| Subscriber.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.core;
5
6 import nl.justobjects.pushlet.util.PushletException;
7 import nl.justobjects.pushlet.util.Rand;
8 import nl.justobjects.pushlet.util.Sys;
9
10import java.util.Collections;
11import java.util.HashMap;
12import java.util.Map;
13import java.net.URLEncoder;
14
15/**
16 * Handles data channel between dispatcher and client.
17 *
18 * @author Just van den Broecke - Just Objects ©
19 * @version $Id: Subscriber.java,v 1.26 2007/11/23 14:33:07 justb Exp $
20 */
21public class Subscriber implements Protocol, ConfigDefs {
22 private Session session;
23
24 /**
25 * Blocking queue.
26 */
27 private EventQueue eventQueue = new EventQueue(Config.getIntProperty(QUEUE_SIZE));
28
29 /**
30 * URL to be used in refresh requests in pull/poll modes.
31 */
32 private long queueReadTimeoutMillis = Config.getLongProperty(QUEUE_READ_TIMEOUT_MILLIS);
33 private long queueWriteTimeoutMillis = Config.getLongProperty(QUEUE_WRITE_TIMEOUT_MILLIS);
34 private long refreshTimeoutMillis = Config.getLongProperty(PULL_REFRESH_TIMEOUT_MILLIS);
35 volatile long lastAlive = Sys.now();
36
37 /**
38 * Map of active subscriptions, keyed by their subscription id.
39 */
40 private Map subscriptions = Collections.synchronizedMap(new HashMap(3));
41
42 /**
43 * Are we able to accept/send events ?.
44 */
45 private volatile boolean active;
46
47 /**
48 * Transfer mode (stream, pull, poll).
49 */
50 private String mode;
51
52
53 /**
54 * Protected constructor as we create through factory method.
55 */
56 protected Subscriber() {
57 }
58
59 /**
60 * Create instance through factory method.
61 *
62 * @param aSession the parent Session
63 * @return a Subscriber object (or derived)
64 * @throws PushletException exception, usually misconfiguration
65 */
66 public static Subscriber create(Session aSession) throws PushletException {
67 Subscriber subscriber;
68 try {
69 subscriber = (Subscriber) Config.getClass(SUBSCRIBER_CLASS, "nl.justobjects.pushlet.core.Subscriber").newInstance();
70 } catch (Throwable t) {
71 throw new PushletException("Cannot instantiate Subscriber from config", t);
72 }
73
74 subscriber.session = aSession;
75 return subscriber;
76 }
77
78 public void start() {
79 active = true;
80 }
81
82 public void stop() {
83 removeSubscriptions();
84 active = false;
85 }
86
87 public void bailout() {
88 session.stop();
89 }
90
91 /**
92 * Are we still active to handle events.
93 */
94 public boolean isActive() {
95 return active;
96 }
97
98 /**
99 * Return client session.
00 */
01 public Session getSession() {
02 return session;
03 }
04
05 /**
06 * Get (session) id.
07 */
08 public String getId() {
09 return session.getId();
10 }
11
12 /**
13 * Return subscriptions.
14 */
15 public Subscription[] getSubscriptions() {
16 // todo: Optimize
17 return (Subscription[]) subscriptions.values().toArray(new Subscription[0]);
18 }
19
20 /**
21 * Add a subscription.
22 */
23 public Subscription addSubscription(String aSubject, String aLabel) throws PushletException {
24 Subscription subscription = Subscription.create(aSubject, aLabel);
25 subscriptions.put(subscription.getId(), subscription);
26 info("Subscription added subject=" + aSubject + " sid=" + subscription.getId() + " label=" + aLabel);
27 return subscription;
28 }
29
30 /**
31 * Remove a subscription.
32 */
33 public Subscription removeSubscription(String aSubscriptionId) {
34 Subscription subscription = (Subscription) subscriptions.remove(aSubscriptionId);
35 if (subscription == null) {
36 warn("No subscription found sid=" + aSubscriptionId);
37 return null;
38 }
39 info("Subscription removed subject=" + subscription.getSubject() + " sid=" + subscription.getId() + " label=" + subscription.getLabel());
40 return subscription;
41 }
42
43 /**
44 * Remove all subscriptions.
45 */
46 public void removeSubscriptions() {
47 subscriptions.clear();
48 }
49
50 public String getMode() {
51 return mode;
52 }
53
54 public void setMode(String aMode) {
55 mode = aMode;
56 }
57
58 public long getRefreshTimeMillis() {
59 String minWaitProperty = PULL_REFRESH_WAIT_MIN_MILLIS;
60 String maxWaitProperty = PULL_REFRESH_WAIT_MAX_MILLIS;
61 if (mode.equals((MODE_POLL))) {
62 minWaitProperty = POLL_REFRESH_WAIT_MIN_MILLIS;
63 maxWaitProperty = POLL_REFRESH_WAIT_MAX_MILLIS;
64
65 }
66 return Rand.randomLong(Config.getLongProperty(minWaitProperty),
67 Config.getLongProperty(maxWaitProperty));
68 }
69
70 /**
71 * Get events from queue and push to client.
72 */
73 public void fetchEvents(Command aCommand) throws PushletException {
74
75 String refreshURL = aCommand.httpReq.getRequestURI() + "?" + P_ID + "=" + session.getId() + "&" + P_EVENT + "=" + E_REFRESH;
76
77 // This is the only thing required to support "poll" mode
78 if (mode.equals(MODE_POLL)) {
79 queueReadTimeoutMillis = 0;
80 refreshTimeoutMillis = Config.getLongProperty(POLL_REFRESH_TIMEOUT_MILLIS);
81 }
82
83 // Required for fast bailout (tomcat)
84 aCommand.httpRsp.setBufferSize(128);
85
86 // Try to prevent caching in any form.
87 aCommand.sendResponseHeaders();
88
89 // Let clientAdapter determine how to send event
90 ClientAdapter clientAdapter = aCommand.getClientAdapter();
91 Event responseEvent = aCommand.getResponseEvent();
92 try {
93 clientAdapter.start();
94
95 // Send first event (usually hb-ack or listen-ack)
96 clientAdapter.push(responseEvent);
97
98 // In pull/poll mode and when response is listen-ack or join-listen-ack,
99 // return and force refresh immediately
00 // such that the client recieves response immediately over this channel.
01 // This is usually when loading the browser app for the first time
02 if ((mode.equals(MODE_POLL) || mode.equals(MODE_PULL))
03 && responseEvent.getEventType().endsWith(Protocol.E_LISTEN_ACK)) {
04 sendRefresh(clientAdapter, refreshURL);
05
06 // We should come back later with refresh event...
07 return;
08 }
09 } catch (Throwable t) {
10 bailout();
11 return;
12 }
13
14
15 Event[] events = null;
16
17 // Main loop: as long as connected, get events and push to client
18 long eventSeqNr = 1;
19 while (isActive()) {
20 // Indicate we are still alive
21 lastAlive = Sys.now();
22
23 // Update session time to live
24 session.kick();
25
26 // Get next events; blocks until timeout or entire contents
27 // of event queue is returned. Note that "poll" mode
28 // will return immediately when queue is empty.
29 try {
30 // Put heartbeat in queue when starting to listen in stream mode
31 // This speeds up the return of *_LISTEN_ACK
32 if (mode.equals(MODE_STREAM) && eventSeqNr == 1) {
33 eventQueue.enQueue(new Event(E_HEARTBEAT));
34 }
35
36 events = eventQueue.deQueueAll(queueReadTimeoutMillis);
37 } catch (InterruptedException ie) {
38 warn("interrupted");
39 bailout();
40 }
41
42 // Send heartbeat when no events received
43 if (events == null) {
44 events = new Event[1];
45 events[0] = new Event(E_HEARTBEAT);
46 }
47
48 // ASSERT: one or more events available
49
50 // Send events to client using adapter
51 // debug("received event count=" + events.length);
52 for (int i = 0; i < events.length; i++) {
53 // Check for abort event
54 if (events[i].getEventType().equals(E_ABORT)) {
55 warn("Aborting Subscriber");
56 bailout();
57 }
58
59 // Push next Event to client
60 try {
61 // Set sequence number
62 events[i].setField(P_SEQ, eventSeqNr++);
63
64 // Push to client through client adapter
65 clientAdapter.push(events[i]);
66 } catch (Throwable t) {
67 bailout();
68 return;
69 }
70 }
71
72 // Force client refresh request in pull or poll modes
73 if (mode.equals(MODE_PULL) || mode.equals(MODE_POLL)) {
74 sendRefresh(clientAdapter, refreshURL);
75
76 // Always leave loop in pull/poll mode
77 break;
78 }
79 }
80 }
81
82 /**
83 * Determine if we should receive event.
84 */
85 public Subscription match(Event event) {
86 Subscription[] subscriptions = getSubscriptions();
87 for (int i = 0; i < subscriptions.length; i++) {
88 if (subscriptions[i].match(event)) {
89 return subscriptions[i];
90 }
91 }
92 return null;
93 }
94
95 /**
96 * Event from Dispatcher: enqueue it.
97 */
98 public void onEvent(Event theEvent) {
99 if (!isActive()) {
00 return;
01 }
02
03 // p("send: queue event: "+theEvent.getSubject());
04
05 // Check if we had any active continuation for at
06 // least 'timeOut' millisecs. If the client has left this
07 // instance there would be no way of knowing otherwise.
08 long now = Sys.now();
09 if (now - lastAlive > refreshTimeoutMillis) {
10 warn("not alive for at least: " + refreshTimeoutMillis + "ms, leaving...");
11 bailout();
12 return;
13 }
14
15 // Put event in queue; leave if queue full
16 try {
17 if (!eventQueue.enQueue(theEvent, queueWriteTimeoutMillis)) {
18 warn("queue full, bailing out...");
19 bailout();
20 }
21
22 // ASSERTION : Event in queue.
23 // see fetchEvents() where Events are dequeued and pushed to the client.
24 } catch (InterruptedException ie) {
25 bailout();
26 }
27
28 }
29
30 /**
31 * Send refresh command to pull/poll clients.
32 */
33 protected void sendRefresh(ClientAdapter aClientAdapter, String aRefreshURL) {
34 Event refreshEvent = new Event(E_REFRESH);
35
36 // Set wait time and url for refresh
37 refreshEvent.setField(P_WAIT, "" + getRefreshTimeMillis());
38 refreshEvent.setField(P_URL, aRefreshURL);
39
40 try {
41 // Push to client through client adapter
42 aClientAdapter.push(refreshEvent);
43
44 // Stop this round until refresh event
45 aClientAdapter.stop();
46 } catch (Throwable t) {
47 // Leave on any exception
48 bailout();
49 }
50 }
51
52 /**
53 * Info.
54 */
55 protected void info(String s) {
56 session.info("[Subscriber] " + s);
57 }
58
59 /**
60 * Exceptional print util.
61 */
62 protected void warn(String s) {
63 session.warn("[Subscriber] " + s);
64 }
65
66 /**
67 * Exceptional print util.
68 */
69 protected void debug(String s) {
70 session.debug("[Subscriber] " + s);
71 }
72
73
74 public String toString() {
75 return session.toString();
76 }
77}
78
79/*
80 * $Log: Subscriber.java,v $
81 * Revision 1.26 2007/11/23 14:33:07 justb
82 * core classes now configurable through factory
83 *
84 * Revision 1.25 2007/11/10 15:53:15 justb
85 * put heartbeat in queue when start fetching events in stream-mode
86 *
87 * Revision 1.24 2006/10/19 12:33:40 justb
88 * add atomic join-listen support (one request)
89 *
90 * Revision 1.22 2006/05/06 00:06:28 justb
91 * first rough version AJAX client
92 *
93 * Revision 1.21 2005/02/28 12:45:59 justb
94 * introduced Command class
95 *
96 * Revision 1.20 2005/02/21 16:59:09 justb
97 * SessionManager and session lease introduced
98 *
99 * Revision 1.19 2005/02/21 12:32:28 justb
00 * fixed publish event in Controller
01 *
02 * Revision 1.18 2005/02/21 11:50:46 justb
03 * ohase1 of refactoring Subscriber into Session/Controller/Subscriber
04 *
05 * Revision 1.17 2005/02/20 13:05:32 justb
06 * removed the Postlet (integrated in Pushlet protocol)
07 *
08 * Revision 1.16 2005/02/18 12:36:47 justb
09 * changes for renaming and configurability
10 *
11 * Revision 1.15 2005/02/18 10:07:23 justb
12 * many renamings of classes (make names compact)
13 *
14 * Revision 1.14 2005/02/18 09:54:15 justb
15 * refactor: rename Publisher Dispatcher and single Subscriber class
16 *
17 * Revision 1.13 2005/02/16 14:39:34 justb
18 * fixed leave handling and added "poll" mode
19 *
20 * Revision 1.12 2005/01/24 13:42:00 justb
21 * new protocol changes (p_listen)
22 *
23 * Revision 1.11 2005/01/13 14:47:15 justb
24 * control evt: send response on same (control) connection
25 *
26 * Revision 1.10 2004/10/24 20:50:35 justb
27 * refine subscription with label and sending sid and label on events
28 *
29 * Revision 1.9 2004/10/24 12:58:18 justb
30 * revised client and test classes for new protocol
31 *
32 * Revision 1.8 2004/09/26 21:39:43 justb
33 * allow multiple subscriptions and out-of-band requests
34 *
35 * Revision 1.7 2004/09/20 22:01:38 justb
36 * more changes for new protocol
37 *
38 * Revision 1.6 2004/09/03 22:35:37 justb
39 * Almost complete rewrite, just checking in now
40 *
41 * Revision 1.5 2004/08/13 23:36:05 justb
42 * rewrite of Pullet into Pushlet "pull" mode
43 *
44 * Revision 1.4 2004/03/10 14:01:55 justb
45 * formatting and *Subscriber refactoring
46 *
47 * Revision 1.3 2003/08/15 08:37:40 justb
48 * fix/add Copyright+LGPL file headers and footers
49 *
50 * Revision 1.2 2003/05/18 16:15:08 justb
51 * support for XML encoded Events
52 *
53 * Revision 1.1.1.1 2002/09/24 21:02:32 justb
54 * import to sourceforge
55 *
56 * Revision 1.1.1.1 2002/09/20 22:48:18 justb
57 * import to SF
58 *
59 * Revision 1.1.1.1 2002/09/20 14:19:04 justb
60 * first import into SF
61 *
62 * Revision 1.3 2002/04/15 20:42:41 just
63 * reformatting and renaming GuardedQueue to EventQueue
64 *
65 * Revision 1.2 2000/08/21 20:48:29 just
66 * added CVS log and id tags plus copyrights
67 *
68 *
69 */
70| Subscriber.java |