View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.ConnectException;
24  import java.net.InetSocketAddress;
25  import java.net.SocketAddress;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.Iterator;
30  import java.util.Set;
31  
32  import org.apache.mina.common.ConnectFuture;
33  import org.apache.mina.common.ExceptionMonitor;
34  import org.apache.mina.common.IoConnector;
35  import org.apache.mina.common.IoConnectorConfig;
36  import org.apache.mina.common.IoHandler;
37  import org.apache.mina.common.IoServiceConfig;
38  import org.apache.mina.common.support.AbstractIoFilterChain;
39  import org.apache.mina.common.support.BaseIoConnector;
40  import org.apache.mina.common.support.DefaultConnectFuture;
41  import org.apache.mina.util.NamePreservingRunnable;
42  import org.apache.mina.util.NewThreadExecutor;
43  import org.apache.mina.util.Queue;
44  
45  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
46  
47  /**
48   * {@link IoConnector} for socket transport (TCP/IP).
49   *
50   * @author The Apache Directory Project (mina-dev@directory.apache.org)
51   * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
52   */
53  public class SocketConnector extends BaseIoConnector {
54      /**
55       * @noinspection StaticNonFinalField
56       */
57      private static volatile int nextId = 0;
58  
59      private final Object lock = new Object();
60  
61      private final int id = nextId++;
62  
63      private final String threadName = "SocketConnector-" + id;
64  
65      private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
66  
67      private final Queue connectQueue = new Queue();
68  
69      private final SocketIoProcessor[] ioProcessors;
70  
71      private final int processorCount;
72  
73      private final Executor executor;
74  
75      /**
76       * @noinspection FieldAccessedSynchronizedAndUnsynchronized
77       */
78      private Selector selector;
79  
80      private Worker worker;
81  
82      private int processorDistributor = 0;
83  
84      private int workerTimeout = 60; // 1 min.
85  
86      /**
87       * Create a connector with a single processing thread using a NewThreadExecutor 
88       */
89      public SocketConnector() {
90          this(1, new NewThreadExecutor());
91      }
92  
93      /**
94       * Create a connector with the desired number of processing threads
95       *
96       * @param processorCount Number of processing threads
97       * @param executor Executor to use for launching threads
98       */
99      public SocketConnector(int processorCount, Executor executor) {
100         if (processorCount < 1) {
101             throw new IllegalArgumentException(
102                     "Must have at least one processor");
103         }
104 
105         this.executor = executor;
106         this.processorCount = processorCount;
107         ioProcessors = new SocketIoProcessor[processorCount];
108 
109         for (int i = 0; i < processorCount; i++) {
110             ioProcessors[i] = new SocketIoProcessor(
111                     "SocketConnectorIoProcessor-" + id + "." + i, executor);
112         }
113     }
114 
115     /**
116      * How many seconds to keep the connection thread alive between connection requests
117      *
118      * @return the number of seconds to keep connection thread alive.
119      *         0 means that the connection thread will terminate immediately
120      *         when there's no connection to make.
121      */
122     public int getWorkerTimeout() {
123         return workerTimeout;
124     }
125 
126     /**
127      * Set how many seconds the connection worker thread should remain alive once idle before terminating itself.
128      *
129      * @param workerTimeout the number of seconds to keep thread alive.
130      *                      Must be >=0.  If 0 is specified, the connection
131      *                      worker thread will terminate immediately when
132      *                      there's no connection to make.
133      */
134     public void setWorkerTimeout(int workerTimeout) {
135         if (workerTimeout < 0) {
136             throw new IllegalArgumentException("Must be >= 0");
137         }
138         this.workerTimeout = workerTimeout;
139     }
140 
141     public ConnectFuture connect(SocketAddress address, IoHandler handler,
142             IoServiceConfig config) {
143         return connect(address, null, handler, config);
144     }
145 
146     public ConnectFuture connect(SocketAddress address,
147             SocketAddress localAddress, IoHandler handler,
148             IoServiceConfig config) {
149         if (address == null)
150             throw new NullPointerException("address");
151         if (handler == null)
152             throw new NullPointerException("handler");
153 
154         if (!(address instanceof InetSocketAddress))
155             throw new IllegalArgumentException("Unexpected address type: "
156                     + address.getClass());
157 
158         if (localAddress != null
159                 && !(localAddress instanceof InetSocketAddress))
160             throw new IllegalArgumentException(
161                     "Unexpected local address type: " + localAddress.getClass());
162 
163         if (config == null) {
164             config = getDefaultConfig();
165         }
166 
167         SocketChannel ch = null;
168         boolean success = false;
169         try {
170             ch = SocketChannel.open();
171             ch.socket().setReuseAddress(true);
172             if (localAddress != null) {
173                 ch.socket().bind(localAddress);
174             }
175 
176             ch.configureBlocking(false);
177 
178             if (ch.connect(address)) {
179                 DefaultConnectFuture future = new DefaultConnectFuture();
180                 newSession(ch, handler, config, future);
181                 success = true;
182                 return future;
183             }
184 
185             success = true;
186         } catch (IOException e) {
187             return DefaultConnectFuture.newFailedFuture(e);
188         } finally {
189             if (!success && ch != null) {
190                 try {
191                     ch.close();
192                 } catch (IOException e) {
193                     ExceptionMonitor.getInstance().exceptionCaught(e);
194                 }
195             }
196         }
197 
198         ConnectionRequest request = new ConnectionRequest(ch, handler, config);
199         synchronized (lock) {
200             try {
201                 startupWorker();
202             } catch (IOException e) {
203                 try {
204                     ch.close();
205                 } catch (IOException e2) {
206                     ExceptionMonitor.getInstance().exceptionCaught(e2);
207                 }
208     
209                 return DefaultConnectFuture.newFailedFuture(e);
210             }
211     
212             synchronized (connectQueue) {
213                 connectQueue.push(request);
214             }
215             selector.wakeup();
216         }
217 
218         return request;
219     }
220 
221     public IoServiceConfig getDefaultConfig() {
222         return defaultConfig;
223     }
224 
225     /**
226      * Sets the config this connector will use by default.
227      * 
228      * @param defaultConfig the default config.
229      * @throws NullPointerException if the specified value is <code>null</code>.
230      */
231     public void setDefaultConfig(SocketConnectorConfig defaultConfig) {
232         if (defaultConfig == null) {
233             throw new NullPointerException("defaultConfig");
234         }
235         this.defaultConfig = defaultConfig;
236     }
237     
238     private Selector getSelector() {
239         synchronized (lock) {
240             return this.selector;
241         }
242     }
243 
244     private void startupWorker() throws IOException {
245         synchronized (lock) {
246             if (worker == null) {
247                 selector = Selector.open();
248                 worker = new Worker();
249                 executor.execute(new NamePreservingRunnable(worker, threadName));
250             }
251         }
252     }
253 
254     private void registerNew() {
255         if (connectQueue.isEmpty())
256             return;
257 
258         Selector selector = getSelector();
259         for (;;) {
260             ConnectionRequest req;
261             synchronized (connectQueue) {
262                 req = (ConnectionRequest) connectQueue.pop();
263             }
264 
265             if (req == null)
266                 break;
267 
268             SocketChannel ch = req.channel;
269             try {
270                 ch.register(selector, SelectionKey.OP_CONNECT, req);
271             } catch (IOException e) {
272                 req.setException(e);
273                 try {
274                     ch.close();
275                 } catch (IOException e2) {
276                     ExceptionMonitor.getInstance().exceptionCaught(e2);
277                 }
278             }
279         }
280     }
281 
282     private void processSessions(Set keys) {
283         Iterator it = keys.iterator();
284 
285         while (it.hasNext()) {
286             SelectionKey key = (SelectionKey) it.next();
287 
288             if (!key.isConnectable())
289                 continue;
290 
291             SocketChannel ch = (SocketChannel) key.channel();
292             ConnectionRequest entry = (ConnectionRequest) key.attachment();
293 
294             boolean success = false;
295             try {
296                 if (ch.finishConnect()) {
297                     key.cancel();
298                     newSession(ch, entry.handler, entry.config, entry);
299                 }
300                 success = true;
301             } catch (Throwable e) {
302                 entry.setException(e);
303             } finally {
304                 if (!success) {
305                     key.cancel();
306                     try {
307                         ch.close();
308                     } catch (IOException e) {
309                         ExceptionMonitor.getInstance().exceptionCaught(e);
310                     }
311                 }
312             }
313         }
314 
315         keys.clear();
316     }
317 
318     private void processTimedOutSessions(Set keys) {
319         long currentTime = System.currentTimeMillis();
320         Iterator it = keys.iterator();
321 
322         while (it.hasNext()) {
323             SelectionKey key = (SelectionKey) it.next();
324 
325             if (!key.isValid())
326                 continue;
327 
328             ConnectionRequest entry = (ConnectionRequest) key.attachment();
329 
330             if (currentTime >= entry.deadline) {
331                 entry.setException(new ConnectException());
332                 try {
333                     key.channel().close();
334                 } catch (IOException e) {
335                     ExceptionMonitor.getInstance().exceptionCaught(e);
336                 } finally {
337                     key.cancel();
338                 }
339             }
340         }
341     }
342 
343     private void newSession(SocketChannel ch, IoHandler handler,
344             IoServiceConfig config, ConnectFuture connectFuture)
345             throws IOException {
346         SocketSessionImpl session = new SocketSessionImpl(this,
347                 nextProcessor(), getListeners(), config, ch, handler, ch
348                         .socket().getRemoteSocketAddress());
349         try {
350             getFilterChainBuilder().buildFilterChain(session.getFilterChain());
351             config.getFilterChainBuilder().buildFilterChain(
352                     session.getFilterChain());
353             config.getThreadModel().buildFilterChain(session.getFilterChain());
354         } catch (Throwable e) {
355             throw (IOException) new IOException("Failed to create a session.")
356                     .initCause(e);
357         }
358 
359         // Set the ConnectFuture of the specified session, which will be
360         // removed and notified by AbstractIoFilterChain eventually.
361         session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
362                 connectFuture);
363 
364         // Forward the remaining process to the SocketIoProcessor.
365         session.getIoProcessor().addNew(session);
366     }
367 
368     private SocketIoProcessor nextProcessor() {
369         if (this.processorDistributor == Integer.MAX_VALUE) {
370             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
371         }
372 
373         return ioProcessors[processorDistributor++ % processorCount];
374     }
375 
376     private class Worker implements Runnable {
377         private long lastActive = System.currentTimeMillis();
378 
379         public void run() {
380             Selector selector = getSelector();
381             for (;;) {
382                 try {
383                     int nKeys = selector.select(1000);
384 
385                     registerNew();
386 
387                     if (nKeys > 0) {
388                         processSessions(selector.selectedKeys());
389                     }
390 
391                     processTimedOutSessions(selector.keys());
392 
393                     if (selector.keys().isEmpty()) {
394                         if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) {
395                             synchronized (lock) {
396                                 if (selector.keys().isEmpty()
397                                         && connectQueue.isEmpty()) {
398                                     worker = null;
399                                     try {
400                                         selector.close();
401                                     } catch (IOException e) {
402                                         ExceptionMonitor.getInstance()
403                                                 .exceptionCaught(e);
404                                     } finally {
405                                         SocketConnector.this.selector = null;
406                                     }
407                                     break;
408                                 }
409                             }
410                         }
411                     } else {
412                         lastActive = System.currentTimeMillis();
413                     }
414                 } catch (IOException e) {
415                     ExceptionMonitor.getInstance().exceptionCaught(e);
416 
417                     try {
418                         Thread.sleep(1000);
419                     } catch (InterruptedException e1) {
420                         ExceptionMonitor.getInstance().exceptionCaught(e1);
421                     }
422                 }
423             }
424         }
425     }
426 
427     private class ConnectionRequest extends DefaultConnectFuture {
428         private final SocketChannel channel;
429 
430         private final long deadline;
431 
432         private final IoHandler handler;
433 
434         private final IoServiceConfig config;
435 
436         private ConnectionRequest(SocketChannel channel, IoHandler handler,
437                 IoServiceConfig config) {
438             this.channel = channel;
439             long timeout;
440             if (config instanceof IoConnectorConfig) {
441                 timeout = ((IoConnectorConfig) config)
442                         .getConnectTimeoutMillis();
443             } else {
444                 timeout = ((IoConnectorConfig) getDefaultConfig())
445                         .getConnectTimeoutMillis();
446             }
447             this.deadline = System.currentTimeMillis() + timeout;
448             this.handler = handler;
449             this.config = config;
450         }
451     }
452 }