1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3 
4 package nl.justobjects.pushlet.client;
5 
6 import nl.justobjects.pushlet.core.Event;
7 import nl.justobjects.pushlet.core.EventParser;
8 import nl.justobjects.pushlet.core.Protocol;
9 import nl.justobjects.pushlet.util.PushletException;
10
11import java.io.IOException;
12import java.io.InputStreamReader;
13import java.io.Reader;
14import java.io.OutputStream;
15import java.net.*;
16import java.util.Map;
17
18/**
19 * Client API for Java HTTP client applets or apps.
20 * <p/>
21 * Use this class within Java client applications or applets.
22 * Implement a PushletClientListener to receive callbacks for
23 * data-related Event objects pushed by the server.
24 * <p/>
25 * This class may also be used as a base class and be extended
26 * for custom clients, hence the presence of many proteced methods.
27 *
28 * @author Just van den Broecke - Just Objects &copy;
29 * @version $Id: PushletClient.java,v 1.19 2009/06/04 12:46:35 justb Exp $
30 * @see PushletClientListener
31 * @see nl.justobjects.pushlet.test.PushletApplet
32 * @see nl.justobjects.pushlet.test.PushletPingApplication
33 */
34public class PushletClient implements Protocol {
35    /**
36     * Pushlet URL.
37     */
38    private String pushletURL;
39
40    /**
41     * Debug flag for verbose output.
42     */
43    private boolean debug;
44
45    /**
46     * Id gotten on join ack
47     */
48    private String id;
49
50    /**
51     * Internal listener for data events pushed by server.
52     */
53    protected DataEventListener dataEventListener;
54
55    /**
56     * Constructor with full pushlet URL.
57     */
58    public PushletClient(String aPushletURL) {
59        pushletURL = aPushletURL;
60    }
61
62    /**
63     * Constructor with host and port using default URI.
64     */
65    public PushletClient(String aHost, int aPort) {
66        this("http://" + aHost + ":" + aPort + DEFAULT_SERVLET_URI);
67    }
68
69    /**
70     * Set proxy options and optional proxy authentication.
71     * <p/>
72     * Contributed by Dele Olajide
73     * See http://groups.yahoo.com/group/pushlet/message/634
74     * <p/>
75     * Usage:
76     * PushletClient pushletClient = new PushletClient("http:://www.domain.com/pushlet");
77     * pushletClient.setProxyOptions("proxy.bla.com", "8080", ....);
78     * <p/>
79     * use pushletClient further as normal
80     */
81    public void setProxyOptions(String aProxyHost,
82                                String aProxyPort, String theNonProxyHosts,
83                                String aUserName, String aPassword, String anNTLMDomain) {
84
85        // Enable proxying
86        System.setProperty("http.proxySet", "true");
87        System.setProperty("http.proxyHost", aProxyHost);
88        System.setProperty("http.proxyPort", aProxyPort);
89
90        // Set optional non-proxy hosts
91        if (theNonProxyHosts != null) {
92            System.setProperty("http.nonProxyHosts", theNonProxyHosts);
93        }
94
95        // If user name specified configure proxy authentication
96        if (aUserName != null) {
97            System.setProperty("http.proxyUser", aUserName);
98            System.setProperty("http.proxyPassword", aPassword);
99
00            // See inner class below
01            Authenticator.setDefault(new HTTPAuthenticateProxy(aUserName, aPassword));
02
03            // Optional NT domain
04            if (anNTLMDomain != null) {
05                System.setProperty("http.auth.ntlm.domain", anNTLMDomain);
06            }
07        }
08    }
09
10    /**
11     * Join server, starts session.
12     */
13    public void join() throws PushletException {
14        Event event = new Event(E_JOIN);
15        event.setField(P_FORMAT, FORMAT_XML);
16        Event response = doControl(event);
17        throwOnNack(response);
18
19        // Join Ack received
20        id = response.getField(P_ID);
21    }
22
23    /**
24     * Leave server, stops session.
25     */
26    public void leave() throws PushletException {
27        stopListen();
28        throwOnInvalidSession();
29        Event event = new Event(E_LEAVE);
30        event.setField(P_ID, id);
31        Event response = doControl(event);
32
33        throwOnNack(response);
34        id = null;
35    }
36
37    /**
38     * Open data channel.
39     */
40    public void listen(PushletClientListener aListener) throws PushletException {
41        listen(aListener, MODE_STREAM);
42    }
43
44    /**
45     * Open data channel in stream or push mode.
46     */
47    public void listen(PushletClientListener aListener, String aMode) throws PushletException {
48        listen(aListener, aMode, null);
49    }
50
51    /**
52     * Open data channel in stream or push mode with a subject.
53     */
54    public void listen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
55        throwOnInvalidSession();
56        stopListen();
57
58        String listenURL = pushletURL
59                + "?" + P_EVENT + "=" + E_LISTEN
60                + "&" + P_ID + "=" + id
61                + "&" + P_MODE + "=" + aMode;
62        if (aSubject != null) {
63            listenURL = listenURL + "&" + P_SUBJECT + "=" + aSubject;
64        }
65
66        // Start listener thread (sync call).
67        startDataEventListener(aListener, listenURL);
68    }
69
70    /**
71     * Immediate listener: joins/subscribes and listens in one action.
72     */
73    public void joinListen(PushletClientListener aListener, String aMode, String aSubject) throws PushletException {
74        stopListen();
75
76        String listenURL = pushletURL
77                + "?" + P_EVENT + "=" + E_JOIN_LISTEN
78                + "&" + P_FORMAT + "=" + FORMAT_XML
79                + "&" + P_MODE + "=" + aMode
80                + "&" + P_SUBJECT + "=" + aSubject;
81
82        // Start listener thread (sync call).
83        startDataEventListener(aListener, listenURL);
84    }
85
86    /**
87     * Publish an event through server.
88     */
89    public void publish(String aSubject, Map theAttributes) throws PushletException {
90        throwOnInvalidSession();
91        Event event = new Event(E_PUBLISH, theAttributes);
92        event.setField(P_SUBJECT, aSubject);
93        event.setField(P_ID, id);
94        Event response = doControl(event);
95        throwOnNack(response);
96    }
97
98    /**
99     * Subscribes, returning subscription id.
00     */
01    public String subscribe(String aSubject, String aLabel) throws PushletException {
02        throwOnInvalidSession();
03        Event event = new Event(E_SUBSCRIBE);
04        event.setField(P_ID, id);
05        event.setField(P_SUBJECT, aSubject);
06
07        // Optional label, is returned in data events
08        if (aLabel != null) {
09            event.setField(P_SUBSCRIPTION_LABEL, aLabel);
10        }
11
12        // Send request
13        Event response = doControl(event);
14        throwOnNack(response);
15
16        return response.getField(P_SUBSCRIPTION_ID);
17    }
18
19    /**
20     * Subscribes, returning subscription id.
21     */
22    public String subscribe(String aSubject) throws PushletException {
23        return subscribe(aSubject, null);
24    }
25
26    /**
27     * Unsubscribes with subscription id.
28     */
29    public void unsubscribe(String aSubscriptionId) throws PushletException {
30        throwOnInvalidSession();
31        Event event = new Event(E_UNSUBSCRIBE);
32        event.setField(P_ID, id);
33
34        // Optional subscription id
35        if (aSubscriptionId != null) {
36            event.setField(P_SUBSCRIPTION_ID, aSubscriptionId);
37        }
38
39        Event response = doControl(event);
40        throwOnNack(response);
41    }
42
43    /**
44     * Unsubscribes from all subjects.
45     */
46    public void unsubscribe() throws PushletException {
47        unsubscribe(null);
48    }
49
50    /**
51     * Stop the listener.
52     */
53    public void stopListen() throws PushletException {
54        if (dataEventListener != null) {
55            unsubscribe();
56            dataEventListener.stop();
57            dataEventListener = null;
58        }
59    }
60
61    public void setDebug(boolean b) {
62        debug = b;
63    }
64
65    /**
66     * Starts default DataEventListener and waits for its thread to start.
67     */
68    protected void startDataEventListener(PushletClientListener aListener, String aListenURL) {
69        // Suggestion by Jeff Nowakowski 29.oct.2006
70        dataEventListener = new DataEventListener(aListener, aListenURL);
71
72        synchronized (dataEventListener) {
73            dataEventListener.start();
74            try {
75                // Wait for data event listener (thread) to start
76                dataEventListener.wait();
77            } catch (InterruptedException e) {
78            }
79        }
80    }
81
82    protected void throwOnNack(Event anEvent) throws PushletException {
83        if (anEvent.getEventType().equals(E_NACK)) {
84            throw new PushletException("Negative response: reason=" + anEvent.getField(P_REASON));
85        }
86    }
87
88    protected void throwOnInvalidSession() throws PushletException {
89        if (id == null) {
90            throw new PushletException("Invalid pushlet session");
91        }
92    }
93
94    protected Reader openURL(String aURL) throws PushletException {
95        // Open URL connection with server
96        try {
97            p("Connecting to " + aURL);
98            URL url = new URL(aURL);
99            URLConnection urlConnection = url.openConnection();
00
01            // Disable any kind of caching.
02            urlConnection.setUseCaches(false);
03            urlConnection.setDefaultUseCaches(false);
04
05            // TODO: later version may use POST
06            // Enable HTTP POST
07            // urlConnection.setDoOutput(true);
08
09            // Do the POST with Event in XML in body
10            // OutputStream os = urlConnection.getOutputStream();
11            // os.write(anEvent.toXML().getBytes());
12            // os.flush();
13            // os.close();
14
15            // Get the stream from the server.
16            // reader = new BufferedReader(new InputStreamReader(urlConnection.getInputStream()));
17            // Note: somehow the client does not work with some JVMs when using
18            // BufferedInputStream... So do unbuffered input.
19            // p("Opening urlConnection inputstream");
20            return new InputStreamReader(urlConnection.getInputStream());
21
22        } catch (Throwable t) {
23            warn("openURL() could not open " + aURL, t);
24            throw new PushletException(" could not open " + aURL, t);
25        }
26    }
27
28
29    /**
30     * Send control events to server and return response.
31     */
32    protected Event doControl(Event aControlEvent) throws PushletException {
33        String controlURL = pushletURL + "?" + aControlEvent.toQueryString();
34
35        p("doControl to " + controlURL);
36
37        // Open URL connection with server
38        Reader reader = openURL(controlURL);
39
40        // Get Pushlet event from stream
41        Event event = null;
42        try {
43            p("Getting event...");
44            // Get next event from server
45            event = EventParser.parse(reader);
46            p("Event received " + event);
47            return event;
48        } catch (Throwable t) {
49            // Stop and report error.
50            warn("doControl() exception", t);
51            throw new PushletException(" error parsing response from" + controlURL, t);
52        }
53    }
54
55    /**
56     * Util: print.
57     */
58    protected void p(String s) {
59        if (debug) {
60            System.out.println("[PushletClient] " + s);
61        }
62    }
63
64    /**
65     * Util: warn.
66     */
67    protected void warn(String s) {
68        warn(s, null);
69    }
70
71    /**
72     * Util: warn with exception.
73     */
74    protected void warn(String s, Throwable t) {
75        System.err.println("[PushletClient] - WARN - " + s + " ex=" + t);
76
77        if (t != null) {
78            t.printStackTrace();
79        }
80    }
81
82    /**
83     * Internal (default) listener for the Pushlet data channel.
84     */
85    protected class DataEventListener implements Runnable {
86        /**
87         * Client's listener that gets called back on events.
88         */
89        private PushletClientListener listener;
90
91        /**
92         * Receiver receiveThread.
93         */
94        private Thread receiveThread = null;
95        private Reader reader;
96        private String refreshURL;
97        private String listenURL;
98
99        public DataEventListener(PushletClientListener aListener, String aListenURL) {
00            listener = aListener;
01            listenURL = aListenURL;
02        }
03
04        public void start() {
05            // All ok: start a receiver receiveThread
06            receiveThread = new Thread(this);
07            receiveThread.start();
08
09        }
10
11        /**
12         * Stop listening; may restart later with start().
13         */
14        public void stop() {
15            p("In stop()");
16            bailout();
17        }
18
19        /**
20         * Receive event objects from server and callback listener.
21         */
22        public void run() {
23            p("Start run()");
24            try {
25                while (receiveThread != null && receiveThread.isAlive()) {
26                    // Connect to server
27                    reader = openURL(listenURL);
28
29                    synchronized (this) {
30                        // Inform the calling thread we're ready to receive events.
31                        // Suggestion by Jeff Nowakowski 29.oct.2006
32                        this.notify();
33                    }
34
35                    // Get events while we're alive.
36                    while (receiveThread != null && receiveThread.isAlive()) {
37                        Event event = null;
38                        try {
39                            // p("Getting event...");
40                            // Get next event from server
41                            event = EventParser.parse(reader);
42                            p("Event received " + event);
43                        } catch (Throwable t) {
44
45                            // Stop and report error.
46                            // warn("Stop run() on exception", t);
47                            if (listener != null) {
48                                listener.onError("exception during receive: " + t);
49                            }
50
51                            break;
52                        }
53
54                        // Handle event by calling listener
55                        if (event != null && listener != null) {
56                            // p("received: " + event.toXML());
57                            String eventType = event.getEventType();
58                            if (eventType.equals(E_HEARTBEAT)) {
59                                listener.onHeartbeat(event);
60                            } else if (eventType.equals(E_DATA)) {
61                                listener.onData(event);
62                            } else if (eventType.equals(E_JOIN_LISTEN_ACK)) {
63                                id = event.getField(P_ID);
64                            } else if (eventType.equals(E_LISTEN_ACK)) {
65                                p("Listen ack ok");
66                            } else if (eventType.equals(E_REFRESH_ACK)) {
67                                // ignore
68                            } else if (eventType.equals(E_ABORT)) {
69                                listener.onAbort(event);
70                                listener = null;
71                                break;
72                            } else if (eventType.equals(E_REFRESH)) {
73                                refresh(event);
74                            } else {
75                                handleUnknownEventType(eventType, event, listener);
76                            }
77                        }
78                    }
79                }
80            } catch (Throwable t) {
81                warn("Exception in run() ", t);
82                // bailout();
83            }
84        }
85
86        protected void disconnect() {
87            p("start disconnect()");
88            if (reader != null) {
89                try {
90                    // this blocks, find another way
91                    // reader.close();
92                    p("Closed reader ok");
93                } catch (Exception ignore) {
94                } finally {
95                    reader = null;
96                }
97            }
98            p("end disconnect()");
99        }
00
01        /**
02         * Stop receiver receiveThread.
03         */
04        public void stopThread() {
05            p("In stopThread()");
06
07            // Keep a reference such that we can kill it from here.
08            Thread targetThread = receiveThread;
09
10            receiveThread = null;
11
12            // This should stop the main loop for this receiveThread.
13            // Killing a receiveThread on a blcing read is tricky.
14            // See also http://gee.cs.oswego.edu/dl/cpj/cancel.html
15            if ((targetThread != null) && targetThread.isAlive()) {
16
17                targetThread.interrupt();
18
19                try {
20
21                    // Wait for it to die
22                    targetThread.join(500);
23                } catch (InterruptedException ignore) {
24                }
25
26                // If current receiveThread refuses to die,
27                // take more rigorous methods.
28                if (targetThread.isAlive()) {
29
30                    // Not preferred but may be needed
31                    // to stop during a blocking read.
32                    targetThread.stop();
33
34                    // Wait for it to die
35                    try {
36                        targetThread.join(500);
37                    } catch (Throwable ignore) {
38                    }
39                }
40
41                p("Stopped receiveThread alive=" + targetThread.isAlive());
42
43            }
44        }
45
46        /**
47         * Stop listening on stream from server.
48         */
49        public void bailout() {
50            p("In bailout()");
51            stopThread();
52            disconnect();
53        }
54
55        /**
56         * Handle refresh, by pausing.
57         */
58        protected void refresh(Event aRefreshEvent) throws PushletException {
59            try {
60                // Wait for specified time.
61                Thread.sleep(Long.parseLong(aRefreshEvent.getField(P_WAIT)));
62            } catch (Throwable t) {
63                warn("abort while refresing");
64                refreshURL = null;
65                return;
66            }
67
68            // If stopped during sleep, don't proceed
69            if (receiveThread == null) {
70                return;
71            }
72
73            // Create url to refresh
74            refreshURL = pushletURL
75                    + "?" + P_ID + "=" + id
76                    + "&" + P_EVENT + "=" + E_REFRESH
77                    ;
78
79            if (reader != null) {
80                try {
81                    reader.close();
82
83                } catch (IOException ignore) {
84
85                }
86                reader = null;
87            }
88
89            reader = openURL(refreshURL);
90        }
91
92        /**
93         * Handle unknown Event (default behaviour).
94         */
95        protected void handleUnknownEventType(String eventType, Event event, PushletClientListener listener) {
96            warn("unsupported event type received: " + eventType);
97        }
98    }
99
00    /**
01     * Authenticator
02     */
03    private static class HTTPAuthenticateProxy extends Authenticator {
04
05        /**
06         * Contributed by Dele Olajide
07         * See http://groups.yahoo.com/group/pushlet/message/634
08         */
09
10        private String thePassword = "";
11        private String theUser = "";
12
13        public HTTPAuthenticateProxy(String username, String password) {
14
15            thePassword = password;
16            theUser = username;
17        }
18
19        protected PasswordAuthentication getPasswordAuthentication() {
20            // System.out.println("[HttpAuthenticateProxy] Username = " + theUser);
21            // System.out.println("[HttpAuthenticateProxy] Password = " + thePassword);
22
23            return new PasswordAuthentication(theUser, thePassword.toCharArray());
24        }
25
26    }
27
28}
29
30/*
31 * $Log: PushletClient.java,v $
32 * Revision 1.19  2009/06/04 12:46:35  justb
33 * PushletClient: add more hooks for extension (feat ID: 2799694 Craig M)
34 *
35 * Revision 1.18  2007/11/10 13:52:47  justb
36 * make startDataEventListener method protected to allow overriding
37 *
38 * Revision 1.17  2006/10/29 16:47:57  justb
39 * included patch from Jeff Nowakowski: wait until listener thread runs
40 *
41 * Revision 1.16  2005/05/06 20:08:20  justb
42 * client enhancements
43 *
44 * Revision 1.15  2005/03/27 17:42:27  justb
45 * enhancements
46 *
47 * Revision 1.14  2005/03/25 23:54:04  justb
48 * *** empty log message ***
49 *
50 * Revision 1.13  2005/02/28 16:59:40  justb
51 * fixes for leave and disconnect
52 *
53 * Revision 1.12  2005/02/28 15:57:54  justb
54 * added SimpleListener example
55 *
56 * Revision 1.11  2005/02/21 12:31:44  justb
57 * added proxy contribution from Dele Olajide
58 *
59 * Revision 1.10  2005/02/20 13:05:32  justb
60 * removed the Postlet (integrated in Pushlet protocol)
61 *
62 * Revision 1.9  2005/02/18 10:07:23  justb
63 * many renamings of classes (make names compact)
64 *
65 * Revision 1.8  2005/02/18 09:54:12  justb
66 * refactor: rename Publisher Dispatcher and single Subscriber class
67 *
68 * Revision 1.7  2005/02/15 15:46:30  justb
69 * client API improves
70 *
71 * Revision 1.6  2005/02/15 13:28:56  justb
72 * first quick rewrite adapt for v2 protocol
73 *
74 * Revision 1.5  2004/10/25 21:23:44  justb
75 * *** empty log message ***
76 *
77 * Revision 1.4  2004/10/24 13:52:51  justb
78 * small fixes in client lib
79 *
80 * Revision 1.3  2004/10/24 12:58:18  justb
81 * revised client and test classes for new protocol
82 *
83 * Revision 1.2  2004/09/03 22:35:37  justb
84 * Almost complete rewrite, just checking in now
85 *
86 * Revision 1.1  2004/03/10 20:14:17  justb
87 * renamed all *JavaPushletClient* to *PushletClient*
88 *
89 * Revision 1.10  2004/03/10 15:45:55  justb
90 * many cosmetic changes
91 *
92 * Revision 1.9  2003/08/17 20:30:20  justb
93 * cosmetic changes
94 *
95 * Revision 1.8  2003/08/15 08:37:40  justb
96 * fix/add Copyright+LGPL file headers and footers
97 *
98 *
99 */