View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.mina.core.ExceptionMonitor;
35  import org.apache.mina.core.IoUtil;
36  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
37  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
38  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
39  import org.apache.mina.core.future.ConnectFuture;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.future.IoFuture;
42  import org.apache.mina.core.future.WriteFuture;
43  import org.apache.mina.core.session.AbstractIoSession;
44  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
45  import org.apache.mina.core.session.IdleStatus;
46  import org.apache.mina.core.session.IdleStatusChecker;
47  import org.apache.mina.core.session.IoSession;
48  import org.apache.mina.core.session.IoSessionConfig;
49  import org.apache.mina.core.session.IoSessionDataStructureFactory;
50  import org.apache.mina.core.session.IoSessionInitializationException;
51  import org.apache.mina.core.session.IoSessionInitializer;
52  import org.apache.mina.util.NamePreservingRunnable;
53  
54  
55  /**
56   * Base implementation of {@link IoService}s.
57   *
58   * @author The Apache MINA Project (dev@mina.apache.org)
59   * @version $Rev: 672585 $, $Date: 2008-06-28 23:36:52 +0200 (sam, 28 jun 2008) $
60   */
61  public abstract class AbstractIoService implements IoService {
62      private static final AtomicInteger id = new AtomicInteger();
63  
64      private final IoServiceListener serviceActivationListener =
65          new IoServiceListener() {
66              public void serviceActivated(IoService service) {
67                  // Update lastIoTime.
68                  AbstractIoService s = (AbstractIoService) service;
69                  s.setLastReadTime(s.getActivationTime());
70                  s.setLastWriteTime(s.getActivationTime());
71                  s.lastThroughputCalculationTime = s.getActivationTime();
72  
73                  // Start idleness notification.
74                  idleStatusChecker.addService(s);
75              }
76  
77              public void serviceDeactivated(IoService service) {
78                  idleStatusChecker.removeService((AbstractIoService) service);
79              }
80  
81              public void serviceIdle(IoService service, IdleStatus idleStatus) {}
82              public void sessionCreated(IoSession session) {}
83              public void sessionDestroyed(IoSession session) {}
84      };
85  
86      /**
87       * Current filter chain builder.
88       */
89      private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
90  
91      /**
92       * Current handler.
93       */
94      private IoHandler handler;
95  
96      private IoSessionDataStructureFactory sessionDataStructureFactory =
97          new DefaultIoSessionDataStructureFactory();
98  
99      /**
100      * Maintains the {@link IoServiceListener}s of this service.
101      */
102     private final IoServiceListenerSupport listeners;
103 
104     private final Executor executor;
105     private final String threadName;
106     private final boolean createdExecutor;
107 
108     /**
109      * A lock object which must be acquired when related resources are
110      * destroyed.
111      */
112     protected final Object disposalLock = new Object();
113     private volatile boolean disposing;
114     private volatile boolean disposed;
115     private IoFuture disposalFuture;
116 
117     private final AtomicLong readBytes = new AtomicLong();
118     private final AtomicLong writtenBytes = new AtomicLong();
119     private final AtomicLong readMessages = new AtomicLong();
120     private final AtomicLong writtenMessages = new AtomicLong();
121     private long lastReadTime;
122     private long lastWriteTime;
123 
124     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
125     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
126 
127     private final Object throughputCalculationLock = new Object();
128     private int throughputCalculationInterval = 3;
129 
130     private long lastThroughputCalculationTime;
131     private long lastReadBytes;
132     private long lastWrittenBytes;
133     private long lastReadMessages;
134     private long lastWrittenMessages;
135     private double readBytesThroughput;
136     private double writtenBytesThroughput;
137     private double readMessagesThroughput;
138     private double writtenMessagesThroughput;
139     private double largestReadBytesThroughput;
140     private double largestWrittenBytesThroughput;
141     private double largestReadMessagesThroughput;
142     private double largestWrittenMessagesThroughput;
143 
144     private final IdleStatusChecker idleStatusChecker = new IdleStatusChecker();
145     private final Object idlenessCheckLock = new Object();
146     private int idleTimeForRead;
147     private int idleTimeForWrite;
148     private int idleTimeForBoth;
149 
150     private int idleCountForBoth;
151     private int idleCountForRead;
152     private int idleCountForWrite;
153 
154     private long lastIdleTimeForBoth;
155     private long lastIdleTimeForRead;
156     private long lastIdleTimeForWrite;
157 
158     /**
159      * The default {@link IoSessionConfig} which will be used to configure new sessions.
160      */
161     private final IoSessionConfig sessionConfig;
162 
163     /**
164 	 * Constructor for {@link AbstractIoService}. You need to provide a default
165 	 * session configuration and an {@link Executor} for handling I/O events. If
166 	 * null {@link Executor} is provided, a default one will be created using
167 	 * {@link Executors#newCachedThreadPool()}.
168 	 * 
169 	 * @param sessionConfig
170 	 *            the default configuration for the managed {@link IoSession}
171 	 * @param executor
172 	 *            the {@link Executor} used for handling execution of I/O
173 	 *            events. Can be <code>null</code>.
174 	 */
175     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
176         if (sessionConfig == null) {
177             throw new NullPointerException("sessionConfig");
178         }
179 
180         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(
181                 sessionConfig.getClass())) {
182             throw new IllegalArgumentException("sessionConfig type: "
183                     + sessionConfig.getClass() + " (expected: "
184                     + getTransportMetadata().getSessionConfigType() + ")");
185         }
186 
187         listeners = new IoServiceListenerSupport(this);
188         listeners.add(serviceActivationListener);
189         this.sessionConfig = sessionConfig;
190 
191         // Make JVM load the exception monitor before some transports
192         // change the thread context class loader.
193         ExceptionMonitor.getInstance();
194 
195         if (executor == null) {
196             this.executor = Executors.newCachedThreadPool();
197             createdExecutor = true;
198         } else {
199             this.executor = executor;
200             createdExecutor = false;
201         }
202 
203         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
204 
205         executeWorker(idleStatusChecker.getNotifyingTask(), "idleStatusChecker");
206     }
207 
208     /**
209      * {@inheritDoc}
210      */
211     public final IoFilterChainBuilder getFilterChainBuilder() {
212         return filterChainBuilder;
213     }
214 
215     /**
216      * {@inheritDoc}
217      */
218     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
219         if (builder == null) {
220             builder = new DefaultIoFilterChainBuilder();
221         }
222         filterChainBuilder = builder;
223     }
224 
225     /**
226      * {@inheritDoc}
227      */
228     public final DefaultIoFilterChainBuilder getFilterChain() {
229         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
230             return (DefaultIoFilterChainBuilder) filterChainBuilder;
231         } else {
232             throw new IllegalStateException(
233                     "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
234         }
235     }
236 
237     /**
238      * {@inheritDoc}
239      */
240     public final void addListener(IoServiceListener listener) {
241         listeners.add(listener);
242     }
243 
244     /**
245      * {@inheritDoc}
246      */
247     public final void removeListener(IoServiceListener listener) {
248         listeners.remove(listener);
249     }
250 
251     /**
252      * {@inheritDoc}
253      */
254     public final boolean isActive() {
255         return listeners.isActive();
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     public final boolean isDisposing() {
262         return disposing;
263     }
264 
265     /**
266      * {@inheritDoc}
267      */
268     public final boolean isDisposed() {
269         return disposed;
270     }
271 
272     /**
273      * {@inheritDoc}
274      */
275     public final void dispose() {
276         if (disposed) {
277             return;
278         }
279 
280         IoFuture disposalFuture;
281         synchronized (disposalLock) {
282             disposalFuture = this.disposalFuture;
283             if (!disposing) {
284                 disposing = true;
285                 try {
286                     this.disposalFuture = disposalFuture = dispose0();
287                 } catch (Exception e) {
288                     ExceptionMonitor.getInstance().exceptionCaught(e);
289                 } finally {
290                     if (disposalFuture == null) {
291                         disposed = true;
292                     }
293                 }
294             }
295         }
296 
297         idleStatusChecker.getNotifyingTask().cancel();
298         if (disposalFuture != null) {
299             disposalFuture.awaitUninterruptibly();
300         }
301         if (createdExecutor) {
302             ExecutorService e = (ExecutorService) executor;
303             e.shutdown();
304             while (!e.isTerminated()) {
305                 try {
306                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
307                 } catch (InterruptedException e1) {
308                     // Ignore; it should end shortly.
309                 }
310             }
311         }
312 
313         disposed = true;
314     }
315 
316     /**
317      * Implement this method to release any acquired resources.  This method
318      * is invoked only once by {@link #dispose()}.
319      */
320     protected abstract IoFuture dispose0() throws Exception;
321 
322     /**
323      * {@inheritDoc}
324      */
325     public final Map<Long, IoSession> getManagedSessions() {
326         return listeners.getManagedSessions();
327     }
328 
329     /**
330      * {@inheritDoc}
331      */
332     public final long getCumulativeManagedSessionCount() {
333         return listeners.getCumulativeManagedSessionCount();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     public final int getLargestManagedSessionCount() {
340         return listeners.getLargestManagedSessionCount();
341     }
342 
343     /**
344      * {@inheritDoc}
345      */
346     public final int getManagedSessionCount() {
347         return listeners.getManagedSessionCount();
348     }
349 
350     /**
351      * {@inheritDoc}
352      */
353     public final IoHandler getHandler() {
354         return handler;
355     }
356 
357     /**
358      * {@inheritDoc}
359      */
360     public final void setHandler(IoHandler handler) {
361         if (handler == null) {
362             throw new NullPointerException("handler");
363         }
364 
365         if (isActive()) {
366             throw new IllegalStateException("handler cannot be set while the service is active.");
367         }
368 
369         this.handler = handler;
370     }
371 
372     /**
373      * {@inheritDoc}
374      */
375     public IoSessionConfig getSessionConfig() {
376         return sessionConfig;
377     }
378 
379     /**
380      * {@inheritDoc}
381      */
382     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
383         return sessionDataStructureFactory;
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
390         if (sessionDataStructureFactory == null) {
391             throw new NullPointerException("sessionDataStructureFactory");
392         }
393 
394         if (isActive()) {
395             throw new IllegalStateException(
396                     "sessionDataStructureFactory cannot be set while the service is active.");
397         }
398 
399         this.sessionDataStructureFactory = sessionDataStructureFactory;
400     }
401 
402     /**
403      * {@inheritDoc}
404      */
405     public final long getReadBytes() {
406         return readBytes.get();
407     }
408 
409     public final void increaseReadBytes(long increment, long currentTime) {
410         readBytes.addAndGet(increment);
411         lastReadTime = currentTime;
412         idleCountForBoth = 0;
413         idleCountForRead = 0;
414     }
415 
416     /**
417      * {@inheritDoc}
418      */
419     public final long getReadMessages() {
420         return readMessages.get();
421     }
422 
423     public final void increaseReadMessages(long currentTime) {
424         readMessages.incrementAndGet();
425         lastReadTime = currentTime;
426         idleCountForBoth = 0;
427         idleCountForRead = 0;
428     }
429 
430     /**
431      * {@inheritDoc}
432      */
433     public final int getThroughputCalculationInterval() {
434         return throughputCalculationInterval;
435     }
436 
437     /**
438      * {@inheritDoc}
439      */
440     public final void setThroughputCalculationInterval(int throughputCalculationInterval) {
441         if (throughputCalculationInterval < 0) {
442             throw new IllegalArgumentException(
443                     "throughputCalculationInterval: " + throughputCalculationInterval);
444         }
445 
446         this.throughputCalculationInterval = throughputCalculationInterval;
447     }
448 
449     /**
450      * {@inheritDoc}
451      */
452     public final long getThroughputCalculationIntervalInMillis() {
453         return throughputCalculationInterval * 1000L;
454     }
455 
456     /**
457      * {@inheritDoc}
458      */
459     public final double getReadBytesThroughput() {
460         resetThroughput();
461         return readBytesThroughput;
462     }
463 
464     /**
465      * {@inheritDoc}
466      */
467     public final double getWrittenBytesThroughput() {
468         resetThroughput();
469         return writtenBytesThroughput;
470     }
471 
472     /**
473      * {@inheritDoc}
474      */
475     public final double getReadMessagesThroughput() {
476         resetThroughput();
477         return readMessagesThroughput;
478     }
479 
480     /**
481      * {@inheritDoc}
482      */
483     public final double getWrittenMessagesThroughput() {
484         resetThroughput();
485         return writtenMessagesThroughput;
486     }
487 
488     /**
489      * {@inheritDoc}
490      */
491     public final double getLargestReadBytesThroughput() {
492         return largestReadBytesThroughput;
493     }
494 
495     /**
496      * {@inheritDoc}
497      */
498     public final double getLargestWrittenBytesThroughput() {
499         return largestWrittenBytesThroughput;
500     }
501 
502     /**
503      * {@inheritDoc}
504      */
505     public final double getLargestReadMessagesThroughput() {
506         return largestReadMessagesThroughput;
507     }
508 
509     /**
510      * {@inheritDoc}
511      */
512     public final double getLargestWrittenMessagesThroughput() {
513         return largestWrittenMessagesThroughput;
514     }
515 
516     private void resetThroughput() {
517         if (getManagedSessionCount() == 0) {
518             readBytesThroughput = 0;
519             writtenBytesThroughput = 0;
520             readMessagesThroughput = 0;
521             writtenMessagesThroughput = 0;
522         }
523     }
524 
525     private void updateThroughput(long currentTime) {
526         synchronized (throughputCalculationLock) {
527             int interval = (int) (currentTime - lastThroughputCalculationTime);
528             long minInterval = getThroughputCalculationIntervalInMillis();
529             if (minInterval == 0 || interval < minInterval) {
530                 return;
531             }
532 
533             long readBytes = this.readBytes.get();
534             long writtenBytes = this.writtenBytes.get();
535             long readMessages = this.readMessages.get();
536             long writtenMessages = this.writtenMessages.get();
537 
538             readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
539             writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
540             readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
541             writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
542 
543             if (readBytesThroughput > largestReadBytesThroughput) {
544                 largestReadBytesThroughput = readBytesThroughput;
545             }
546             if (writtenBytesThroughput > largestWrittenBytesThroughput) {
547                 largestWrittenBytesThroughput = writtenBytesThroughput;
548             }
549             if (readMessagesThroughput > largestReadMessagesThroughput) {
550                 largestReadMessagesThroughput = readMessagesThroughput;
551             }
552             if (writtenMessagesThroughput > largestWrittenMessagesThroughput) {
553                 largestWrittenMessagesThroughput = writtenMessagesThroughput;
554             }
555 
556             lastReadBytes = readBytes;
557             lastWrittenBytes = writtenBytes;
558             lastReadMessages = readMessages;
559             lastWrittenMessages = writtenMessages;
560 
561             lastThroughputCalculationTime = currentTime;
562         }
563     }
564 
565     /**
566      * {@inheritDoc}
567      */
568     public final int getScheduledWriteBytes() {
569         return scheduledWriteBytes.get();
570     }
571 
572     public final void increaseScheduledWriteBytes(int increment) {
573         scheduledWriteBytes.addAndGet(increment);
574     }
575 
576     /**
577      * {@inheritDoc}
578      */
579     public final int getScheduledWriteMessages() {
580         return scheduledWriteMessages.get();
581     }
582 
583     public final void increaseScheduledWriteMessages() {
584         scheduledWriteMessages.incrementAndGet();
585     }
586 
587     public final void decreaseScheduledWriteMessages() {
588         scheduledWriteMessages.decrementAndGet();
589     }
590 
591     /**
592      * {@inheritDoc}
593      */
594     public final long getActivationTime() {
595         return listeners.getActivationTime();
596     }
597 
598     /**
599      * {@inheritDoc}
600      */
601     public final long getLastIoTime() {
602         return Math.max(lastReadTime, lastWriteTime);
603     }
604 
605     /**
606      * {@inheritDoc}
607      */
608     public final long getLastReadTime() {
609         return lastReadTime;
610     }
611 
612     protected final void setLastReadTime(long lastReadTime) {
613         this.lastReadTime = lastReadTime;
614     }
615 
616     /**
617      * {@inheritDoc}
618      */
619     public final long getLastWriteTime() {
620         return lastWriteTime;
621     }
622 
623     protected final void setLastWriteTime(long lastWriteTime) {
624         this.lastWriteTime = lastWriteTime;
625     }
626 
627     /**
628      * {@inheritDoc}
629      */
630     public final long getWrittenBytes() {
631         return writtenBytes.get();
632     }
633 
634     public final void increaseWrittenBytes(long increment, long currentTime) {
635         writtenBytes.addAndGet(increment);
636         lastWriteTime = currentTime;
637         idleCountForBoth = 0;
638         idleCountForWrite = 0;
639     }
640 
641     /**
642      * {@inheritDoc}
643      */
644     public final long getWrittenMessages() {
645         return writtenMessages.get();
646     }
647 
648     public final void increaseWrittenMessages(long currentTime) {
649         writtenMessages.incrementAndGet();
650         lastWriteTime = currentTime;
651         idleCountForBoth = 0;
652         idleCountForWrite = 0;
653     }
654 
655     /**
656      * {@inheritDoc}
657      */
658     public final int getIdleTime(IdleStatus status) {
659         if (status == IdleStatus.BOTH_IDLE) {
660             return idleTimeForBoth;
661         }
662 
663         if (status == IdleStatus.READER_IDLE) {
664             return idleTimeForRead;
665         }
666 
667         if (status == IdleStatus.WRITER_IDLE) {
668             return idleTimeForWrite;
669         }
670 
671         throw new IllegalArgumentException("Unknown idle status: " + status);
672     }
673 
674     /**
675      * {@inheritDoc}
676      */
677     public final long getIdleTimeInMillis(IdleStatus status) {
678         return getIdleTime(status) * 1000L;
679     }
680 
681     /**
682      * {@inheritDoc}
683      */
684     public final void setIdleTime(IdleStatus status, int idleTime) {
685         if (idleTime < 0) {
686             throw new IllegalArgumentException("Illegal idle time: " + idleTime);
687         }
688 
689         if (status == IdleStatus.BOTH_IDLE) {
690             idleTimeForBoth = idleTime;
691         } else if (status == IdleStatus.READER_IDLE) {
692             idleTimeForRead = idleTime;
693         } else if (status == IdleStatus.WRITER_IDLE) {
694             idleTimeForWrite = idleTime;
695         } else {
696             throw new IllegalArgumentException("Unknown idle status: " + status);
697         }
698 
699         if (idleTime == 0) {
700             if (status == IdleStatus.BOTH_IDLE) {
701                 idleCountForBoth = 0;
702             } else if (status == IdleStatus.READER_IDLE) {
703                 idleCountForRead = 0;
704             } else if (status == IdleStatus.WRITER_IDLE) {
705                 idleCountForWrite = 0;
706             }
707         }
708     }
709 
710     /**
711      * {@inheritDoc}
712      */
713     public final boolean isIdle(IdleStatus status) {
714         if (status == IdleStatus.BOTH_IDLE) {
715             return idleCountForBoth > 0;
716         }
717 
718         if (status == IdleStatus.READER_IDLE) {
719             return idleCountForRead > 0;
720         }
721 
722         if (status == IdleStatus.WRITER_IDLE) {
723             return idleCountForWrite > 0;
724         }
725 
726         throw new IllegalArgumentException("Unknown idle status: " + status);
727     }
728 
729     /**
730      * {@inheritDoc}
731      */
732     public final int getIdleCount(IdleStatus status) {
733         if (status == IdleStatus.BOTH_IDLE) {
734             return idleCountForBoth;
735         }
736 
737         if (status == IdleStatus.READER_IDLE) {
738             return idleCountForRead;
739         }
740 
741         if (status == IdleStatus.WRITER_IDLE) {
742             return idleCountForWrite;
743         }
744 
745         throw new IllegalArgumentException("Unknown idle status: " + status);
746     }
747 
748     /**
749      * {@inheritDoc}
750      */
751     public final long getLastIdleTime(IdleStatus status) {
752         if (status == IdleStatus.BOTH_IDLE) {
753             return lastIdleTimeForBoth;
754         }
755 
756         if (status == IdleStatus.READER_IDLE) {
757             return lastIdleTimeForRead;
758         }
759 
760         if (status == IdleStatus.WRITER_IDLE) {
761             return lastIdleTimeForWrite;
762         }
763 
764         throw new IllegalArgumentException("Unknown idle status: " + status);
765     }
766 
767     private void increaseIdleCount(IdleStatus status, long currentTime) {
768         if (status == IdleStatus.BOTH_IDLE) {
769             idleCountForBoth++;
770             lastIdleTimeForBoth = currentTime;
771         } else if (status == IdleStatus.READER_IDLE) {
772             idleCountForRead++;
773             lastIdleTimeForRead = currentTime;
774         } else if (status == IdleStatus.WRITER_IDLE) {
775             idleCountForWrite++;
776             lastIdleTimeForWrite = currentTime;
777         } else {
778             throw new IllegalArgumentException("Unknown idle status: " + status);
779         }
780     }
781 
782     public final void notifyIdleness(long currentTime) {
783         updateThroughput(currentTime);
784 
785         synchronized (idlenessCheckLock) {
786             notifyIdleness(
787                     currentTime,
788                     getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
789                     IdleStatus.BOTH_IDLE, Math.max(
790                             getLastIoTime(),
791                             getLastIdleTime(IdleStatus.BOTH_IDLE)));
792 
793             notifyIdleness(
794                     currentTime,
795                     getIdleTimeInMillis(IdleStatus.READER_IDLE),
796                     IdleStatus.READER_IDLE, Math.max(
797                             getLastReadTime(),
798                             getLastIdleTime(IdleStatus.READER_IDLE)));
799 
800             notifyIdleness(
801                     currentTime,
802                     getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
803                     IdleStatus.WRITER_IDLE, Math.max(
804                             getLastWriteTime(),
805                             getLastIdleTime(IdleStatus.WRITER_IDLE)));
806         }
807     }
808 
809     private void notifyIdleness(
810             long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
811         if (idleTime > 0 && lastIoTime != 0
812                 && currentTime - lastIoTime >= idleTime) {
813             increaseIdleCount(status, currentTime);
814             listeners.fireServiceIdle(status);
815         }
816     }
817 
818     /**
819      * {@inheritDoc}
820      */
821     public final int getBothIdleCount() {
822         return getIdleCount(IdleStatus.BOTH_IDLE);
823     }
824 
825     /**
826      * {@inheritDoc}
827      */
828     public final long getLastBothIdleTime() {
829         return getLastIdleTime(IdleStatus.BOTH_IDLE);
830     }
831 
832     /**
833      * {@inheritDoc}
834      */
835     public final long getLastReaderIdleTime() {
836         return getLastIdleTime(IdleStatus.READER_IDLE);
837     }
838 
839     /**
840      * {@inheritDoc}
841      */
842     public final long getLastWriterIdleTime() {
843         return getLastIdleTime(IdleStatus.WRITER_IDLE);
844     }
845 
846     /**
847      * {@inheritDoc}
848      */
849     public final int getReaderIdleCount() {
850         return getIdleCount(IdleStatus.READER_IDLE);
851     }
852 
853     /**
854      * {@inheritDoc}
855      */
856     public final int getWriterIdleCount() {
857         return getIdleCount(IdleStatus.WRITER_IDLE);
858     }
859 
860     /**
861      * {@inheritDoc}
862      */
863     public final int getBothIdleTime() {
864         return getIdleTime(IdleStatus.BOTH_IDLE);
865     }
866 
867     /**
868      * {@inheritDoc}
869      */
870     public final long getBothIdleTimeInMillis() {
871         return getIdleTimeInMillis(IdleStatus.BOTH_IDLE);
872     }
873 
874     /**
875      * {@inheritDoc}
876      */
877     public final int getReaderIdleTime() {
878         return getIdleTime(IdleStatus.READER_IDLE);
879     }
880 
881     /**
882      * {@inheritDoc}
883      */
884     public final long getReaderIdleTimeInMillis() {
885         return getIdleTimeInMillis(IdleStatus.READER_IDLE);
886     }
887 
888     /**
889      * {@inheritDoc}
890      */
891     public final int getWriterIdleTime() {
892         return getIdleTime(IdleStatus.WRITER_IDLE);
893     }
894 
895     /**
896      * {@inheritDoc}
897      */
898     public final long getWriterIdleTimeInMillis() {
899         return getIdleTimeInMillis(IdleStatus.WRITER_IDLE);
900     }
901 
902     /**
903      * {@inheritDoc}
904      */
905     public final boolean isBothIdle() {
906         return isIdle(IdleStatus.BOTH_IDLE);
907     }
908 
909     /**
910      * {@inheritDoc}
911      */
912     public final boolean isReaderIdle() {
913         return isIdle(IdleStatus.READER_IDLE);
914     }
915 
916     /**
917      * {@inheritDoc}
918      */
919     public final boolean isWriterIdle() {
920         return isIdle(IdleStatus.WRITER_IDLE);
921     }
922 
923     /**
924      * {@inheritDoc}
925      */
926     public final void setBothIdleTime(int idleTime) {
927         setIdleTime(IdleStatus.BOTH_IDLE, idleTime);
928     }
929 
930     /**
931      * {@inheritDoc}
932      */
933     public final void setReaderIdleTime(int idleTime) {
934         setIdleTime(IdleStatus.READER_IDLE, idleTime);
935     }
936 
937     /**
938      * {@inheritDoc}
939      */
940     public final void setWriterIdleTime(int idleTime) {
941         setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
942     }
943 
944     /**
945      * {@inheritDoc}
946      */
947     public final Set<WriteFuture> broadcast(Object message) {
948         // Convert to Set.  We do not return a List here because only the
949         // direct caller of MessageBroadcaster knows the order of write
950         // operations.
951         final List<WriteFuture> futures = IoUtil.broadcast(
952                 message, getManagedSessions().values());
953         return new AbstractSet<WriteFuture>() {
954             @Override
955             public Iterator<WriteFuture> iterator() {
956                 return futures.iterator();
957             }
958 
959             @Override
960             public int size() {
961                 return futures.size();
962             }
963         };
964     }
965 
966     public final IoServiceListenerSupport getListeners() {
967         return listeners;
968     }
969 
970     protected final IdleStatusChecker getIdleStatusChecker() {
971         return idleStatusChecker;
972     }
973 
974     protected final void executeWorker(Runnable worker) {
975         executeWorker(worker, null);
976     }
977 
978     protected final void executeWorker(Runnable worker, String suffix) {
979         String actualThreadName = threadName;
980         if (suffix != null) {
981             actualThreadName = actualThreadName + '-' + suffix;
982         }
983         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
984     }
985 
986     // TODO Figure out make it work without causing a compiler error / warning.
987     @SuppressWarnings("unchecked")
988     protected final void finishSessionInitialization(
989             IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
990         // Update lastIoTime if needed.
991         if (getLastReadTime() == 0) {
992             setLastReadTime(getActivationTime());
993         }
994         if (getLastWriteTime() == 0) {
995             setLastWriteTime(getActivationTime());
996         }
997 
998         // Every property but attributeMap should be set now.
999         // Now initialize the attributeMap.  The reason why we initialize
1000         // the attributeMap at last is to make sure all session properties
1001         // such as remoteAddress are provided to IoSessionDataStructureFactory.
1002         try {
1003             ((AbstractIoSession) session).setAttributeMap(
1004                     session.getService().getSessionDataStructureFactory().getAttributeMap(session));
1005         } catch (IoSessionInitializationException e) {
1006             throw e;
1007         } catch (Exception e) {
1008             throw new IoSessionInitializationException(
1009                     "Failed to initialize an attributeMap.", e);
1010         }
1011 
1012         try {
1013             ((AbstractIoSession) session).setWriteRequestQueue(
1014                     session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session));
1015         } catch (IoSessionInitializationException e) {
1016             throw e;
1017         } catch (Exception e) {
1018             throw new IoSessionInitializationException(
1019                     "Failed to initialize a writeRequestQueue.", e);
1020         }
1021 
1022         if (future != null && future instanceof ConnectFuture) {
1023             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
1024             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
1025         }
1026 
1027         if (sessionInitializer != null) {
1028             sessionInitializer.initializeSession(session, future);
1029         }
1030 
1031         finishSessionInitialization0(session, future);
1032     }
1033 
1034     /**
1035      * Implement this method to perform additional tasks required for session
1036      * initialization. Do not call this method directly;
1037      * {@link #finishSessionInitialization(IoSession, IoFuture, IoSessionInitializer)} will call
1038      * this method instead.
1039      */
1040     protected void finishSessionInitialization0(IoSession session, IoFuture future) {}
1041 
1042     protected static class ServiceOperationFuture extends DefaultIoFuture {
1043         public ServiceOperationFuture() {
1044             super(null);
1045         }
1046 
1047         public final boolean isDone() {
1048             return getValue() == Boolean.TRUE;
1049         }
1050 
1051         public final void setDone() {
1052             setValue(Boolean.TRUE);
1053         }
1054 
1055         public final Exception getException() {
1056             if (getValue() instanceof Exception) {
1057                 return (Exception) getValue();
1058             } else {
1059                 return null;
1060             }
1061         }
1062 
1063         public final void setException(Exception exception) {
1064             if (exception == null) {
1065                 throw new NullPointerException("exception");
1066             }
1067             setValue(exception);
1068         }
1069     }
1070 }