View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.EnumSet;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.core.filterchain.IoFilterAdapter;
30  import org.apache.mina.core.filterchain.IoFilterChain;
31  import org.apache.mina.core.filterchain.IoFilterEvent;
32  import org.apache.mina.core.session.IdleStatus;
33  import org.apache.mina.core.session.IoEventType;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.WriteRequest;
36  
37  /**
38   * A filter that forwards I/O events to {@link Executor} to enforce a certain
39   * thread model while allowing the events per session to be processed
40   * simultaneously. You can apply various thread model by inserting this filter
41   * to a {@link IoFilterChain}.
42   * 
43   * <h2>Life Cycle Management</h2>
44   * 
45   * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
46   * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
47   * constructor that accepts an {@link Executor} that you've instantiated, you have
48   * full control and responsibility of managing its life cycle (e.g. calling
49   * {@link ExecutorService#shutdown()}.
50   * <p> 
51   * If you created this filter using convenience constructors like
52   * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
53   * {@link #destroy()} explicitly.
54   * 
55   * <h2>Event Ordering</h2>
56   * 
57   * All convenience constructors of this filter creates a new
58   * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
59   * maintained like the following:
60   * <ul>
61   * <li>All event handler methods are called exclusively.
62   *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
63   * <li>The event order is never mixed up.
64   *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
65   * </ul>
66   * However, if you specified other {@link Executor} instance in the constructor,
67   * the order of events are not maintained at all.  This means more than one event
68   * handler methods can be invoked at the same time with mixed order.  For example,
69   * let's assume that messageReceived, messageSent, and sessionClosed events are
70   * fired.
71   * <ul>
72   * <li>All event handler methods can be called simultaneously.
73   *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
74   * <li>The event order can be mixed up.
75   *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
76   *           is invoked.)</li>
77   * </ul>
78   * If you need to maintain the order of events per session, please specify an
79   * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
80   * 
81   * <h2>Selective Filtering</h2>
82   * 
83   * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
84   * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
85   * underlying executor, which is most common setting.
86   * <p>
87   * If you want to submit only a certain set of event types, you can specify them
88   * in the constructor.  For example, you could configure a thread pool for
89   * write operation for the maximum performance:
90   * <pre><code>
91   * IoService service = ...;
92   * DefaultIoFilterChainBuilder chain = service.getFilterChain();
93   * 
94   * chain.addLast("codec", new ProtocolCodecFilter(...));
95   * // Use one thread pool for most events.
96   * chain.addLast("executor1", new ExecutorFilter());
97   * // and another dedicated thread pool for 'filterWrite' events.
98   * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
99   * </code></pre>
100  * 
101  * <h2>Preventing {@link OutOfMemoryError}</h2>
102  * 
103  * Please refer to {@link IoEventQueueThrottle}, which is specified as
104  * a parameter of the convenience constructors.
105  * 
106  * @author The Apache MINA Project (dev@mina.apache.org)
107  * @version $Rev: 762171 $, $Date: 2009-04-06 00:03:59 +0200 (Mon, 06 Apr 2009) $
108  * 
109  * @see OrderedThreadPoolExecutor
110  * @see UnorderedThreadPoolExecutor
111  * @org.apache.xbean.XBean
112  */
113 public class ExecutorFilter extends IoFilterAdapter {
114     /** The list of handled events */
115     private EnumSet<IoEventType> eventTypes;
116     
117     /** The associated executor */
118     private Executor executor;
119     
120     /** A flag set if the executor can be managed */ 
121     private boolean manageableExecutor;
122     
123     /** The default pool size */
124     private static final int DEFAULT_MAX_POOL_SIZE = 16;
125     
126     /** The number of thread to create at startup */
127     private static final int BASE_THREAD_NUMBER = 0;
128     
129     /** The default KeepAlive time, in seconds */
130     private static final long DEFAULT_KEEPALIVE_TIME = 30;
131     
132     /** 
133      * A set of flags used to tell if the Executor has been created 
134      * in the constructor or passed as an argument. In the second case, 
135      * the executor state can be managed.
136      **/
137     private static final boolean MANAGEABLE_EXECUTOR = true;
138     private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
139     
140     /** A list of default EventTypes to be handled by the executor */
141     private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
142         IoEventType.EXCEPTION_CAUGHT,
143         IoEventType.MESSAGE_RECEIVED, 
144         IoEventType.MESSAGE_SENT,
145         IoEventType.SESSION_CLOSED, 
146         IoEventType.SESSION_IDLE,
147         IoEventType.SESSION_OPENED
148     };
149     
150 
151     /**
152      * (Convenience constructor) Creates a new instance with a new
153      * {@link OrderedThreadPoolExecutor}, no thread in the pool, and a 
154      * maximum of 16 threads in the pool. All the event will be handled 
155      * by this default executor.
156      */
157     public ExecutorFilter() {
158         // Create a new default Executor
159         Executor executor = createDefaultExecutor(
160             BASE_THREAD_NUMBER,
161             DEFAULT_MAX_POOL_SIZE,
162             DEFAULT_KEEPALIVE_TIME,
163             TimeUnit.SECONDS,
164             Executors.defaultThreadFactory(),
165             null);
166         
167         // Initialize the filter
168         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
169     }
170     
171     /**
172      * (Convenience constructor) Creates a new instance with a new
173      * {@link OrderedThreadPoolExecutor}, no thread in the pool, but 
174      * a maximum of threads in the pool is given. All the event will be handled 
175      * by this default executor.
176      * 
177      * @param maximumPoolSize The maximum pool size
178      */
179     public ExecutorFilter(int maximumPoolSize) {
180         // Create a new default Executor
181         Executor executor = createDefaultExecutor(
182             BASE_THREAD_NUMBER,
183             maximumPoolSize,
184             DEFAULT_KEEPALIVE_TIME,
185             TimeUnit.SECONDS,
186             Executors.defaultThreadFactory(),
187             null);
188         
189         // Initialize the filter
190         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
191     }
192     
193     /**
194      * (Convenience constructor) Creates a new instance with a new
195      * {@link OrderedThreadPoolExecutor}, a number of thread to start with, a  
196      * maximum of threads the pool can contain. All the event will be handled 
197      * by this default executor.
198      *
199      * @param corePoolSize The initial pool size
200      * @param maximumPoolSize The maximum pool size
201      */
202     public ExecutorFilter(int corePoolSize, int maximumPoolSize) {
203         // Create a new default Executor
204         Executor executor = createDefaultExecutor(
205             corePoolSize,
206             maximumPoolSize,
207             DEFAULT_KEEPALIVE_TIME,
208             TimeUnit.SECONDS,
209             Executors.defaultThreadFactory(),
210             null);
211         
212         // Initialize the filter
213         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
214     }
215     
216     /**
217      * (Convenience constructor) Creates a new instance with a new
218      * {@link OrderedThreadPoolExecutor}.
219      * 
220      * @param corePoolSize The initial pool size
221      * @param maximumPoolSize The maximum pool size
222      * @param keepAliveTime Default duration for a thread
223      * @param unit Time unit used for the keepAlive value
224      */
225     public ExecutorFilter(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
226             TimeUnit unit) {
227         // Create a new default Executor
228         Executor executor = createDefaultExecutor(
229             corePoolSize,
230             maximumPoolSize,
231             keepAliveTime,
232             unit,
233             Executors.defaultThreadFactory(),
234             null);
235         
236         // Initialize the filter
237         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
238     }
239 
240     /**
241      * (Convenience constructor) Creates a new instance with a new
242      * {@link OrderedThreadPoolExecutor}.
243      * 
244      * @param corePoolSize The initial pool size
245      * @param maximumPoolSize The maximum pool size
246      * @param keepAliveTime Default duration for a thread
247      * @param unit Time unit used for the keepAlive value
248      * @param queueHandler The queue used to store events
249      */
250     public ExecutorFilter(
251             int corePoolSize, int maximumPoolSize, 
252             long keepAliveTime, TimeUnit unit,
253             IoEventQueueHandler queueHandler) {
254         // Create a new default Executor
255         Executor executor = createDefaultExecutor(
256             corePoolSize,
257             maximumPoolSize,
258             keepAliveTime,
259             unit,
260             Executors.defaultThreadFactory(),
261             queueHandler);
262         
263         // Initialize the filter
264         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
265     }
266 
267     /**
268      * (Convenience constructor) Creates a new instance with a new
269      * {@link OrderedThreadPoolExecutor}.
270      * 
271      * @param corePoolSize The initial pool size
272      * @param maximumPoolSize The maximum pool size
273      * @param keepAliveTime Default duration for a thread
274      * @param unit Time unit used for the keepAlive value
275      * @param threadFactory The factory used to create threads
276      */
277     public ExecutorFilter(
278             int corePoolSize, int maximumPoolSize, 
279             long keepAliveTime, TimeUnit unit,
280             ThreadFactory threadFactory) {
281         // Create a new default Executor
282         Executor executor = createDefaultExecutor(
283             corePoolSize,
284             maximumPoolSize,
285             keepAliveTime,
286             unit,
287             threadFactory,
288             null);
289         
290         // Initialize the filter
291         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
292     }
293 
294     /**
295      * (Convenience constructor) Creates a new instance with a new
296      * {@link OrderedThreadPoolExecutor}.
297      * 
298      * @param corePoolSize The initial pool size
299      * @param maximumPoolSize The maximum pool size
300      * @param keepAliveTime Default duration for a thread
301      * @param unit Time unit used for the keepAlive value
302      * @param threadFactory The factory used to create threads
303      * @param queueHandler The queue used to store events
304      */
305     public ExecutorFilter(
306             int corePoolSize, int maximumPoolSize, 
307             long keepAliveTime, TimeUnit unit,
308             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
309         // Create a new default Executor
310         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, queueHandler);
311         
312         // Initialize the filter
313         init(executor, MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
314     }
315 
316     /**
317      * (Convenience constructor) Creates a new instance with a new
318      * {@link OrderedThreadPoolExecutor}.
319      * 
320      * @param eventTypes The event for which the executor will be used
321      */
322     public ExecutorFilter(IoEventType... eventTypes) {
323         // Create a new default Executor
324         Executor executor = createDefaultExecutor(
325             BASE_THREAD_NUMBER,
326             DEFAULT_MAX_POOL_SIZE,
327             DEFAULT_KEEPALIVE_TIME,
328             TimeUnit.SECONDS,
329             Executors.defaultThreadFactory(),
330             null);
331         
332         // Initialize the filter
333         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
334     }
335     
336     /**
337      * (Convenience constructor) Creates a new instance with a new
338      * {@link OrderedThreadPoolExecutor}.
339      * 
340      * @param maximumPoolSize The maximum pool size
341      * @param eventTypes The event for which the executor will be used
342      */
343     public ExecutorFilter(int maximumPoolSize, IoEventType... eventTypes) {
344         // Create a new default Executor
345         Executor executor = createDefaultExecutor(
346             BASE_THREAD_NUMBER,
347             maximumPoolSize,
348             DEFAULT_KEEPALIVE_TIME,
349             TimeUnit.SECONDS,
350             Executors.defaultThreadFactory(),
351             null);
352         
353         // Initialize the filter
354         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
355     }
356     
357     /**
358      * (Convenience constructor) Creates a new instance with a new
359      * {@link OrderedThreadPoolExecutor}.
360      * 
361      * @param corePoolSize The initial pool size
362      * @param maximumPoolSize The maximum pool size
363      * @param eventTypes The event for which the executor will be used
364      */
365     public ExecutorFilter(int corePoolSize, int maximumPoolSize, IoEventType... eventTypes) {
366         // Create a new default Executor
367         Executor executor = createDefaultExecutor(
368             corePoolSize,
369             maximumPoolSize,
370             DEFAULT_KEEPALIVE_TIME,
371             TimeUnit.SECONDS,
372             Executors.defaultThreadFactory(),
373             null);
374         
375         // Initialize the filter
376         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
377     }
378     
379     /**
380      * (Convenience constructor) Creates a new instance with a new
381      * {@link OrderedThreadPoolExecutor}.
382      * 
383      * @param corePoolSize The initial pool size
384      * @param maximumPoolSize The maximum pool size
385      * @param keepAliveTime Default duration for a thread
386      * @param unit Time unit used for the keepAlive value
387      * @param eventTypes The event for which the executor will be used
388      */
389     public ExecutorFilter(
390             int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
391             IoEventType... eventTypes) {
392         // Create a new default Executor
393         Executor executor = createDefaultExecutor(
394             corePoolSize,
395             maximumPoolSize,
396             keepAliveTime,
397             unit,
398             Executors.defaultThreadFactory(),
399             null);
400         
401         // Initialize the filter
402         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
403     }
404     
405     /**
406      * (Convenience constructor) Creates a new instance with a new
407      * {@link OrderedThreadPoolExecutor}.
408      * 
409      * @param corePoolSize The initial pool size
410      * @param maximumPoolSize The maximum pool size
411      * @param keepAliveTime Default duration for a thread
412      * @param unit Time unit used for the keepAlive value
413      * @param queueHandler The queue used to store events
414      * @param eventTypes The event for which the executor will be used
415      */
416     public ExecutorFilter(
417             int corePoolSize, int maximumPoolSize, 
418             long keepAliveTime, TimeUnit unit,
419             IoEventQueueHandler queueHandler, IoEventType... eventTypes) {
420         // Create a new default Executor
421         Executor executor = createDefaultExecutor(
422             corePoolSize,
423             maximumPoolSize,
424             keepAliveTime,
425             unit,
426             Executors.defaultThreadFactory(),
427             queueHandler);
428         
429         // Initialize the filter
430         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
431     }
432 
433     /**
434      * (Convenience constructor) Creates a new instance with a new
435      * {@link OrderedThreadPoolExecutor}.
436      * 
437      * @param corePoolSize The initial pool size
438      * @param maximumPoolSize The maximum pool size
439      * @param keepAliveTime Default duration for a thread
440      * @param unit Time unit used for the keepAlive value
441      * @param threadFactory The factory used to create threads
442      * @param eventTypes The event for which the executor will be used
443      */
444     public ExecutorFilter(
445             int corePoolSize, int maximumPoolSize, 
446             long keepAliveTime, TimeUnit unit,
447             ThreadFactory threadFactory, IoEventType... eventTypes) {
448         // Create a new default Executor
449         Executor executor = createDefaultExecutor(
450             corePoolSize,
451             maximumPoolSize,
452             keepAliveTime,
453             unit,
454             threadFactory,
455             null);
456         
457         // Initialize the filter
458         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
459     }
460 
461     /**
462      * (Convenience constructor) Creates a new instance with a new
463      * {@link OrderedThreadPoolExecutor}.
464      * 
465      * @param corePoolSize The initial pool size
466      * @param maximumPoolSize The maximum pool size
467      * @param keepAliveTime Default duration for a thread
468      * @param unit Time unit used for the keepAlive value
469      * @param threadFactory The factory used to create threads
470      * @param queueHandler The queue used to store events
471      * @param eventTypes The event for which the executor will be used
472      */
473     public ExecutorFilter(
474             int corePoolSize, int maximumPoolSize, 
475             long keepAliveTime, TimeUnit unit,
476             ThreadFactory threadFactory, IoEventQueueHandler queueHandler, 
477             IoEventType... eventTypes) {
478         // Create a new default Executor
479         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
480             keepAliveTime, unit, threadFactory, queueHandler);
481         
482         // Initialize the filter
483         init(executor, MANAGEABLE_EXECUTOR, eventTypes);
484     }
485     
486     /**
487      * Creates a new instance with the specified {@link Executor}.
488      * 
489      * @param executor the user's managed Executor to use in this filter
490      */
491     public ExecutorFilter(Executor executor) {
492         // Initialize the filter
493         init(executor, NOT_MANAGEABLE_EXECUTOR, DEFAULT_EVENT_SET);
494     }
495 
496     /**
497      * Creates a new instance with the specified {@link Executor}.
498      * 
499      * @param executor the user's managed Executor to use in this filter
500      * @param eventTypes The event for which the executor will be used
501      */
502     public ExecutorFilter(Executor executor, IoEventType... eventTypes) {
503         // Initialize the filter
504         init(executor, NOT_MANAGEABLE_EXECUTOR, eventTypes);
505     }
506     
507     /**
508      * Create an OrderedThreadPool executor.
509      *
510      * @param corePoolSize The initial pool sizePoolSize
511      * @param maximumPoolSize The maximum pool size
512      * @param keepAliveTime Default duration for a thread
513      * @param unit Time unit used for the keepAlive value
514      * @param threadFactory The factory used to create threads
515      * @param queueHandler The queue used to store events
516      * @return An instance of the created Executor
517      */
518     private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
519         TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
520         // Create a new Executor
521         Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
522             keepAliveTime, unit, threadFactory, queueHandler);
523         
524         return executor;
525     }
526     
527     /**
528      * Create an EnumSet from an array of EventTypes, and set the associated
529      * eventTypes field.
530      *
531      * @param eventTypes The array of handled events
532      */
533     private void initEventTypes(IoEventType... eventTypes) {
534         if ((eventTypes == null) || (eventTypes.length == 0)) {
535             eventTypes = DEFAULT_EVENT_SET;
536         }
537 
538         // Copy the list of handled events in the event set
539         this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
540         
541         // Check that we don't have the SESSION_CREATED event in the set
542         if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
543             this.eventTypes = null;
544             throw new IllegalArgumentException(IoEventType.SESSION_CREATED
545                 + " is not allowed.");
546         }
547     }
548 
549     /**
550      * Creates a new instance of ExecutorFilter. This private constructor is called by all
551      * the public constructor.
552      *
553      * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
554      * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
555      * @param eventTypes The lit of event which are handled by the executor
556      * @param
557      */
558     private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
559         if (executor == null) {
560             throw new NullPointerException("executor");
561         }
562 
563         initEventTypes(eventTypes);
564         this.executor = executor;
565         this.manageableExecutor = manageableExecutor;
566     }
567     
568     /**
569      * Shuts down the underlying executor if this filter hase been created via
570      * a convenience constructor.
571      */
572     @Override
573     public void destroy() {
574         if (manageableExecutor) {
575             ((ExecutorService) executor).shutdown();
576         }
577     }
578 
579     /**
580      * Returns the underlying {@link Executor} instance this filter uses.
581      * 
582      * @return The underlying {@link Executor}
583      */
584     public final Executor getExecutor() {
585         return executor;
586     }
587 
588     /**
589      * Fires the specified event through the underlying executor.
590      */
591     protected void fireEvent(IoFilterEvent event) {
592         getExecutor().execute(event);
593     }
594 
595     /**
596      * A trigger fired when adding this filter in a chain. As this filter can be
597      * added only once in a chain, if the chain already contains the same filter,
598      * and exception will be thrown.
599      * 
600      * @param parent The chain in which we want to inject this filter
601      * @param name The Fitler's name
602      * @param nextFilter The next filter in the chain
603      * 
604      * @throws IllegalArgumentException If the filter is already present in the chain
605      */
606     @Override
607     public void onPreAdd(IoFilterChain parent, String name,
608             NextFilter nextFilter) throws Exception {
609         if (parent.contains(this)) {
610             throw new IllegalArgumentException(
611                     "You can't add the same filter instance more than once.  Create another instance and add it.");
612         }
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     @Override
619     public final void sessionOpened(NextFilter nextFilter, IoSession session) {
620         if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
621             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
622                 session, null); 
623             fireEvent(event);
624         } else {
625             nextFilter.sessionOpened(session);
626         }
627     }
628 
629     /**
630      * {@inheritDoc}
631      */
632     @Override
633     public final void sessionClosed(NextFilter nextFilter, IoSession session) {
634         if (eventTypes.contains(IoEventType.SESSION_CLOSED)) {
635             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED,
636                 session, null); 
637             fireEvent(event);
638         } else {
639             nextFilter.sessionClosed(session);
640         }
641     }
642 
643     /**
644      * {@inheritDoc}
645      */
646     @Override
647     public final void sessionIdle(NextFilter nextFilter, IoSession session,
648             IdleStatus status) {
649         if (eventTypes.contains(IoEventType.SESSION_IDLE)) {
650             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE,
651                 session, status); 
652             fireEvent(event);
653         } else {
654             nextFilter.sessionIdle(session, status);
655         }
656     }
657 
658     /**
659      * {@inheritDoc}
660      */
661     @Override
662     public final void exceptionCaught(NextFilter nextFilter, IoSession session,
663             Throwable cause) {
664         if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) {
665             IoFilterEvent event = new IoFilterEvent(nextFilter,
666                 IoEventType.EXCEPTION_CAUGHT, session, cause); 
667             fireEvent(event);
668         } else {
669             nextFilter.exceptionCaught(session, cause);
670         }
671     }
672 
673     /**
674      * {@inheritDoc}
675      */
676     @Override
677     public final void messageReceived(NextFilter nextFilter, IoSession session,
678             Object message) {
679         if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
680             IoFilterEvent event = new IoFilterEvent(nextFilter,
681                 IoEventType.MESSAGE_RECEIVED, session, message); 
682             fireEvent(event);
683         } else {
684             nextFilter.messageReceived(session, message);
685         }
686     }
687 
688     /**
689      * {@inheritDoc}
690      */
691     @Override
692     public final void messageSent(NextFilter nextFilter, IoSession session,
693             WriteRequest writeRequest) {
694         if (eventTypes.contains(IoEventType.MESSAGE_SENT)) {
695             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT,
696                 session, writeRequest); 
697             fireEvent(event);
698         } else {
699             nextFilter.messageSent(session, writeRequest);
700         }
701     }
702 
703     /**
704      * {@inheritDoc}
705      */
706     @Override
707     public final void filterWrite(NextFilter nextFilter, IoSession session,
708             WriteRequest writeRequest) {
709         if (eventTypes.contains(IoEventType.WRITE)) {
710             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session,
711                 writeRequest); 
712             fireEvent(event);
713         } else {
714             nextFilter.filterWrite(session, writeRequest);
715         }
716     }
717 
718     /**
719      * {@inheritDoc}
720      */
721     @Override
722     public final void filterClose(NextFilter nextFilter, IoSession session)
723             throws Exception {
724         if (eventTypes.contains(IoEventType.CLOSE)) {
725             IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session,
726                 null); 
727             fireEvent(event);
728         } else {
729             nextFilter.filterClose(session);
730         }
731     }
732 }