1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.executor;
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.io.IOException;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.Server;
33 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
34 import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
35 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
36 import org.junit.Test;
37 import static org.mockito.Mockito.*;
38
39 public class TestExecutorService {
40 private static final Log LOG = LogFactory.getLog(TestExecutorService.class);
41
42 @Test
43 public void testExecutorService() throws Exception {
44 int maxThreads = 5;
45 int maxTries = 10;
46 int sleepInterval = 10;
47
48 Server mockedServer = mock(Server.class);
49 when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
50
51
52 ExecutorService executorService = new ExecutorService("unit_test");
53 executorService.startExecutorService(
54 ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
55
56 Executor executor =
57 executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
58 ThreadPoolExecutor pool = executor.threadPoolExecutor;
59
60
61 assertEquals(0, pool.getPoolSize());
62
63 AtomicBoolean lock = new AtomicBoolean(true);
64 AtomicInteger counter = new AtomicInteger(0);
65
66
67 for (int i = 0; i < maxThreads; i++) {
68 executorService.submit(
69 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
70 lock, counter));
71 }
72
73
74 int tries = 0;
75 while (counter.get() < maxThreads && tries < maxTries) {
76 LOG.info("Waiting for all event handlers to start...");
77 Thread.sleep(sleepInterval);
78 tries++;
79 }
80
81
82 assertEquals(maxThreads, counter.get());
83 assertEquals(maxThreads, pool.getPoolSize());
84
85
86 synchronized (lock) {
87 lock.set(false);
88 lock.notifyAll();
89 }
90
91
92 while (counter.get() < (maxThreads * 2) && tries < maxTries) {
93 System.out.println("Waiting for all event handlers to finish...");
94 Thread.sleep(sleepInterval);
95 tries++;
96 }
97
98 assertEquals(maxThreads * 2, counter.get());
99 assertEquals(maxThreads, pool.getPoolSize());
100
101
102
103 for (int i = 0; i < (2 * maxThreads); i++) {
104 executorService.submit(
105 new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
106 lock, counter));
107 }
108
109 synchronized (lock) {
110 lock.set(false);
111 lock.notifyAll();
112 }
113
114
115 Thread.sleep(executor.keepAliveTimeInMillis * 2);
116 assertEquals(maxThreads, pool.getPoolSize());
117 }
118
119 public static class TestEventHandler extends EventHandler {
120 private AtomicBoolean lock;
121 private AtomicInteger counter;
122
123 public TestEventHandler(Server server, EventType eventType,
124 AtomicBoolean lock, AtomicInteger counter) {
125 super(server, eventType);
126 this.lock = lock;
127 this.counter = counter;
128 }
129
130 @Override
131 public void process() throws IOException {
132 int num = counter.incrementAndGet();
133 LOG.info("Running process #" + num + ", threadName=" +
134 Thread.currentThread().getName());
135 synchronized (lock) {
136 while (lock.get()) {
137 try {
138 lock.wait();
139 } catch (InterruptedException e) {
140
141 }
142 }
143 }
144 counter.incrementAndGet();
145 }
146 }
147 }