1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.vmpipe;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Queue;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26
27 import org.apache.mina.common.AbstractIoSession;
28 import org.apache.mina.common.DefaultIoFilterChain;
29 import org.apache.mina.common.IdleStatus;
30 import org.apache.mina.common.IoBuffer;
31 import org.apache.mina.common.IoEvent;
32 import org.apache.mina.common.IoEventType;
33 import org.apache.mina.common.IoProcessor;
34 import org.apache.mina.common.IoSession;
35 import org.apache.mina.common.WriteRequest;
36 import org.apache.mina.common.WriteRequestQueue;
37 import org.apache.mina.common.WriteToClosedSessionException;
38
39
40
41
42
43 class VmPipeFilterChain extends DefaultIoFilterChain {
44
45 private final Queue<IoEvent> eventQueue = new ConcurrentLinkedQueue<IoEvent>();
46 private final IoProcessor<VmPipeSessionImpl> processor = new VmPipeIoProcessor();
47
48 private volatile boolean flushEnabled;
49 private volatile boolean sessionOpened;
50
51 VmPipeFilterChain(AbstractIoSession session) {
52 super(session);
53 }
54
55 IoProcessor<VmPipeSessionImpl> getProcessor() {
56 return processor;
57 }
58
59 public void start() {
60 flushEnabled = true;
61 flushEvents();
62 flushPendingDataQueues((VmPipeSessionImpl) getSession());
63 }
64
65 private void pushEvent(IoEvent e) {
66 eventQueue.add(e);
67 if (flushEnabled) {
68 flushEvents();
69 }
70 }
71
72 private void flushEvents() {
73 IoEvent e;
74 while ((e = eventQueue.poll()) != null) {
75 fireEvent(e);
76 }
77 }
78
79 private void fireEvent(IoEvent e) {
80 IoSession session = getSession();
81 IoEventType type = e.getType();
82 Object data = e.getParameter();
83
84 if (type == IoEventType.MESSAGE_RECEIVED) {
85 VmPipeSessionImpl s = (VmPipeSessionImpl) session;
86 if (sessionOpened && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
87 try {
88 if (!s.getTrafficMask().isReadable()) {
89 s.receivedMessageQueue.add(data);
90 } else {
91 super.fireMessageReceived(data);
92 }
93 } finally {
94 s.getLock().unlock();
95 }
96 } else {
97 s.receivedMessageQueue.add(data);
98 }
99 } else if (type == IoEventType.WRITE) {
100 super.fireFilterWrite((WriteRequest) data);
101 } else if (type == IoEventType.MESSAGE_SENT) {
102 super.fireMessageSent((WriteRequest) data);
103 } else if (type == IoEventType.EXCEPTION_CAUGHT) {
104 super.fireExceptionCaught((Throwable) data);
105 } else if (type == IoEventType.SESSION_IDLE) {
106 super.fireSessionIdle((IdleStatus) data);
107 } else if (type == IoEventType.SESSION_OPENED) {
108 super.fireSessionOpened();
109 sessionOpened = true;
110 } else if (type == IoEventType.SESSION_CREATED) {
111 super.fireSessionCreated();
112 } else if (type == IoEventType.SESSION_CLOSED) {
113 super.fireSessionClosed();
114 } else if (type == IoEventType.CLOSE) {
115 super.fireFilterClose();
116 }
117 }
118
119 private static void flushPendingDataQueues(VmPipeSessionImpl s) {
120 s.getProcessor().updateTrafficMask(s);
121 s.getRemoteSession().getProcessor().updateTrafficMask(s);
122 }
123
124 @Override
125 public void fireFilterClose() {
126 pushEvent(new IoEvent(IoEventType.CLOSE, getSession(), null));
127 }
128
129 @Override
130 public void fireFilterWrite(WriteRequest writeRequest) {
131 pushEvent(new IoEvent(IoEventType.WRITE, getSession(), writeRequest));
132 }
133
134 @Override
135 public void fireExceptionCaught(Throwable cause) {
136 pushEvent(new IoEvent(IoEventType.EXCEPTION_CAUGHT, getSession(), cause));
137 }
138
139 @Override
140 public void fireMessageSent(WriteRequest request) {
141 pushEvent(new IoEvent(IoEventType.MESSAGE_SENT, getSession(), request));
142 }
143
144 @Override
145 public void fireSessionClosed() {
146 pushEvent(new IoEvent(IoEventType.SESSION_CLOSED, getSession(), null));
147 }
148
149 @Override
150 public void fireSessionCreated() {
151 pushEvent(new IoEvent(IoEventType.SESSION_CREATED, getSession(), null));
152 }
153
154 @Override
155 public void fireSessionIdle(IdleStatus status) {
156 pushEvent(new IoEvent(IoEventType.SESSION_IDLE, getSession(), status));
157 }
158
159 @Override
160 public void fireSessionOpened() {
161 pushEvent(new IoEvent(IoEventType.SESSION_OPENED, getSession(), null));
162 }
163
164 @Override
165 public void fireMessageReceived(Object message) {
166 pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(), message));
167 }
168
169 private class VmPipeIoProcessor implements IoProcessor<VmPipeSessionImpl> {
170 public void flush(VmPipeSessionImpl session) {
171 WriteRequestQueue queue = session.getWriteRequestQueue0();
172 if (queue.isEmpty(session)) {
173 return;
174 }
175 if (session.isConnected()) {
176 if (session.getLock().tryLock()) {
177 try {
178 WriteRequest req;
179 while ((req = queue.poll(session)) != null) {
180 Object message = req.getMessage();
181 Object messageCopy = message;
182 if (message instanceof IoBuffer) {
183 IoBuffer rb = (IoBuffer) message;
184 rb.mark();
185 IoBuffer wb = IoBuffer.allocate(rb.remaining());
186 wb.put(rb);
187 wb.flip();
188 rb.reset();
189 messageCopy = wb;
190 }
191
192 session.getRemoteSession().getFilterChain().fireMessageReceived(
193 messageCopy);
194 session.getFilterChain().fireMessageSent(req);
195 }
196 } finally {
197 session.getLock().unlock();
198 }
199
200 flushPendingDataQueues(session);
201 }
202 } else {
203 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
204 WriteRequest req;
205 while ((req = queue.poll(session)) != null) {
206 failedRequests.add(req);
207 }
208
209 if (!failedRequests.isEmpty()) {
210 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
211 for (WriteRequest r: failedRequests) {
212 r.getFuture().setException(cause);
213 }
214 session.getFilterChain().fireExceptionCaught(cause);
215 }
216 }
217 }
218
219 public void remove(VmPipeSessionImpl session) {
220 try {
221 session.getLock().lock();
222 if (!session.getCloseFuture().isClosed()) {
223 session.getServiceListeners().fireSessionDestroyed(session);
224 session.getRemoteSession().close();
225 }
226 } finally {
227 session.getLock().unlock();
228 }
229 }
230
231 public void add(VmPipeSessionImpl session) {
232 }
233
234 public void updateTrafficMask(VmPipeSessionImpl session) {
235 if (session.getTrafficMask().isReadable()) {
236 List<Object> data = new ArrayList<Object>();
237 session.receivedMessageQueue.drainTo(data);
238 for (Object aData : data) {
239 VmPipeFilterChain.this.fireMessageReceived(aData);
240 }
241 }
242
243 if (session.getTrafficMask().isWritable()) {
244 flush(session);
245 }
246 }
247
248 public void dispose() {
249 }
250
251 public boolean isDisposed() {
252 return false;
253 }
254
255 public boolean isDisposing() {
256 return false;
257 }
258 }
259 }