| EventQueue.java |
1 // Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
2 // Distributable under LGPL license. See terms of license at gnu.org.
3
4 package nl.justobjects.pushlet.core;
5
6 /**
7 * FIFO queue with guarded suspension.
8 * <b>Purpose</b><br>
9 * <p/>
10 * <b>Implementation</b><br>
11 * FIFO queue class implemented with circular array. The enQueue() and
12 * deQueue() methods use guarded suspension according to a readers/writers
13 * pattern, implemented with java.lang.Object.wait()/notify().
14 * <p/>
15 * <b>Examples</b><br>
16 * <p/>
17 * <br>
18 *
19 * @author Just van den Broecke - Just Objects ©
20 * @version $Id: EventQueue.java,v 1.3 2007/11/23 14:33:07 justb Exp $
21 */
22public class EventQueue {
23 /**
24 * Defines maximum queue size
25 */
26 private int capacity = 8;
27 private Event[] queue = null;
28 private int front, rear;
29
30 /**
31 * Construct queue with default (8) capacity.
32 */
33 public EventQueue() {
34 this(8);
35 }
36
37 /**
38 * Construct queue with specified capacity.
39 */
40 public EventQueue(int capacity) {
41 this.capacity = capacity;
42 queue = new Event[capacity];
43 front = rear = 0;
44 }
45
46 /**
47 * Put item in queue; waits() indefinitely if queue is full.
48 */
49 public synchronized boolean enQueue(Event item) throws InterruptedException {
50 return enQueue(item, -1);
51 }
52
53 /**
54 * Put item in queue; if full wait maxtime.
55 */
56 public synchronized boolean enQueue(Event item, long maxWaitTime) throws InterruptedException {
57
58 // Wait (optional maxtime) as long as the queue is full
59 while (isFull()) {
60 if (maxWaitTime > 0) {
61 // Wait at most maximum time
62 wait(maxWaitTime);
63
64 // Timed out or woken; if still full we
65 // had bad luck and return failure.
66 if (isFull()) {
67 return false;
68 }
69 } else {
70 wait();
71 }
72 }
73
74 // Put item in queue
75 queue[rear] = item;
76 rear = next(rear);
77
78 // Wake up waiters; NOTE: first waiter will eat item
79 notifyAll();
80 return true;
81 }
82
83 /**
84 * Get head; if empty wait until something in queue.
85 */
86 public synchronized Event deQueue() throws InterruptedException {
87 return deQueue(-1);
88 }
89
90 /**
91 * Get head; if empty wait for specified time at max.
92 */
93 public synchronized Event deQueue(long maxWaitTime) throws InterruptedException {
94 while (isEmpty()) {
95 if (maxWaitTime >= 0) {
96 wait(maxWaitTime);
97
98 // Timed out or woken; if still empty we
99 // had bad luck and return failure.
00 if (isEmpty()) {
01 return null;
02 }
03 } else {
04 // Wait indefinitely for something in queue.
05 wait();
06 }
07 }
08
09 // Dequeue item
10 Event result = fetchNext();
11
12 // Notify possible wait()-ing enQueue()-ers
13 notifyAll();
14
15 // Return dequeued item
16 return result;
17 }
18
19 /**
20 * Get all queued Events.
21 */
22 public synchronized Event[] deQueueAll(long maxWaitTime) throws InterruptedException {
23 while (isEmpty()) {
24 if (maxWaitTime >= 0) {
25 wait(maxWaitTime);
26
27 // Timed out or woken; if still empty we
28 // had bad luck and return failure.
29 if (isEmpty()) {
30 return null;
31 }
32 } else {
33 // Wait indefinitely for something in queue.
34 wait();
35 }
36 }
37
38 // Dequeue all items item
39 Event[] events = new Event[getSize()];
40 for (int i = 0; i < events.length; i++) {
41 events[i] = fetchNext();
42 }
43
44 // Notify possible wait()-ing enQueue()-ers
45 notifyAll();
46
47 // Return dequeued item
48 return events;
49 }
50
51 public synchronized int getSize() {
52 return (rear >= front) ? (rear - front) : (capacity - front + rear);
53 }
54
55 /**
56 * Is the queue empty ?
57 */
58 public synchronized boolean isEmpty() {
59 return front == rear;
60 }
61
62 /**
63 * Is the queue full ?
64 */
65 public synchronized boolean isFull() {
66 return (next(rear) == front);
67 }
68
69 /**
70 * Circular counter.
71 */
72 private int next(int index) {
73 return (index + 1 < capacity ? index + 1 : 0);
74 }
75
76 /**
77 * Circular counter.
78 */
79 private Event fetchNext() {
80 Event temp = queue[front];
81 queue[front] = null;
82 front = next(front);
83 return temp;
84 }
85
86 public static void p(String s) {
87 System.out.println(s);
88 }
89
90 public static void main(String[] args) {
91 EventQueue q = new EventQueue(8);
92 Event event = new Event("t");
93 try {
94 q.enQueue(event);
95 p("(1) size = " + q.getSize());
96 q.enQueue(event);
97 p("(2) size = " + q.getSize());
98 q.deQueue();
99 p("(1) size = " + q.getSize());
00 q.deQueue();
01 p("(0) size = " + q.getSize());
02
03 q.enQueue(event);
04 q.enQueue(event);
05 q.enQueue(event);
06 p("(3) size = " + q.getSize());
07 q.deQueue();
08 p("(2) size = " + q.getSize());
09 q.enQueue(event);
10 q.enQueue(event);
11 q.enQueue(event);
12 p("(5) size = " + q.getSize());
13 q.enQueue(event);
14 q.enQueue(event);
15 p("(7) size = " + q.getSize());
16 q.deQueue();
17 q.deQueue();
18 q.deQueue();
19 p("(4) size = " + q.getSize());
20 q.deQueue();
21 q.deQueue();
22 q.deQueue();
23 ;
24 q.deQueue();
25 p("(0) size = " + q.getSize());
26
27 q.enQueue(event);
28 q.enQueue(event);
29 q.enQueue(event);
30 q.enQueue(event);
31 q.enQueue(event);
32 p("(5) size = " + q.getSize());
33
34 q.deQueue();
35 q.deQueue();
36 q.deQueue();
37 ;
38 q.deQueue();
39 p("(1) size = " + q.getSize());
40 } catch (InterruptedException ie) {
41 }
42 }
43}
44
45/*
46* $Log: EventQueue.java,v $
47* Revision 1.3 2007/11/23 14:33:07 justb
48* core classes now configurable through factory
49*
50* Revision 1.2 2005/02/21 11:50:46 justb
51* ohase1 of refactoring Subscriber into Session/Controller/Subscriber
52*
53* Revision 1.1 2005/02/18 10:07:23 justb
54* many renamings of classes (make names compact)
55*
56* Revision 1.6 2005/02/16 12:16:16 justb
57* added support for "poll" mode
58*
59* Revision 1.5 2005/01/13 14:47:15 justb
60* control evt: send response on same (control) connection
61*
62* Revision 1.4 2004/09/03 22:35:37 justb
63* Almost complete rewrite, just checking in now
64*
65* Revision 1.3 2003/08/15 08:37:40 justb
66* fix/add Copyright+LGPL file headers and footers
67*
68*
69*/
70| EventQueue.java |