View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.vmpipe.support;
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.IdleStatus;
24  import org.apache.mina.common.IoSession;
25  import org.apache.mina.common.IoFilter.WriteRequest;
26  import org.apache.mina.common.support.AbstractIoFilterChain;
27  
28  import edu.emory.mathcs.backport.java.util.Queue;
29  import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
30  import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
31  
32  /**
33   * @author The Apache Directory Project (mina-dev@directory.apache.org)
34   * @version $Rev: 575603 $, $Date: 2007-09-14 12:04:45 +0200 (Fri, 14 Sep 2007) $
35   */
36  public class VmPipeFilterChain extends AbstractIoFilterChain {
37  
38      private final Queue eventQueue = new ConcurrentLinkedQueue();
39  
40      private final AtomicBoolean flushEnabled = new AtomicBoolean();
41      private final AtomicBoolean sessionOpened = new AtomicBoolean();
42  
43      public VmPipeFilterChain(IoSession session) {
44          super(session);
45      }
46  
47      public void start() {
48          flushEnabled.set(true);
49          flushEvents();
50          flushPendingDataQueues((VmPipeSessionImpl) getSession());
51      }
52  
53      private void pushEvent(Event e) {
54          eventQueue.offer(e);
55          if (flushEnabled.get()) {
56              flushEvents();
57          }
58      }
59  
60      private void flushEvents() {
61          Event e;
62          while ((e = (Event) eventQueue.poll()) != null) {
63              fireEvent(e);
64          }
65      }
66  
67      private void fireEvent(Event e) {
68          IoSession session = getSession();
69          EventType type = e.getType();
70          Object data = e.getData();
71  
72          if (type == EventType.RECEIVED) {
73              VmPipeSessionImpl s = (VmPipeSessionImpl) session;
74              if (sessionOpened.get() && s.getTrafficMask().isReadable() && s.getLock().tryLock()) {
75                  try {
76                      int byteCount = 1;
77                      if (data instanceof ByteBuffer) {
78                          byteCount = ((ByteBuffer) data).remaining();
79                      }
80  
81                      s.increaseReadBytes(byteCount);
82  
83                      super.fireMessageReceived(s, data);
84                  } finally {
85                      s.getLock().unlock();
86                  }
87                  
88                  flushPendingDataQueues(s);
89              } else {
90                  s.pendingDataQueue.add(data);
91              }
92          } else if (type == EventType.WRITE) {
93              super.fireFilterWrite(session, (WriteRequest) data);
94          } else if (type == EventType.SENT) {
95              super.fireMessageSent(session, (WriteRequest) data);
96          } else if (type == EventType.EXCEPTION) {
97              super.fireExceptionCaught(session, (Throwable) data);
98          } else if (type == EventType.IDLE) {
99              super.fireSessionIdle(session, (IdleStatus) data);
100         } else if (type == EventType.OPENED) {
101             super.fireSessionOpened(session);
102             sessionOpened.set(true);
103         } else if (type == EventType.CREATED) {
104             super.fireSessionCreated(session);
105         } else if (type == EventType.CLOSED) {
106             super.fireSessionClosed(session);
107         } else if (type == EventType.CLOSE) {
108             super.fireFilterClose(session);
109         }
110     }
111 
112     private static void flushPendingDataQueues( VmPipeSessionImpl s ) {
113         s.updateTrafficMask();
114         s.getRemoteSession().updateTrafficMask();
115     }
116     
117     public void fireFilterClose(IoSession session) {
118         pushEvent(new Event(EventType.CLOSE, null));
119     }
120 
121     public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
122         pushEvent(new Event(EventType.WRITE, writeRequest));
123     }
124 
125     public void fireExceptionCaught(IoSession session, Throwable cause) {
126         pushEvent(new Event(EventType.EXCEPTION, cause));
127     }
128 
129     public void fireMessageSent(IoSession session, WriteRequest request) {
130         pushEvent(new Event(EventType.SENT, request));
131     }
132 
133     public void fireSessionClosed(IoSession session) {
134         pushEvent(new Event(EventType.CLOSED, null));
135     }
136 
137     public void fireSessionCreated(IoSession session) {
138         pushEvent(new Event(EventType.CREATED, null));
139     }
140 
141     public void fireSessionIdle(IoSession session, IdleStatus status) {
142         pushEvent(new Event(EventType.IDLE, status));
143     }
144 
145     public void fireSessionOpened(IoSession session) {
146         pushEvent(new Event(EventType.OPENED, null));
147     }
148 
149     public void fireMessageReceived(IoSession session, Object message) {
150         pushEvent(new Event(EventType.RECEIVED, message));
151     }
152 
153     protected void doWrite(IoSession session, WriteRequest writeRequest) {
154         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
155         if (s.isConnected()) {
156             if (s.getTrafficMask().isWritable() && s.getLock().tryLock()) {
157                 try {
158                     Object message = writeRequest.getMessage();
159 
160                     int byteCount = 1;
161                     Object messageCopy = message;
162                     if (message instanceof ByteBuffer) {
163                         ByteBuffer rb = (ByteBuffer) message;
164                         rb.mark();
165                         byteCount = rb.remaining();
166                         ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
167                         wb.put(rb);
168                         wb.flip();
169                         rb.reset();
170                         messageCopy = wb;
171                     }
172 
173                     // Avoid unwanted side effect that scheduledWrite* becomes negative
174                     // by increasing them.
175                     s.increaseScheduledWriteBytes(byteCount);
176                     s.increaseScheduledWriteRequests();
177 
178                     s.increaseWrittenBytes(byteCount);
179                     s.increaseWrittenMessages();
180 
181                     s.getRemoteSession().getFilterChain().fireMessageReceived(
182                             s.getRemoteSession(), messageCopy);
183                     s.getFilterChain().fireMessageSent(s, writeRequest);
184                 } finally {
185                     s.getLock().unlock();
186                 }
187                 
188                 flushPendingDataQueues( s );
189             } else {
190                 s.pendingDataQueue.add(writeRequest);
191             }
192         } else {
193             writeRequest.getFuture().setWritten(false);
194         }
195     }
196 
197     protected void doClose(IoSession session) {
198         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
199         s.getLock().lock();
200         try {
201             if (!session.getCloseFuture().isClosed()) {
202                 s.getServiceListeners().fireSessionDestroyed(s);
203                 s.getRemoteSession().close();
204             }
205         } finally {
206             s.getLock().unlock();
207         }
208     }
209 
210     // FIXME Copied and pasted from {@link ExecutorFilter}.
211     private static class EventType {
212         public static final EventType CREATED = new EventType("CREATED");
213 
214         public static final EventType OPENED = new EventType("OPENED");
215 
216         public static final EventType CLOSED = new EventType("CLOSED");
217 
218         public static final EventType RECEIVED = new EventType("RECEIVED");
219 
220         public static final EventType SENT = new EventType("SENT");
221 
222         public static final EventType IDLE = new EventType("IDLE");
223 
224         public static final EventType EXCEPTION = new EventType("EXCEPTION");
225 
226         public static final EventType WRITE = new EventType("WRITE");
227 
228         public static final EventType CLOSE = new EventType("CLOSE");
229 
230         private final String value;
231 
232         private EventType(String value) {
233             this.value = value;
234         }
235 
236         public String toString() {
237             return value;
238         }
239     }
240 
241     private static class Event {
242         private final EventType type;
243 
244         private final Object data;
245 
246         private Event(EventType type, Object data) {
247             this.type = type;
248             this.data = data;
249         }
250 
251         public Object getData() {
252             return data;
253         }
254 
255         public EventType getType() {
256             return type;
257         }
258     }
259 }