1
4 package nl.justobjects.pushlet.test;
5
6 import nl.justobjects.pushlet.client.PushletClient;
7 import nl.justobjects.pushlet.client.PushletClientListener;
8 import nl.justobjects.pushlet.core.Event;
9 import nl.justobjects.pushlet.core.Protocol;
10import nl.justobjects.pushlet.util.PushletException;
11import nl.justobjects.pushlet.util.Rand;
12
13import java.util.HashMap;
14import java.util.Map;
15
16
26public class StressTester implements Protocol {
27 static private String host = "localhost";
28 static private int port = 8080;
29 static private int TESTER_COUNT = 10;
30 private static final String SUBJECT = "/test/ping";
31 private static final long MIN_PUBLISH_INTERVAL_MILLIS = 200;
32 private static final long MAX_PUBLISH_INTERVAL_MILLIS = 1000;
33 private static final long MIN_SUBSCRIBER_INTERVAL_MILLIS = 500;
34 private static final long MAX_SUBSCRIBER_INTERVAL_MILLIS = 1000;
35
36 public StressTester() {
37 }
38
39 public void run() {
40 new EventPublisher().start();
41 new EventSubscriber().start();
42 }
43
44
47 public void err(String s) {
48 System.out.println("[StressTester] ERROR" + s);
49 }
50
51
54 public void p(String s) {
55 System.out.println("[StressTester] " + s);
56 }
57
58 private class EventSubscriber extends Thread implements PushletClientListener {
59 private PushletClient pushletClient;
60
61 public void run() {
62 while (true) {
63 try {
66 pushletClient = new PushletClient(host, port);
67 pushletClient.join();
69 pushletClient.listen(this, Protocol.MODE_STREAM);
70 String subscriptionId = pushletClient.subscribe(SUBJECT);
73 pushletClient.unsubscribe(subscriptionId);
74
75 subscriptionId = pushletClient.subscribe(SUBJECT);
77 sleepRandom();
79 pushletClient.unsubscribe(subscriptionId);
81 pushletClient.leave();
82
83 } catch (Throwable t) {
84 err("Error in EventSubscriber t=" + t);
85 return;
86 }
87 }
88 }
89
90
93 public void onError(String message) {
94 }
96
97
00 public void onAbort(Event theEvent) {
01 }
03
04
07 public void onData(Event theEvent) {
08 long then = Long.parseLong(theEvent.getField("time"));
10 long delay = System.currentTimeMillis() - then;
11 }
13
14
17 public void onHeartbeat(Event theEvent) {
18 }
20
21 private void sleepRandom() throws InterruptedException {
22 Thread.sleep(Rand.randomLong(MIN_SUBSCRIBER_INTERVAL_MILLIS, MAX_SUBSCRIBER_INTERVAL_MILLIS));
23 }
24 }
25
26 private class EventPublisher extends Thread {
27 private PushletClient pushletClient;
28
29 public void run() {
30 try {
33 pushletClient = new PushletClient(host, port);
34 pushletClient.join();
35
36 } catch (PushletException pe) {
38 err("Error in EventPublisher pe=" + pe);
39 return;
40 }
41
42 Map eventData = new HashMap(2);
44 int seqNr = 1;
45 while (true) {
46 try {
47 eventData.put("seqNr", "" + seqNr++);
49 eventData.put("time", "" + System.currentTimeMillis());
50
51 pushletClient.publish(SUBJECT, eventData);
53
54 Thread.sleep(Rand.randomLong(MIN_PUBLISH_INTERVAL_MILLIS, MAX_PUBLISH_INTERVAL_MILLIS));
55 } catch (Exception e) {
56 p("EventPublisher exception: " + e);
57 return;
58 }
59 }
60 }
61
62 }
63
64
67 public static void main(String args[]) {
68 if (args.length > 0) {
69 TESTER_COUNT = Integer.parseInt(args[0]);
70 }
71 if (args.length == 3) {
72 host = args[1];
73 port = Integer.parseInt(args[2]);
74 }
75
76 for (int i = 0; i < TESTER_COUNT; i++) {
77 new StressTester().run();
78 }
79
80 }
81}
82
83
93