View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.SocketAddress;
23  import java.net.SocketException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  
29  import org.apache.mina.common.IoFilter.WriteRequest;
30  import org.apache.mina.common.IoFilterChain;
31  import org.apache.mina.common.IoHandler;
32  import org.apache.mina.common.IoService;
33  import org.apache.mina.common.IoServiceConfig;
34  import org.apache.mina.common.IoSession;
35  import org.apache.mina.common.IoSessionConfig;
36  import org.apache.mina.common.RuntimeIOException;
37  import org.apache.mina.common.TransportType;
38  import org.apache.mina.common.ByteBuffer;
39  import org.apache.mina.common.support.BaseIoSession;
40  import org.apache.mina.common.support.BaseIoSessionConfig;
41  import org.apache.mina.common.support.IoServiceListenerSupport;
42  
43  /**
44   * An {@link IoSession} for socket transport (TCP/IP).
45   *
46   * @author The Apache Directory Project (mina-dev@directory.apache.org)
47   * @version $Rev: 561242 $, $Date: 2007-07-31 15:43:29 +0900 (화, 31  7월 2007) $
48   */
49  class SocketSessionImpl extends BaseIoSession {
50      private final IoService manager;
51  
52      private final IoServiceConfig serviceConfig;
53  
54      private final SocketSessionConfig config = new SessionConfigImpl();
55  
56      private final SocketIoProcessor ioProcessor;
57  
58      private final SocketFilterChain filterChain;
59  
60      private final SocketChannel ch;
61  
62      private final Queue<WriteRequest> writeRequestQueue;
63  
64      private final IoHandler handler;
65  
66      private final SocketAddress remoteAddress;
67  
68      private final SocketAddress localAddress;
69  
70      private final SocketAddress serviceAddress;
71  
72      private final IoServiceListenerSupport serviceListeners;
73  
74      private SelectionKey key;
75  
76      private int readBufferSize = 1024;
77  
78      /**
79       * Creates a new instance.
80       */
81      SocketSessionImpl(IoService manager, SocketIoProcessor ioProcessor,
82              IoServiceListenerSupport listeners, IoServiceConfig serviceConfig,
83              SocketChannel ch, IoHandler defaultHandler,
84              SocketAddress serviceAddress) {
85          this.manager = manager;
86          this.serviceListeners = listeners;
87          this.ioProcessor = ioProcessor;
88          this.filterChain = new SocketFilterChain(this);
89          this.ch = ch;
90          this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
91          this.handler = defaultHandler;
92          this.remoteAddress = ch.socket().getRemoteSocketAddress();
93          this.localAddress = ch.socket().getLocalSocketAddress();
94          this.serviceAddress = serviceAddress;
95          this.serviceConfig = serviceConfig;
96  
97          // Apply the initial session settings
98          IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99          if (sessionConfig instanceof SocketSessionConfig) {
100             SocketSessionConfig cfg = (SocketSessionConfig) sessionConfig;
101             this.config.setKeepAlive(cfg.isKeepAlive());
102             this.config.setOobInline(cfg.isOobInline());
103             this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
104             this.config.setReuseAddress(cfg.isReuseAddress());
105             this.config.setSendBufferSize(cfg.getSendBufferSize());
106             this.config.setSoLinger(cfg.getSoLinger());
107             this.config.setTcpNoDelay(cfg.isTcpNoDelay());
108 
109             if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
110                 this.config.setTrafficClass(cfg.getTrafficClass());
111             }
112         }
113     }
114 
115     public IoService getService() {
116         return manager;
117     }
118 
119     public IoServiceConfig getServiceConfig() {
120         return serviceConfig;
121     }
122 
123     public IoSessionConfig getConfig() {
124         return config;
125     }
126 
127     SocketIoProcessor getIoProcessor() {
128         return ioProcessor;
129     }
130 
131     public IoFilterChain getFilterChain() {
132         return filterChain;
133     }
134 
135     SocketChannel getChannel() {
136         return ch;
137     }
138 
139     IoServiceListenerSupport getServiceListeners() {
140         return serviceListeners;
141     }
142 
143     SelectionKey getSelectionKey() {
144         return key;
145     }
146 
147     void setSelectionKey(SelectionKey key) {
148         this.key = key;
149     }
150 
151     public IoHandler getHandler() {
152         return handler;
153     }
154 
155     @Override
156     protected void close0() {
157         filterChain.fireFilterClose(this);
158     }
159 
160     Queue<WriteRequest> getWriteRequestQueue() {
161         return writeRequestQueue;
162     }
163 
164     public int getScheduledWriteRequests() {
165         int size = 0;
166         for (WriteRequest request : writeRequestQueue) {
167             Object message = request.getMessage();
168             if (message instanceof ByteBuffer) {
169                 if (((ByteBuffer) message).hasRemaining()) {
170                     size ++;
171                 }
172             } else {
173                 size ++;
174             }
175         }
176 
177         return size;
178     }
179 
180     /**
181      * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
182      * in the writeRequestQueue queue.
183      *
184      * @throws ClassCastException if an element is not a {@link ByteBuffer}
185      */
186     public int getScheduledWriteBytes() {
187         int byteSize = 0;
188 
189         for (WriteRequest request : writeRequestQueue) {
190             byteSize += ((ByteBuffer) request.getMessage()).remaining();
191         }
192 
193         return byteSize;
194     }
195 
196     @Override
197     protected void write0(WriteRequest writeRequest) {
198         filterChain.fireFilterWrite(this, writeRequest);
199     }
200 
201     public TransportType getTransportType() {
202         return TransportType.SOCKET;
203     }
204 
205     public SocketAddress getRemoteAddress() {
206         return remoteAddress;
207     }
208 
209     public SocketAddress getLocalAddress() {
210         return localAddress;
211     }
212 
213     public SocketAddress getServiceAddress() {
214         return serviceAddress;
215     }
216 
217     @Override
218     protected void updateTrafficMask() {
219         this.ioProcessor.updateTrafficMask(this);
220     }
221 
222     int getReadBufferSize() {
223         return readBufferSize;
224     }
225     
226     void setReadBufferSize(int readBufferSize) {
227         this.readBufferSize = readBufferSize;
228     }
229 
230     private class SessionConfigImpl extends BaseIoSessionConfig implements
231             SocketSessionConfig {
232         public boolean isKeepAlive() {
233             try {
234                 return ch.socket().getKeepAlive();
235             } catch (SocketException e) {
236                 throw new RuntimeIOException(e);
237             }
238         }
239 
240         public void setKeepAlive(boolean on) {
241             try {
242                 ch.socket().setKeepAlive(on);
243             } catch (SocketException e) {
244                 throw new RuntimeIOException(e);
245             }
246         }
247 
248         public boolean isOobInline() {
249             try {
250                 return ch.socket().getOOBInline();
251             } catch (SocketException e) {
252                 throw new RuntimeIOException(e);
253             }
254         }
255 
256         public void setOobInline(boolean on) {
257             try {
258                 ch.socket().setOOBInline(on);
259             } catch (SocketException e) {
260                 throw new RuntimeIOException(e);
261             }
262         }
263 
264         public boolean isReuseAddress() {
265             try {
266                 return ch.socket().getReuseAddress();
267             } catch (SocketException e) {
268                 throw new RuntimeIOException(e);
269             }
270         }
271 
272         public void setReuseAddress(boolean on) {
273             try {
274                 ch.socket().setReuseAddress(on);
275             } catch (SocketException e) {
276                 throw new RuntimeIOException(e);
277             }
278         }
279 
280         public int getSoLinger() {
281             try {
282                 return ch.socket().getSoLinger();
283             } catch (SocketException e) {
284                 throw new RuntimeIOException(e);
285             }
286         }
287 
288         public void setSoLinger(int linger) {
289             try {
290                 if (linger < 0) {
291                     ch.socket().setSoLinger(false, 0);
292                 } else {
293                     ch.socket().setSoLinger(true, linger);
294                 }
295             } catch (SocketException e) {
296                 throw new RuntimeIOException(e);
297             }
298         }
299 
300         public boolean isTcpNoDelay() {
301             try {
302                 return ch.socket().getTcpNoDelay();
303             } catch (SocketException e) {
304                 throw new RuntimeIOException(e);
305             }
306         }
307 
308         public void setTcpNoDelay(boolean on) {
309             try {
310                 ch.socket().setTcpNoDelay(on);
311             } catch (SocketException e) {
312                 throw new RuntimeIOException(e);
313             }
314         }
315 
316         public int getTrafficClass() {
317             if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) {
318                 try {
319                     return ch.socket().getTrafficClass();
320                 } catch (SocketException e) {
321                     // Throw an exception only when setTrafficClass is also available.
322                     if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
323                         throw new RuntimeIOException(e);
324                     }
325                 }
326             }
327 
328             return 0;
329         }
330 
331         public void setTrafficClass(int tc) {
332             if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
333                 try {
334                     ch.socket().setTrafficClass(tc);
335                 } catch (SocketException e) {
336                     throw new RuntimeIOException(e);
337                 }
338             }
339         }
340 
341         public int getSendBufferSize() {
342             try {
343                 return ch.socket().getSendBufferSize();
344             } catch (SocketException e) {
345                 throw new RuntimeIOException(e);
346             }
347         }
348 
349         public void setSendBufferSize(int size) {
350             if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) {
351                 try {
352                     ch.socket().setSendBufferSize(size);
353                 } catch (SocketException e) {
354                     throw new RuntimeIOException(e);
355                 }
356             }
357         }
358 
359         public int getReceiveBufferSize() {
360             try {
361                 return ch.socket().getReceiveBufferSize();
362             } catch (SocketException e) {
363                 throw new RuntimeIOException(e);
364             }
365         }
366 
367         public void setReceiveBufferSize(int size) {
368             if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
369                 try {
370                     ch.socket().setReceiveBufferSize(size);
371                 } catch (SocketException e) {
372                     throw new RuntimeIOException(e);
373                 }
374             }
375         }
376     }
377 }