1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.IOException;
23 import java.security.PrivilegedAction;
24 import java.security.PrivilegedExceptionAction;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.hbase.client.HConnectionManager;
35 import org.apache.hadoop.hbase.master.HMaster;
36 import org.apache.hadoop.hbase.regionserver.HRegion;
37 import org.apache.hadoop.hbase.regionserver.HRegionServer;
38 import org.apache.hadoop.hbase.security.User;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.JVMClusterUtil;
41 import org.apache.hadoop.hbase.util.Threads;
42 import org.apache.hadoop.hdfs.DistributedFileSystem;
43 import org.apache.hadoop.io.MapWritable;
44 import org.apache.zookeeper.KeeperException;
45
46
47
48
49
50
51
52 public class MiniHBaseCluster {
53 static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
54 private Configuration conf;
55 public LocalHBaseCluster hbaseCluster;
56 private static int index;
57
58
59
60
61
62
63
64 public MiniHBaseCluster(Configuration conf, int numRegionServers)
65 throws IOException, InterruptedException {
66 this(conf, 1, numRegionServers);
67 }
68
69
70
71
72
73
74
75
76 public MiniHBaseCluster(Configuration conf, int numMasters,
77 int numRegionServers)
78 throws IOException, InterruptedException {
79 this.conf = conf;
80 conf.set(HConstants.MASTER_PORT, "0");
81 init(numMasters, numRegionServers);
82 }
83
84 public Configuration getConfiguration() {
85 return this.conf;
86 }
87
88
89
90
91 public static class MiniHBaseClusterMaster extends HMaster {
92 private final Map<HServerInfo, List<HMsg>> messages =
93 new ConcurrentHashMap<HServerInfo, List<HMsg>>();
94
95 private final Map<HServerInfo, IOException> exceptions =
96 new ConcurrentHashMap<HServerInfo, IOException>();
97
98 public MiniHBaseClusterMaster(final Configuration conf)
99 throws IOException, KeeperException, InterruptedException {
100 super(conf);
101 }
102
103
104
105
106
107
108 void addMessage(final HServerInfo hsi, HMsg msg) {
109 synchronized(this.messages) {
110 List<HMsg> hmsgs = this.messages.get(hsi);
111 if (hmsgs == null) {
112 hmsgs = new ArrayList<HMsg>();
113 this.messages.put(hsi, hmsgs);
114 }
115 hmsgs.add(msg);
116 }
117 }
118
119 void addException(final HServerInfo hsi, final IOException ex) {
120 this.exceptions.put(hsi, ex);
121 }
122
123
124
125
126
127
128
129
130
131
132 @Override
133 protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
134 final HMsg[] msgs) throws IOException {
135 IOException ex = this.exceptions.remove(hsi);
136 if (ex != null) {
137 throw ex;
138 }
139 HMsg [] answerMsgs = msgs;
140 synchronized (this.messages) {
141 List<HMsg> hmsgs = this.messages.get(hsi);
142 if (hmsgs != null && !hmsgs.isEmpty()) {
143 int size = answerMsgs.length;
144 HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
145 System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
146 for (int i = 0; i < hmsgs.size(); i++) {
147 newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
148 }
149 answerMsgs = newAnswerMsgs;
150 hmsgs.clear();
151 }
152 }
153 return super.adornRegionServerAnswer(hsi, answerMsgs);
154 }
155 }
156
157
158
159
160
161
162
163 public static class MiniHBaseClusterRegionServer extends HRegionServer {
164 private Thread shutdownThread = null;
165 private User user = null;
166
167 public MiniHBaseClusterRegionServer(Configuration conf)
168 throws IOException, InterruptedException {
169 super(conf);
170 this.user = User.getCurrent();
171 }
172
173 public void setHServerInfo(final HServerInfo hsi) {
174 this.serverInfo = hsi;
175 }
176
177
178
179
180
181
182
183
184
185 @Override
186 protected void handleReportForDutyResponse(MapWritable c) throws IOException {
187 super.handleReportForDutyResponse(c);
188
189 this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
190 }
191
192 @Override
193 public void run() {
194 try {
195 this.user.runAs(new PrivilegedAction<Object>(){
196 public Object run() {
197 runRegionServer();
198 return null;
199 }
200 });
201 } catch (Throwable t) {
202 LOG.error("Exception in run", t);
203 } finally {
204
205 if (this.shutdownThread != null) {
206 this.shutdownThread.start();
207 Threads.shutdown(this.shutdownThread, 30000);
208 }
209 }
210 }
211
212 private void runRegionServer() {
213 super.run();
214 }
215
216 @Override
217 public void kill() {
218 super.kill();
219 }
220
221 public void abort(final String reason, final Throwable cause) {
222 this.user.runAs(new PrivilegedAction<Object>() {
223 public Object run() {
224 abortRegionServer(reason, cause);
225 return null;
226 }
227 });
228 }
229
230 private void abortRegionServer(String reason, Throwable cause) {
231 super.abort(reason, cause);
232 }
233 }
234
235
236
237
238
239 static class SingleFileSystemShutdownThread extends Thread {
240 private final FileSystem fs;
241 SingleFileSystemShutdownThread(final FileSystem fs) {
242 super("Shutdown of " + fs);
243 this.fs = fs;
244 }
245 @Override
246 public void run() {
247 try {
248 LOG.info("Hook closing fs=" + this.fs);
249 this.fs.close();
250 } catch (NullPointerException npe) {
251 LOG.debug("Need to fix these: " + npe.toString());
252 } catch (IOException e) {
253 LOG.warn("Running hook", e);
254 }
255 }
256 }
257
258 private void init(final int nMasterNodes, final int nRegionNodes)
259 throws IOException, InterruptedException {
260 try {
261
262 hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
263 MiniHBaseCluster.MiniHBaseClusterMaster.class,
264 MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
265
266
267 for (int i=0; i<nRegionNodes; i++) {
268 Configuration rsConf = HBaseConfiguration.create(conf);
269 User user = HBaseTestingUtility.getDifferentUser(rsConf,
270 ".hfs."+index++);
271 hbaseCluster.addRegionServer(rsConf, i, user);
272 }
273
274 hbaseCluster.startup();
275 } catch (IOException e) {
276 shutdown();
277 throw e;
278 } catch (Throwable t) {
279 LOG.error("Error starting cluster", t);
280 shutdown();
281 throw new IOException("Shutting down", t);
282 }
283 }
284
285
286
287
288
289
290
291 public JVMClusterUtil.RegionServerThread startRegionServer()
292 throws IOException {
293 final Configuration newConf = HBaseConfiguration.create(conf);
294 User rsUser =
295 HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
296 JVMClusterUtil.RegionServerThread t = null;
297 try {
298 t = hbaseCluster.addRegionServer(
299 newConf, hbaseCluster.getRegionServers().size(), rsUser);
300 t.start();
301 t.waitForServerOnline();
302 } catch (InterruptedException ie) {
303 throw new IOException("Interrupted executing UserGroupInformation.doAs()", ie);
304 }
305 return t;
306 }
307
308
309
310
311
312 public String abortRegionServer(int serverNumber) {
313 HRegionServer server = getRegionServer(serverNumber);
314 LOG.info("Aborting " + server.toString());
315 server.abort("Aborting for tests", new Exception("Trace info"));
316 return server.toString();
317 }
318
319
320
321
322
323
324
325 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
326 return stopRegionServer(serverNumber, true);
327 }
328
329
330
331
332
333
334
335
336
337
338
339 public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
340 final boolean shutdownFS) {
341 JVMClusterUtil.RegionServerThread server =
342 hbaseCluster.getRegionServers().get(serverNumber);
343 LOG.info("Stopping " + server.toString());
344 server.getRegionServer().stop("Stopping rs " + serverNumber);
345 return server;
346 }
347
348
349
350
351
352
353
354 public String waitOnRegionServer(final int serverNumber) {
355 return this.hbaseCluster.waitOnRegionServer(serverNumber);
356 }
357
358
359
360
361
362
363
364
365 public JVMClusterUtil.MasterThread startMaster() throws IOException {
366 Configuration c = HBaseConfiguration.create(conf);
367 User user =
368 HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
369
370 JVMClusterUtil.MasterThread t = null;
371 try {
372 t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
373 t.start();
374 t.waitForServerOnline();
375 } catch (InterruptedException ie) {
376 throw new IOException("Interrupted executing UserGroupInformation.doAs()", ie);
377 }
378 return t;
379 }
380
381
382
383
384
385
386 public HServerAddress getHMasterAddress() {
387 return this.hbaseCluster.getActiveMaster().getMasterAddress();
388 }
389
390
391
392
393
394 public HMaster getMaster() {
395 return this.hbaseCluster.getActiveMaster();
396 }
397
398
399
400
401
402 public HMaster getMaster(final int serverNumber) {
403 return this.hbaseCluster.getMaster(serverNumber);
404 }
405
406
407
408
409
410 public String abortMaster(int serverNumber) {
411 HMaster server = getMaster(serverNumber);
412 LOG.info("Aborting " + server.toString());
413 server.abort("Aborting for tests", new Exception("Trace info"));
414 return server.toString();
415 }
416
417
418
419
420
421
422
423 public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
424 return stopMaster(serverNumber, true);
425 }
426
427
428
429
430
431
432
433
434
435
436
437 public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
438 final boolean shutdownFS) {
439 JVMClusterUtil.MasterThread server =
440 hbaseCluster.getMasters().get(serverNumber);
441 LOG.info("Stopping " + server.toString());
442 server.getMaster().stop("Stopping master " + serverNumber);
443 return server;
444 }
445
446
447
448
449
450
451
452 public String waitOnMaster(final int serverNumber) {
453 return this.hbaseCluster.waitOnMaster(serverNumber);
454 }
455
456
457
458
459
460
461
462
463
464 public boolean waitForActiveAndReadyMaster() throws InterruptedException {
465 List<JVMClusterUtil.MasterThread> mts;
466 while ((mts = getMasterThreads()).size() > 0) {
467 for (JVMClusterUtil.MasterThread mt : mts) {
468 if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
469 return true;
470 }
471 }
472 Thread.sleep(200);
473 }
474 return false;
475 }
476
477
478
479
480 public List<JVMClusterUtil.MasterThread> getMasterThreads() {
481 return this.hbaseCluster.getMasters();
482 }
483
484
485
486
487 public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
488 return this.hbaseCluster.getLiveMasters();
489 }
490
491
492
493
494 public void join() {
495 this.hbaseCluster.join();
496 }
497
498
499
500
501
502 public void shutdown() throws IOException {
503 if (this.hbaseCluster != null) {
504 this.hbaseCluster.shutdown();
505 }
506 HConnectionManager.deleteAllConnections(false);
507 }
508
509
510
511
512
513 public void flushcache() throws IOException {
514 for (JVMClusterUtil.RegionServerThread t:
515 this.hbaseCluster.getRegionServers()) {
516 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
517 r.flushcache();
518 }
519 }
520 }
521
522
523
524
525
526 public void flushcache(byte [] tableName) throws IOException {
527 for (JVMClusterUtil.RegionServerThread t:
528 this.hbaseCluster.getRegionServers()) {
529 for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
530 if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
531 r.flushcache();
532 }
533 }
534 }
535 }
536
537
538
539
540 public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
541 return this.hbaseCluster.getRegionServers();
542 }
543
544
545
546
547 public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
548 return this.hbaseCluster.getLiveRegionServers();
549 }
550
551
552
553
554
555
556 public HRegionServer getRegionServer(int serverNumber) {
557 return hbaseCluster.getRegionServer(serverNumber);
558 }
559
560 public List<HRegion> getRegions(byte[] tableName) {
561 List<HRegion> ret = new ArrayList<HRegion>();
562 for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
563 HRegionServer hrs = rst.getRegionServer();
564 for (HRegion region : hrs.getOnlineRegionsLocalContext()) {
565 if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
566 ret.add(region);
567 }
568 }
569 }
570 return ret;
571 }
572
573
574
575
576
577 public int getServerWithMeta() {
578 return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
579 }
580
581
582
583
584
585
586
587 public int getServerWith(byte[] regionName) {
588 int index = -1;
589 int count = 0;
590 for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
591 HRegionServer hrs = rst.getRegionServer();
592 HRegion metaRegion =
593 hrs.getOnlineRegion(regionName);
594 if (metaRegion != null) {
595 index = count;
596 break;
597 }
598 count++;
599 }
600 return index;
601 }
602
603
604
605
606
607
608
609 public void addExceptionToSendRegionServer(final int serverNumber,
610 IOException ex) throws IOException {
611 MiniHBaseClusterRegionServer hrs =
612 (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
613 addExceptionToSendRegionServer(hrs, ex);
614 }
615
616
617
618
619
620
621
622 public void addExceptionToSendRegionServer(
623 final MiniHBaseClusterRegionServer hrs, IOException ex)
624 throws IOException {
625 ((MiniHBaseClusterMaster)getMaster()).addException(hrs.getHServerInfo(),ex);
626 }
627
628
629
630
631
632
633
634
635 public void addMessageToSendRegionServer(final int serverNumber,
636 final HMsg msg)
637 throws IOException {
638 MiniHBaseClusterRegionServer hrs =
639 (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
640 addMessageToSendRegionServer(hrs, msg);
641 }
642
643
644
645
646
647
648
649
650 public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs,
651 final HMsg msg)
652 throws IOException {
653 ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
654 }
655
656
657
658
659
660
661
662 public long countServedRegions() {
663 long count = 0;
664 for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
665 count += rst.getRegionServer().getNumberOfOnlineRegions();
666 }
667 return count;
668 }
669 }