1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.executor;
21
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
34 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
35
36 import com.google.common.util.concurrent.ThreadFactoryBuilder;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class ExecutorService {
54 private static final Log LOG = LogFactory.getLog(ExecutorService.class);
55
56
57 private final ConcurrentHashMap<String, Executor> executorMap =
58 new ConcurrentHashMap<String, Executor>();
59
60
61 private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
62 new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
63
64
65 private final String servername;
66
67
68
69
70
71 public enum ExecutorType {
72
73
74 MASTER_CLOSE_REGION (1),
75 MASTER_OPEN_REGION (2),
76 MASTER_SERVER_OPERATIONS (3),
77 MASTER_TABLE_OPERATIONS (4),
78 MASTER_RS_SHUTDOWN (5),
79 MASTER_META_SERVER_OPERATIONS (6),
80
81
82 RS_OPEN_REGION (20),
83 RS_OPEN_ROOT (21),
84 RS_OPEN_META (22),
85 RS_CLOSE_REGION (23),
86 RS_CLOSE_ROOT (24),
87 RS_CLOSE_META (25);
88
89 ExecutorType(int value) {}
90
91
92
93
94
95 String getExecutorName(String serverName) {
96 return this.toString() + "-" + serverName;
97 }
98 }
99
100
101
102
103
104
105 public ExecutorType getExecutorServiceType(final EventHandler.EventType type) {
106 switch(type) {
107
108
109 case RS_ZK_REGION_CLOSED:
110 return ExecutorType.MASTER_CLOSE_REGION;
111
112 case RS_ZK_REGION_OPENED:
113 return ExecutorType.MASTER_OPEN_REGION;
114
115 case M_SERVER_SHUTDOWN:
116 return ExecutorType.MASTER_SERVER_OPERATIONS;
117
118 case M_META_SERVER_SHUTDOWN:
119 return ExecutorType.MASTER_META_SERVER_OPERATIONS;
120
121 case C_M_DELETE_TABLE:
122 case C_M_DISABLE_TABLE:
123 case C_M_ENABLE_TABLE:
124 case C_M_MODIFY_TABLE:
125 return ExecutorType.MASTER_TABLE_OPERATIONS;
126
127
128
129 case M_RS_OPEN_REGION:
130 return ExecutorType.RS_OPEN_REGION;
131
132 case M_RS_OPEN_ROOT:
133 return ExecutorType.RS_OPEN_ROOT;
134
135 case M_RS_OPEN_META:
136 return ExecutorType.RS_OPEN_META;
137
138 case M_RS_CLOSE_REGION:
139 return ExecutorType.RS_CLOSE_REGION;
140
141 case M_RS_CLOSE_ROOT:
142 return ExecutorType.RS_CLOSE_ROOT;
143
144 case M_RS_CLOSE_META:
145 return ExecutorType.RS_CLOSE_META;
146
147 default:
148 throw new RuntimeException("Unhandled event type " + type);
149 }
150 }
151
152
153
154
155
156 public ExecutorService(final String servername) {
157 super();
158 this.servername = servername;
159 }
160
161
162
163
164
165
166 void startExecutorService(String name, int maxThreads) {
167 if (this.executorMap.get(name) != null) {
168 throw new RuntimeException("An executor service with the name " + name +
169 " is already running!");
170 }
171 Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
172 if (this.executorMap.putIfAbsent(name, hbes) != null) {
173 throw new RuntimeException("An executor service with the name " + name +
174 " is already running (2)!");
175 }
176 LOG.debug("Starting executor service name=" + name +
177 ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
178 ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
179 }
180
181 boolean isExecutorServiceRunning(String name) {
182 return this.executorMap.containsKey(name);
183 }
184
185 public void shutdown() {
186 for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
187 List<Runnable> wasRunning =
188 entry.getValue().threadPoolExecutor.shutdownNow();
189 if (!wasRunning.isEmpty()) {
190 LOG.info(entry.getKey() + " had " + wasRunning + " on shutdown");
191 }
192 }
193 this.executorMap.clear();
194 }
195
196 Executor getExecutor(final ExecutorType type) {
197 return getExecutor(type.getExecutorName(this.servername));
198 }
199
200 Executor getExecutor(String name) {
201 Executor executor = this.executorMap.get(name);
202 if (executor == null) {
203 LOG.debug("Executor service [" + name + "] not found in " + this.executorMap);
204 }
205 return executor;
206 }
207
208
209 public void startExecutorService(final ExecutorType type, final int maxThreads) {
210 String name = type.getExecutorName(this.servername);
211 if (isExecutorServiceRunning(name)) {
212 LOG.debug("Executor service " + toString() + " already running on " +
213 this.servername);
214 return;
215 }
216 startExecutorService(name, maxThreads);
217 }
218
219 public void submit(final EventHandler eh) {
220 getExecutor(getExecutorServiceType(eh.getEventType())).submit(eh);
221 }
222
223
224
225
226
227
228
229
230 public void registerListener(final EventHandler.EventType type,
231 final EventHandlerListener listener) {
232 this.eventHandlerListeners.put(type, listener);
233 }
234
235
236
237
238
239
240
241 public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
242 return this.eventHandlerListeners.remove(type);
243 }
244
245
246
247
248 static class Executor {
249
250 final long keepAliveTimeInMillis = 1000;
251
252 final ThreadPoolExecutor threadPoolExecutor;
253
254 final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
255 private final String name;
256 private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
257
258 protected Executor(String name, int maxThreads,
259 final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
260 this.name = name;
261 this.eventHandlerListeners = eventHandlerListeners;
262
263 this.threadPoolExecutor = new ThreadPoolExecutor(maxThreads, maxThreads,
264 keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
265
266 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
267 tfb.setNameFormat(this.name + "-%d");
268 this.threadPoolExecutor.setThreadFactory(tfb.build());
269 }
270
271
272
273
274
275 void submit(final EventHandler event) {
276
277
278 EventHandlerListener listener =
279 this.eventHandlerListeners.get(event.getEventType());
280 if (listener != null) {
281 event.setListener(listener);
282 }
283 this.threadPoolExecutor.execute(event);
284 }
285 }
286 }