View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.vmpipe.support;
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.IdleStatus;
24  import org.apache.mina.common.IoSession;
25  import org.apache.mina.common.IoFilter.WriteRequest;
26  import org.apache.mina.common.support.AbstractIoFilterChain;
27  
28  import edu.emory.mathcs.backport.java.util.Queue;
29  import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
30  
31  /**
32   * @author The Apache Directory Project (mina-dev@directory.apache.org)
33   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (금, 13  7월 2007) $
34   */
35  public class VmPipeFilterChain extends AbstractIoFilterChain {
36  
37      private final Queue eventQueue = new ConcurrentLinkedQueue();
38  
39      private boolean flushEnabled;
40  
41      public VmPipeFilterChain(IoSession session) {
42          super(session);
43      }
44  
45      public void start() {
46          flushEnabled = true;
47          flushEvents();
48      }
49  
50      private void pushEvent(Event e) {
51          eventQueue.offer(e);
52          if (flushEnabled) {
53              flushEvents();
54          }
55      }
56  
57      private void flushEvents() {
58          Event e;
59          while ((e = (Event) eventQueue.poll()) != null) {
60              fireEvent(e);
61          }
62      }
63  
64      private void fireEvent(Event e) {
65          IoSession session = getSession();
66          EventType type = e.getType();
67          Object data = e.getData();
68  
69          if (type == EventType.RECEIVED) {
70              VmPipeSessionImpl s = (VmPipeSessionImpl) session;
71              synchronized (s.lock) {
72                  if (!s.getTrafficMask().isReadable()) {
73                      synchronized (s.pendingDataQueue) {
74                          s.pendingDataQueue.push(data);
75                      }
76                  } else {
77                      int byteCount = 1;
78                      if (data instanceof ByteBuffer) {
79                          byteCount = ((ByteBuffer) data).remaining();
80                      }
81  
82                      s.increaseReadBytes(byteCount);
83  
84                      super.fireMessageReceived(s, data);
85                  }
86              }
87          } else if (type == EventType.WRITE) {
88              super.fireFilterWrite(session, (WriteRequest) data);
89          } else if (type == EventType.SENT) {
90              super.fireMessageSent(session, (WriteRequest) data);
91          } else if (type == EventType.EXCEPTION) {
92              super.fireExceptionCaught(session, (Throwable) data);
93          } else if (type == EventType.IDLE) {
94              super.fireSessionIdle(session, (IdleStatus) data);
95          } else if (type == EventType.OPENED) {
96              super.fireSessionOpened(session);
97          } else if (type == EventType.CREATED) {
98              super.fireSessionCreated(session);
99          } else if (type == EventType.CLOSED) {
100             super.fireSessionClosed(session);
101         } else if (type == EventType.CLOSE) {
102             super.fireFilterClose(session);
103         }
104     }
105 
106     public void fireFilterClose(IoSession session) {
107         pushEvent(new Event(EventType.CLOSE, null));
108     }
109 
110     public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
111         pushEvent(new Event(EventType.WRITE, writeRequest));
112     }
113 
114     public void fireExceptionCaught(IoSession session, Throwable cause) {
115         pushEvent(new Event(EventType.EXCEPTION, cause));
116     }
117 
118     public void fireMessageSent(IoSession session, WriteRequest request) {
119         pushEvent(new Event(EventType.SENT, request));
120     }
121 
122     public void fireSessionClosed(IoSession session) {
123         pushEvent(new Event(EventType.CLOSED, null));
124     }
125 
126     public void fireSessionCreated(IoSession session) {
127         pushEvent(new Event(EventType.CREATED, null));
128     }
129 
130     public void fireSessionIdle(IoSession session, IdleStatus status) {
131         pushEvent(new Event(EventType.IDLE, status));
132     }
133 
134     public void fireSessionOpened(IoSession session) {
135         pushEvent(new Event(EventType.OPENED, null));
136     }
137 
138     public void fireMessageReceived(IoSession session, Object message) {
139         pushEvent(new Event(EventType.RECEIVED, message));
140     }
141 
142     protected void doWrite(IoSession session, WriteRequest writeRequest) {
143         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
144         synchronized (s.lock) {
145             if (s.isConnected()) {
146 
147                 if (!s.getTrafficMask().isWritable()) {
148                     synchronized (s.pendingDataQueue) {
149                         s.pendingDataQueue.push(writeRequest);
150                     }
151                 } else {
152                     Object message = writeRequest.getMessage();
153 
154                     int byteCount = 1;
155                     Object messageCopy = message;
156                     if (message instanceof ByteBuffer) {
157                         ByteBuffer rb = (ByteBuffer) message;
158                         rb.mark();
159                         byteCount = rb.remaining();
160                         ByteBuffer wb = ByteBuffer.allocate(rb.remaining());
161                         wb.put(rb);
162                         wb.flip();
163                         rb.reset();
164                         messageCopy = wb;
165                     }
166 
167                     s.increaseWrittenBytes(byteCount);
168                     s.increaseWrittenMessages();
169 
170                     s.getRemoteSession().getFilterChain().fireMessageReceived(
171                             s.getRemoteSession(), messageCopy);
172                     s.getFilterChain().fireMessageSent(s, writeRequest);
173                 }
174             } else {
175                 writeRequest.getFuture().setWritten(false);
176             }
177         }
178     }
179 
180     protected void doClose(IoSession session) {
181         VmPipeSessionImpl s = (VmPipeSessionImpl) session;
182         synchronized (s.lock) {
183             if (!session.getCloseFuture().isClosed()) {
184                 s.getServiceListeners().fireSessionDestroyed(s);
185                 s.getRemoteSession().close();
186             }
187         }
188     }
189 
190     // FIXME Copied and pasted from {@link ExecutorFilter}.
191     private static class EventType {
192         public static final EventType CREATED = new EventType("CREATED");
193 
194         public static final EventType OPENED = new EventType("OPENED");
195 
196         public static final EventType CLOSED = new EventType("CLOSED");
197 
198         public static final EventType RECEIVED = new EventType("RECEIVED");
199 
200         public static final EventType SENT = new EventType("SENT");
201 
202         public static final EventType IDLE = new EventType("IDLE");
203 
204         public static final EventType EXCEPTION = new EventType("EXCEPTION");
205 
206         public static final EventType WRITE = new EventType("WRITE");
207 
208         public static final EventType CLOSE = new EventType("CLOSE");
209 
210         private final String value;
211 
212         private EventType(String value) {
213             this.value = value;
214         }
215 
216         public String toString() {
217             return value;
218         }
219     }
220 
221     private static class Event {
222         private final EventType type;
223 
224         private final Object data;
225 
226         public Event(EventType type, Object data) {
227             this.type = type;
228             this.data = data;
229         }
230 
231         public Object getData() {
232             return data;
233         }
234 
235         public EventType getType() {
236             return type;
237         }
238     }
239 }