View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Queue;
26  import java.util.Set;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.RejectedExecutionHandler;
31  import java.util.concurrent.SynchronousQueue;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  
37  import org.apache.mina.common.AttributeKey;
38  import org.apache.mina.common.DummySession;
39  import org.apache.mina.common.IoEvent;
40  import org.apache.mina.common.IoSession;
41  import org.apache.mina.util.CircularQueue;
42  
43  /**
44   * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s.
45   * <p>
46   * If you don't need to maintain the order of events per session, please use
47   * {@link UnorderedThreadPoolExecutor}.
48  
49   * @author The Apache MINA Project (dev@mina.apache.org)
50   * @version $Rev: 595549 $, $Date: 2007-11-15 21:45:36 -0700 (Thu, 15 Nov 2007) $
51   */
52  public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
53  
54      private static final IoSession EXIT_SIGNAL = new DummySession();
55      private static final IoEventQueueHandler NOOP_QUEUE_HANDLER = new IoEventQueueHandler() {
56          public boolean accept(ThreadPoolExecutor executor, IoEvent event) {
57              return true;
58          }
59          public void offered(ThreadPoolExecutor executor, IoEvent event) {}
60          public void polled(ThreadPoolExecutor executor, IoEvent event) {}
61      };
62  
63      private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
64      private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
65      
66      private final Set<Worker> workers = new HashSet<Worker>();
67      
68      private volatile int corePoolSize;
69      private volatile int maximumPoolSize;
70      private volatile int largestPoolSize;
71      private final AtomicInteger idleWorkers = new AtomicInteger();
72      
73      private long completedTaskCount;
74      private volatile boolean shutdown;
75      
76      private final IoEventQueueHandler queueHandler;
77      
78      public OrderedThreadPoolExecutor() {
79          this(16);
80      }
81      
82      public OrderedThreadPoolExecutor(int maximumPoolSize) {
83          this(0, maximumPoolSize);
84      }
85      
86      public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
87          this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS);
88      }
89      
90      public OrderedThreadPoolExecutor(
91              int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
92          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory());
93      }
94      
95      public OrderedThreadPoolExecutor(
96              int corePoolSize, int maximumPoolSize, 
97              long keepAliveTime, TimeUnit unit,
98              IoEventQueueHandler queueHandler) {
99          this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler);
100     }
101 
102     public OrderedThreadPoolExecutor(
103             int corePoolSize, int maximumPoolSize, 
104             long keepAliveTime, TimeUnit unit,
105             ThreadFactory threadFactory) {
106         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
107     }
108 
109     public OrderedThreadPoolExecutor(
110             int corePoolSize, int maximumPoolSize, 
111             long keepAliveTime, TimeUnit unit,
112             ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
113         super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
114         if (corePoolSize < 0) {
115             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
116         }
117         
118         if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) {
119             throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
120         }
121         
122         if (queueHandler == null) {
123             queueHandler = NOOP_QUEUE_HANDLER;
124         }
125         
126         this.corePoolSize = corePoolSize;
127         this.maximumPoolSize = maximumPoolSize;
128         this.queueHandler = queueHandler;
129     }
130     
131     public IoEventQueueHandler getQueueHandler() {
132         return queueHandler;
133     }
134 
135     @Override
136     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
137         // Ignore the request.  It must always be AbortPolicy.
138     }
139 
140     private void addWorker() {
141         synchronized (workers) {
142             if (workers.size() >= maximumPoolSize) {
143                 return;
144             }
145 
146             Worker worker = new Worker();
147             Thread thread = getThreadFactory().newThread(worker);
148             idleWorkers.incrementAndGet();
149             thread.start();
150             workers.add(worker);
151             
152             if (workers.size() > largestPoolSize) {
153                 largestPoolSize = workers.size();
154             }
155         }
156     }
157     
158     private void addWorkerIfNecessary() {
159         if (idleWorkers.get() == 0) {
160             synchronized (workers) {
161                 if (workers.isEmpty() || idleWorkers.get() == 0) {
162                     addWorker();
163                 }
164             }
165         }
166     }
167     
168     private void removeWorker() {
169         synchronized (workers) {
170             if (workers.size() <= corePoolSize) {
171                 return;
172             }
173             waitingSessions.offer(EXIT_SIGNAL);
174         }
175     }
176     
177     @Override
178     public int getMaximumPoolSize() {
179         return maximumPoolSize;
180     }
181     
182     @Override
183     public void setMaximumPoolSize(int maximumPoolSize) {
184         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
185             throw new IllegalArgumentException("maximumPoolSize: "
186                     + maximumPoolSize);
187         }
188 
189         synchronized (workers) {
190             this.maximumPoolSize = maximumPoolSize;
191             int difference = workers.size() - maximumPoolSize;
192             while (difference > 0) {
193                 removeWorker();
194                 --difference;
195             }
196         }
197     }
198     
199     @Override
200     public boolean awaitTermination(long timeout, TimeUnit unit)
201             throws InterruptedException {
202         
203         long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
204         
205         synchronized (workers) {
206             while (!isTerminated()) {
207                 long waitTime = deadline - System.currentTimeMillis();
208                 if (waitTime <= 0) {
209                     break;
210                 }
211                 
212                 workers.wait(waitTime);
213             }
214         }
215         return isTerminated();
216     }
217 
218     @Override
219     public boolean isShutdown() {
220         return shutdown;
221     }
222 
223     @Override
224     public boolean isTerminated() {
225         if (!shutdown) {
226             return false;
227         }
228         
229         synchronized (workers) {
230             return workers.isEmpty();
231         }
232     }
233 
234     @Override
235     public void shutdown() {
236         if (shutdown) {
237             return;
238         }
239         
240         shutdown = true;
241 
242         synchronized (workers) {
243             for (int i = workers.size(); i > 0; i --) {
244                 waitingSessions.offer(EXIT_SIGNAL);
245             }
246         }
247     }
248 
249     @Override
250     public List<Runnable> shutdownNow() {
251         shutdown();
252         
253         List<Runnable> answer = new ArrayList<Runnable>();
254         IoSession session;
255         while ((session = waitingSessions.poll()) != null) {
256             if (session == EXIT_SIGNAL) {
257                 waitingSessions.offer(EXIT_SIGNAL);
258                 Thread.yield(); // Let others take the signal.
259                 continue;
260             }
261             
262             SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER);
263             synchronized (buf.queue) {
264                 for (Runnable task: buf.queue) {
265                     getQueueHandler().polled(this, (IoEvent) task);
266                     answer.add(task);
267                 }
268                 buf.queue.clear();
269             }
270         }
271         
272         return answer;
273     }
274 
275     @Override
276     public void execute(Runnable task) {
277         if (shutdown) {
278             rejectTask(task);
279         }
280 
281         checkTaskType(task);
282         
283         IoEvent e = (IoEvent) task;
284         IoSession s = e.getSession();
285         SessionBuffer buf = getSessionBuffer(s);
286         Queue<Runnable> queue = buf.queue;
287         boolean offerSession;
288         boolean offerEvent = queueHandler.accept(this, e);
289         if (offerEvent) {
290             synchronized (queue) {
291                 queue.offer(e);
292                 if (buf.processingCompleted) {
293                     buf.processingCompleted = false;
294                     offerSession = true;
295                 } else {
296                     offerSession = false;
297                 }
298             }            
299         } else {
300             offerSession = false;
301         }
302         
303         if (offerSession) {
304             waitingSessions.offer(s);
305         }
306         
307         addWorkerIfNecessary();
308         
309         if (offerEvent) {
310             queueHandler.offered(this, e);
311         }
312     }
313     
314     private void rejectTask(Runnable task) {
315         getRejectedExecutionHandler().rejectedExecution(task, this);
316     }
317     
318     private void checkTaskType(Runnable task) {
319         if (!(task instanceof IoEvent)) {
320             throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
321         }
322     }
323 
324     @Override
325     public int getActiveCount() {
326         synchronized (workers) {
327             return workers.size() - idleWorkers.get();
328         }
329     }
330 
331     @Override
332     public long getCompletedTaskCount() {
333         synchronized (workers) {
334             long answer = completedTaskCount;
335             for (Worker w: workers) {
336                 answer += w.completedTaskCount;
337             }
338             
339             return answer;
340         }
341     }
342 
343     @Override
344     public int getLargestPoolSize() {
345         return largestPoolSize;
346     }
347 
348     @Override
349     public int getPoolSize() {
350         synchronized (workers) {
351             return workers.size();
352         }
353     }
354 
355     @Override
356     public long getTaskCount() {
357         return getCompletedTaskCount();
358     }
359 
360     @Override
361     public boolean isTerminating() {
362         synchronized (workers) {
363             return isShutdown() && !isTerminated();
364         }
365     }
366 
367     @Override
368     public int prestartAllCoreThreads() {
369         int answer = 0;
370         synchronized (workers) {
371             for (int i = corePoolSize - workers.size() ; i > 0; i --) {
372                 addWorker();
373                 answer ++;
374             }
375         }
376         return answer;
377     }
378 
379     @Override
380     public boolean prestartCoreThread() {
381         synchronized (workers) {
382             if (workers.size() < corePoolSize) {
383                 addWorker();
384                 return true;
385             } else {
386                 return false;
387             }
388         }
389     }
390     
391     @Override
392     public BlockingQueue<Runnable> getQueue() {
393         throw new UnsupportedOperationException();
394     }
395     
396     @Override
397     public void purge() {
398     }
399 
400     @Override
401     public boolean remove(Runnable task) {
402         checkTaskType(task);
403         IoEvent e = (IoEvent) task;
404         IoSession s = e.getSession();
405         SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER);
406         if (buffer == null) {
407             return false;
408         }
409         
410         boolean removed;
411         synchronized (buffer.queue) {
412             removed = buffer.queue.remove(task);
413         }
414         
415         if (removed) {
416             getQueueHandler().polled(this, e);
417         }
418         
419         return removed;
420     }
421     
422     @Override
423     public int getCorePoolSize() {
424         return corePoolSize;
425     }
426 
427     @Override
428     public void setCorePoolSize(int corePoolSize) {
429         if (corePoolSize < 0) {
430             throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
431         }
432         if (corePoolSize > maximumPoolSize) {
433             throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
434         }
435         
436         synchronized (workers) {
437             if (this.corePoolSize > corePoolSize) {
438                 for (int i = this.corePoolSize - corePoolSize; i > 0; i --) {
439                     removeWorker();
440                 }
441             }
442             this.corePoolSize = corePoolSize;
443         }
444     }
445 
446     private SessionBuffer getSessionBuffer(IoSession session) {
447         SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER);
448         if (buffer == null) {
449             buffer = new SessionBuffer();
450             SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer);
451             if (oldBuffer != null) {
452                 buffer = oldBuffer;
453             }
454         }
455         return buffer;
456     }
457     
458     private static class SessionBuffer {
459         private final Queue<Runnable> queue = new CircularQueue<Runnable>();
460         private boolean processingCompleted = true;
461     }
462     
463     private class Worker implements Runnable {
464         
465         private volatile long completedTaskCount;
466         private Thread thread;
467         
468         public void run() {
469             thread = Thread.currentThread();
470 
471             try {
472                 for (;;) {
473                     IoSession session = fetchSession();
474                     
475                     idleWorkers.decrementAndGet();
476                     
477                     if (session == null) {
478                         synchronized (workers) {
479                             if (workers.size() > corePoolSize) {
480                                 // Remove now to prevent duplicate exit.
481                                 workers.remove(this);
482                                 break;
483                             }
484                         }
485                     }
486                     
487                     if (session == EXIT_SIGNAL) {
488                         break;
489                     }
490                     
491                     try {
492                         runTasks(getSessionBuffer(session));
493                     } finally {
494                         idleWorkers.incrementAndGet();
495                     }
496                 }
497             } finally {
498                 synchronized (workers) {
499                     workers.remove(this);
500                     OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
501                     workers.notifyAll();
502                 }
503             }
504         }
505 
506         private IoSession fetchSession() {
507             IoSession session = null;
508             long currentTime = System.currentTimeMillis();
509             long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
510             for (;;) {
511                 try {
512                     long waitTime = deadline - currentTime;
513                     if (waitTime <= 0) {
514                         break;
515                     }
516 
517                     try {
518                         session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
519                         break;
520                     } finally {
521                         if (session == null) {
522                             currentTime = System.currentTimeMillis();
523                         }
524                     }
525                 } catch (InterruptedException e) {
526                     // Ignore.
527                     continue;
528                 }
529             }
530             return session;
531         }
532 
533         private void runTasks(SessionBuffer buf) {
534             for (;;) {
535                 Runnable task;
536                 synchronized (buf.queue) {
537                     task = buf.queue.poll();
538     
539                     if (task == null) {
540                         buf.processingCompleted = true;
541                         break;
542                     }
543                 }
544 
545                 queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
546 
547                 runTask(task);
548             }
549         }
550 
551         private void runTask(Runnable task) {
552             beforeExecute(thread, task);
553             boolean ran = false;
554             try {
555                 task.run();
556                 ran = true;
557                 afterExecute(task, null);
558                 completedTaskCount ++;
559             } catch (RuntimeException e) {
560                 if (!ran)
561                     afterExecute(task, e);
562                 throw e;
563             }
564         }
565     }
566 }