View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.Inet4Address;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Queue;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.Executor;
37  
38  import org.apache.mina.core.RuntimeIoException;
39  import org.apache.mina.core.buffer.IoBuffer;
40  import org.apache.mina.core.service.AbstractIoAcceptor;
41  import org.apache.mina.core.service.IoAcceptor;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.session.AbstractIoSession;
44  import org.apache.mina.core.session.ExpiringSessionRecycler;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.IoSessionRecycler;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.util.ExceptionMonitor;
51  
52  /**
53   * {@link IoAcceptor} for datagram transport (UDP/IP).
54   *
55   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
56   * @org.apache.xbean.XBean
57   * 
58    * @param <S> the type of the {@link IoSession} this processor can handle
59  */
60  public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession, H>
61          extends AbstractIoAcceptor {
62  
63      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
64  
65      /**
66       * A timeout used for the select, as we need to get out to deal with idle
67       * sessions
68       */
69      private static final long SELECT_TIMEOUT = 1000L;
70  
71      private final Object lock = new Object();
72  
73      private final IoProcessor<S> processor = new ConnectionlessAcceptorProcessor();
74      private final Queue<AcceptorOperationFuture> registerQueue =
75          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
76      private final Queue<AcceptorOperationFuture> cancelQueue =
77          new ConcurrentLinkedQueue<AcceptorOperationFuture>();
78  
79      private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
80      private final Map<String, H> boundHandles =
81          Collections.synchronizedMap(new HashMap<String, H>());
82  
83      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
84  
85      private final ServiceOperationFuture disposalFuture =
86          new ServiceOperationFuture();
87      private volatile boolean selectable;
88      
89      /** The thread responsible of accepting incoming requests */ 
90      private Acceptor acceptor;
91  
92      private long lastIdleCheckTime;
93      
94      private String getAddressAsString(SocketAddress address) {
95      	InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
96      	int port = ((InetSocketAddress)address).getPort();
97      	
98      	String result = null;
99      	
100     	if ( inetAddress instanceof Inet4Address ) {
101     		result = "/" + inetAddress.getHostAddress() + ":" + port;
102     	} else {
103     		// Inet6
104     		if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
105     			byte[] bytes = inetAddress.getAddress();
106     			
107     			result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15] + ":" + port;
108     		} else {
109     			result = inetAddress.toString();
110     		}
111     	}
112     	
113     	return result;
114     }
115 
116     /**
117      * Creates a new instance.
118      */
119     protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
120         this(sessionConfig, null);
121     }
122 
123     /**
124      * Creates a new instance.
125      */
126     protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127         super(sessionConfig, executor);
128 
129         try {
130             init();
131             selectable = true;
132         } catch (RuntimeException e) {
133             throw e;
134         } catch (Exception e) {
135             throw new RuntimeIoException("Failed to initialize.", e);
136         } finally {
137             if (!selectable) {
138                 try {
139                     destroy();
140                 } catch (Exception e) {
141                     ExceptionMonitor.getInstance().exceptionCaught(e);
142                 }
143             }
144         }
145     }
146 
147     protected abstract void init() throws Exception;
148     protected abstract void destroy() throws Exception;
149     protected abstract int select() throws Exception;
150     protected abstract int select(long timeout) throws Exception;
151     protected abstract void wakeup();
152     protected abstract Iterator<H> selectedHandles();
153     protected abstract H open(SocketAddress localAddress) throws Exception;
154     protected abstract void close(H handle) throws Exception;
155     protected abstract SocketAddress localAddress(H handle) throws Exception;
156     protected abstract boolean isReadable(H handle);
157     protected abstract boolean isWritable(H handle);
158     protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
159 
160     protected abstract int send(S session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
161 
162     protected abstract S newSession(IoProcessor<S> processor, H handle, SocketAddress remoteAddress) throws Exception;
163 
164     protected abstract void setInterestedInWrite(S session, boolean interested) throws Exception;
165 
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     protected void dispose0() throws Exception {
171         unbind();
172         startupAcceptor();
173         wakeup();
174     }
175 
176     /**
177      * {@inheritDoc}
178      */
179     @Override
180     protected final Set<SocketAddress> bindInternal(
181             List<? extends SocketAddress> localAddresses) throws Exception {
182         // Create a bind request as a Future operation. When the selector
183         // have handled the registration, it will signal this future.
184         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
185 
186         // adds the Registration request to the queue for the Workers
187         // to handle
188         registerQueue.add(request);
189 
190         // creates the Acceptor instance and has the local
191         // executor kick it off.
192         startupAcceptor();
193         
194         // As we just started the acceptor, we have to unblock the select()
195         // in order to process the bind request we just have added to the 
196         // registerQueue.
197         wakeup();
198 
199         // Now, we wait until this request is completed.
200         request.awaitUninterruptibly();
201 
202         if (request.getException() != null) {
203             throw request.getException();
204         }
205 
206         // Update the local addresses.
207         // setLocalAddresses() shouldn't be called from the worker thread
208         // because of deadlock.
209         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
210 
211         for (H handle : boundHandles.values()) {
212             newLocalAddresses.add(localAddress(handle));
213         }
214         
215         return newLocalAddresses;
216     }
217 
218     /**
219      * {@inheritDoc}
220      */
221     @Override
222     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
223         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
224 
225         cancelQueue.add(request);
226         startupAcceptor();
227         wakeup();
228 
229         request.awaitUninterruptibly();
230 
231         if (request.getException() != null) {
232             throw request.getException();
233         }
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
240         if (isDisposing()) {
241             throw new IllegalStateException("Already disposed.");
242         }
243 
244         if (remoteAddress == null) {
245             throw new IllegalArgumentException("remoteAddress");
246         }
247 
248         synchronized (bindLock) {
249             if (!isActive()) {
250                 throw new IllegalStateException(
251                         "Can't create a session from a unbound service.");
252             }
253 
254             try {
255                 return newSessionWithoutLock(remoteAddress, localAddress);
256             } catch (RuntimeException e) {
257                 throw e;
258             } catch (Error e) {
259                 throw e;
260             } catch (Exception e) {
261                 throw new RuntimeIoException("Failed to create a session.", e);
262             }
263         }
264     }
265 
266     private IoSession newSessionWithoutLock(
267             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
268         H handle = boundHandles.get(getAddressAsString(localAddress));
269         
270         if (handle == null) {
271             throw new IllegalArgumentException("Unknown local address: " + localAddress);
272         }
273 
274         IoSession session;
275         IoSessionRecycler sessionRecycler = getSessionRecycler();
276         
277         synchronized (sessionRecycler) {
278             session = sessionRecycler.recycle(localAddress, remoteAddress);
279             
280             if (session != null) {
281                 return session;
282             }
283 
284             // If a new session needs to be created.
285             S newSession = newSession(processor, handle, remoteAddress);
286             getSessionRecycler().put(newSession);
287             session = newSession;
288         }
289 
290         initSession(session, null, null);
291 
292         try {
293             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
294             getListeners().fireSessionCreated(session);
295         } catch (Throwable t) {
296             ExceptionMonitor.getInstance().exceptionCaught(t);
297         }
298 
299         return session;
300     }
301 
302     public final IoSessionRecycler getSessionRecycler() {
303         return sessionRecycler;
304     }
305 
306     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
307         synchronized (bindLock) {
308             if (isActive()) {
309                 throw new IllegalStateException(
310                         "sessionRecycler can't be set while the acceptor is bound.");
311             }
312 
313             if (sessionRecycler == null) {
314                 sessionRecycler = DEFAULT_RECYCLER;
315             }
316             
317             this.sessionRecycler = sessionRecycler;
318         }
319     }
320 
321     private class ConnectionlessAcceptorProcessor implements IoProcessor<S> {
322 
323         public void add(S session) {
324         }
325 
326         public void flush(S session) {
327             if (scheduleFlush(session)) {
328                 wakeup();
329             }
330         }
331 
332         public void remove(S session) {
333             getSessionRecycler().remove(session);
334             getListeners().fireSessionDestroyed(session);
335         }
336 
337         public void updateTrafficControl(S session) {
338             throw new UnsupportedOperationException();
339         }
340 
341         public void dispose() {
342         }
343 
344         public boolean isDisposed() {
345             return false;
346         }
347 
348         public boolean isDisposing() {
349             return false;
350         }
351     }
352 
353     /**
354      * Starts the inner Acceptor thread.
355      */
356     private void startupAcceptor() {
357         if (!selectable) {
358             registerQueue.clear();
359             cancelQueue.clear();
360             flushingSessions.clear();
361         }
362 
363         synchronized (lock) {
364             if (acceptor == null) {
365                 acceptor = new Acceptor();
366                 executeWorker(acceptor);
367             }
368         }
369     }
370 
371     private boolean scheduleFlush(S session) {
372         // Set the schedule for flush flag if the session
373         // has not already be added to the flushingSessions
374         // queue
375         if (session.setScheduledForFlush(true)) {
376             flushingSessions.add(session);
377             return true;
378         } else {
379             return false;
380         }
381     }
382 
383     /**
384      * This private class is used to accept incoming connection from 
385      * clients. It's an infinite loop, which can be stopped when all
386      * the registered handles have been removed (unbound). 
387      */
388     private class Acceptor implements Runnable {
389         public void run() {
390             int nHandles = 0;
391             lastIdleCheckTime = System.currentTimeMillis();
392 
393             while (selectable) {
394                 try {
395                     int selected = select(SELECT_TIMEOUT);
396 
397                     nHandles += registerHandles();
398 
399                     if (selected > 0) {
400                         processReadySessions(selectedHandles());
401                     }
402 
403                     long currentTime = System.currentTimeMillis();
404                     flushSessions(currentTime);
405                     nHandles -= unregisterHandles();
406 
407                     notifyIdleSessions(currentTime);
408 
409                     if (nHandles == 0) {
410                         synchronized (lock) {
411                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
412                                 acceptor = null;
413                                 break;
414                             }
415                         }
416                     }
417                 } catch (Exception e) {
418                     ExceptionMonitor.getInstance().exceptionCaught(e);
419 
420                     try {
421                         Thread.sleep(1000);
422                     } catch (InterruptedException e1) {
423                     }
424                 }
425             }
426 
427             if (selectable && isDisposing()) {
428                 selectable = false;
429                 try {
430                     destroy();
431                 } catch (Exception e) {
432                     ExceptionMonitor.getInstance().exceptionCaught(e);
433                 } finally {
434                     disposalFuture.setValue(true);
435                 }
436             }
437         }
438     }
439 
440     @SuppressWarnings("unchecked")
441     private void processReadySessions(Iterator<H> handles) {
442         while (handles.hasNext()) {
443             H h = handles.next();
444             handles.remove();
445             
446             try {
447                 if (isReadable(h)) {
448                     readHandle(h);
449                 }
450 
451                 if (isWritable(h)) {
452                     for (IoSession session : getManagedSessions().values()) {
453                         scheduleFlush((S) session);
454                     }
455                 }
456             } catch (Throwable t) {
457                 ExceptionMonitor.getInstance().exceptionCaught(t);
458             }
459         }
460     }
461 
462     private void readHandle(H handle) throws Exception {
463         IoBuffer readBuf = IoBuffer.allocate(
464                 getSessionConfig().getReadBufferSize());
465 
466         SocketAddress remoteAddress = receive(handle, readBuf);
467         
468         if (remoteAddress != null) {
469             IoSession session = newSessionWithoutLock(
470                     remoteAddress, localAddress(handle));
471 
472             readBuf.flip();
473 
474             IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
475             newBuf.put(readBuf);
476             newBuf.flip();
477 
478             session.getFilterChain().fireMessageReceived(newBuf);
479         }
480     }
481 
482     private void flushSessions(long currentTime) {
483         for (;;) {
484             S session = flushingSessions.poll();
485             
486             if (session == null) {
487                 break;
488             }
489 
490             // Reset the Schedule for flush flag for this session,
491             // as we are flushing it now
492             session.unscheduledForFlush();
493 
494             try {
495                 boolean flushedAll = flush(session, currentTime);
496                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
497                     !session.isScheduledForFlush()) {
498                     scheduleFlush(session);
499                 }
500             } catch (Exception e) {
501                 session.getFilterChain().fireExceptionCaught(e);
502             }
503         }
504     }
505 
506     private boolean flush(S session, long currentTime) throws Exception {
507         // Clear OP_WRITE
508         setInterestedInWrite(session, false);
509 
510         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
511         final int maxWrittenBytes =
512             session.getConfig().getMaxReadBufferSize() +
513             (session.getConfig().getMaxReadBufferSize() >>> 1);
514 
515         int writtenBytes = 0;
516         
517         try {
518             for (;;) {
519                 WriteRequest req = session.getCurrentWriteRequest();
520                 
521                 if (req == null) {
522                     req = writeRequestQueue.poll(session);
523                     if (req == null) {
524                         break;
525                     }
526                     session.setCurrentWriteRequest(req);
527                 }
528 
529                 IoBuffer buf = (IoBuffer) req.getMessage();
530                 
531                 if (buf.remaining() == 0) {
532                     // Clear and fire event
533                     session.setCurrentWriteRequest(null);
534                     buf.reset();
535                     session.getFilterChain().fireMessageSent(req);
536                     continue;
537                 }
538 
539                 SocketAddress destination = req.getDestination();
540                 
541                 if (destination == null) {
542                     destination = session.getRemoteAddress();
543                 }
544 
545                 int localWrittenBytes = send(session, buf, destination);
546                 
547                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
548                     // Kernel buffer is full or wrote too much
549                     setInterestedInWrite(session, true);
550                     return false;
551                 } else {
552                     setInterestedInWrite(session, false);
553 
554                     // Clear and fire event
555                     session.setCurrentWriteRequest(null);
556                     writtenBytes += localWrittenBytes;
557                     buf.reset();
558                     session.getFilterChain().fireMessageSent(req);
559                 }
560             }
561         } finally {
562             session.increaseWrittenBytes(writtenBytes, currentTime);
563         }
564 
565         return true;
566     }
567 
568     private int registerHandles() {
569         for (;;) {
570             AcceptorOperationFuture req = registerQueue.poll();
571             
572             if (req == null) {
573                 break;
574             }
575 
576             Map<String, H> newHandles = new HashMap<String, H>();
577             List<SocketAddress> localAddresses = req.getLocalAddresses();
578             
579             try {
580                 for (SocketAddress socketAddress : localAddresses) {
581                     H handle = open(socketAddress);
582                     newHandles.put(getAddressAsString(localAddress(handle)), handle);
583                 }
584                 
585                 boundHandles.putAll(newHandles);
586 
587                 getListeners().fireServiceActivated();
588                 req.setDone();
589                 
590                 return newHandles.size();
591             } catch (Exception e) {
592                 req.setException(e);
593             } finally {
594                 // Roll back if failed to bind all addresses.
595                 if (req.getException() != null) {
596                     for (H handle : newHandles.values()) {
597                         try {
598                             close(handle);
599                         } catch (Exception e) {
600                             ExceptionMonitor.getInstance().exceptionCaught(e);
601                         }
602                     }
603                     
604                     wakeup();
605                 }
606             }
607         }
608 
609         return 0;
610     }
611 
612     private int unregisterHandles() {
613         int nHandles = 0;
614         
615         for (;;) {
616             AcceptorOperationFuture request = cancelQueue.poll();
617             if (request == null) {
618                 break;
619             }
620 
621             // close the channels
622             for (SocketAddress socketAddress : request.getLocalAddresses()) {
623                 H handle = boundHandles.remove(getAddressAsString(socketAddress));
624                 
625                 if (handle == null) {
626                     continue;
627                 }
628 
629                 try {
630                     close(handle);
631                     wakeup(); // wake up again to trigger thread death
632                 } catch (Throwable e) {
633                     ExceptionMonitor.getInstance().exceptionCaught(e);
634                 } finally {
635                     nHandles++;
636                 }
637             }
638 
639             request.setDone();
640         }
641 
642         return nHandles;
643     }
644 
645     private void notifyIdleSessions(long currentTime) {
646         // process idle sessions
647         if (currentTime - lastIdleCheckTime >= 1000) {
648             lastIdleCheckTime = currentTime;
649             AbstractIoSession.notifyIdleness(
650                     getListeners().getManagedSessions().values().iterator(),
651                     currentTime);
652         }
653     }
654 }