| PushletClient.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.client;
5
6 import nl.justobjects.pushlet.core.Event;
7 import nl.justobjects.pushlet.core.EventParser;
8 import nl.justobjects.pushlet.core.Protocol;
9 import nl.justobjects.pushlet.util.PushletException;
10
11import java.io.IOException;
12import java.io.InputStreamReader;
13import java.io.Reader;
14import java.io.OutputStream;
15import java.net.*;
16import java.util.Map;
17
18/**
19 * Client API for Java HTTP client applets or apps.
20 * <p/>
21 * Use this class within Java client applications or applets.
22 * Implement a PushletClientListener to receive callbacks for
23 * data-related Event objects pushed by the server.
24 *
25 * @author Just van den Broecke - Just Objects ©
26 * @version $Id: PushletClient.java,v 1.18 2007/11/10 13:52:47 justb Exp $
27 * @see PushletClientListener
28 * @see nl.justobjects.pushlet.test.PushletApplet
29 * @see nl.justobjects.pushlet.test.PushletPingApplication
30 */
31public class PushletClient implements Protocol {
32 /**
33 * Pushlet URL.
34 */
35 private String pushletURL;
36
37 /**
38 * Debug flag for verbose output.
39 */
40 private boolean debug;
41
42 /**
43 * Id gotten on join ack
44 */
45 private String id;
46
47 /**
48 * Internal listener for data events pushed by server.
49 */
50 private DataEventListener dataEventListener;
51
52 /**
53 * Constructor with full pushlet URL.
54 */
55 public PushletClient(String aPushletURL) {
56 pushletURL = aPushletURL;
57 }
58
59 /**
60 * Constructor with host and port using default URI.
61 */
62 public PushletClient(String aHost, int aPort) {
63 this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
64 }
65
66 /**
67 * Set proxy options and optional proxy authentication.
68 * <p/>
69 * Contributed by Dele Olajide
70 * See http://groups.yahoo.com/group/pushlet/message/634
71 * <p/>
72 * Usage:
73 * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet");
74 * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....);
75 * <p/>
76 * use pushletClient further as normal
77 */
78 public void setProxyOptions(String aProxyHost,
79 String aProxyPort, String theNonProxyHosts,
80 String aUserName, String aPassword, String anNTLMDomain) {
81
82 // Enable proxying
83 System.setProperty("http.proxySet", "true");
84 System.setProperty("http.proxyHost", aProxyHost);
85 System.setProperty("http.proxyPort", aProxyPort);
86
87 // Set optional non-proxy hosts
88 if (theNonProxyHosts != null) {
89 System.setProperty("http.nonProxyHosts", theNonProxyHosts);
90 }
91
92 // If user name specified configure proxy authentication
93 if (aUserName != null) {
94 System.setProperty("http.proxyUser", aUserName);
95 System.setProperty("http.proxyPassword", aPassword);
96
97 // See inner class below
98 Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));
99
00 // Optional NT domain
01 if (anNTLMDomain != null) {
02 System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
03 }
04 }
05 }
06
07 /**
08 * Join server, starts session.
09 */
10 public void join() throws PushletException {
11 Event event = new Event(E_JOIN);
12 event.setField(P_FORMAT, FORMAT_XML);
13 Event response = doControl(event);
14 throwOnNack(response);
15
16 // Join Ack received
17 id = response.getField(P_ID);
18 }
19
20 /**
21 * Leave server, stops session.
22 */
23 public void leave() throws PushletException {
24 stopListen();
25 throwOnInvalidSession();
26 Event event = new Event(E_LEAVE);
27 event.setField(P_ID, id);
28 Event response = doControl(event);
29
30 throwOnNack(response);
31 id = null;
32 }
33
34 /**
35 * Open data channel.
36 */
37 public void listen(PushletClientListener aListener) throws PushletException {
38 listen(aListener, MODE_STREAM);
39 }
40
41 /**
42 * Open data channel in stream or push mode.
43 */
44 public void listen(PushletClientListener aListener, String aMode) throws PushletException {
45 listen(aListener, aMode, null);
46 }
47
48 /**
49 * Open data channel in stream or push mode with a subject.
50 */
51 public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
52 throwOnInvalidSession();
53 stopListen();
54
55 String listenURL = pushletURL
56 + "?" + P_EVENT + "=" + E_LISTEN
57 + "&" + P_ID + "=" + id
58 + "&" + P_MODE + "=" + aMode;
59 if (aSubject != null) {
60 listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
61 }
62
63 // Start listener thread (sync call).
64 startDataEventListener(aListener, listenURL);
65 }
66
67 /**
68 * Immediate listener: joins/subscribes and listens in one action.
69 */
70 public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
71 stopListen();
72
73 String listenURL = pushletURL
74 + "?" + P_EVENT + "=" + E_JOIN_LISTEN
75 + "&" + P_FORMAT + "=" + FORMAT_XML
76 + "&" + P_MODE + "=" + aMode
77 + "&" + P_SUBJECT + "=" + aSubject;
78
79 // Start listener thread (sync call).
80 startDataEventListener(aListener, listenURL);
81 }
82
83 /**
84 * Publish an event through server.
85 */
86 public void publish(String aSubject, Map theAttributes) throws PushletException {
87 throwOnInvalidSession();
88 Event event = new Event(E_PUBLISH, theAttributes);
89 event.setField(P_SUBJECT, aSubject);
90 event.setField(P_ID, id);
91 Event response = doControl(event);
92 throwOnNack(response);
93 }
94
95 /**
96 * Subscribes, returning subscription id.
97 */
98 public String subscribe(String aSubject, String aLabel) throws PushletException {
99 throwOnInvalidSession();
00 Event event = new Event(E_SUBSCRIBE);
01 event.setField(P_ID, id);
02 event.setField(P_SUBJECT, aSubject);
03
04 // Optional label, is returned in data events
05 if (aLabel != null) {
06 event.setField(P_SUBSCRIPTION_LABEL, aLabel);
07 }
08
09 // Send request
10 Event response = doControl(event);
11 throwOnNack(response);
12
13 return response.getField(P_SUBSCRIPTION_ID);
14 }
15
16 /**
17 * Subscribes, returning subscription id.
18 */
19 public String subscribe(String aSubject) throws PushletException {
20 return subscribe(aSubject, null);
21 }
22
23 /**
24 * Unsubscribes with subscription id.
25 */
26 public void unsubscribe(String aSubscriptionId) throws PushletException {
27 throwOnInvalidSession();
28 Event event = new Event(E_UNSUBSCRIBE);
29 event.setField(P_ID, id);
30
31 // Optional subscription id
32 if (aSubscriptionId != null) {
33 event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
34 }
35
36 Event response = doControl(event);
37 throwOnNack(response);
38 }
39
40 /**
41 * Unsubscribes from all subjects.
42 */
43 public void unsubscribe() throws PushletException {
44 unsubscribe(null);
45 }
46
47 /**
48 * Stop the listener.
49 */
50 public void stopListen() throws PushletException {
51 if (dataEventListener != null) {
52 unsubscribe();
53 dataEventListener.stop();
54 dataEventListener = null;
55 }
56 }
57
58 public void setDebug(boolean b) {
59 debug = b;
60 }
61
62 /**
63 * Starts DataEventListener and waits for its thread to start.
64 */
65 protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
66 // Suggestion by Jeff Nowakowski 29.oct.2006
67 dataEventListener = new DataEventListener(aListener, aListenURL);
68
69 synchronized (dataEventListener) {
70 dataEventListener.start();
71 try {
72 // Wait for data event listener (thread) to start
73 dataEventListener.wait();
74 } catch (InterruptedException e) {
75 }
76 }
77 }
78
79 private void throwOnNack(Event anEvent) throws PushletException {
80 if (anEvent.getEventType().equals(E_NACK)) {
81 throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
82 }
83 }
84
85 private void throwOnInvalidSession() throws PushletException {
86 if (id == null) {
87 throw new PushletException("Invalid pushlet session");
88 }
89 }
90
91 private Reader openURL(String aURL) throws PushletException {
92 // Open URL connection with server
93 try {
94 p("Connecting to " + aURL);
95 URL url = new URL(aURL);
96 URLConnection urlConnection = url.openConnection();
97
98 // Disable any kind of caching.
99 urlConnection.setUseCaches(false);
00 urlConnection.setDefaultUseCaches(false);
01
02 // TODO: later version may use POST
03 // Enable HTTP POST
04 // urlConnection.setDoOutput(true);
05
06 // Do the POST with Event in XML in body
07 // OutputStream os = urlConnection.getOutputStream();
08 // os.write(anEvent.toXML().getBytes());
09 // os.flush();
10 // os.close();
11
12 // Get the stream from the server.
13 // reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
14 // Note: somehow the client does not work with some JVMs when using
15 // BufferedInputStream... So do unbuffered input.
16 // p("Opening urlConnection inputstream");
17 return new InputStreamReader(urlConnection.getInputStream());
18
19 } catch (Throwable t) {
20 warn("openURL() could not open " + aURL, t);
21 throw new PushletException(" could not open " + aURL, t);
22 }
23 }
24
25
26 private Event doControl(Event aControlEvent) throws PushletException {
27 String controlURL = pushletURL + "?" + aControlEvent.toQueryString();
28
29 p("doControl to " + controlURL);
30
31 // Open URL connection with server
32 Reader reader = openURL(controlURL);
33
34 // Get Pushlet event from stream
35 Event event = null;
36 try {
37 p("Getting event...");
38 // Get next event from server
39 event = EventParser.parse(reader);
40 p("Event received " + event);
41 return event;
42 } catch (Throwable t) {
43 // Stop and report error.
44 warn("doControl() exception", t);
45 throw new PushletException(" error parsing response from" + controlURL, t);
46 }
47 }
48
49 /**
50 * Util: print.
51 */
52 private void p(String s) {
53 if (debug) {
54 System.out.println("[PushletClient] " + s);
55 }
56 }
57
58 /**
59 * Util: warn.
60 */
61 private void warn(String s) {
62 warn(s, null);
63 }
64
65 /**
66 * Util: warn with exception.
67 */
68 private void warn(String s, Throwable t) {
69 System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);
70
71 if (t != null) {
72 t.printStackTrace();
73 }
74 }
75
76 /**
77 * Internal listener for the Pushlet data channel.
78 */
79 private class DataEventListener implements Runnable {
80 /**
81 * Client's listener that gets called back on events.
82 */
83 private PushletClientListener listener;
84
85 /**
86 * Receiver receiveThread.
87 */
88 private Thread receiveThread = null;
89 private Reader reader;
90 private String refreshURL;
91 private String listenURL;
92
93 public DataEventListener(PushletClientListener aListener, String aListenURL) {
94 listener = aListener;
95 listenURL = aListenURL;
96 }
97
98 public void start() {
99 // All ok: start a receiver receiveThread
00 receiveThread = new Thread(this);
01 receiveThread.start();
02
03 }
04
05 /**
06 * Stop listening; may restart later with start().
07 */
08 public void stop() {
09 p("In stop()");
10 bailout();
11 }
12
13 /**
14 * Receive event objects from server and callback listener.
15 */
16 public void run() {
17 p("Start run()");
18 try {
19 while (receiveThread != null && receiveThread.isAlive()) {
20 // Connect to server
21 reader = openURL(listenURL);
22
23 synchronized (this) {
24 // Inform the calling thread we're ready to receive events.
25 // Suggestion by Jeff Nowakowski 29.oct.2006
26 this.notify();
27 }
28
29 // Get events while we're alive.
30 while (receiveThread != null && receiveThread.isAlive()) {
31 Event event = null;
32 try {
33 // p("Getting event...");
34 // Get next event from server
35 event = EventParser.parse(reader);
36 p("Event received " + event);
37 } catch (Throwable t) {
38
39 // Stop and report error.
40 // warn("Stop run() on exception", t);
41 if (listener != null) {
42 listener.onError("exception during receive: " + t);
43 }
44
45 break;
46 }
47
48 // Handle event by calling listener
49 if (event != null && listener != null) {
50 // p("received: " + event.toXML());
51 String eventType = event.getEventType();
52 if (eventType.equals(E_HEARTBEAT)) {
53 listener.onHeartbeat(event);
54 } else if (eventType.equals(E_DATA)) {
55 listener.onData(event);
56 } else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
57 id = event.getField(P_ID);
58 } else if (eventType.equals(E_LISTEN_ACK)) {
59 p("Listen ack ok");
60 } else if (eventType.equals(E_REFRESH_ACK)) {
61 // ignore
62 } else if (eventType.equals(E_ABORT)) {
63 listener.onAbort(event);
64 listener = null;
65 break;
66 } else if (eventType.equals(E_REFRESH)) {
67 refresh(event);
68 } else {
69 warn("unsupported event type received: " + eventType);
70 }
71 }
72 }
73 }
74 } catch (Throwable t) {
75 warn("Exception in run() ", t);
76 // bailout();
77 }
78 }
79
80 private void disconnect() {
81 p("start disconnect()");
82 if (reader != null) {
83 try {
84 // this blocks, find another way
85 // reader.close();
86 p("Closed reader ok");
87 } catch (Exception ignore) {
88 } finally {
89 reader = null;
90 }
91 }
92 p("end disconnect()");
93 }
94
95 /**
96 * Stop receiver receiveThread.
97 */
98 public void stopThread() {
99 p("In stopThread()");
00
01 // Keep a reference such that we can kill it from here.
02 Thread targetThread = receiveThread;
03
04 receiveThread = null;
05
06 // This should stop the main loop for this receiveThread.
07 // Killing a receiveThread on a blcing read is tricky.
08 // See also http://gee.cs.oswego.edu/dl/cpj/cancel.html
09 if ((targetThread != null) && targetThread.isAlive()) {
10
11 targetThread.interrupt();
12
13 try {
14
15 // Wait for it to die
16 targetThread.join(500);
17 } catch (InterruptedException ignore) {
18 }
19
20 // If current receiveThread refuses to die,
21 // take more rigorous methods.
22 if (targetThread.isAlive()) {
23
24 // Not preferred but may be needed
25 // to stop during a blocking read.
26 targetThread.stop();
27
28 // Wait for it to die
29 try {
30 targetThread.join(500);
31 } catch (Throwable ignore) {
32 }
33 }
34
35 p("Stopped receiveThread alive=" + targetThread.isAlive());
36
37 }
38 }
39
40 /**
41 * Stop listening on stream from server.
42 */
43 public void bailout() {
44 p("In bailout()");
45 stopThread();
46 disconnect();
47 }
48
49 /**
50 * Handle refresh, by pausing.
51 */
52 private void refresh(Event aRefreshEvent) throws PushletException {
53 try {
54 // Wait for specified time.
55 Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
56 } catch (Throwable t) {
57 warn("abort while refresing");
58 refreshURL = null;
59 return;
60 }
61
62 // If stopped during sleep, don't proceed
63 if (receiveThread == null) {
64 return;
65 }
66
67 // Create url to refresh
68 refreshURL = pushletURL
69 + "?" + P_ID + "=" + id
70 + "&" + P_EVENT + "=" + E_REFRESH
71 ;
72
73 if (reader != null) {
74 try {
75 reader.close();
76
77 } catch (IOException ignore) {
78
79 }
80 reader = null;
81 }
82
83 reader = openURL(refreshURL);
84 }
85 }
86
87 /**
88 * Authenticator
89 */
90 private static class HTTPAuthenticateProxy extends Authenticator {
91
92 /**
93 * Contributed by Dele Olajide
94 * See http://groups.yahoo.com/group/pushlet/message/634
95 */
96
97 private String thePassword = "";
98 private String theUser = "";
99
00 public HTTPAuthenticateProxy(String username, String password) {
01
02 thePassword = password;
03 theUser = username;
04 }
05
06 protected PasswordAuthentication getPasswordAuthentication() {
07 // System.out.println("[HttpAuthenticateProxy] Username = " + theUser);
08 // System.out.println("[HttpAuthenticateProxy] Password = " + thePassword);
09
10 return new PasswordAuthentication(theUser, thePassword.toCharArray());
11 }
12
13 }
14
15}
16
17/*
18 * $Log: PushletClient.java,v $
19 * Revision 1.18 2007/11/10 13:52:47 justb
20 * make startDataEventListener method protected to allow overriding
21 *
22 * Revision 1.17 2006/10/29 16:47:57 justb
23 * included patch from Jeff Nowakowski: wait until listener thread runs
24 *
25 * Revision 1.16 2005/05/06 20:08:20 justb
26 * client enhancements
27 *
28 * Revision 1.15 2005/03/27 17:42:27 justb
29 * enhancements
30 *
31 * Revision 1.14 2005/03/25 23:54:04 justb
32 * *** empty log message ***
33 *
34 * Revision 1.13 2005/02/28 16:59:40 justb
35 * fixes for leave and disconnect
36 *
37 * Revision 1.12 2005/02/28 15:57:54 justb
38 * added SimpleListener example
39 *
40 * Revision 1.11 2005/02/21 12:31:44 justb
41 * added proxy contribution from Dele Olajide
42 *
43 * Revision 1.10 2005/02/20 13:05:32 justb
44 * removed the Postlet (integrated in Pushlet protocol)
45 *
46 * Revision 1.9 2005/02/18 10:07:23 justb
47 * many renamings of classes (make names compact)
48 *
49 * Revision 1.8 2005/02/18 09:54:12 justb
50 * refactor: rename Publisher Dispatcher and single Subscriber class
51 *
52 * Revision 1.7 2005/02/15 15:46:30 justb
53 * client API improves
54 *
55 * Revision 1.6 2005/02/15 13:28:56 justb
56 * first quick rewrite adapt for v2 protocol
57 *
58 * Revision 1.5 2004/10/25 21:23:44 justb
59 * *** empty log message ***
60 *
61 * Revision 1.4 2004/10/24 13:52:51 justb
62 * small fixes in client lib
63 *
64 * Revision 1.3 2004/10/24 12:58:18 justb
65 * revised client and test classes for new protocol
66 *
67 * Revision 1.2 2004/09/03 22:35:37 justb
68 * Almost complete rewrite, just checking in now
69 *
70 * Revision 1.1 2004/03/10 20:14:17 justb
71 * renamed all *JavaPushletClient* to *PushletClient*
72 *
73 * Revision 1.10 2004/03/10 15:45:55 justb
74 * many cosmetic changes
75 *
76 * Revision 1.9 2003/08/17 20:30:20 justb
77 * cosmetic changes
78 *
79 * Revision 1.8 2003/08/15 08:37:40 justb
80 * fix/add Copyright+LGPL file headers and footers
81 *
82 *
83 */| PushletClient.java |