1
4 package nl.justobjects.pushlet.core;
5
6 import nl.justobjects.pushlet.util.PushletException;
7
8 import java.io.IOException;
9
10
16public class Controller implements Protocol, ConfigDefs {
17
18 private Session session;
19
20
23 protected Controller() {
24 }
25
26
33 public static Controller create(Session aSession) throws PushletException {
34 Controller controller;
35 try {
36 controller = (Controller) Config.getClass(CONTROLLER_CLASS, "nl.justobjects.pushlet.core.Controller").newInstance();
37 } catch (Throwable t) {
38 throw new PushletException("Cannot instantiate Controller from config", t);
39 }
40 controller.session = aSession;
41 return controller;
42 }
43
44
47 public void doCommand(Command aCommand) {
48 try {
49 session.kick();
51
52 session.setAddress(aCommand.httpReq.getRemoteAddr());
54
55 debug("doCommand() event=" + aCommand.reqEvent);
56
57 String eventType = aCommand.reqEvent.getEventType();
59
60 if (eventType.equals(Protocol.E_REFRESH)) {
62 doRefresh(aCommand);
64 } else if (eventType.equals(Protocol.E_SUBSCRIBE)) {
65 doSubscribe(aCommand);
67 } else if (eventType.equals(Protocol.E_UNSUBSCRIBE)) {
68 doUnsubscribe(aCommand);
70 } else if (eventType.equals(Protocol.E_JOIN)) {
71 doJoin(aCommand);
73 } else if (eventType.equals(Protocol.E_JOIN_LISTEN)) {
74 doJoinListen(aCommand);
76 } else if (eventType.equals(Protocol.E_LEAVE)) {
77 doLeave(aCommand);
79 } else if (eventType.equals(Protocol.E_HEARTBEAT)) {
80 doHeartbeat(aCommand);
82 } else if (eventType.equals(Protocol.E_PUBLISH)) {
83 doPublish(aCommand);
85 } else if (eventType.equals(Protocol.E_LISTEN)) {
86 doListen(aCommand);
88 }
89
90 if (eventType.endsWith(Protocol.E_LISTEN) ||
92 eventType.equals(Protocol.E_REFRESH)) {
93 getSubscriber().fetchEvents(aCommand);
96
97 } else {
98 sendControlResponse(aCommand);
00 }
01
02 } catch (Throwable t) {
03 warn("Exception in doCommand(): " + t);
04 t.printStackTrace();
05 }
06 }
07
08 public String toString() {
09 return session.toString();
10 }
11
12
15 protected void doHeartbeat(Command aCommand) {
16
17 aCommand.setResponseEvent(new Event(E_HEARTBEAT_ACK));
19 }
20
21
24 protected void doJoin(Command aCommand) throws PushletException {
25
26 Event responseEvent = null;
27
28 try {
29
30 session.start();
31
32 String format = aCommand.reqEvent.getField(P_FORMAT, FORMAT_JAVASCRIPT);
35
36 session.setFormat(format);
37 responseEvent = new Event(E_JOIN_ACK);
38
39 responseEvent.setField(P_ID, session.getId());
41 responseEvent.setField(P_FORMAT, format);
42 info("joined");
43 } catch (Throwable t) {
44 session.stop();
45 responseEvent = new Event(E_NACK);
46 responseEvent.setField(P_ID, session.getId());
47 responseEvent.setField(P_REASON, "unexpected error: " + t);
48 warn("doJoin() error: " + t);
49 t.printStackTrace();
50 } finally {
51 aCommand.setResponseEvent(responseEvent);
53 }
54
55 }
56
57
60 protected void doJoinListen(Command aCommand) throws PushletException {
61
62
67 doJoin(aCommand);
69 if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) {
70 doListen(aCommand);
72 if (!aCommand.getResponseEvent().getEventType().equals(E_NACK)) {
73 aCommand.getResponseEvent().setField(P_EVENT, E_JOIN_LISTEN_ACK);
75 }
76 }
77 }
78
79
82 protected void doLeave(Command aCommand) throws IOException {
83
84 Event responseEvent = null;
85
86 try {
87 session.stop();
89
90 responseEvent = new Event(E_LEAVE_ACK);
92
93 responseEvent.setField(P_ID, session.getId());
95 info("left");
96 } catch (Throwable t) {
97 responseEvent = new Event(E_NACK);
98 responseEvent.setField(P_ID, session.getId());
99 responseEvent.setField(P_REASON, "unexpected error: " + t);
00 warn("doLeave() error: " + t);
01 t.printStackTrace();
02 } finally {
03 aCommand.setResponseEvent(responseEvent);
05 }
06
07 }
08
09
12 protected void doListen(Command aCommand) throws PushletException {
13
14
15 String mode = MODE_STREAM;
16 if (Config.getBoolProperty(LISTEN_FORCE_PULL_ALL)) {
18 mode = MODE_PULL;
19 } else {
20
23 mode = aCommand.reqEvent.getField(P_MODE, MODE_STREAM);
27
28 String userAgent = aCommand.httpReq.getHeader("User-Agent");
29 if (userAgent != null) {
30 userAgent = userAgent.toLowerCase();
31 for (int i = 0; i < session.FORCED_PULL_AGENTS.length; i++) {
32 if ((userAgent.indexOf(session.FORCED_PULL_AGENTS[i]) != -1)) {
33 info("Forcing pull mode for agent=" + userAgent);
34 mode = MODE_PULL;
35 break;
36 }
37 }
38 } else {
39 userAgent = "unknown";
40 }
41 }
42
43 getSubscriber().setMode(mode);
44
45 Event listenAckEvent = new Event(E_LISTEN_ACK);
47
48 String subject = aCommand.reqEvent.getField(P_SUBJECT);
50 if (subject != null) {
51 String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL);
53
54 Subscription subscription = getSubscriber().addSubscription(subject, label);
56
57 listenAckEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
59 if (label != null) {
60 listenAckEvent.setField(P_SUBSCRIPTION_LABEL, label);
61 }
62 }
63
64 listenAckEvent.setField(P_ID, session.getId());
66 listenAckEvent.setField(P_MODE, mode);
67 listenAckEvent.setField(P_FORMAT, session.getFormat());
68
69 getSubscriber().start();
71
72 aCommand.setResponseEvent(listenAckEvent);
74
75 info("Listening mode=" + mode + " userAgent=" + session.getUserAgent());
76
77 }
78
79
82 protected void doPublish(Command aCommand) {
83 Event responseEvent = null;
84
85 try {
86 String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT);
87 if (subject == null) {
88 responseEvent = new Event(E_NACK);
90 responseEvent.setField(P_ID, session.getId());
91 responseEvent.setField(P_REASON, "no subject provided");
92 } else {
93 aCommand.reqEvent.setField(P_FROM, session.getId());
94 aCommand.reqEvent.setField(P_EVENT, E_DATA);
95
96 String to = aCommand.reqEvent.getField(P_TO);
98 if (to != null) {
99 Dispatcher.getInstance().unicast(aCommand.reqEvent, to);
00 } else {
01 debug("doPublish() event=" + aCommand.reqEvent);
03 Dispatcher.getInstance().multicast(aCommand.reqEvent);
04 }
05
06 responseEvent = new Event(E_PUBLISH_ACK);
08 }
09
10 } catch (Throwable t) {
11 responseEvent = new Event(E_NACK);
12 responseEvent.setField(P_ID, session.getId());
13 responseEvent.setField(P_REASON, "unexpected error: " + t);
14 warn("doPublish() error: " + t);
15 t.printStackTrace();
16 } finally {
17 aCommand.setResponseEvent(responseEvent);
19 }
20 }
21
22
25 protected void doRefresh(Command aCommand) {
26 aCommand.setResponseEvent(new Event(E_REFRESH_ACK));
28 }
29
30
33 protected void doSubscribe(Command aCommand) throws IOException {
34
35 Event responseEvent = null;
36 try {
37 String subject = aCommand.reqEvent.getField(Protocol.P_SUBJECT);
38 Subscription subscription = null;
39 if (subject == null) {
40 responseEvent = new Event(E_NACK);
42 responseEvent.setField(P_ID, session.getId());
43 responseEvent.setField(P_REASON, "no subject provided");
44 } else {
45
46 String label = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_LABEL);
47 subscription = getSubscriber().addSubscription(subject, label);
48
49 responseEvent = new Event(E_SUBSCRIBE_ACK);
51 responseEvent.setField(P_ID, session.getId());
52 responseEvent.setField(P_SUBJECT, subject);
53 responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
54 if (label != null) {
55 responseEvent.setField(P_SUBSCRIPTION_LABEL, label);
56 }
57 info("subscribed to " + subject + " sid=" + subscription.getId());
58 }
59
60 } catch (Throwable t) {
61 responseEvent = new Event(E_NACK);
62 responseEvent.setField(P_ID, session.getId());
63 responseEvent.setField(P_REASON, "unexpected error: " + t);
64 warn("doSubscribe() error: " + t);
65 t.printStackTrace();
66 } finally {
67 aCommand.setResponseEvent(responseEvent);
69 }
70 }
71
72
75 protected void doUnsubscribe(Command aCommand) throws IOException {
76
77
78 Event responseEvent = null;
79 try {
80 String subscriptionId = aCommand.reqEvent.getField(Protocol.P_SUBSCRIPTION_ID);
81 if (subscriptionId == null) {
82 getSubscriber().removeSubscriptions();
84 responseEvent = new Event(E_UNSUBSCRIBE_ACK);
85 responseEvent.setField(P_ID, session.getId());
86 info("unsubscribed all");
87 } else {
88 Subscription subscription = getSubscriber().removeSubscription(subscriptionId);
90 if (subscription == null) {
91 responseEvent = new Event(E_NACK);
93 responseEvent.setField(P_ID, session.getId());
94 responseEvent.setField(P_REASON, "no subscription for sid=" + subscriptionId);
95 warn("unsubscribe: no subscription for sid=" + subscriptionId);
96 } else {
97 responseEvent = new Event(E_UNSUBSCRIBE_ACK);
99 responseEvent.setField(P_ID, session.getId());
00 responseEvent.setField(P_SUBSCRIPTION_ID, subscription.getId());
01 responseEvent.setField(P_SUBJECT, subscription.getSubject());
02 if (subscription.getLabel() != null) {
03 responseEvent.setField(P_SUBSCRIPTION_LABEL, subscription.getLabel());
04 }
05 info("unsubscribed sid= " + subscriptionId);
06 }
07 }
08 } catch (Throwable t) {
09 responseEvent = new Event(E_NACK);
10 responseEvent.setField(P_ID, session.getId());
11 responseEvent.setField(P_REASON, "unexpected error: " + t);
12 warn("doUnsubscribe() error: " + t);
13 t.printStackTrace();
14 } finally {
15 aCommand.setResponseEvent(responseEvent);
17 }
18 }
19
20 public Subscriber getSubscriber() {
21 return session.getSubscriber();
22 }
23
24
27 protected void sendControlResponse(Command aCommand) {
28 try {
29
30 aCommand.sendResponseHeaders();
32
33 aCommand.getClientAdapter().start();
35
36 aCommand.getClientAdapter().push(aCommand.getResponseEvent());
38
39 aCommand.getClientAdapter().stop();
41 } catch (Throwable t) {
42 session.stop();
43 }
44 }
45
46
47
50 protected void info(String s) {
51 session.info("[Controller] " + s);
52 }
53
54
57 protected void warn(String s) {
58 session.warn("[Controller] " + s);
59 }
60
61
64 protected void debug(String s) {
65 session.debug("[Controller] " + s);
66 }
67
68
69}
70
71
03