1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.IOException;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.client.HBaseAdmin;
32 import org.apache.hadoop.hbase.regionserver.HRegionServer;
33 import org.apache.hadoop.hbase.security.User;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
36
37 import java.util.concurrent.CopyOnWriteArrayList;
38 import org.apache.hadoop.hbase.master.HMaster;
39 import org.apache.hadoop.hbase.util.JVMClusterUtil;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class LocalHBaseCluster {
58 static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
59 private final List<JVMClusterUtil.MasterThread> masterThreads =
60 new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
61 private final List<JVMClusterUtil.RegionServerThread> regionThreads =
62 new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
63 private final static int DEFAULT_NO = 1;
64
65 public static final String LOCAL = "local";
66
67 public static final String LOCAL_COLON = LOCAL + ":";
68 private final Configuration conf;
69 private final Class<? extends HMaster> masterClass;
70 private final Class<? extends HRegionServer> regionServerClass;
71
72
73
74
75
76
77 public LocalHBaseCluster(final Configuration conf)
78 throws IOException {
79 this(conf, DEFAULT_NO);
80 }
81
82
83
84
85
86
87
88
89 public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
90 throws IOException {
91 this(conf, 1, noRegionServers, getMasterImplementation(conf),
92 getRegionServerImplementation(conf));
93 }
94
95
96
97
98
99
100
101
102
103 public LocalHBaseCluster(final Configuration conf, final int noMasters,
104 final int noRegionServers)
105 throws IOException {
106 this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
107 getRegionServerImplementation(conf));
108 }
109
110 @SuppressWarnings("unchecked")
111 private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
112 return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
113 HRegionServer.class);
114 }
115
116 @SuppressWarnings("unchecked")
117 private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
118 return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
119 HMaster.class);
120 }
121
122
123
124
125
126
127
128
129
130
131
132 @SuppressWarnings("unchecked")
133 public LocalHBaseCluster(final Configuration conf, final int noMasters,
134 final int noRegionServers, final Class<? extends HMaster> masterClass,
135 final Class<? extends HRegionServer> regionServerClass)
136 throws IOException {
137 this.conf = conf;
138
139
140 conf.set(HConstants.MASTER_PORT, "0");
141 conf.set(HConstants.REGIONSERVER_PORT, "0");
142
143 this.masterClass =
144 (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
145 masterClass);
146 for (int i = 0; i < noMasters; i++) {
147 addMaster(new Configuration(conf), i);
148 }
149
150 this.regionServerClass =
151 (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
152 regionServerClass);
153
154 for (int i = 0; i < noRegionServers; i++) {
155 addRegionServer(new Configuration(conf), i);
156 }
157 }
158
159 public JVMClusterUtil.RegionServerThread addRegionServer()
160 throws IOException {
161 return addRegionServer(new Configuration(conf), this.regionThreads.size());
162 }
163
164 public JVMClusterUtil.RegionServerThread addRegionServer(
165 Configuration config, final int index)
166 throws IOException {
167
168
169
170 JVMClusterUtil.RegionServerThread rst =
171 JVMClusterUtil.createRegionServerThread(config,
172 this.regionServerClass, index);
173 this.regionThreads.add(rst);
174 return rst;
175 }
176
177 public JVMClusterUtil.RegionServerThread addRegionServer(
178 final Configuration config, final int index, User user)
179 throws IOException, InterruptedException {
180 return user.runAs(
181 new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
182 public JVMClusterUtil.RegionServerThread run() throws Exception {
183 return addRegionServer(config, index);
184 }
185 });
186 }
187
188 public JVMClusterUtil.MasterThread addMaster() throws IOException {
189 return addMaster(new Configuration(conf), this.masterThreads.size());
190 }
191
192 public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
193 throws IOException {
194
195
196
197 JVMClusterUtil.MasterThread mt =
198 JVMClusterUtil.createMasterThread(c,
199 this.masterClass, index);
200 this.masterThreads.add(mt);
201 return mt;
202 }
203
204 public JVMClusterUtil.MasterThread addMaster(
205 final Configuration c, final int index, User user)
206 throws IOException, InterruptedException {
207 return user.runAs(
208 new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
209 public JVMClusterUtil.MasterThread run() throws Exception {
210 return addMaster(c, index);
211 }
212 });
213 }
214
215
216
217
218
219 public HRegionServer getRegionServer(int serverNumber) {
220 return regionThreads.get(serverNumber).getRegionServer();
221 }
222
223
224
225
226 public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
227 return Collections.unmodifiableList(this.regionThreads);
228 }
229
230
231
232
233
234
235 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
236 List<JVMClusterUtil.RegionServerThread> liveServers =
237 new ArrayList<JVMClusterUtil.RegionServerThread>();
238 List<RegionServerThread> list = getRegionServers();
239 for (JVMClusterUtil.RegionServerThread rst: list) {
240 if (rst.isAlive()) liveServers.add(rst);
241 }
242 return liveServers;
243 }
244
245
246
247
248
249
250
251 public String waitOnRegionServer(int serverNumber) {
252 JVMClusterUtil.RegionServerThread regionServerThread =
253 this.regionThreads.remove(serverNumber);
254 while (regionServerThread.isAlive()) {
255 try {
256 LOG.info("Waiting on " +
257 regionServerThread.getRegionServer().getHServerInfo().toString());
258 regionServerThread.join();
259 } catch (InterruptedException e) {
260 e.printStackTrace();
261 } catch (IOException e) {
262 e.printStackTrace();
263 }
264 }
265 return regionServerThread.getName();
266 }
267
268
269
270
271
272
273
274 public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
275 while (rst.isAlive()) {
276 try {
277 LOG.info("Waiting on " +
278 rst.getRegionServer().getHServerInfo().toString());
279 rst.join();
280 } catch (InterruptedException e) {
281 e.printStackTrace();
282 } catch (IOException e) {
283 e.printStackTrace();
284 }
285 }
286 for (int i=0;i<regionThreads.size();i++) {
287 if (regionThreads.get(i) == rst) {
288 regionThreads.remove(i);
289 break;
290 }
291 }
292 return rst.getName();
293 }
294
295
296
297
298
299 public HMaster getMaster(int serverNumber) {
300 return masterThreads.get(serverNumber).getMaster();
301 }
302
303
304
305
306
307
308 public HMaster getActiveMaster() {
309 for (JVMClusterUtil.MasterThread mt : masterThreads) {
310 if (mt.getMaster().isActiveMaster()) {
311 return mt.getMaster();
312 }
313 }
314 return null;
315 }
316
317
318
319
320 public List<JVMClusterUtil.MasterThread> getMasters() {
321 return Collections.unmodifiableList(this.masterThreads);
322 }
323
324
325
326
327
328
329 public List<JVMClusterUtil.MasterThread> getLiveMasters() {
330 List<JVMClusterUtil.MasterThread> liveServers =
331 new ArrayList<JVMClusterUtil.MasterThread>();
332 List<JVMClusterUtil.MasterThread> list = getMasters();
333 for (JVMClusterUtil.MasterThread mt: list) {
334 if (mt.isAlive()) {
335 liveServers.add(mt);
336 }
337 }
338 return liveServers;
339 }
340
341
342
343
344
345
346
347 public String waitOnMaster(int serverNumber) {
348 JVMClusterUtil.MasterThread masterThread =
349 this.masterThreads.remove(serverNumber);
350 while (masterThread.isAlive()) {
351 try {
352 LOG.info("Waiting on " +
353 masterThread.getMaster().getServerName().toString());
354 masterThread.join();
355 } catch (InterruptedException e) {
356 e.printStackTrace();
357 }
358 }
359 return masterThread.getName();
360 }
361
362
363
364
365
366
367
368 public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
369 while (masterThread.isAlive()) {
370 try {
371 LOG.info("Waiting on " +
372 masterThread.getMaster().getServerName().toString());
373 masterThread.join();
374 } catch (InterruptedException e) {
375 e.printStackTrace();
376 }
377 }
378 for (int i=0;i<masterThreads.size();i++) {
379 if (masterThreads.get(i) == masterThread) {
380 masterThreads.remove(i);
381 break;
382 }
383 }
384 return masterThread.getName();
385 }
386
387
388
389
390
391 public void join() {
392 if (this.regionThreads != null) {
393 for(Thread t: this.regionThreads) {
394 if (t.isAlive()) {
395 try {
396 t.join();
397 } catch (InterruptedException e) {
398
399 }
400 }
401 }
402 }
403 if (this.masterThreads != null) {
404 for (Thread t : this.masterThreads) {
405 if (t.isAlive()) {
406 try {
407 t.join();
408 } catch (InterruptedException e) {
409
410 }
411 }
412 }
413 }
414 }
415
416
417
418
419 public void startup() {
420 JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
421 }
422
423
424
425
426 public void shutdown() {
427 JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
428 }
429
430
431
432
433
434 public static boolean isLocal(final Configuration c) {
435 final String mode = c.get(HConstants.CLUSTER_DISTRIBUTED);
436 return mode == null || mode.equals(HConstants.CLUSTER_IS_LOCAL);
437 }
438
439
440
441
442
443
444 public static void main(String[] args) throws IOException {
445 Configuration conf = HBaseConfiguration.create();
446 LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
447 cluster.startup();
448 HBaseAdmin admin = new HBaseAdmin(conf);
449 HTableDescriptor htd =
450 new HTableDescriptor(Bytes.toBytes(cluster.getClass().getName()));
451 admin.createTable(htd);
452 cluster.shutdown();
453 }
454 }