| Dispatcher.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.core;
5
6 import nl.justobjects.pushlet.util.Log;
7 import nl.justobjects.pushlet.util.PushletException;
8
9 import java.lang.reflect.Method;
10import java.util.HashMap;
11import java.util.Map;
12
13/**
14 * Routes Events to Subscribers.
15 *
16 * @author Just van den Broecke - Just Objects ©
17 * @version $Id: Dispatcher.java,v 1.9 2007/12/04 13:55:53 justb Exp $
18 */
19public class Dispatcher implements Protocol, ConfigDefs {
20 /**
21 * Singleton pattern: single instance.
22 */
23 private static Dispatcher instance;
24 protected SessionManagerVisitor sessionManagerVisitor;
25
26 static {
27 try {
28 instance = (Dispatcher) Config.getClass(DISPATCHER_CLASS, "nl.justobjects.pushlet.core.Dispatcher").newInstance();
29 Log.info("Dispatcher created className=" + instance.getClass());
30 } catch (Throwable t) {
31 Log.fatal("Cannot instantiate Dispatcher from config", t);
32 }
33 }
34
35 /**
36 * Singleton pattern with factory method: protected constructor.
37 */
38 protected Dispatcher() {
39
40 }
41
42 /**
43 * Singleton pattern: get single instance.
44 */
45 public static Dispatcher getInstance() {
46 return instance;
47 }
48
49 /**
50 * Send event to all subscribers.
51 */
52 public synchronized void broadcast(Event anEvent) {
53 try {
54 // Let the SessionManager loop through Sessions, calling
55 // our Visitor Method for each Session. This is done to guard
56 // synchronization with SessionManager and to optimize by
57 // not getting an array of all sessions.
58 Object[] args = new Object[2];
59 args[1] = anEvent;
60 Method method = sessionManagerVisitor.getMethod("visitBroadcast");
61 SessionManager.getInstance().apply(sessionManagerVisitor, method, args);
62 } catch (Throwable t) {
63 Log.error("Error calling SessionManager.apply: ", t);
64 }
65 }
66
67 /**
68 * Send event to subscribers matching Event subject.
69 */
70 public synchronized void multicast(Event anEvent) {
71 try {
72 // Let the SessionManager loop through Sessions, calling
73 // our Visitor Method for each Session. This is done to guard
74 // synchronization with SessionManager and to optimize by
75 // not getting an array of all sessions.
76 Method method = sessionManagerVisitor.getMethod("visitMulticast");
77 Object[] args = new Object[2];
78 args[1] = anEvent;
79 SessionManager.getInstance().apply(sessionManagerVisitor, method, args);
80 } catch (Throwable t) {
81 Log.error("Error calling SessionManager.apply: ", t);
82 }
83 }
84
85
86 /**
87 * Send event to specific subscriber.
88 */
89 public synchronized void unicast(Event event, String aSessionId) {
90 // Get subscriber to send event to
91 Session session = SessionManager.getInstance().getSession(aSessionId);
92 if (session == null) {
93 Log.warn("unicast: session with id=" + aSessionId + " does not exist");
94 return;
95 }
96
97 // Send Event to subscriber.
98 session.getSubscriber().onEvent((Event) event.clone());
99 }
00
01 /**
02 * Start Dispatcher.
03 */
04 public void start() throws PushletException {
05 Log.info("Dispatcher started");
06
07 // Create callback for SessionManager visits.
08 sessionManagerVisitor = new SessionManagerVisitor();
09 }
10
11 /**
12 * Stop Dispatcher.
13 */
14 public void stop() {
15 // Send abort control event to all subscribers.
16 Log.info("Dispatcher stopped: broadcast abort to all subscribers");
17 broadcast(new Event(E_ABORT));
18 }
19
20 /**
21 * Supplies Visitor methods for callbacks from SessionManager.
22 */
23 private class SessionManagerVisitor {
24 private final Map visitorMethods = new HashMap(2);
25
26 SessionManagerVisitor() throws PushletException {
27
28 try {
29 // Setup Visitor Methods for callback from SessionManager
30 // This is a slight opitmization over creating Method objects
31 // on each invokation.
32 Class[] argsClasses = {Session.class, Event.class};
33 visitorMethods.put("visitMulticast", this.getClass().getMethod("visitMulticast", argsClasses));
34 visitorMethods.put("visitBroadcast", this.getClass().getMethod("visitBroadcast", argsClasses));
35 } catch (NoSuchMethodException e) {
36 throw new PushletException("Failed to setup SessionManagerVisitor", e);
37 }
38 }
39
40 /**
41 * Return Visitor Method by name.
42 */
43 public Method getMethod(String aName) {
44 return (Method) visitorMethods.get(aName);
45
46 }
47
48 /**
49 * Visitor method called by SessionManager.
50 */
51 public void visitBroadcast(Session aSession, Event event) {
52 aSession.getSubscriber().onEvent((Event) event.clone());
53 }
54
55 /**
56 * Visitor method called by SessionManager.
57 */
58 public void visitMulticast(Session aSession, Event event) {
59 Subscriber subscriber = aSession.getSubscriber();
60 Event clonedEvent;
61 Subscription subscription;
62
63 // Send only if the subscriber's criteria
64 // match the event.
65 if ((subscription = subscriber.match(event)) != null) {
66 // Personalize event
67 clonedEvent = (Event) event.clone();
68
69 // Set subscription id and optional label
70 clonedEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
71 if (subscription.getLabel() != null) {
72 event.setField(P_SUBSCRIPTION_LABEL, subscription.getLabel());
73 }
74
75 subscriber.onEvent(clonedEvent);
76 }
77 }
78 }
79}
80
81/*
82 * $Log: Dispatcher.java,v $
83 * Revision 1.9 2007/12/04 13:55:53 justb
84 * reimplement SessionManager concurrency (prev version was not thread-safe!)
85 *
86 * Revision 1.8 2007/11/23 14:33:07 justb
87 * core classes now configurable through factory
88 *
89 * Revision 1.7 2005/02/28 12:45:59 justb
90 * introduced Command class
91 *
92 * Revision 1.6 2005/02/28 09:14:55 justb
93 * sessmgr/dispatcher factory/singleton support
94 *
95 * Revision 1.5 2005/02/21 16:59:06 justb
96 * SessionManager and session lease introduced
97 *
98 * Revision 1.4 2005/02/21 11:50:46 justb
99 * ohase1 of refactoring Subscriber into Session/Controller/Subscriber
00 *
01 * Revision 1.3 2005/02/18 12:36:47 justb
02 * changes for renaming and configurability
03 *
04 * Revision 1.2 2005/02/18 10:07:23 justb
05 * many renamings of classes (make names compact)
06 *
07 * Revision 1.1 2005/02/18 09:54:15 justb
08 * refactor: rename Publisher Dispatcher and single Subscriber class
09 *
10 * Revision 1.14 2005/02/16 14:39:34 justb
11 * fixed leave handling and added "poll" mode
12 *
13 * Revision 1.13 2004/10/24 20:50:35 justb
14 * refine subscription with label and sending sid and label on events
15 *
16 * Revision 1.12 2004/10/24 12:58:18 justb
17 * revised client and test classes for new protocol
18 *
19 * Revision 1.11 2004/09/26 21:39:43 justb
20 * allow multiple subscriptions and out-of-band requests
21 *
22 * Revision 1.10 2004/09/20 22:01:38 justb
23 * more changes for new protocol
24 *
25 * Revision 1.9 2004/09/03 22:35:37 justb
26 * Almost complete rewrite, just checking in now
27 *
28 * Revision 1.8 2004/08/13 23:36:05 justb
29 * rewrite of Pullet into Pushlet "pull" mode
30 *
31 * Revision 1.7 2004/08/12 13:18:54 justb
32 * cosmetic changes
33 *
34 * Revision 1.6 2004/03/10 15:45:55 justb
35 * many cosmetic changes
36 *
37 * Revision 1.5 2004/03/10 13:59:28 justb
38 * rewrite using Collection classes and finer synchronization
39 *
40 * Revision 1.4 2003/08/15 08:37:40 justb
41 * fix/add Copyright+LGPL file headers and footers
42 *
43 * Revision 1.3 2003/08/12 08:54:40 justb
44 * added getSubscriberCount() and use Log
45 *
46 * Revision 1.2 2003/05/18 16:15:08 justb
47 * support for XML encoded Events
48 *
49 * Revision 1.1.1.1 2002/09/24 21:02:31 justb
50 * import to sourceforge
51 *
52 * Revision 1.1.1.1 2002/09/20 22:48:18 justb
53 * import to SF
54 *
55 * Revision 1.1.1.1 2002/09/20 14:19:04 justb
56 * first import into SF
57 *
58 * Revision 1.3 2002/04/15 20:42:41 just
59 * reformatting and renaming GuardedQueue to EventQueue
60 *
61 * Revision 1.2 2000/08/21 20:48:29 just
62 * added CVS log and id tags plus copyrights
63 *
64 *
65 */
66| Dispatcher.java |