| PushletClient.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.client;
5
6 import nl.justobjects.pushlet.core.Event;
7 import nl.justobjects.pushlet.core.EventParser;
8 import nl.justobjects.pushlet.core.Protocol;
9 import nl.justobjects.pushlet.util.PushletException;
10
11import java.io.IOException;
12import java.io.InputStreamReader;
13import java.io.Reader;
14import java.io.OutputStream;
15import java.net.*;
16import java.util.Map;
17
18/**
19 * Client API for Java HTTP client applets or apps.
20 * <p/>
21 * Use this class within Java client applications or applets.
22 * Implement a PushletClientListener to receive callbacks for
23 * data-related Event objects pushed by the server.
24 * <p/>
25 * This class may also be used as a base class and be extended
26 * for custom clients, hence the presence of many proteced methods.
27 *
28 * @author Just van den Broecke - Just Objects ©
29 * @version $Id: PushletClient.java,v 1.19 2009/06/04 12:46:35 justb Exp $
30 * @see PushletClientListener
31 * @see nl.justobjects.pushlet.test.PushletApplet
32 * @see nl.justobjects.pushlet.test.PushletPingApplication
33 */
34public class PushletClient implements Protocol {
35 /**
36 * Pushlet URL.
37 */
38 private String pushletURL;
39
40 /**
41 * Debug flag for verbose output.
42 */
43 private boolean debug;
44
45 /**
46 * Id gotten on join ack
47 */
48 private String id;
49
50 /**
51 * Internal listener for data events pushed by server.
52 */
53 protected DataEventListener dataEventListener;
54
55 /**
56 * Constructor with full pushlet URL.
57 */
58 public PushletClient(String aPushletURL) {
59 pushletURL = aPushletURL;
60 }
61
62 /**
63 * Constructor with host and port using default URI.
64 */
65 public PushletClient(String aHost, int aPort) {
66 this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
67 }
68
69 /**
70 * Set proxy options and optional proxy authentication.
71 * <p/>
72 * Contributed by Dele Olajide
73 * See http://groups.yahoo.com/group/pushlet/message/634
74 * <p/>
75 * Usage:
76 * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet");
77 * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....);
78 * <p/>
79 * use pushletClient further as normal
80 */
81 public void setProxyOptions(String aProxyHost,
82 String aProxyPort, String theNonProxyHosts,
83 String aUserName, String aPassword, String anNTLMDomain) {
84
85 // Enable proxying
86 System.setProperty("http.proxySet", "true");
87 System.setProperty("http.proxyHost", aProxyHost);
88 System.setProperty("http.proxyPort", aProxyPort);
89
90 // Set optional non-proxy hosts
91 if (theNonProxyHosts != null) {
92 System.setProperty("http.nonProxyHosts", theNonProxyHosts);
93 }
94
95 // If user name specified configure proxy authentication
96 if (aUserName != null) {
97 System.setProperty("http.proxyUser", aUserName);
98 System.setProperty("http.proxyPassword", aPassword);
99
00 // See inner class below
01 Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));
02
03 // Optional NT domain
04 if (anNTLMDomain != null) {
05 System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
06 }
07 }
08 }
09
10 /**
11 * Join server, starts session.
12 */
13 public void join() throws PushletException {
14 Event event = new Event(E_JOIN);
15 event.setField(P_FORMAT, FORMAT_XML);
16 Event response = doControl(event);
17 throwOnNack(response);
18
19 // Join Ack received
20 id = response.getField(P_ID);
21 }
22
23 /**
24 * Leave server, stops session.
25 */
26 public void leave() throws PushletException {
27 stopListen();
28 throwOnInvalidSession();
29 Event event = new Event(E_LEAVE);
30 event.setField(P_ID, id);
31 Event response = doControl(event);
32
33 throwOnNack(response);
34 id = null;
35 }
36
37 /**
38 * Open data channel.
39 */
40 public void listen(PushletClientListener aListener) throws PushletException {
41 listen(aListener, MODE_STREAM);
42 }
43
44 /**
45 * Open data channel in stream or push mode.
46 */
47 public void listen(PushletClientListener aListener, String aMode) throws PushletException {
48 listen(aListener, aMode, null);
49 }
50
51 /**
52 * Open data channel in stream or push mode with a subject.
53 */
54 public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
55 throwOnInvalidSession();
56 stopListen();
57
58 String listenURL = pushletURL
59 + "?" + P_EVENT + "=" + E_LISTEN
60 + "&" + P_ID + "=" + id
61 + "&" + P_MODE + "=" + aMode;
62 if (aSubject != null) {
63 listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
64 }
65
66 // Start listener thread (sync call).
67 startDataEventListener(aListener, listenURL);
68 }
69
70 /**
71 * Immediate listener: joins/subscribes and listens in one action.
72 */
73 public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
74 stopListen();
75
76 String listenURL = pushletURL
77 + "?" + P_EVENT + "=" + E_JOIN_LISTEN
78 + "&" + P_FORMAT + "=" + FORMAT_XML
79 + "&" + P_MODE + "=" + aMode
80 + "&" + P_SUBJECT + "=" + aSubject;
81
82 // Start listener thread (sync call).
83 startDataEventListener(aListener, listenURL);
84 }
85
86 /**
87 * Publish an event through server.
88 */
89 public void publish(String aSubject, Map theAttributes) throws PushletException {
90 throwOnInvalidSession();
91 Event event = new Event(E_PUBLISH, theAttributes);
92 event.setField(P_SUBJECT, aSubject);
93 event.setField(P_ID, id);
94 Event response = doControl(event);
95 throwOnNack(response);
96 }
97
98 /**
99 * Subscribes, returning subscription id.
00 */
01 public String subscribe(String aSubject, String aLabel) throws PushletException {
02 throwOnInvalidSession();
03 Event event = new Event(E_SUBSCRIBE);
04 event.setField(P_ID, id);
05 event.setField(P_SUBJECT, aSubject);
06
07 // Optional label, is returned in data events
08 if (aLabel != null) {
09 event.setField(P_SUBSCRIPTION_LABEL, aLabel);
10 }
11
12 // Send request
13 Event response = doControl(event);
14 throwOnNack(response);
15
16 return response.getField(P_SUBSCRIPTION_ID);
17 }
18
19 /**
20 * Subscribes, returning subscription id.
21 */
22 public String subscribe(String aSubject) throws PushletException {
23 return subscribe(aSubject, null);
24 }
25
26 /**
27 * Unsubscribes with subscription id.
28 */
29 public void unsubscribe(String aSubscriptionId) throws PushletException {
30 throwOnInvalidSession();
31 Event event = new Event(E_UNSUBSCRIBE);
32 event.setField(P_ID, id);
33
34 // Optional subscription id
35 if (aSubscriptionId != null) {
36 event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
37 }
38
39 Event response = doControl(event);
40 throwOnNack(response);
41 }
42
43 /**
44 * Unsubscribes from all subjects.
45 */
46 public void unsubscribe() throws PushletException {
47 unsubscribe(null);
48 }
49
50 /**
51 * Stop the listener.
52 */
53 public void stopListen() throws PushletException {
54 if (dataEventListener != null) {
55 unsubscribe();
56 dataEventListener.stop();
57 dataEventListener = null;
58 }
59 }
60
61 public void setDebug(boolean b) {
62 debug = b;
63 }
64
65 /**
66 * Starts default DataEventListener and waits for its thread to start.
67 */
68 protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
69 // Suggestion by Jeff Nowakowski 29.oct.2006
70 dataEventListener = new DataEventListener(aListener, aListenURL);
71
72 synchronized (dataEventListener) {
73 dataEventListener.start();
74 try {
75 // Wait for data event listener (thread) to start
76 dataEventListener.wait();
77 } catch (InterruptedException e) {
78 }
79 }
80 }
81
82 protected void throwOnNack(Event anEvent) throws PushletException {
83 if (anEvent.getEventType().equals(E_NACK)) {
84 throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
85 }
86 }
87
88 protected void throwOnInvalidSession() throws PushletException {
89 if (id == null) {
90 throw new PushletException("Invalid pushlet session");
91 }
92 }
93
94 protected Reader openURL(String aURL) throws PushletException {
95 // Open URL connection with server
96 try {
97 p("Connecting to " + aURL);
98 URL url = new URL(aURL);
99 URLConnection urlConnection = url.openConnection();
00
01 // Disable any kind of caching.
02 urlConnection.setUseCaches(false);
03 urlConnection.setDefaultUseCaches(false);
04
05 // TODO: later version may use POST
06 // Enable HTTP POST
07 // urlConnection.setDoOutput(true);
08
09 // Do the POST with Event in XML in body
10 // OutputStream os = urlConnection.getOutputStream();
11 // os.write(anEvent.toXML().getBytes());
12 // os.flush();
13 // os.close();
14
15 // Get the stream from the server.
16 // reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
17 // Note: somehow the client does not work with some JVMs when using
18 // BufferedInputStream... So do unbuffered input.
19 // p("Opening urlConnection inputstream");
20 return new InputStreamReader(urlConnection.getInputStream());
21
22 } catch (Throwable t) {
23 warn("openURL() could not open " + aURL, t);
24 throw new PushletException(" could not open " + aURL, t);
25 }
26 }
27
28
29 /**
30 * Send control events to server and return response.
31 */
32 protected Event doControl(Event aControlEvent) throws PushletException {
33 String controlURL = pushletURL + "?" + aControlEvent.toQueryString();
34
35 p("doControl to " + controlURL);
36
37 // Open URL connection with server
38 Reader reader = openURL(controlURL);
39
40 // Get Pushlet event from stream
41 Event event = null;
42 try {
43 p("Getting event...");
44 // Get next event from server
45 event = EventParser.parse(reader);
46 p("Event received " + event);
47 return event;
48 } catch (Throwable t) {
49 // Stop and report error.
50 warn("doControl() exception", t);
51 throw new PushletException(" error parsing response from" + controlURL, t);
52 }
53 }
54
55 /**
56 * Util: print.
57 */
58 protected void p(String s) {
59 if (debug) {
60 System.out.println("[PushletClient] " + s);
61 }
62 }
63
64 /**
65 * Util: warn.
66 */
67 protected void warn(String s) {
68 warn(s, null);
69 }
70
71 /**
72 * Util: warn with exception.
73 */
74 protected void warn(String s, Throwable t) {
75 System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);
76
77 if (t != null) {
78 t.printStackTrace();
79 }
80 }
81
82 /**
83 * Internal (default) listener for the Pushlet data channel.
84 */
85 protected class DataEventListener implements Runnable {
86 /**
87 * Client's listener that gets called back on events.
88 */
89 private PushletClientListener listener;
90
91 /**
92 * Receiver receiveThread.
93 */
94 private Thread receiveThread = null;
95 private Reader reader;
96 private String refreshURL;
97 private String listenURL;
98
99 public DataEventListener(PushletClientListener aListener, String aListenURL) {
00 listener = aListener;
01 listenURL = aListenURL;
02 }
03
04 public void start() {
05 // All ok: start a receiver receiveThread
06 receiveThread = new Thread(this);
07 receiveThread.start();
08
09 }
10
11 /**
12 * Stop listening; may restart later with start().
13 */
14 public void stop() {
15 p("In stop()");
16 bailout();
17 }
18
19 /**
20 * Receive event objects from server and callback listener.
21 */
22 public void run() {
23 p("Start run()");
24 try {
25 while (receiveThread != null && receiveThread.isAlive()) {
26 // Connect to server
27 reader = openURL(listenURL);
28
29 synchronized (this) {
30 // Inform the calling thread we're ready to receive events.
31 // Suggestion by Jeff Nowakowski 29.oct.2006
32 this.notify();
33 }
34
35 // Get events while we're alive.
36 while (receiveThread != null && receiveThread.isAlive()) {
37 Event event = null;
38 try {
39 // p("Getting event...");
40 // Get next event from server
41 event = EventParser.parse(reader);
42 p("Event received " + event);
43 } catch (Throwable t) {
44
45 // Stop and report error.
46 // warn("Stop run() on exception", t);
47 if (listener != null) {
48 listener.onError("exception during receive: " + t);
49 }
50
51 break;
52 }
53
54 // Handle event by calling listener
55 if (event != null && listener != null) {
56 // p("received: " + event.toXML());
57 String eventType = event.getEventType();
58 if (eventType.equals(E_HEARTBEAT)) {
59 listener.onHeartbeat(event);
60 } else if (eventType.equals(E_DATA)) {
61 listener.onData(event);
62 } else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
63 id = event.getField(P_ID);
64 } else if (eventType.equals(E_LISTEN_ACK)) {
65 p("Listen ack ok");
66 } else if (eventType.equals(E_REFRESH_ACK)) {
67 // ignore
68 } else if (eventType.equals(E_ABORT)) {
69 listener.onAbort(event);
70 listener = null;
71 break;
72 } else if (eventType.equals(E_REFRESH)) {
73 refresh(event);
74 } else {
75 handleUnknownEventType(eventType, event, listener);
76 }
77 }
78 }
79 }
80 } catch (Throwable t) {
81 warn("Exception in run() ", t);
82 // bailout();
83 }
84 }
85
86 protected void disconnect() {
87 p("start disconnect()");
88 if (reader != null) {
89 try {
90 // this blocks, find another way
91 // reader.close();
92 p("Closed reader ok");
93 } catch (Exception ignore) {
94 } finally {
95 reader = null;
96 }
97 }
98 p("end disconnect()");
99 }
00
01 /**
02 * Stop receiver receiveThread.
03 */
04 public void stopThread() {
05 p("In stopThread()");
06
07 // Keep a reference such that we can kill it from here.
08 Thread targetThread = receiveThread;
09
10 receiveThread = null;
11
12 // This should stop the main loop for this receiveThread.
13 // Killing a receiveThread on a blcing read is tricky.
14 // See also http://gee.cs.oswego.edu/dl/cpj/cancel.html
15 if ((targetThread != null) && targetThread.isAlive()) {
16
17 targetThread.interrupt();
18
19 try {
20
21 // Wait for it to die
22 targetThread.join(500);
23 } catch (InterruptedException ignore) {
24 }
25
26 // If current receiveThread refuses to die,
27 // take more rigorous methods.
28 if (targetThread.isAlive()) {
29
30 // Not preferred but may be needed
31 // to stop during a blocking read.
32 targetThread.stop();
33
34 // Wait for it to die
35 try {
36 targetThread.join(500);
37 } catch (Throwable ignore) {
38 }
39 }
40
41 p("Stopped receiveThread alive=" + targetThread.isAlive());
42
43 }
44 }
45
46 /**
47 * Stop listening on stream from server.
48 */
49 public void bailout() {
50 p("In bailout()");
51 stopThread();
52 disconnect();
53 }
54
55 /**
56 * Handle refresh, by pausing.
57 */
58 protected void refresh(Event aRefreshEvent) throws PushletException {
59 try {
60 // Wait for specified time.
61 Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
62 } catch (Throwable t) {
63 warn("abort while refresing");
64 refreshURL = null;
65 return;
66 }
67
68 // If stopped during sleep, don't proceed
69 if (receiveThread == null) {
70 return;
71 }
72
73 // Create url to refresh
74 refreshURL = pushletURL
75 + "?" + P_ID + "=" + id
76 + "&" + P_EVENT + "=" + E_REFRESH
77 ;
78
79 if (reader != null) {
80 try {
81 reader.close();
82
83 } catch (IOException ignore) {
84
85 }
86 reader = null;
87 }
88
89 reader = openURL(refreshURL);
90 }
91
92 /**
93 * Handle unknown Event (default behaviour).
94 */
95 protected void handleUnknownEventType(String eventType, Event event, PushletClientListener listener) {
96 warn("unsupported event type received: " + eventType);
97 }
98 }
99
00 /**
01 * Authenticator
02 */
03 private static class HTTPAuthenticateProxy extends Authenticator {
04
05 /**
06 * Contributed by Dele Olajide
07 * See http://groups.yahoo.com/group/pushlet/message/634
08 */
09
10 private String thePassword = "";
11 private String theUser = "";
12
13 public HTTPAuthenticateProxy(String username, String password) {
14
15 thePassword = password;
16 theUser = username;
17 }
18
19 protected PasswordAuthentication getPasswordAuthentication() {
20 // System.out.println("[HttpAuthenticateProxy] Username = " + theUser);
21 // System.out.println("[HttpAuthenticateProxy] Password = " + thePassword);
22
23 return new PasswordAuthentication(theUser, thePassword.toCharArray());
24 }
25
26 }
27
28}
29
30/*
31 * $Log: PushletClient.java,v $
32 * Revision 1.19 2009/06/04 12:46:35 justb
33 * PushletClient: add more hooks for extension (feat ID: 2799694 Craig M)
34 *
35 * Revision 1.18 2007/11/10 13:52:47 justb
36 * make startDataEventListener method protected to allow overriding
37 *
38 * Revision 1.17 2006/10/29 16:47:57 justb
39 * included patch from Jeff Nowakowski: wait until listener thread runs
40 *
41 * Revision 1.16 2005/05/06 20:08:20 justb
42 * client enhancements
43 *
44 * Revision 1.15 2005/03/27 17:42:27 justb
45 * enhancements
46 *
47 * Revision 1.14 2005/03/25 23:54:04 justb
48 * *** empty log message ***
49 *
50 * Revision 1.13 2005/02/28 16:59:40 justb
51 * fixes for leave and disconnect
52 *
53 * Revision 1.12 2005/02/28 15:57:54 justb
54 * added SimpleListener example
55 *
56 * Revision 1.11 2005/02/21 12:31:44 justb
57 * added proxy contribution from Dele Olajide
58 *
59 * Revision 1.10 2005/02/20 13:05:32 justb
60 * removed the Postlet (integrated in Pushlet protocol)
61 *
62 * Revision 1.9 2005/02/18 10:07:23 justb
63 * many renamings of classes (make names compact)
64 *
65 * Revision 1.8 2005/02/18 09:54:12 justb
66 * refactor: rename Publisher Dispatcher and single Subscriber class
67 *
68 * Revision 1.7 2005/02/15 15:46:30 justb
69 * client API improves
70 *
71 * Revision 1.6 2005/02/15 13:28:56 justb
72 * first quick rewrite adapt for v2 protocol
73 *
74 * Revision 1.5 2004/10/25 21:23:44 justb
75 * *** empty log message ***
76 *
77 * Revision 1.4 2004/10/24 13:52:51 justb
78 * small fixes in client lib
79 *
80 * Revision 1.3 2004/10/24 12:58:18 justb
81 * revised client and test classes for new protocol
82 *
83 * Revision 1.2 2004/09/03 22:35:37 justb
84 * Almost complete rewrite, just checking in now
85 *
86 * Revision 1.1 2004/03/10 20:14:17 justb
87 * renamed all *JavaPushletClient* to *PushletClient*
88 *
89 * Revision 1.10 2004/03/10 15:45:55 justb
90 * many cosmetic changes
91 *
92 * Revision 1.9 2003/08/17 20:30:20 justb
93 * cosmetic changes
94 *
95 * Revision 1.8 2003/08/15 08:37:40 justb
96 * fix/add Copyright+LGPL file headers and footers
97 *
98 *
99 */| PushletClient.java |