1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.client;
5 
6 import nl.justobjects.pushlet.core.Event;
7 import nl.justobjects.pushlet.core.EventParser;
8 import nl.justobjects.pushlet.core.Protocol;
9 import nl.justobjects.pushlet.util.PushletException;
10
11import java.io.IOException;
12import java.io.InputStreamReader;
13import java.io.Reader;
14import java.io.OutputStream;
15import java.net.*;
16import java.util.Map;
17
18/**
19 * Client API for Java HTTP client applets or apps.
20 * <p/>
21 * Use this class within Java client applications or applets.
22 * Implement a PushletClientListener to receive callbacks for
23 * data-related Event objects pushed by the server.
24 *
25 * @author Just van den Broecke - Just Objects &copy;
26 * @version $Id: PushletClient.java,v 1.18 2007/11/10 13:52:47 justb Exp $
27 * @see PushletClientListener
28 * @see nl.justobjects.pushlet.test.PushletApplet
29 * @see nl.justobjects.pushlet.test.PushletPingApplication
30 */
31public class PushletClient implements Protocol {
32    /**
33     * Pushlet URL.
34     */
35    private String pushletURL;
36
37    /**
38     * Debug flag for verbose output.
39     */
40    private boolean debug;
41
42    /**
43     * Id gotten on join ack
44     */
45    private String id;
46
47    /**
48     * Internal listener for data events pushed by server.
49     */
50    private DataEventListener dataEventListener;
51
52    /**
53     * Constructor with full pushlet URL.
54     */
55    public PushletClient(String aPushletURL) {
56        pushletURL = aPushletURL;
57    }
58
59    /**
60     * Constructor with host and port using default URI.
61     */
62    public PushletClient(String aHost, int aPort) {
63        this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
64    }
65
66    /**
67     * Set proxy options and optional proxy authentication.
68     * <p/>
69     * Contributed by Dele Olajide
70     * See http://groups.yahoo.com/group/pushlet/message/634
71     * <p/>
72     * Usage:
73     * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet");
74     * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....);
75     * <p/>
76     * use pushletClient further as normal
77     */
78    public void setProxyOptions(String aProxyHost,
79                                String aProxyPort, String theNonProxyHosts,
80                                String aUserName, String aPassword, String anNTLMDomain) {
81
82        // Enable proxying
83        System.setProperty("http.proxySet", "true");
84        System.setProperty("http.proxyHost", aProxyHost);
85        System.setProperty("http.proxyPort", aProxyPort);
86
87        // Set optional non-proxy hosts
88        if (theNonProxyHosts != null) {
89            System.setProperty("http.nonProxyHosts", theNonProxyHosts);
90        }
91
92        // If user name specified configure proxy authentication
93        if (aUserName != null) {
94            System.setProperty("http.proxyUser", aUserName);
95            System.setProperty("http.proxyPassword", aPassword);
96
97            // See inner class below
98            Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));
99
00            // Optional NT domain
01            if (anNTLMDomain != null) {
02                System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
03            }
04        }
05    }
06
07    /**
08     * Join server, starts session.
09     */
10    public void join() throws PushletException {
11        Event event = new Event(E_JOIN);
12        event.setField(P_FORMAT, FORMAT_XML);
13        Event response = doControl(event);
14        throwOnNack(response);
15
16        // Join Ack received
17        id = response.getField(P_ID);
18    }
19
20    /**
21     * Leave server, stops session.
22     */
23    public void leave() throws PushletException {
24        stopListen();
25        throwOnInvalidSession();
26        Event event = new Event(E_LEAVE);
27        event.setField(P_ID, id);
28        Event response = doControl(event);
29
30        throwOnNack(response);
31        id = null;
32    }
33
34    /**
35     * Open data channel.
36     */
37    public void listen(PushletClientListener aListener) throws PushletException {
38        listen(aListener, MODE_STREAM);
39    }
40
41    /**
42     * Open data channel in stream or push mode.
43     */
44    public void listen(PushletClientListener aListener, String aMode) throws PushletException {
45        listen(aListener, aMode, null);
46    }
47
48    /**
49     * Open data channel in stream or push mode with a subject.
50     */
51    public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
52        throwOnInvalidSession();
53        stopListen();
54
55        String listenURL = pushletURL
56                + "?" + P_EVENT + "=" + E_LISTEN
57                + "&" + P_ID + "=" + id
58                + "&" + P_MODE + "=" + aMode;
59        if (aSubject != null) {
60            listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
61        }
62
63        // Start listener thread (sync call).
64        startDataEventListener(aListener, listenURL);
65    }
66
67    /**
68     * Immediate listener: joins/subscribes and listens in one action.
69     */
70    public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
71        stopListen();
72
73        String listenURL = pushletURL
74                + "?" + P_EVENT + "=" + E_JOIN_LISTEN
75                + "&" + P_FORMAT + "=" + FORMAT_XML
76                + "&" + P_MODE + "=" + aMode
77                + "&" + P_SUBJECT + "=" + aSubject;
78
79        // Start listener thread (sync call).
80        startDataEventListener(aListener, listenURL);
81    }
82
83    /**
84     * Publish an event through server.
85     */
86    public void publish(String aSubject, Map theAttributes) throws PushletException {
87        throwOnInvalidSession();
88        Event event = new Event(E_PUBLISH, theAttributes);
89        event.setField(P_SUBJECT, aSubject);
90        event.setField(P_ID, id);
91        Event response = doControl(event);
92        throwOnNack(response);
93    }
94
95    /**
96     * Subscribes, returning subscription id.
97     */
98    public String subscribe(String aSubject, String aLabel) throws PushletException {
99        throwOnInvalidSession();
00        Event event = new Event(E_SUBSCRIBE);
01        event.setField(P_ID, id);
02        event.setField(P_SUBJECT, aSubject);
03
04        // Optional label, is returned in data events
05        if (aLabel != null) {
06            event.setField(P_SUBSCRIPTION_LABEL, aLabel);
07        }
08
09        // Send request
10        Event response = doControl(event);
11        throwOnNack(response);
12
13        return response.getField(P_SUBSCRIPTION_ID);
14    }
15
16    /**
17     * Subscribes, returning subscription id.
18     */
19    public String subscribe(String aSubject) throws PushletException {
20        return subscribe(aSubject, null);
21    }
22
23    /**
24     * Unsubscribes with subscription id.
25     */
26    public void unsubscribe(String aSubscriptionId) throws PushletException {
27        throwOnInvalidSession();
28        Event event = new Event(E_UNSUBSCRIBE);
29        event.setField(P_ID, id);
30
31        // Optional subscription id
32        if (aSubscriptionId != null) {
33            event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
34        }
35
36        Event response = doControl(event);
37        throwOnNack(response);
38    }
39
40    /**
41     * Unsubscribes from all subjects.
42     */
43    public void unsubscribe() throws PushletException {
44        unsubscribe(null);
45    }
46
47    /**
48     * Stop the listener.
49     */
50    public void stopListen() throws PushletException {
51        if (dataEventListener != null) {
52            unsubscribe();
53            dataEventListener.stop();
54            dataEventListener = null;
55        }
56    }
57
58    public void setDebug(boolean b) {
59        debug = b;
60    }
61
62    /**
63     * Starts DataEventListener and waits for its thread to start.
64     */
65    protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
66        // Suggestion by Jeff Nowakowski 29.oct.2006
67        dataEventListener = new DataEventListener(aListener, aListenURL);
68
69        synchronized (dataEventListener) {
70            dataEventListener.start();
71            try {
72                // Wait for data event listener (thread) to start
73                dataEventListener.wait();
74            } catch (InterruptedException e) {
75            }
76        }
77    }
78
79    private void throwOnNack(Event anEvent) throws PushletException {
80        if (anEvent.getEventType().equals(E_NACK)) {
81            throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
82        }
83    }
84
85    private void throwOnInvalidSession() throws PushletException {
86        if (id == null) {
87            throw new PushletException("Invalid pushlet session");
88        }
89    }
90
91    private Reader openURL(String aURL) throws PushletException {
92        // Open URL connection with server
93        try {
94            p("Connecting to " + aURL);
95            URL url = new URL(aURL);
96            URLConnection urlConnection = url.openConnection();
97
98            // Disable any kind of caching.
99            urlConnection.setUseCaches(false);
00            urlConnection.setDefaultUseCaches(false);
01
02            // TODO: later version may use POST
03            // Enable HTTP POST
04            // urlConnection.setDoOutput(true);
05
06            // Do the POST with Event in XML in body
07            // OutputStream os = urlConnection.getOutputStream();
08            // os.write(anEvent.toXML().getBytes());
09            // os.flush();
10            // os.close();
11
12            // Get the stream from the server.
13            // reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
14            // Note: somehow the client does not work with some JVMs when using
15            // BufferedInputStream... So do unbuffered input.
16            // p("Opening urlConnection inputstream");
17            return new InputStreamReader(urlConnection.getInputStream());
18
19        } catch (Throwable t) {
20            warn("openURL() could not open " + aURL, t);
21            throw new PushletException(" could not open " + aURL, t);
22        }
23    }
24
25
26    private Event doControl(Event aControlEvent) throws PushletException {
27        String controlURL = pushletURL + "?" + aControlEvent.toQueryString();
28
29        p("doControl to " + controlURL);
30
31        // Open URL connection with server
32        Reader reader = openURL(controlURL);
33
34        // Get Pushlet event from stream
35        Event event = null;
36        try {
37            p("Getting event...");
38            // Get next event from server
39            event = EventParser.parse(reader);
40            p("Event received " + event);
41            return event;
42        } catch (Throwable t) {
43            // Stop and report error.
44            warn("doControl() exception", t);
45            throw new PushletException(" error parsing response from" + controlURL, t);
46        }
47    }
48
49    /**
50     * Util: print.
51     */
52    private void p(String s) {
53        if (debug) {
54            System.out.println("[PushletClient] " + s);
55        }
56    }
57
58    /**
59     * Util: warn.
60     */
61    private void warn(String s) {
62        warn(s, null);
63    }
64
65    /**
66     * Util: warn with exception.
67     */
68    private void warn(String s, Throwable t) {
69        System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);
70
71        if (t != null) {
72            t.printStackTrace();
73        }
74    }
75
76    /**
77     * Internal listener for the Pushlet data channel.
78     */
79    private class DataEventListener implements Runnable {
80        /**
81         * Client's listener that gets called back on events.
82         */
83        private PushletClientListener listener;
84
85        /**
86         * Receiver receiveThread.
87         */
88        private Thread receiveThread = null;
89        private Reader reader;
90        private String refreshURL;
91        private String listenURL;
92
93        public DataEventListener(PushletClientListener aListener, String aListenURL) {
94            listener = aListener;
95            listenURL = aListenURL;
96        }
97
98        public void start() {
99            // All ok: start a receiver receiveThread
00            receiveThread = new Thread(this);
01            receiveThread.start();
02
03        }
04
05        /**
06         * Stop listening; may restart later with start().
07         */
08        public void stop() {
09            p("In stop()");
10            bailout();
11        }
12
13        /**
14         * Receive event objects from server and callback listener.
15         */
16        public void run() {
17            p("Start run()");
18            try {
19                while (receiveThread != null && receiveThread.isAlive()) {
20                    // Connect to server
21                    reader = openURL(listenURL);
22
23                    synchronized (this) {
24                        // Inform the calling thread we're ready to receive events.
25                        // Suggestion by Jeff Nowakowski 29.oct.2006
26                        this.notify();
27                    }
28
29                    // Get events while we're alive.
30                    while (receiveThread != null && receiveThread.isAlive()) {
31                        Event event = null;
32                        try {
33                            // p("Getting event...");
34                            // Get next event from server
35                            event = EventParser.parse(reader);
36                            p("Event received " + event);
37                        } catch (Throwable t) {
38
39                            // Stop and report error.
40                            // warn("Stop run() on exception", t);
41                            if (listener != null) {
42                                listener.onError("exception during receive: " + t);
43                            }
44
45                            break;
46                        }
47
48                        // Handle event by calling listener
49                        if (event != null && listener != null) {
50                            // p("received: " + event.toXML());
51                            String eventType = event.getEventType();
52                            if (eventType.equals(E_HEARTBEAT)) {
53                                listener.onHeartbeat(event);
54                            } else if (eventType.equals(E_DATA)) {
55                                listener.onData(event);
56                            } else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
57                                id = event.getField(P_ID);
58                            } else if (eventType.equals(E_LISTEN_ACK)) {
59                                p("Listen ack ok");
60                            } else if (eventType.equals(E_REFRESH_ACK)) {
61                                // ignore
62                            } else if (eventType.equals(E_ABORT)) {
63                                listener.onAbort(event);
64                                listener = null;
65                                break;
66                            } else if (eventType.equals(E_REFRESH)) {
67                                refresh(event);
68                            } else {
69                                warn("unsupported event type received: " + eventType);
70                            }
71                        }
72                    }
73                }
74            } catch (Throwable t) {
75                warn("Exception in run() ", t);
76                // bailout();
77            }
78        }
79
80        private void disconnect() {
81            p("start disconnect()");
82            if (reader != null) {
83                try {
84                    // this blocks, find another way
85                    // reader.close();
86                    p("Closed reader ok");
87                } catch (Exception ignore) {
88                } finally {
89                    reader = null;
90                }
91            }
92            p("end disconnect()");
93        }
94
95        /**
96         * Stop receiver receiveThread.
97         */
98        public void stopThread() {
99            p("In stopThread()");
00
01            // Keep a reference such that we can kill it from here.
02            Thread targetThread = receiveThread;
03
04            receiveThread = null;
05
06            // This should stop the main loop for this receiveThread.
07            // Killing a receiveThread on a blcing read is tricky.
08            // See also http://gee.cs.oswego.edu/dl/cpj/cancel.html
09            if ((targetThread != null) && targetThread.isAlive()) {
10
11                targetThread.interrupt();
12
13                try {
14
15                    // Wait for it to die
16                    targetThread.join(500);
17                } catch (InterruptedException ignore) {
18                }
19
20                // If current receiveThread refuses to die,
21                // take more rigorous methods.
22                if (targetThread.isAlive()) {
23
24                    // Not preferred but may be needed
25                    // to stop during a blocking read.
26                    targetThread.stop();
27
28                    // Wait for it to die
29                    try {
30                        targetThread.join(500);
31                    } catch (Throwable ignore) {
32                    }
33                }
34
35                p("Stopped receiveThread alive=" + targetThread.isAlive());
36
37            }
38        }
39
40        /**
41         * Stop listening on stream from server.
42         */
43        public void bailout() {
44            p("In bailout()");
45            stopThread();
46            disconnect();
47        }
48
49        /**
50         * Handle refresh, by pausing.
51         */
52        private void refresh(Event aRefreshEvent) throws PushletException {
53            try {
54                // Wait for specified time.
55                Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
56            } catch (Throwable t) {
57                warn("abort while refresing");
58                refreshURL = null;
59                return;
60            }
61
62            // If stopped during sleep, don't proceed
63            if (receiveThread == null) {
64                return;
65            }
66
67            // Create url to refresh
68            refreshURL = pushletURL
69                    + "?" + P_ID + "=" + id
70                    + "&" + P_EVENT + "=" + E_REFRESH
71                    ;
72
73            if (reader != null) {
74                try {
75                    reader.close();
76
77                } catch (IOException ignore) {
78
79                }
80                reader = null;
81            }
82
83            reader = openURL(refreshURL);
84        }
85    }
86
87    /**
88     * Authenticator
89     */
90    private static class HTTPAuthenticateProxy extends Authenticator {
91
92        /**
93         * Contributed by Dele Olajide
94         * See http://groups.yahoo.com/group/pushlet/message/634
95         */
96
97        private String thePassword = "";
98        private String theUser = "";
99
00        public HTTPAuthenticateProxy(String username, String password) {
01
02            thePassword = password;
03            theUser = username;
04        }
05
06        protected PasswordAuthentication getPasswordAuthentication() {
07            // System.out.println("[HttpAuthenticateProxy] Username = " + theUser);
08            // System.out.println("[HttpAuthenticateProxy] Password = " + thePassword);
09
10            return new PasswordAuthentication(theUser, thePassword.toCharArray());
11        }
12
13    }
14
15}
16
17/*
18 * $Log: PushletClient.java,v $
19 * Revision 1.18  2007/11/10 13:52:47  justb
20 * make startDataEventListener method protected to allow overriding
21 *
22 * Revision 1.17  2006/10/29 16:47:57  justb
23 * included patch from Jeff Nowakowski: wait until listener thread runs
24 *
25 * Revision 1.16  2005/05/06 20:08:20  justb
26 * client enhancements
27 *
28 * Revision 1.15  2005/03/27 17:42:27  justb
29 * enhancements
30 *
31 * Revision 1.14  2005/03/25 23:54:04  justb
32 * *** empty log message ***
33 *
34 * Revision 1.13  2005/02/28 16:59:40  justb
35 * fixes for leave and disconnect
36 *
37 * Revision 1.12  2005/02/28 15:57:54  justb
38 * added SimpleListener example
39 *
40 * Revision 1.11  2005/02/21 12:31:44  justb
41 * added proxy contribution from Dele Olajide
42 *
43 * Revision 1.10  2005/02/20 13:05:32  justb
44 * removed the Postlet (integrated in Pushlet protocol)
45 *
46 * Revision 1.9  2005/02/18 10:07:23  justb
47 * many renamings of classes (make names compact)
48 *
49 * Revision 1.8  2005/02/18 09:54:12  justb
50 * refactor: rename Publisher Dispatcher and single Subscriber class
51 *
52 * Revision 1.7  2005/02/15 15:46:30  justb
53 * client API improves
54 *
55 * Revision 1.6  2005/02/15 13:28:56  justb
56 * first quick rewrite adapt for v2 protocol
57 *
58 * Revision 1.5  2004/10/25 21:23:44  justb
59 * *** empty log message ***
60 *
61 * Revision 1.4  2004/10/24 13:52:51  justb
62 * small fixes in client lib
63 *
64 * Revision 1.3  2004/10/24 12:58:18  justb
65 * revised client and test classes for new protocol
66 *
67 * Revision 1.2  2004/09/03 22:35:37  justb
68 * Almost complete rewrite, just checking in now
69 *
70 * Revision 1.1  2004/03/10 20:14:17  justb
71 * renamed all *JavaPushletClient* to *PushletClient*
72 *
73 * Revision 1.10  2004/03/10 15:45:55  justb
74 * many cosmetic changes
75 *
76 * Revision 1.9  2003/08/17 20:30:20  justb
77 * cosmetic changes
78 *
79 * Revision 1.8  2003/08/15 08:37:40  justb
80 * fix/add Copyright+LGPL file headers and footers
81 *
82 *
83 */