1
4 package nl.justobjects.pushlet.client;
5
6 import nl.justobjects.pushlet.core.Event;
7 import nl.justobjects.pushlet.core.EventParser;
8 import nl.justobjects.pushlet.core.Protocol;
9 import nl.justobjects.pushlet.util.PushletException;
10
11import java.io.IOException;
12import java.io.InputStreamReader;
13import java.io.Reader;
14import java.io.OutputStream;
15import java.net.*;
16import java.util.Map;
17
18
34public class PushletClient implements Protocol {
35
38 private String pushletURL;
39
40
43 private boolean debug;
44
45
48 private String id;
49
50
53 protected DataEventListener dataEventListener;
54
55
58 public PushletClient(String aPushletURL) {
59 pushletURL = aPushletURL;
60 }
61
62
65 public PushletClient(String aHost, int aPort) {
66 this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
67 }
68
69
81 public void setProxyOptions(String aProxyHost,
82 String aProxyPort, String theNonProxyHosts,
83 String aUserName, String aPassword, String anNTLMDomain) {
84
85 System.setProperty("http.proxySet", "true");
87 System.setProperty("http.proxyHost", aProxyHost);
88 System.setProperty("http.proxyPort", aProxyPort);
89
90 if (theNonProxyHosts != null) {
92 System.setProperty("http.nonProxyHosts", theNonProxyHosts);
93 }
94
95 if (aUserName != null) {
97 System.setProperty("http.proxyUser", aUserName);
98 System.setProperty("http.proxyPassword", aPassword);
99
00 Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));
02
03 if (anNTLMDomain != null) {
05 System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
06 }
07 }
08 }
09
10
13 public void join() throws PushletException {
14 Event event = new Event(E_JOIN);
15 event.setField(P_FORMAT, FORMAT_XML);
16 Event response = doControl(event);
17 throwOnNack(response);
18
19 id = response.getField(P_ID);
21 }
22
23
26 public void leave() throws PushletException {
27 stopListen();
28 throwOnInvalidSession();
29 Event event = new Event(E_LEAVE);
30 event.setField(P_ID, id);
31 Event response = doControl(event);
32
33 throwOnNack(response);
34 id = null;
35 }
36
37
40 public void listen(PushletClientListener aListener) throws PushletException {
41 listen(aListener, MODE_STREAM);
42 }
43
44
47 public void listen(PushletClientListener aListener, String aMode) throws PushletException {
48 listen(aListener, aMode, null);
49 }
50
51
54 public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
55 throwOnInvalidSession();
56 stopListen();
57
58 String listenURL = pushletURL
59 + "?" + P_EVENT + "=" + E_LISTEN
60 + "&" + P_ID + "=" + id
61 + "&" + P_MODE + "=" + aMode;
62 if (aSubject != null) {
63 listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
64 }
65
66 startDataEventListener(aListener, listenURL);
68 }
69
70
73 public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
74 stopListen();
75
76 String listenURL = pushletURL
77 + "?" + P_EVENT + "=" + E_JOIN_LISTEN
78 + "&" + P_FORMAT + "=" + FORMAT_XML
79 + "&" + P_MODE + "=" + aMode
80 + "&" + P_SUBJECT + "=" + aSubject;
81
82 startDataEventListener(aListener, listenURL);
84 }
85
86
89 public void publish(String aSubject, Map theAttributes) throws PushletException {
90 throwOnInvalidSession();
91 Event event = new Event(E_PUBLISH, theAttributes);
92 event.setField(P_SUBJECT, aSubject);
93 event.setField(P_ID, id);
94 Event response = doControl(event);
95 throwOnNack(response);
96 }
97
98
01 public String subscribe(String aSubject, String aLabel) throws PushletException {
02 throwOnInvalidSession();
03 Event event = new Event(E_SUBSCRIBE);
04 event.setField(P_ID, id);
05 event.setField(P_SUBJECT, aSubject);
06
07 if (aLabel != null) {
09 event.setField(P_SUBSCRIPTION_LABEL, aLabel);
10 }
11
12 Event response = doControl(event);
14 throwOnNack(response);
15
16 return response.getField(P_SUBSCRIPTION_ID);
17 }
18
19
22 public String subscribe(String aSubject) throws PushletException {
23 return subscribe(aSubject, null);
24 }
25
26
29 public void unsubscribe(String aSubscriptionId) throws PushletException {
30 throwOnInvalidSession();
31 Event event = new Event(E_UNSUBSCRIBE);
32 event.setField(P_ID, id);
33
34 if (aSubscriptionId != null) {
36 event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
37 }
38
39 Event response = doControl(event);
40 throwOnNack(response);
41 }
42
43
46 public void unsubscribe() throws PushletException {
47 unsubscribe(null);
48 }
49
50
53 public void stopListen() throws PushletException {
54 if (dataEventListener != null) {
55 unsubscribe();
56 dataEventListener.stop();
57 dataEventListener = null;
58 }
59 }
60
61 public void setDebug(boolean b) {
62 debug = b;
63 }
64
65
68 protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
69 dataEventListener = new DataEventListener(aListener, aListenURL);
71
72 synchronized (dataEventListener) {
73 dataEventListener.start();
74 try {
75 dataEventListener.wait();
77 } catch (InterruptedException e) {
78 }
79 }
80 }
81
82 protected void throwOnNack(Event anEvent) throws PushletException {
83 if (anEvent.getEventType().equals(E_NACK)) {
84 throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
85 }
86 }
87
88 protected void throwOnInvalidSession() throws PushletException {
89 if (id == null) {
90 throw new PushletException("Invalid pushlet session");
91 }
92 }
93
94 protected Reader openURL(String aURL) throws PushletException {
95 try {
97 p("Connecting to " + aURL);
98 URL url = new URL(aURL);
99 URLConnection urlConnection = url.openConnection();
00
01 urlConnection.setUseCaches(false);
03 urlConnection.setDefaultUseCaches(false);
04
05
09
15 return new InputStreamReader(urlConnection.getInputStream());
21
22 } catch (Throwable t) {
23 warn("openURL() could not open " + aURL, t);
24 throw new PushletException(" could not open " + aURL, t);
25 }
26 }
27
28
29
32 protected Event doControl(Event aControlEvent) throws PushletException {
33 String controlURL = pushletURL + "?" + aControlEvent.toQueryString();
34
35 p("doControl to " + controlURL);
36
37 Reader reader = openURL(controlURL);
39
40 Event event = null;
42 try {
43 p("Getting event...");
44 event = EventParser.parse(reader);
46 p("Event received " + event);
47 return event;
48 } catch (Throwable t) {
49 warn("doControl() exception", t);
51 throw new PushletException(" error parsing response from" + controlURL, t);
52 }
53 }
54
55
58 protected void p(String s) {
59 if (debug) {
60 System.out.println("[PushletClient] " + s);
61 }
62 }
63
64
67 protected void warn(String s) {
68 warn(s, null);
69 }
70
71
74 protected void warn(String s, Throwable t) {
75 System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);
76
77 if (t != null) {
78 t.printStackTrace();
79 }
80 }
81
82
85 protected class DataEventListener implements Runnable {
86
89 private PushletClientListener listener;
90
91
94 private Thread receiveThread = null;
95 private Reader reader;
96 private String refreshURL;
97 private String listenURL;
98
99 public DataEventListener(PushletClientListener aListener, String aListenURL) {
00 listener = aListener;
01 listenURL = aListenURL;
02 }
03
04 public void start() {
05 receiveThread = new Thread(this);
07 receiveThread.start();
08
09 }
10
11
14 public void stop() {
15 p("In stop()");
16 bailout();
17 }
18
19
22 public void run() {
23 p("Start run()");
24 try {
25 while (receiveThread != null && receiveThread.isAlive()) {
26 reader = openURL(listenURL);
28
29 synchronized (this) {
30 this.notify();
33 }
34
35 while (receiveThread != null && receiveThread.isAlive()) {
37 Event event = null;
38 try {
39 event = EventParser.parse(reader);
42 p("Event received " + event);
43 } catch (Throwable t) {
44
45 if (listener != null) {
48 listener.onError("exception during receive: " + t);
49 }
50
51 break;
52 }
53
54 if (event != null && listener != null) {
56 String eventType = event.getEventType();
58 if (eventType.equals(E_HEARTBEAT)) {
59 listener.onHeartbeat(event);
60 } else if (eventType.equals(E_DATA)) {
61 listener.onData(event);
62 } else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
63 id = event.getField(P_ID);
64 } else if (eventType.equals(E_LISTEN_ACK)) {
65 p("Listen ack ok");
66 } else if (eventType.equals(E_REFRESH_ACK)) {
67 } else if (eventType.equals(E_ABORT)) {
69 listener.onAbort(event);
70 listener = null;
71 break;
72 } else if (eventType.equals(E_REFRESH)) {
73 refresh(event);
74 } else {
75 handleUnknownEventType(eventType, event, listener);
76 }
77 }
78 }
79 }
80 } catch (Throwable t) {
81 warn("Exception in run() ", t);
82 }
84 }
85
86 protected void disconnect() {
87 p("start disconnect()");
88 if (reader != null) {
89 try {
90 p("Closed reader ok");
93 } catch (Exception ignore) {
94 } finally {
95 reader = null;
96 }
97 }
98 p("end disconnect()");
99 }
00
01
04 public void stopThread() {
05 p("In stopThread()");
06
07 Thread targetThread = receiveThread;
09
10 receiveThread = null;
11
12 if ((targetThread != null) && targetThread.isAlive()) {
16
17 targetThread.interrupt();
18
19 try {
20
21 targetThread.join(500);
23 } catch (InterruptedException ignore) {
24 }
25
26 if (targetThread.isAlive()) {
29
30 targetThread.stop();
33
34 try {
36 targetThread.join(500);
37 } catch (Throwable ignore) {
38 }
39 }
40
41 p("Stopped receiveThread alive=" + targetThread.isAlive());
42
43 }
44 }
45
46
49 public void bailout() {
50 p("In bailout()");
51 stopThread();
52 disconnect();
53 }
54
55
58 protected void refresh(Event aRefreshEvent) throws PushletException {
59 try {
60 Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
62 } catch (Throwable t) {
63 warn("abort while refresing");
64 refreshURL = null;
65 return;
66 }
67
68 if (receiveThread == null) {
70 return;
71 }
72
73 refreshURL = pushletURL
75 + "?" + P_ID + "=" + id
76 + "&" + P_EVENT + "=" + E_REFRESH
77 ;
78
79 if (reader != null) {
80 try {
81 reader.close();
82
83 } catch (IOException ignore) {
84
85 }
86 reader = null;
87 }
88
89 reader = openURL(refreshURL);
90 }
91
92
95 protected void handleUnknownEventType(String eventType, Event event, PushletClientListener listener) {
96 warn("unsupported event type received: " + eventType);
97 }
98 }
99
00
03 private static class HTTPAuthenticateProxy extends Authenticator {
04
05
09
10 private String thePassword = "";
11 private String theUser = "";
12
13 public HTTPAuthenticateProxy(String username, String password) {
14
15 thePassword = password;
16 theUser = username;
17 }
18
19 protected PasswordAuthentication getPasswordAuthentication() {
20
23 return new PasswordAuthentication(theUser, thePassword.toCharArray());
24 }
25
26 }
27
28}
29
30