| TestEventPushSources.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.test;
5
6 import nl.justobjects.pushlet.core.Dispatcher;
7 import nl.justobjects.pushlet.core.Event;
8 import nl.justobjects.pushlet.core.EventSource;
9 import nl.justobjects.pushlet.util.Rand;
10
11import java.io.BufferedReader;
12import java.io.IOException;
13import java.io.InputStream;
14import java.io.InputStreamReader;
15import java.net.URL;
16import java.util.StringTokenizer;
17import java.util.Vector;
18
19/**
20 * Event sources that push events (for testing).
21 *
22 * @author Just van den Broecke - Just Objects ©
23 * @version $Id: TestEventPushSources.java,v 1.10 2007/11/09 13:16:57 justb Exp $
24 */
25public class TestEventPushSources {
26
27 static public class AEXStocksEventPushSourceABN {
28 String pageURL = "http://ri2.rois.com/E36msPtnZC0e15CVb4KT97JAGfGSfCcrvv6*FcyZIoNyh/CTIB/RI2APISNAP?RIC=0%23.AEX&FORMAT=XML";
29 // This could be further expanded: getting the Reuters AEX stocks
30 // as XML from ABN with this URL, but we may get into legal problems...
31 }
32
33 /**
34 * Produces events from REAL stocks from the AEX.
35 */
36 static public class AEXStocksEventPushSource implements EventSource, Runnable {
37 /**
38 * Here we get our stocks from.
39 */
40 String pageURL = "http://www.debeurs.nl/koersen/aex.asp";
41 Thread thread = null;
42 volatile boolean active = false;
43
44 // Since Baan has been thrown out...
45 public final static int NR_OF_STOCKS = 24;
46
47 public final static String EMPTY = "wait...";
48 private int restarts = 1;
49
50 class Stock {
51 public String name = EMPTY;
52 public String rate = EMPTY;
53 volatile public boolean modified = false;
54 }
55
56 Vector stocksCache = new Vector(NR_OF_STOCKS);
57
58 public AEXStocksEventPushSource() {
59 for (int i = 0; i < NR_OF_STOCKS; i++) {
60 stocksCache.addElement(new Stock());
61 }
62 // updateCache();
63 }
64
65 /**
66 * Activate the event source.
67 */
68 synchronized public void activate() {
69 e("activating...");
70 // Stop a possibly running thread
71 stopThread();
72
73 // Start new thread and
74 thread = new Thread(this, "AEXStocksPublisher-" + (restarts++));
75 active = true;
76 thread.start();
77 e("activated");
78 }
79
80 /**
81 * Deactivate the event source.
82 */
83 synchronized public void passivate() {
84 e("passivating...");
85 active = false;
86 stopThread();
87
88 // Mark the cache modified so we'll send the contents
89 // on the next activation.
90 for (int i = 0; i < NR_OF_STOCKS; i++) {
91 ((Stock) stocksCache.elementAt(i)).modified = true;
92 }
93
94 e("passivated");
95 }
96
97
98 /**
99 * Deactivate the event source.
00 */
01 synchronized public void stop() {
02 }
03
04 public void run() {
05 // Publish cache content (if any) first.
06 publishStocks();
07
08 int count = 5; // enforce update first
09 while (active) {
10
11 // Only do work if active
12 // Update cache every 10 secs.
13 if (count == 5) {
14 updateCache();
15 count = 0;
16 }
17 count++;
18
19 // Do updates for changed stock rates
20 sendUpdates();
21
22 // If we were interrupted just return.
23 if (thread == null || thread.isInterrupted()) {
24 break;
25 }
26
27 // Sleep 2 secs before sending next updates
28 try {
29 thread.sleep(2000);
30 } catch (InterruptedException ie) {
31 break;
32 }
33 }
34
35 // Loop terminated: reset vars
36 thread = null;
37 active = false;
38 }
39
40 private String getStocksLine() {
41 BufferedReader br = null;
42 InputStream is = null;
43 String nextLine = "";
44
45 // Read line from server
46 try {
47 is = new URL(pageURL).openStream();
48 br = new BufferedReader(new InputStreamReader(is));
49 boolean foundLine = false;
50 while (!foundLine) {
51 nextLine = br.readLine();
52 if (nextLine == null) {
53 return "";
54 }
55 foundLine = (nextLine.indexOf("details.asp?iid=14053&parent=aex") != -1);
56 }
57 } catch (Exception e) {
58 e("could not open or read URL pageURL=" + pageURL + " ex=" + e);
59 return "";
60 } finally {
61 try {
62 if (is != null) is.close();
63 } catch (IOException ignore) {
64 }
65 }
66 return nextLine;
67 }
68
69 private void publishStocks() {
70 // Publish only modified stocks from the cache
71 for (int i = 0; i < NR_OF_STOCKS; i++) {
72 Stock nextStock = (Stock) stocksCache.elementAt(i);
73
74 // Publish modified stocks
75 if (nextStock.modified) {
76 publishStock(i, nextStock.name, nextStock.rate);
77 nextStock.modified = false;
78 try {
79 Thread.sleep(400);
80 } catch (InterruptedException ie) {
81 return;
82 }
83 }
84 }
85 }
86
87 private void publishStock(int index, String name, String rate) {
88 Event event = Event.createDataEvent("/stocks/aex");
89 event.setField("number", index + "");
90 event.setField("name", name);
91 event.setField("rate", rate);
92 p("publish: nr=" + index + " name=" + name + " rate=" + rate);
93 Dispatcher.getInstance().multicast(event);
94 }
95
96 private void sendUpdates() {
97 p("sending updates");
98 // In any case send a random stock value by
99 // making it modified, just to see something moving...
00 int randomIndex = Rand.randomInt(0, NR_OF_STOCKS - 1);
01 Stock randomStock = (Stock) stocksCache.elementAt(randomIndex);
02 randomStock.modified = true;
03
04 publishStocks();
05 }
06
07 private void stopThread() {
08 if (thread != null) {
09 thread.interrupt();
10 thread = null;
11 }
12 }
13
14 private void updateCache() {
15 p("updating Cache");
16
17 // Get the line with all stocks from HTML page
18 String stocksLine = getStocksLine();
19 if ("".equals(stocksLine)) {
20 e("updateCache: stocksLine == null");
21 return;
22 }
23
24 // Parse the stocksline and put in cache.
25 // Beware: this is the messy part!!
26 // We assume that stock/names and rates are located at
27 // regular positions in the line.
28 String delim = "<>";
29 StringTokenizer st = new StringTokenizer(stocksLine, delim);
30 String nextToken = "";
31 int count = 0;
32 String nextStock = "";
33 String nextQuote = "";
34 String currentQuote = null;
35 int index = -1;
36 while (st.hasMoreTokens()) {
37 nextToken = st.nextToken();
38 count++;
39 // The <TD> with the stock name
40 if ((count - 5) % 57 == 0) {
41 p("c=" + count + " s=" + nextToken);
42 nextStock = nextToken;
43 }
44
45 // The <TD> with the stock rate
46 if ((count - 10) % 57 == 0) {
47 nextQuote = nextToken;
48 index++;
49 p("c=" + count + " val=" + nextQuote);
50 Stock currentStock = (Stock) stocksCache.elementAt(index);
51
52 // Only update new or modified stocks
53 if (EMPTY.equals(currentStock.rate) || !currentStock.rate.equals(nextQuote)) {
54 p("modified: " + nextStock);
55 currentStock.name = nextStock;
56 currentStock.rate = nextQuote;
57 currentStock.modified = true;
58 }
59 }
60 }
61 }
62 }
63
64
65 /**
66 * Util: stderr print method.
67 */
68 public static void e(String s) {
69 System.out.println("AEXStocksEventPushSource: " + s);
70 }
71
72 /**
73 * Util: stdout print method.
74 */
75 public static void p(String s) {
76 // System.out.println(s);
77 }
78
79
80 public static void main(String[] args) {
81 // new TestEventPushSources$AEXStocksEventPushSource();
82 }
83}
84
85/*
86 * $Log: TestEventPushSources.java,v $
87 * Revision 1.10 2007/11/09 13:16:57 justb
88 * use Rand from util package (and and Rand.java to pushlet client jar
89 *
90 * Revision 1.9 2005/02/28 09:14:56 justb
91 * sessmgr/dispatcher factory/singleton support
92 *
93 * Revision 1.8 2005/02/21 16:59:17 justb
94 * SessionManager and session lease introduced
95 *
96 * Revision 1.7 2005/02/18 10:07:23 justb
97 * many renamings of classes (make names compact)
98 *
99 * Revision 1.6 2005/02/18 09:54:15 justb
00 * refactor: rename Publisher Dispatcher and single Subscriber class
01 *
02 * Revision 1.5 2004/09/03 22:35:37 justb
03 * Almost complete rewrite, just checking in now
04 *
05 * Revision 1.4 2004/03/10 15:45:55 justb
06 * many cosmetic changes
07 *
08 * Revision 1.3 2003/08/15 08:37:41 justb
09 * fix/add Copyright+LGPL file headers and footers
10 *
11 * Revision 1.2 2003/05/18 16:15:08 justb
12 * support for XML encoded Events
13 *
14 * Revision 1.1.1.1 2002/09/24 21:02:33 justb
15 * import to sourceforge
16 *
17 * Revision 1.1.1.1 2002/09/20 22:48:20 justb
18 * import to SF
19 *
20 * Revision 1.1.1.1 2002/09/20 14:19:02 justb
21 * first import into SF
22 *
23 * Revision 1.6 2001/02/18 23:45:13 just
24 * fixes for AEX
25 *
26 * Revision 1.5 2000/10/30 14:16:09 just
27 * no message
28 *
29 * Revision 1.4 2000/09/24 21:02:43 just
30 * chnages due to changed webpage in debeurs.nl
31 *
32 * Revision 1.3 2000/08/31 12:49:50 just
33 * added CVS comment tags for log and copyright
34 *
35 *
36 */
37| TestEventPushSources.java |