1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.Set;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionHandler;
32 import java.util.concurrent.SynchronousQueue;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.ThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.apache.mina.core.session.AttributeKey;
39 import org.apache.mina.core.session.DummySession;
40 import org.apache.mina.core.session.IoEvent;
41 import org.apache.mina.core.session.IoSession;
42
43
44
45
46
47
48
49
50
51
52
53 public class OrderedThreadPoolExecutor extends ThreadPoolExecutor {
54
55 private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
56
57
58 private static final int DEFAULT_MAX_THREAD_POOL = 16;
59
60
61 private static final int DEFAULT_KEEP_ALIVE = 30;
62
63 private static final IoSession EXIT_SIGNAL = new DummySession();
64
65
66 private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue");
67
68 private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();
69
70 private final Set<Worker> workers = new HashSet<Worker>();
71
72 private volatile int largestPoolSize;
73 private final AtomicInteger idleWorkers = new AtomicInteger();
74
75 private long completedTaskCount;
76 private volatile boolean shutdown;
77
78 private final IoEventQueueHandler eventQueueHandler;
79
80
81
82
83
84
85
86
87
88 public OrderedThreadPoolExecutor() {
89 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL,
90 DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null);
91 }
92
93
94
95
96
97
98
99
100
101
102 public OrderedThreadPoolExecutor(int maximumPoolSize) {
103 this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
104 Executors.defaultThreadFactory(), null);
105 }
106
107
108
109
110
111
112
113
114
115
116 public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
117 this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS,
118 Executors.defaultThreadFactory(), null);
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public OrderedThreadPoolExecutor(
132 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) {
133 this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
134 Executors.defaultThreadFactory(), null);
135 }
136
137
138
139
140
141
142
143
144
145
146
147 public OrderedThreadPoolExecutor(
148 int corePoolSize, int maximumPoolSize,
149 long keepAliveTime, TimeUnit unit,
150 IoEventQueueHandler queueHandler) {
151 this(corePoolSize, maximumPoolSize, keepAliveTime, unit,
152 Executors.defaultThreadFactory(), queueHandler);
153 }
154
155
156
157
158
159
160
161
162
163
164
165 public OrderedThreadPoolExecutor(
166 int corePoolSize, int maximumPoolSize,
167 long keepAliveTime, TimeUnit unit,
168 ThreadFactory threadFactory) {
169 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null);
170 }
171
172
173
174
175
176
177
178
179
180
181
182 public OrderedThreadPoolExecutor(
183 int corePoolSize, int maximumPoolSize,
184 long keepAliveTime, TimeUnit unit,
185 ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
186
187
188
189 super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit,
190 new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
191
192 if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
193 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
194 }
195
196 if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
197 throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize);
198 }
199
200
201 super.setCorePoolSize( corePoolSize );
202 super.setMaximumPoolSize( maximumPoolSize );
203
204
205 this.eventQueueHandler = queueHandler;
206 }
207
208
209
210
211
212 public IoEventQueueHandler getQueueHandler() {
213 return eventQueueHandler;
214 }
215
216
217
218
219 @Override
220 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
221
222 }
223
224
225
226
227
228 private void addWorker() {
229 synchronized (workers) {
230 if (workers.size() >= super.getMaximumPoolSize()) {
231 return;
232 }
233
234
235 Worker worker = new Worker();
236 Thread thread = getThreadFactory().newThread(worker);
237
238
239 idleWorkers.incrementAndGet();
240
241
242 thread.start();
243 workers.add(worker);
244
245 if (workers.size() > largestPoolSize) {
246 largestPoolSize = workers.size();
247 }
248 }
249 }
250
251
252
253
254 private void addWorkerIfNecessary() {
255 if (idleWorkers.get() == 0) {
256 synchronized (workers) {
257 if (workers.isEmpty() || (idleWorkers.get() == 0)) {
258 addWorker();
259 }
260 }
261 }
262 }
263
264 private void removeWorker() {
265 synchronized (workers) {
266 if (workers.size() <= super.getCorePoolSize()) {
267 return;
268 }
269 waitingSessions.offer(EXIT_SIGNAL);
270 }
271 }
272
273
274
275
276 @Override
277 public int getMaximumPoolSize() {
278 return super.getMaximumPoolSize();
279 }
280
281
282
283
284 @Override
285 public void setMaximumPoolSize(int maximumPoolSize) {
286 if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) {
287 throw new IllegalArgumentException("maximumPoolSize: "
288 + maximumPoolSize);
289 }
290
291 synchronized (workers) {
292 super.setMaximumPoolSize( maximumPoolSize );
293 int difference = workers.size() - maximumPoolSize;
294 while (difference > 0) {
295 removeWorker();
296 --difference;
297 }
298 }
299 }
300
301
302
303
304 @Override
305 public boolean awaitTermination(long timeout, TimeUnit unit)
306 throws InterruptedException {
307
308 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
309
310 synchronized (workers) {
311 while (!isTerminated()) {
312 long waitTime = deadline - System.currentTimeMillis();
313 if (waitTime <= 0) {
314 break;
315 }
316
317 workers.wait(waitTime);
318 }
319 }
320 return isTerminated();
321 }
322
323
324
325
326 @Override
327 public boolean isShutdown() {
328 return shutdown;
329 }
330
331
332
333
334 @Override
335 public boolean isTerminated() {
336 if (!shutdown) {
337 return false;
338 }
339
340 synchronized (workers) {
341 return workers.isEmpty();
342 }
343 }
344
345
346
347
348 @Override
349 public void shutdown() {
350 if (shutdown) {
351 return;
352 }
353
354 shutdown = true;
355
356 synchronized (workers) {
357 for (int i = workers.size(); i > 0; i --) {
358 waitingSessions.offer(EXIT_SIGNAL);
359 }
360 }
361 }
362
363
364
365
366 @Override
367 public List<Runnable> shutdownNow() {
368 shutdown();
369
370 List<Runnable> answer = new ArrayList<Runnable>();
371 IoSession session;
372
373 while ((session = waitingSessions.poll()) != null) {
374 if (session == EXIT_SIGNAL) {
375 waitingSessions.offer(EXIT_SIGNAL);
376 Thread.yield();
377 continue;
378 }
379
380 Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
381
382 synchronized (tasksQueue) {
383
384 for (Runnable task: tasksQueue) {
385 getQueueHandler().polled(this, (IoEvent) task);
386 answer.add(task);
387 }
388
389 tasksQueue.clear();
390 }
391 }
392
393 return answer;
394 }
395
396
397
398
399 @Override
400 public void execute(Runnable task) {
401 if (shutdown) {
402 rejectTask(task);
403 }
404
405
406 checkTaskType(task);
407
408 IoEvent event = (IoEvent) task;
409 IoSession session = event.getSession();
410
411
412 Queue<Runnable> tasksQueue = getTasksQueue(session);
413 boolean offerSession;
414 boolean offerEvent = true;
415
416
417
418
419 if (eventQueueHandler != null) {
420 offerEvent = eventQueueHandler.accept(this, event);
421 }
422
423 if (offerEvent) {
424
425 synchronized (tasksQueue) {
426 offerSession = tasksQueue.isEmpty();
427
428
429 tasksQueue.offer(event);
430 }
431 } else {
432 offerSession = false;
433 }
434
435 if (offerSession) {
436 waitingSessions.offer(session);
437 }
438
439 addWorkerIfNecessary();
440
441 if (offerEvent) {
442 if (eventQueueHandler != null) {
443 eventQueueHandler.offered(this, event);
444 }
445 }
446 }
447
448 private void rejectTask(Runnable task) {
449 getRejectedExecutionHandler().rejectedExecution(task, this);
450 }
451
452 private void checkTaskType(Runnable task) {
453 if (!(task instanceof IoEvent)) {
454 throw new IllegalArgumentException("task must be an IoEvent or its subclass.");
455 }
456 }
457
458
459
460
461 @Override
462 public int getActiveCount() {
463 synchronized (workers) {
464 return workers.size() - idleWorkers.get();
465 }
466 }
467
468
469
470
471 @Override
472 public long getCompletedTaskCount() {
473 synchronized (workers) {
474 long answer = completedTaskCount;
475 for (Worker w: workers) {
476 answer += w.completedTaskCount;
477 }
478
479 return answer;
480 }
481 }
482
483
484
485
486 @Override
487 public int getLargestPoolSize() {
488 return largestPoolSize;
489 }
490
491
492
493
494 @Override
495 public int getPoolSize() {
496 synchronized (workers) {
497 return workers.size();
498 }
499 }
500
501
502
503
504 @Override
505 public long getTaskCount() {
506 return getCompletedTaskCount();
507 }
508
509
510
511
512 @Override
513 public boolean isTerminating() {
514 synchronized (workers) {
515 return isShutdown() && !isTerminated();
516 }
517 }
518
519
520
521
522 @Override
523 public int prestartAllCoreThreads() {
524 int answer = 0;
525 synchronized (workers) {
526 for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) {
527 addWorker();
528 answer ++;
529 }
530 }
531 return answer;
532 }
533
534
535
536
537 @Override
538 public boolean prestartCoreThread() {
539 synchronized (workers) {
540 if (workers.size() < super.getCorePoolSize()) {
541 addWorker();
542 return true;
543 } else {
544 return false;
545 }
546 }
547 }
548
549
550
551
552 @Override
553 public BlockingQueue<Runnable> getQueue() {
554 throw new UnsupportedOperationException();
555 }
556
557
558
559
560 @Override
561 public void purge() {
562
563 }
564
565
566
567
568 @Override
569 public boolean remove(Runnable task) {
570 checkTaskType(task);
571 IoEvent event = (IoEvent) task;
572 IoSession session = event.getSession();
573 Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE);
574
575 if (tasksQueue == null) {
576 return false;
577 }
578
579 boolean removed;
580
581 synchronized (tasksQueue) {
582 removed = tasksQueue.remove(task);
583 }
584
585 if (removed) {
586 getQueueHandler().polled(this, event);
587 }
588
589 return removed;
590 }
591
592
593
594
595 @Override
596 public int getCorePoolSize() {
597 return super.getCorePoolSize();
598 }
599
600
601
602
603 @Override
604 public void setCorePoolSize(int corePoolSize) {
605 if (corePoolSize < 0) {
606 throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
607 }
608 if (corePoolSize > super.getMaximumPoolSize()) {
609 throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize");
610 }
611
612 synchronized (workers) {
613 if (super.getCorePoolSize()> corePoolSize) {
614 for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) {
615 removeWorker();
616 }
617 }
618 super.setCorePoolSize(corePoolSize);
619 }
620 }
621
622 private Queue<Runnable> getTasksQueue(IoSession session) {
623 Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE);
624
625 if (tasksQueue == null) {
626 tasksQueue = new ConcurrentLinkedQueue<Runnable>();
627 Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue);
628
629 if (oldTasksQueue != null) {
630 tasksQueue = oldTasksQueue;
631 }
632 }
633
634 return tasksQueue;
635 }
636
637 private class Worker implements Runnable {
638
639 private volatile long completedTaskCount;
640 private Thread thread;
641
642 public void run() {
643 thread = Thread.currentThread();
644
645 try {
646 for (;;) {
647 IoSession session = fetchSession();
648
649 idleWorkers.decrementAndGet();
650
651 if (session == null) {
652 synchronized (workers) {
653 if (workers.size() > getCorePoolSize()) {
654
655 workers.remove(this);
656 break;
657 }
658 }
659 }
660
661 if (session == EXIT_SIGNAL) {
662 break;
663 }
664
665 try {
666 if (session != null) {
667 runTasks(getTasksQueue(session));
668 }
669 } finally {
670 idleWorkers.incrementAndGet();
671 }
672 }
673 } finally {
674 synchronized (workers) {
675 workers.remove(this);
676 OrderedThreadPoolExecutor.this.completedTaskCount += completedTaskCount;
677 workers.notifyAll();
678 }
679 }
680 }
681
682 private IoSession fetchSession() {
683 IoSession session = null;
684 long currentTime = System.currentTimeMillis();
685 long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS);
686 for (;;) {
687 try {
688 long waitTime = deadline - currentTime;
689 if (waitTime <= 0) {
690 break;
691 }
692
693 try {
694 session = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS);
695 break;
696 } finally {
697 if (session == null) {
698 currentTime = System.currentTimeMillis();
699 }
700 }
701 } catch (InterruptedException e) {
702
703 continue;
704 }
705 }
706 return session;
707 }
708
709 private void runTasks(Queue<Runnable> tasksQueue) {
710 for (;;) {
711 Runnable task;
712
713 synchronized (tasksQueue) {
714 if ( tasksQueue.isEmpty()) {
715 break;
716 }
717
718 task = tasksQueue.poll();
719
720 if (task == null) {
721 break;
722 }
723 }
724
725 if (eventQueueHandler != null) {
726 eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
727 }
728
729 runTask(task);
730 }
731 }
732
733 private void runTask(Runnable task) {
734 beforeExecute(thread, task);
735 boolean ran = false;
736 try {
737 task.run();
738 ran = true;
739 afterExecute(task, null);
740 completedTaskCount ++;
741 } catch (RuntimeException e) {
742 if (!ran) {
743 afterExecute(task, e);
744 }
745 throw e;
746 }
747 }
748 }
749 }