View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.ServerSocketChannel;
28  import java.nio.channels.SocketChannel;
29  import java.util.ArrayList;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  
36  import org.apache.mina.common.ExceptionMonitor;
37  import org.apache.mina.common.IoAcceptor;
38  import org.apache.mina.common.IoHandler;
39  import org.apache.mina.common.IoServiceConfig;
40  import org.apache.mina.common.support.BaseIoAcceptor;
41  import org.apache.mina.util.NamePreservingRunnable;
42  import org.apache.mina.util.NewThreadExecutor;
43  import org.apache.mina.util.Queue;
44  
45  import edu.emory.mathcs.backport.java.util.concurrent.Executor;
46  
47  /**
48   * {@link IoAcceptor} for socket transport (TCP/IP).
49   *
50   * @author The Apache Directory Project (mina-dev@directory.apache.org)
51   * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
52   */
53  public class SocketAcceptor extends BaseIoAcceptor {
54      /**
55       * @noinspection StaticNonFinalField
56       */
57      private static volatile int nextId = 0;
58  
59      private final Executor executor;
60  
61      private final Object lock = new Object();
62  
63      private final int id = nextId++;
64  
65      private final String threadName = "SocketAcceptor-" + id;
66  
67      private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
68  
69      private final Map channels = new HashMap();
70  
71      private final Queue registerQueue = new Queue();
72  
73      private final Queue cancelQueue = new Queue();
74  
75      private final SocketIoProcessor[] ioProcessors;
76  
77      private final int processorCount;
78  
79      /**
80       * @noinspection FieldAccessedSynchronizedAndUnsynchronized
81       */
82      private Selector selector;
83  
84      private Worker worker;
85  
86      private int processorDistributor = 0;
87  
88      /**
89       * Create an acceptor with a single processing thread using a NewThreadExecutor
90       */
91      public SocketAcceptor() {
92          this(1, new NewThreadExecutor());
93      }
94  
95      /**
96       * Create an acceptor with the desired number of processing threads
97       *
98       * @param processorCount Number of processing threads
99       * @param executor Executor to use for launching threads
100      */
101     public SocketAcceptor(int processorCount, Executor executor) {
102         if (processorCount < 1) {
103             throw new IllegalArgumentException(
104                     "Must have at least one processor");
105         }
106 
107         // The default reuseAddress of an accepted socket should be 'true'.
108         ((SocketSessionConfig) defaultConfig.getSessionConfig())
109                 .setReuseAddress(true);
110 
111         this.executor = executor;
112         this.processorCount = processorCount;
113         ioProcessors = new SocketIoProcessor[processorCount];
114 
115         for (int i = 0; i < processorCount; i++) {
116             ioProcessors[i] = new SocketIoProcessor(
117                     "SocketAcceptorIoProcessor-" + id + "." + i, executor);
118         }
119     }
120 
121     /**
122      * Binds to the specified <code>address</code> and handles incoming connections with the specified
123      * <code>handler</code>.  Backlog value is configured to the value of <code>backlog</code> property.
124      *
125      * @throws IOException if failed to bind
126      */
127     public void bind(SocketAddress address, IoHandler handler,
128             IoServiceConfig config) throws IOException {
129         if (handler == null) {
130             throw new NullPointerException("handler");
131         }
132 
133         if (address != null && !(address instanceof InetSocketAddress)) {
134             throw new IllegalArgumentException("Unexpected address type: "
135                     + address.getClass());
136         }
137 
138         if (config == null) {
139             config = getDefaultConfig();
140         }
141 
142         RegistrationRequest request = new RegistrationRequest(address, handler,
143                 config);
144 
145         synchronized (lock) {
146             startupWorker();
147     
148             synchronized (registerQueue) {
149                 registerQueue.push(request);
150             }
151     
152             selector.wakeup();
153         }
154 
155         synchronized (request) {
156             while (!request.done) {
157                 try {
158                     request.wait();
159                 } catch (InterruptedException e) {
160                     ExceptionMonitor.getInstance().exceptionCaught(e);
161                 }
162             }
163         }
164 
165         if (request.exception != null) {
166             throw request.exception;
167         }
168     }
169     
170     private Selector getSelector() {
171         synchronized (lock) {
172             return this.selector;
173         }
174     }
175 
176     private void startupWorker() throws IOException {
177         synchronized (lock) {
178             if (worker == null) {
179                 selector = Selector.open();
180                 worker = new Worker();
181 
182                 executor.execute(new NamePreservingRunnable(worker));
183             }
184         }
185     }
186 
187     public void unbind(SocketAddress address) {
188         if (address == null) {
189             throw new NullPointerException("address");
190         }
191 
192         CancellationRequest request = new CancellationRequest(address);
193 
194         try {
195             startupWorker();
196         } catch (IOException e) {
197             // IOException is thrown only when Worker thread is not
198             // running and failed to open a selector.  We simply throw
199             // IllegalArgumentException here because we can simply
200             // conclude that nothing is bound to the selector.
201             throw new IllegalArgumentException("Address not bound: " + address);
202         }
203 
204         synchronized (cancelQueue) {
205             cancelQueue.push(request);
206         }
207 
208         selector.wakeup();
209 
210         synchronized (request) {
211             while (!request.done) {
212                 try {
213                     request.wait();
214                 } catch (InterruptedException e) {
215                     ExceptionMonitor.getInstance().exceptionCaught(e);
216                 }
217             }
218         }
219 
220         if (request.exception != null) {
221             request.exception.fillInStackTrace();
222 
223             throw request.exception;
224         }
225     }
226 
227     public void unbindAll() {
228         List addresses;
229         synchronized (channels) {
230             addresses = new ArrayList(channels.keySet());
231         }
232 
233         for (Iterator i = addresses.iterator(); i.hasNext();) {
234             unbind((SocketAddress) i.next());
235         }
236     }
237 
238     private class Worker implements Runnable {
239         public void run() {
240             Thread.currentThread().setName(SocketAcceptor.this.threadName);
241 
242             Selector selector = getSelector();
243             for (;;) {
244                 try {
245                     int nKeys = selector.select();
246 
247                     registerNew();
248 
249                     if (nKeys > 0) {
250                         processSessions(selector.selectedKeys());
251                     }
252 
253                     cancelKeys();
254 
255                     if (selector.keys().isEmpty()) {
256                         synchronized (lock) {
257                             if (selector.keys().isEmpty()
258                                     && registerQueue.isEmpty()
259                                     && cancelQueue.isEmpty()) {
260                                 worker = null;
261                                 try {
262                                     selector.close();
263                                 } catch (IOException e) {
264                                     ExceptionMonitor.getInstance()
265                                             .exceptionCaught(e);
266                                 } finally {
267                                     SocketAcceptor.this.selector = null;
268                                 }
269                                 break;
270                             }
271                         }
272                     }
273                 } catch (IOException e) {
274                     ExceptionMonitor.getInstance().exceptionCaught(e);
275 
276                     try {
277                         Thread.sleep(1000);
278                     } catch (InterruptedException e1) {
279                         ExceptionMonitor.getInstance().exceptionCaught(e1);
280                     }
281                 }
282             }
283         }
284 
285         private void processSessions(Set keys) throws IOException {
286             Iterator it = keys.iterator();
287             while (it.hasNext()) {
288                 SelectionKey key = (SelectionKey) it.next();
289 
290                 it.remove();
291 
292                 if (!key.isAcceptable()) {
293                     continue;
294                 }
295 
296                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
297 
298                 SocketChannel ch = ssc.accept();
299 
300                 if (ch == null) {
301                     continue;
302                 }
303 
304                 boolean success = false;
305                 try {
306                     RegistrationRequest req = (RegistrationRequest) key
307                             .attachment();
308                     SocketSessionImpl session = new SocketSessionImpl(
309                             SocketAcceptor.this, nextProcessor(),
310                             getListeners(), req.config, ch, req.handler,
311                             req.address);
312                     getFilterChainBuilder().buildFilterChain(
313                             session.getFilterChain());
314                     req.config.getFilterChainBuilder().buildFilterChain(
315                             session.getFilterChain());
316                     req.config.getThreadModel().buildFilterChain(
317                             session.getFilterChain());
318                     session.getIoProcessor().addNew(session);
319                     success = true;
320                 } catch (Throwable t) {
321                     ExceptionMonitor.getInstance().exceptionCaught(t);
322                 } finally {
323                     if (!success) {
324                         ch.close();
325                     }
326                 }
327             }
328         }
329     }
330 
331     private SocketIoProcessor nextProcessor() {
332         if (this.processorDistributor == Integer.MAX_VALUE) {
333             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
334         }
335 
336         return ioProcessors[processorDistributor++ % processorCount];
337     }
338 
339     public IoServiceConfig getDefaultConfig() {
340         return defaultConfig;
341     }
342 
343     /**
344      * Sets the config this acceptor will use by default.
345      * 
346      * @param defaultConfig the default config.
347      * @throws NullPointerException if the specified value is <code>null</code>.
348      */
349     public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
350         if (defaultConfig == null) {
351             throw new NullPointerException("defaultConfig");
352         }
353         this.defaultConfig = defaultConfig;
354     }
355 
356     private void registerNew() {
357         if (registerQueue.isEmpty()) {
358             return;
359         }
360 
361         Selector selector = getSelector();
362         for (;;) {
363             RegistrationRequest req;
364 
365             synchronized (registerQueue) {
366                 req = (RegistrationRequest) registerQueue.pop();
367             }
368 
369             if (req == null) {
370                 break;
371             }
372 
373             ServerSocketChannel ssc = null;
374 
375             try {
376                 ssc = ServerSocketChannel.open();
377                 ssc.configureBlocking(false);
378 
379                 // Configure the server socket,
380                 SocketAcceptorConfig cfg;
381                 if (req.config instanceof SocketAcceptorConfig) {
382                     cfg = (SocketAcceptorConfig) req.config;
383                 } else {
384                     cfg = (SocketAcceptorConfig) getDefaultConfig();
385                 }
386 
387                 ssc.socket().setReuseAddress(cfg.isReuseAddress());
388                 ssc.socket().setReceiveBufferSize(
389                         ((SocketSessionConfig) cfg.getSessionConfig())
390                                 .getReceiveBufferSize());
391 
392                 // and bind.
393                 ssc.socket().bind(req.address, cfg.getBacklog());
394                 if (req.address == null || req.address.getPort() == 0) {
395                     req.address = (InetSocketAddress) ssc.socket()
396                             .getLocalSocketAddress();
397                 }
398                 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
399 
400                 synchronized (channels) {
401                     channels.put(req.address, ssc);
402                 }
403 
404                 getListeners().fireServiceActivated(this, req.address,
405                         req.handler, req.config);
406             } catch (IOException e) {
407                 req.exception = e;
408             } finally {
409                 synchronized (req) {
410                     req.done = true;
411 
412                     req.notifyAll();
413                 }
414 
415                 if (ssc != null && req.exception != null) {
416                     try {
417                         ssc.close();
418                     } catch (IOException e) {
419                         ExceptionMonitor.getInstance().exceptionCaught(e);
420                     }
421                 }
422             }
423         }
424     }
425 
426     private void cancelKeys() {
427         if (cancelQueue.isEmpty()) {
428             return;
429         }
430 
431         Selector selector = getSelector();
432         for (;;) {
433             CancellationRequest request;
434 
435             synchronized (cancelQueue) {
436                 request = (CancellationRequest) cancelQueue.pop();
437             }
438 
439             if (request == null) {
440                 break;
441             }
442 
443             ServerSocketChannel ssc;
444             synchronized (channels) {
445                 ssc = (ServerSocketChannel) channels.remove(request.address);
446             }
447 
448             // close the channel
449             try {
450                 if (ssc == null) {
451                     request.exception = new IllegalArgumentException(
452                             "Address not bound: " + request.address);
453                 } else {
454                     SelectionKey key = ssc.keyFor(selector);
455                     request.registrationRequest = (RegistrationRequest) key
456                             .attachment();
457                     key.cancel();
458 
459                     selector.wakeup(); // wake up again to trigger thread death
460 
461                     ssc.close();
462                 }
463             } catch (IOException e) {
464                 ExceptionMonitor.getInstance().exceptionCaught(e);
465             } finally {
466                 synchronized (request) {
467                     request.done = true;
468                     request.notifyAll();
469                 }
470 
471                 if (request.exception == null) {
472                     getListeners().fireServiceDeactivated(this,
473                             request.address,
474                             request.registrationRequest.handler,
475                             request.registrationRequest.config);
476                 }
477             }
478         }
479     }
480 
481     private static class RegistrationRequest {
482         private InetSocketAddress address;
483 
484         private final IoHandler handler;
485 
486         private final IoServiceConfig config;
487 
488         private IOException exception;
489 
490         private boolean done;
491 
492         private RegistrationRequest(SocketAddress address, IoHandler handler,
493                 IoServiceConfig config) {
494             this.address = (InetSocketAddress) address;
495             this.handler = handler;
496             this.config = config;
497         }
498     }
499 
500     private static class CancellationRequest {
501         private final SocketAddress address;
502 
503         private boolean done;
504 
505         private RegistrationRequest registrationRequest;
506 
507         private RuntimeException exception;
508 
509         private CancellationRequest(SocketAddress address) {
510             this.address = address;
511         }
512     }
513 }