View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.SocketAddress;
23  import java.net.SocketException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  
29  import org.apache.mina.common.IoFilter.WriteRequest;
30  import org.apache.mina.common.IoFilterChain;
31  import org.apache.mina.common.IoHandler;
32  import org.apache.mina.common.IoService;
33  import org.apache.mina.common.IoServiceConfig;
34  import org.apache.mina.common.IoSession;
35  import org.apache.mina.common.IoSessionConfig;
36  import org.apache.mina.common.RuntimeIOException;
37  import org.apache.mina.common.TransportType;
38  import org.apache.mina.common.ByteBuffer;
39  import org.apache.mina.common.support.BaseIoSession;
40  import org.apache.mina.common.support.BaseIoSessionConfig;
41  import org.apache.mina.common.support.IoServiceListenerSupport;
42  
43  /**
44   * An {@link IoSession} for socket transport (TCP/IP).
45   *
46   * @author The Apache Directory Project (mina-dev@directory.apache.org)
47   * @version $Rev: 556539 $, $Date: 2007-07-16 16:48:36 +0900 (월, 16  7월 2007) $
48   */
49  class SocketSessionImpl extends BaseIoSession {
50      private final IoService manager;
51  
52      private final IoServiceConfig serviceConfig;
53  
54      private final SocketSessionConfig config = new SessionConfigImpl();
55  
56      private final SocketIoProcessor ioProcessor;
57  
58      private final SocketFilterChain filterChain;
59  
60      private final SocketChannel ch;
61  
62      private final Queue<WriteRequest> writeRequestQueue;
63  
64      private final IoHandler handler;
65  
66      private final SocketAddress remoteAddress;
67  
68      private final SocketAddress localAddress;
69  
70      private final SocketAddress serviceAddress;
71  
72      private final IoServiceListenerSupport serviceListeners;
73  
74      private SelectionKey key;
75  
76      private int readBufferSize = 1024;
77  
78      /**
79       * Creates a new instance.
80       */
81      SocketSessionImpl(IoService manager, SocketIoProcessor ioProcessor,
82              IoServiceListenerSupport listeners, IoServiceConfig serviceConfig,
83              SocketChannel ch, IoHandler defaultHandler,
84              SocketAddress serviceAddress) {
85          this.manager = manager;
86          this.serviceListeners = listeners;
87          this.ioProcessor = ioProcessor;
88          this.filterChain = new SocketFilterChain(this);
89          this.ch = ch;
90          this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
91          this.handler = defaultHandler;
92          this.remoteAddress = ch.socket().getRemoteSocketAddress();
93          this.localAddress = ch.socket().getLocalSocketAddress();
94          this.serviceAddress = serviceAddress;
95          this.serviceConfig = serviceConfig;
96  
97          // Apply the initial session settings
98          IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99          if (sessionConfig instanceof SocketSessionConfig) {
100             SocketSessionConfig cfg = (SocketSessionConfig) sessionConfig;
101             this.config.setKeepAlive(cfg.isKeepAlive());
102             this.config.setOobInline(cfg.isOobInline());
103             this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
104             this.config.setReuseAddress(cfg.isReuseAddress());
105             this.config.setSendBufferSize(cfg.getSendBufferSize());
106             this.config.setSoLinger(cfg.getSoLinger());
107             this.config.setTcpNoDelay(cfg.isTcpNoDelay());
108 
109             if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
110                 this.config.setTrafficClass(cfg.getTrafficClass());
111             }
112         }
113     }
114 
115     public IoService getService() {
116         return manager;
117     }
118 
119     public IoServiceConfig getServiceConfig() {
120         return serviceConfig;
121     }
122 
123     public IoSessionConfig getConfig() {
124         return config;
125     }
126 
127     SocketIoProcessor getIoProcessor() {
128         return ioProcessor;
129     }
130 
131     public IoFilterChain getFilterChain() {
132         return filterChain;
133     }
134 
135     SocketChannel getChannel() {
136         return ch;
137     }
138 
139     IoServiceListenerSupport getServiceListeners() {
140         return serviceListeners;
141     }
142 
143     SelectionKey getSelectionKey() {
144         return key;
145     }
146 
147     void setSelectionKey(SelectionKey key) {
148         this.key = key;
149     }
150 
151     public IoHandler getHandler() {
152         return handler;
153     }
154 
155     @Override
156     protected void close0() {
157         filterChain.fireFilterClose(this);
158     }
159 
160     Queue<WriteRequest> getWriteRequestQueue() {
161         return writeRequestQueue;
162     }
163 
164     public int getScheduledWriteRequests() {
165         return writeRequestQueue.size();
166     }
167 
168     /**
169      * Returns the sum of the '<tt>remaining</tt>' of all {@link ByteBuffer}s
170      * in the writeRequestQueue queue.
171      *
172      * @throws ClassCastException if an element is not a {@link ByteBuffer}
173      */
174     public int getScheduledWriteBytes() {
175         int byteSize = 0;
176 
177         for (WriteRequest request : writeRequestQueue) {
178             byteSize += ((ByteBuffer) request.getMessage()).remaining();
179         }
180 
181         return byteSize;
182     }
183 
184     @Override
185     protected void write0(WriteRequest writeRequest) {
186         filterChain.fireFilterWrite(this, writeRequest);
187     }
188 
189     public TransportType getTransportType() {
190         return TransportType.SOCKET;
191     }
192 
193     public SocketAddress getRemoteAddress() {
194         return remoteAddress;
195     }
196 
197     public SocketAddress getLocalAddress() {
198         return localAddress;
199     }
200 
201     public SocketAddress getServiceAddress() {
202         return serviceAddress;
203     }
204 
205     @Override
206     protected void updateTrafficMask() {
207         this.ioProcessor.updateTrafficMask(this);
208     }
209 
210     int getReadBufferSize() {
211         return readBufferSize;
212     }
213     
214     void setReadBufferSize(int readBufferSize) {
215         this.readBufferSize = readBufferSize;
216     }
217 
218     private class SessionConfigImpl extends BaseIoSessionConfig implements
219             SocketSessionConfig {
220         public boolean isKeepAlive() {
221             try {
222                 return ch.socket().getKeepAlive();
223             } catch (SocketException e) {
224                 throw new RuntimeIOException(e);
225             }
226         }
227 
228         public void setKeepAlive(boolean on) {
229             try {
230                 ch.socket().setKeepAlive(on);
231             } catch (SocketException e) {
232                 throw new RuntimeIOException(e);
233             }
234         }
235 
236         public boolean isOobInline() {
237             try {
238                 return ch.socket().getOOBInline();
239             } catch (SocketException e) {
240                 throw new RuntimeIOException(e);
241             }
242         }
243 
244         public void setOobInline(boolean on) {
245             try {
246                 ch.socket().setOOBInline(on);
247             } catch (SocketException e) {
248                 throw new RuntimeIOException(e);
249             }
250         }
251 
252         public boolean isReuseAddress() {
253             try {
254                 return ch.socket().getReuseAddress();
255             } catch (SocketException e) {
256                 throw new RuntimeIOException(e);
257             }
258         }
259 
260         public void setReuseAddress(boolean on) {
261             try {
262                 ch.socket().setReuseAddress(on);
263             } catch (SocketException e) {
264                 throw new RuntimeIOException(e);
265             }
266         }
267 
268         public int getSoLinger() {
269             try {
270                 return ch.socket().getSoLinger();
271             } catch (SocketException e) {
272                 throw new RuntimeIOException(e);
273             }
274         }
275 
276         public void setSoLinger(int linger) {
277             try {
278                 if (linger < 0) {
279                     ch.socket().setSoLinger(false, 0);
280                 } else {
281                     ch.socket().setSoLinger(true, linger);
282                 }
283             } catch (SocketException e) {
284                 throw new RuntimeIOException(e);
285             }
286         }
287 
288         public boolean isTcpNoDelay() {
289             try {
290                 return ch.socket().getTcpNoDelay();
291             } catch (SocketException e) {
292                 throw new RuntimeIOException(e);
293             }
294         }
295 
296         public void setTcpNoDelay(boolean on) {
297             try {
298                 ch.socket().setTcpNoDelay(on);
299             } catch (SocketException e) {
300                 throw new RuntimeIOException(e);
301             }
302         }
303 
304         public int getTrafficClass() {
305             if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) {
306                 try {
307                     return ch.socket().getTrafficClass();
308                 } catch (SocketException e) {
309                     // Throw an exception only when setTrafficClass is also available.
310                     if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
311                         throw new RuntimeIOException(e);
312                     }
313                 }
314             }
315 
316             return 0;
317         }
318 
319         public void setTrafficClass(int tc) {
320             if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
321                 try {
322                     ch.socket().setTrafficClass(tc);
323                 } catch (SocketException e) {
324                     throw new RuntimeIOException(e);
325                 }
326             }
327         }
328 
329         public int getSendBufferSize() {
330             try {
331                 return ch.socket().getSendBufferSize();
332             } catch (SocketException e) {
333                 throw new RuntimeIOException(e);
334             }
335         }
336 
337         public void setSendBufferSize(int size) {
338             if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) {
339                 try {
340                     ch.socket().setSendBufferSize(size);
341                 } catch (SocketException e) {
342                     throw new RuntimeIOException(e);
343                 }
344             }
345         }
346 
347         public int getReceiveBufferSize() {
348             try {
349                 return ch.socket().getReceiveBufferSize();
350             } catch (SocketException e) {
351                 throw new RuntimeIOException(e);
352             }
353         }
354 
355         public void setReceiveBufferSize(int size) {
356             if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
357                 try {
358                     ch.socket().setReceiveBufferSize(size);
359                 } catch (SocketException e) {
360                     throw new RuntimeIOException(e);
361                 }
362             }
363         }
364     }
365 }