1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe.support;
21
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24
25 import org.apache.mina.common.ByteBuffer;
26 import org.apache.mina.common.IdleStatus;
27 import org.apache.mina.common.IoSession;
28 import org.apache.mina.common.IoFilter.WriteRequest;
29 import org.apache.mina.common.support.AbstractIoFilterChain;
30
31
32
33
34
35
36
37 public class VmPipeFilterChain extends AbstractIoFilterChain {
38
39 private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
40
41 private volatile boolean flushEnabled;
42 private volatile boolean sessionOpened;
43
44 public VmPipeFilterChain(IoSession session) {
45 super(session);
46 }
47
48 public void start() {
49 flushEnabled = true;
50 flushEvents();
51 flushPendingDataQueues( (VmPipeSessionImpl) getSession() );
52 }
53
54 private void pushEvent(Event e) {
55 eventQueue.offer(e);
56 if ( flushEnabled ) {
57 flushEvents();
58 }
59 }
60
61 private void flushEvents() {
62 Event e;
63 while ((e = eventQueue.poll()) != null) {
64 fireEvent(e);
65 }
66 }
67
68 private void fireEvent(Event e) {
69 VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
70 EventType type = e.getType();
71 Object data = e.getData();
72
73 if (type == EventType.RECEIVED) {
74 if( sessionOpened && session.getTrafficMask().isReadable() && session.getLock().tryLock()) {
75 try {
76 int byteCount = 1;
77 if (data instanceof ByteBuffer) {
78 byteCount = ((ByteBuffer) data).remaining();
79 }
80
81 session.increaseReadBytes(byteCount);
82
83 super.fireMessageReceived(session, data);
84 } finally {
85 session.getLock().unlock();
86 }
87
88 flushPendingDataQueues( session );
89 } else {
90 session.pendingDataQueue.add(data);
91 }
92 } else if (type == EventType.WRITE) {
93 super.fireFilterWrite(session, (WriteRequest) data);
94 } else if (type == EventType.SENT) {
95 super.fireMessageSent(session, (WriteRequest) data);
96 } else if (type == EventType.EXCEPTION) {
97 super.fireExceptionCaught(session, (Throwable) data);
98 } else if (type == EventType.IDLE) {
99 super.fireSessionIdle(session, (IdleStatus) data);
100 } else if (type == EventType.OPENED) {
101 super.fireSessionOpened(session);
102 sessionOpened = true;
103 } else if (type == EventType.CREATED) {
104 session.getLock().lock();
105 try {
106 super.fireSessionCreated(session);
107 } finally {
108 session.getLock().unlock();
109 }
110 } else if (type == EventType.CLOSED) {
111 super.fireSessionClosed(session);
112 } else if (type == EventType.CLOSE) {
113 super.fireFilterClose(session);
114 }
115 }
116
117 private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
118 s.updateTrafficMask();
119 s.getRemoteSession().updateTrafficMask();
120 }
121
122 @Override
123 public void fireFilterClose(IoSession session) {
124 pushEvent(new Event(EventType.CLOSE, null));
125 }
126
127 @Override
128 public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
129 pushEvent(new Event(EventType.WRITE, writeRequest));
130 }
131
132 @Override
133 public void fireExceptionCaught(IoSession session, Throwable cause) {
134 pushEvent(new Event(EventType.EXCEPTION, cause));
135 }
136
137 @Override
138 public void fireMessageSent(IoSession session, WriteRequest request) {
139 pushEvent(new Event(EventType.SENT, request));
140 }
141
142 @Override
143 public void fireSessionClosed(IoSession session) {
144 pushEvent(new Event(EventType.CLOSED, null));
145 }
146
147 @Override
148 public void fireSessionCreated(IoSession session) {
149 pushEvent(new Event(EventType.CREATED, null));
150 }
151
152 @Override
153 public void fireSessionIdle(IoSession session, IdleStatus status) {
154 pushEvent(new Event(EventType.IDLE, status));
155 }
156
157 @Override
158 public void fireSessionOpened(IoSession session) {
159 pushEvent(new Event(EventType.OPENED, null));
160 }
161
162 @Override
163 public void fireMessageReceived(IoSession session, Object message) {
164 pushEvent(new Event(EventType.RECEIVED, message));
165 }
166
167 @Override
168 protected void doWrite(IoSession session, WriteRequest writeRequest) {
169 VmPipeSessionImpl s = (VmPipeSessionImpl) session;
170 if (s.isConnected()) {
171 if ( s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
172 try {
173 Object message = writeRequest.getMessage();
174
175 int byteCount = 1;
176 Object messageCopy = message;
177 if (message instanceof ByteBuffer) {
178 ByteBuffer rb = (ByteBuffer) message;
179 rb.mark();
180 byteCount = rb.remaining();
181 ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
182 wb.put(rb);
183 wb.flip();
184 rb.reset();
185 messageCopy = wb;
186 }
187
188
189
190 s.increaseScheduledWriteBytes(byteCount);
191 s.increaseScheduledWriteRequests();
192
193 s.increaseWrittenBytes(byteCount);
194 s.increaseWrittenMessages();
195
196 s.getRemoteSession().getFilterChain().fireMessageReceived(
197 s.getRemoteSession(), messageCopy);
198 s.getFilterChain().fireMessageSent(s, writeRequest);
199 } finally {
200 s.getLock().unlock();
201 }
202
203 flushPendingDataQueues( s );
204 } else {
205 s.pendingDataQueue.add(writeRequest);
206 }
207 } else {
208 writeRequest.getFuture().setWritten(false);
209 }
210 }
211
212 @Override
213 protected void doClose(IoSession session) {
214 VmPipeSessionImpl s = (VmPipeSessionImpl) session;
215
216 try {
217 s.getLock().lock();
218
219 if (!session.getCloseFuture().isClosed()) {
220 s.getServiceListeners().fireSessionDestroyed(s);
221 s.getRemoteSession().close();
222 }
223 } finally {
224 s.getLock().unlock();
225 }
226 }
227
228
229 private static class EventType {
230 public static final EventType CREATED = new EventType("CREATED");
231
232 public static final EventType OPENED = new EventType("OPENED");
233
234 public static final EventType CLOSED = new EventType("CLOSED");
235
236 public static final EventType RECEIVED = new EventType("RECEIVED");
237
238 public static final EventType SENT = new EventType("SENT");
239
240 public static final EventType IDLE = new EventType("IDLE");
241
242 public static final EventType EXCEPTION = new EventType("EXCEPTION");
243
244 public static final EventType WRITE = new EventType("WRITE");
245
246 public static final EventType CLOSE = new EventType("CLOSE");
247
248 private final String value;
249
250 private EventType(String value) {
251 this.value = value;
252 }
253
254 @Override
255 public String toString() {
256 return value;
257 }
258 }
259
260 private static class Event {
261 private final EventType type;
262
263 private final Object data;
264
265 private Event(EventType type, Object data) {
266 this.type = type;
267 this.data = data;
268 }
269
270 public Object getData() {
271 return data;
272 }
273
274 public EventType getType() {
275 return type;
276 }
277 }
278 }