1   /**
2    * Copyright 2008 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.IOException;
23  import java.security.PrivilegedAction;
24  import java.security.PrivilegedExceptionAction;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentHashMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.hbase.client.HConnectionManager;
35  import org.apache.hadoop.hbase.master.HMaster;
36  import org.apache.hadoop.hbase.regionserver.HRegion;
37  import org.apache.hadoop.hbase.regionserver.HRegionServer;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.JVMClusterUtil;
41  import org.apache.hadoop.hbase.util.Threads;
42  import org.apache.hadoop.hdfs.DistributedFileSystem;
43  import org.apache.hadoop.io.MapWritable;
44  import org.apache.zookeeper.KeeperException;
45  
46  /**
47   * This class creates a single process HBase cluster.
48   * each server.  The master uses the 'default' FileSystem.  The RegionServers,
49   * if we are running on DistributedFilesystem, create a FileSystem instance
50   * each and will close down their instance on the way out.
51   */
52  public class MiniHBaseCluster {
53    static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName());
54    private Configuration conf;
55    public LocalHBaseCluster hbaseCluster;
56    private static int index;
57  
58    /**
59     * Start a MiniHBaseCluster.
60     * @param conf Configuration to be used for cluster
61     * @param numRegionServers initial number of region servers to start.
62     * @throws IOException
63     */
64    public MiniHBaseCluster(Configuration conf, int numRegionServers)
65    throws IOException, InterruptedException {
66      this(conf, 1, numRegionServers);
67    }
68  
69    /**
70     * Start a MiniHBaseCluster.
71     * @param conf Configuration to be used for cluster
72     * @param numMasters initial number of masters to start.
73     * @param numRegionServers initial number of region servers to start.
74     * @throws IOException
75     */
76    public MiniHBaseCluster(Configuration conf, int numMasters,
77        int numRegionServers)
78    throws IOException, InterruptedException {
79      this.conf = conf;
80      conf.set(HConstants.MASTER_PORT, "0");
81      init(numMasters, numRegionServers);
82    }
83  
84    public Configuration getConfiguration() {
85      return this.conf;
86    }
87  
88    /**
89     * Override Master so can add inject behaviors testing.
90     */
91    public static class MiniHBaseClusterMaster extends HMaster {
92      private final Map<HServerInfo, List<HMsg>> messages =
93        new ConcurrentHashMap<HServerInfo, List<HMsg>>();
94  
95      private final Map<HServerInfo, IOException> exceptions =
96        new ConcurrentHashMap<HServerInfo, IOException>();
97  
98      public MiniHBaseClusterMaster(final Configuration conf)
99      throws IOException, KeeperException, InterruptedException {
100       super(conf);
101     }
102 
103     /**
104      * Add a message to send to a regionserver next time it checks in.
105      * @param hsi RegionServer's HServerInfo.
106      * @param msg Message to add.
107      */
108     void addMessage(final HServerInfo hsi, HMsg msg) {
109       synchronized(this.messages) {
110         List<HMsg> hmsgs = this.messages.get(hsi);
111         if (hmsgs == null) {
112           hmsgs = new ArrayList<HMsg>();
113           this.messages.put(hsi, hmsgs);
114         }
115         hmsgs.add(msg);
116       }
117     }
118 
119     void addException(final HServerInfo hsi, final IOException ex) {
120       this.exceptions.put(hsi, ex);
121     }
122 
123     /**
124      * This implementation is special, exceptions will be treated first and
125      * message won't be sent back to the region servers even if some are
126      * specified.
127      * @param hsi the rs
128      * @param msgs Messages to add to
129      * @return
130      * @throws IOException will be throw if any added for this region server
131      */
132     @Override
133     protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
134         final HMsg[] msgs) throws IOException {
135       IOException ex = this.exceptions.remove(hsi);
136       if (ex != null) {
137         throw ex;
138       }
139       HMsg [] answerMsgs = msgs;
140       synchronized (this.messages) {
141         List<HMsg> hmsgs = this.messages.get(hsi);
142         if (hmsgs != null && !hmsgs.isEmpty()) {
143           int size = answerMsgs.length;
144           HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
145           System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
146           for (int i = 0; i < hmsgs.size(); i++) {
147             newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
148           }
149           answerMsgs = newAnswerMsgs;
150           hmsgs.clear();
151         }
152       }
153       return super.adornRegionServerAnswer(hsi, answerMsgs);
154     }
155   }
156 
157   /**
158    * Subclass so can get at protected methods (none at moment).  Also, creates
159    * a FileSystem instance per instantiation.  Adds a shutdown own FileSystem
160    * on the way out. Shuts down own Filesystem only, not All filesystems as
161    * the FileSystem system exit hook does.
162    */
163   public static class MiniHBaseClusterRegionServer extends HRegionServer {
164     private Thread shutdownThread = null;
165     private User user = null;
166 
167     public MiniHBaseClusterRegionServer(Configuration conf)
168         throws IOException, InterruptedException {
169       super(conf);
170       this.user = User.getCurrent();
171     }
172 
173     public void setHServerInfo(final HServerInfo hsi) {
174       this.serverInfo = hsi;
175     }
176 
177     /*
178      * @param c
179      * @param currentfs We return this if we did not make a new one.
180      * @param uniqueName Same name used to help identify the created fs.
181      * @return A new fs instance if we are up on DistributeFileSystem.
182      * @throws IOException
183      */
184 
185     @Override
186     protected void handleReportForDutyResponse(MapWritable c) throws IOException {
187       super.handleReportForDutyResponse(c);
188       // Run this thread to shutdown our filesystem on way out.
189       this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem());
190     }
191 
192     @Override
193     public void run() {
194       try {
195         this.user.runAs(new PrivilegedAction<Object>(){
196           public Object run() {
197             runRegionServer();
198             return null;
199           }
200         });
201       } catch (Throwable t) {
202         LOG.error("Exception in run", t);
203       } finally {
204         // Run this on the way out.
205         if (this.shutdownThread != null) {
206           this.shutdownThread.start();
207           Threads.shutdown(this.shutdownThread, 30000);
208         }
209       }
210     }
211 
212     private void runRegionServer() {
213       super.run();
214     }
215 
216     @Override
217     public void kill() {
218       super.kill();
219     }
220 
221     public void abort(final String reason, final Throwable cause) {
222       this.user.runAs(new PrivilegedAction<Object>() {
223         public Object run() {
224           abortRegionServer(reason, cause);
225           return null;
226         }
227       });
228     }
229 
230     private void abortRegionServer(String reason, Throwable cause) {
231       super.abort(reason, cause);
232     }
233   }
234 
235   /**
236    * Alternate shutdown hook.
237    * Just shuts down the passed fs, not all as default filesystem hook does.
238    */
239   static class SingleFileSystemShutdownThread extends Thread {
240     private final FileSystem fs;
241     SingleFileSystemShutdownThread(final FileSystem fs) {
242       super("Shutdown of " + fs);
243       this.fs = fs;
244     }
245     @Override
246     public void run() {
247       try {
248         LOG.info("Hook closing fs=" + this.fs);
249         this.fs.close();
250       } catch (NullPointerException npe) {
251         LOG.debug("Need to fix these: " + npe.toString());
252       } catch (IOException e) {
253         LOG.warn("Running hook", e);
254       }
255     }
256   }
257 
258   private void init(final int nMasterNodes, final int nRegionNodes)
259   throws IOException, InterruptedException {
260     try {
261       // start up a LocalHBaseCluster
262       hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
263           MiniHBaseCluster.MiniHBaseClusterMaster.class,
264           MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
265 
266       // manually add the regionservers as other users
267       for (int i=0; i<nRegionNodes; i++) {
268         Configuration rsConf = HBaseConfiguration.create(conf);
269         User user = HBaseTestingUtility.getDifferentUser(rsConf,
270             ".hfs."+index++);
271         hbaseCluster.addRegionServer(rsConf, i, user);
272       }
273 
274       hbaseCluster.startup();
275     } catch (IOException e) {
276       shutdown();
277       throw e;
278     } catch (Throwable t) {
279       LOG.error("Error starting cluster", t);
280       shutdown();
281       throw new IOException("Shutting down", t);
282     }
283   }
284 
285   /**
286    * Starts a region server thread running
287    *
288    * @throws IOException
289    * @return New RegionServerThread
290    */
291   public JVMClusterUtil.RegionServerThread startRegionServer()
292       throws IOException {
293     final Configuration newConf = HBaseConfiguration.create(conf);
294     User rsUser =
295         HBaseTestingUtility.getDifferentUser(newConf, ".hfs."+index++);
296     JVMClusterUtil.RegionServerThread t =  null;
297     try {
298       t = hbaseCluster.addRegionServer(
299           newConf, hbaseCluster.getRegionServers().size(), rsUser);
300       t.start();
301       t.waitForServerOnline();
302     } catch (InterruptedException ie) {
303       throw new IOException("Interrupted executing UserGroupInformation.doAs()", ie);
304     }
305     return t;
306   }
307 
308   /**
309    * Cause a region server to exit doing basic clean up only on its way out.
310    * @param serverNumber  Used as index into a list.
311    */
312   public String abortRegionServer(int serverNumber) {
313     HRegionServer server = getRegionServer(serverNumber);
314     LOG.info("Aborting " + server.toString());
315     server.abort("Aborting for tests", new Exception("Trace info"));
316     return server.toString();
317   }
318 
319   /**
320    * Shut down the specified region server cleanly
321    *
322    * @param serverNumber  Used as index into a list.
323    * @return the region server that was stopped
324    */
325   public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
326     return stopRegionServer(serverNumber, true);
327   }
328 
329   /**
330    * Shut down the specified region server cleanly
331    *
332    * @param serverNumber  Used as index into a list.
333    * @param shutdownFS True is we are to shutdown the filesystem as part of this
334    * regionserver's shutdown.  Usually we do but you do not want to do this if
335    * you are running multiple regionservers in a test and you shut down one
336    * before end of the test.
337    * @return the region server that was stopped
338    */
339   public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
340       final boolean shutdownFS) {
341     JVMClusterUtil.RegionServerThread server =
342       hbaseCluster.getRegionServers().get(serverNumber);
343     LOG.info("Stopping " + server.toString());
344     server.getRegionServer().stop("Stopping rs " + serverNumber);
345     return server;
346   }
347 
348   /**
349    * Wait for the specified region server to stop. Removes this thread from list
350    * of running threads.
351    * @param serverNumber
352    * @return Name of region server that just went down.
353    */
354   public String waitOnRegionServer(final int serverNumber) {
355     return this.hbaseCluster.waitOnRegionServer(serverNumber);
356   }
357 
358 
359   /**
360    * Starts a master thread running
361    *
362    * @throws IOException
363    * @return New RegionServerThread
364    */
365   public JVMClusterUtil.MasterThread startMaster() throws IOException {
366     Configuration c = HBaseConfiguration.create(conf);
367     User user =
368         HBaseTestingUtility.getDifferentUser(c, ".hfs."+index++);
369 
370     JVMClusterUtil.MasterThread t = null;
371     try {
372       t = hbaseCluster.addMaster(c, hbaseCluster.getMasters().size(), user);
373       t.start();
374       t.waitForServerOnline();
375     } catch (InterruptedException ie) {
376       throw new IOException("Interrupted executing UserGroupInformation.doAs()", ie);
377     }
378     return t;
379   }
380 
381   /**
382    * @return Returns the rpc address actually used by the currently active
383    * master server, because the supplied port is not necessarily the actual port
384    * used.
385    */
386   public HServerAddress getHMasterAddress() {
387     return this.hbaseCluster.getActiveMaster().getMasterAddress();
388   }
389 
390   /**
391    * Returns the current active master, if available.
392    * @return the active HMaster, null if none is active.
393    */
394   public HMaster getMaster() {
395     return this.hbaseCluster.getActiveMaster();
396   }
397 
398   /**
399    * Returns the master at the specified index, if available.
400    * @return the active HMaster, null if none is active.
401    */
402   public HMaster getMaster(final int serverNumber) {
403     return this.hbaseCluster.getMaster(serverNumber);
404   }
405 
406   /**
407    * Cause a master to exit without shutting down entire cluster.
408    * @param serverNumber  Used as index into a list.
409    */
410   public String abortMaster(int serverNumber) {
411     HMaster server = getMaster(serverNumber);
412     LOG.info("Aborting " + server.toString());
413     server.abort("Aborting for tests", new Exception("Trace info"));
414     return server.toString();
415   }
416 
417   /**
418    * Shut down the specified master cleanly
419    *
420    * @param serverNumber  Used as index into a list.
421    * @return the region server that was stopped
422    */
423   public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
424     return stopMaster(serverNumber, true);
425   }
426 
427   /**
428    * Shut down the specified master cleanly
429    *
430    * @param serverNumber  Used as index into a list.
431    * @param shutdownFS True is we are to shutdown the filesystem as part of this
432    * master's shutdown.  Usually we do but you do not want to do this if
433    * you are running multiple master in a test and you shut down one
434    * before end of the test.
435    * @return the master that was stopped
436    */
437   public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
438       final boolean shutdownFS) {
439     JVMClusterUtil.MasterThread server =
440       hbaseCluster.getMasters().get(serverNumber);
441     LOG.info("Stopping " + server.toString());
442     server.getMaster().stop("Stopping master " + serverNumber);
443     return server;
444   }
445 
446   /**
447    * Wait for the specified master to stop. Removes this thread from list
448    * of running threads.
449    * @param serverNumber
450    * @return Name of master that just went down.
451    */
452   public String waitOnMaster(final int serverNumber) {
453     return this.hbaseCluster.waitOnMaster(serverNumber);
454   }
455 
456   /**
457    * Blocks until there is an active master and that master has completed
458    * initialization.
459    *
460    * @return true if an active master becomes available.  false if there are no
461    *         masters left.
462    * @throws InterruptedException
463    */
464   public boolean waitForActiveAndReadyMaster() throws InterruptedException {
465     List<JVMClusterUtil.MasterThread> mts;
466     while ((mts = getMasterThreads()).size() > 0) {
467       for (JVMClusterUtil.MasterThread mt : mts) {
468         if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
469           return true;
470         }
471       }
472       Thread.sleep(200);
473     }
474     return false;
475   }
476 
477   /**
478    * @return List of master threads.
479    */
480   public List<JVMClusterUtil.MasterThread> getMasterThreads() {
481     return this.hbaseCluster.getMasters();
482   }
483 
484   /**
485    * @return List of live master threads (skips the aborted and the killed)
486    */
487   public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
488     return this.hbaseCluster.getLiveMasters();
489   }
490 
491   /**
492    * Wait for Mini HBase Cluster to shut down.
493    */
494   public void join() {
495     this.hbaseCluster.join();
496   }
497 
498   /**
499    * Shut down the mini HBase cluster
500    * @throws IOException
501    */
502   public void shutdown() throws IOException {
503     if (this.hbaseCluster != null) {
504       this.hbaseCluster.shutdown();
505     }
506     HConnectionManager.deleteAllConnections(false);
507   }
508 
509   /**
510    * Call flushCache on all regions on all participating regionservers.
511    * @throws IOException
512    */
513   public void flushcache() throws IOException {
514     for (JVMClusterUtil.RegionServerThread t:
515         this.hbaseCluster.getRegionServers()) {
516       for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
517         r.flushcache();
518       }
519     }
520   }
521 
522   /**
523    * Call flushCache on all regions of the specified table.
524    * @throws IOException
525    */
526   public void flushcache(byte [] tableName) throws IOException {
527     for (JVMClusterUtil.RegionServerThread t:
528         this.hbaseCluster.getRegionServers()) {
529       for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
530         if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
531           r.flushcache();
532         }
533       }
534     }
535   }
536 
537   /**
538    * @return List of region server threads.
539    */
540   public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
541     return this.hbaseCluster.getRegionServers();
542   }
543 
544   /**
545    * @return List of live region server threads (skips the aborted and the killed)
546    */
547   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() {
548     return this.hbaseCluster.getLiveRegionServers();
549   }
550 
551   /**
552    * Grab a numbered region server of your choice.
553    * @param serverNumber
554    * @return region server
555    */
556   public HRegionServer getRegionServer(int serverNumber) {
557     return hbaseCluster.getRegionServer(serverNumber);
558   }
559 
560   public List<HRegion> getRegions(byte[] tableName) {
561     List<HRegion> ret = new ArrayList<HRegion>();
562     for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) {
563       HRegionServer hrs = rst.getRegionServer();
564       for (HRegion region : hrs.getOnlineRegionsLocalContext()) {
565         if (Bytes.equals(region.getTableDesc().getName(), tableName)) {
566           ret.add(region);
567         }
568       }
569     }
570     return ret;
571   }
572 
573   /**
574    * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
575    * of HRS carrying regionName. Returns -1 if none found.
576    */
577   public int getServerWithMeta() {
578     return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
579   }
580 
581   /**
582    * Get the location of the specified region
583    * @param regionName Name of the region in bytes
584    * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
585    * of HRS carrying .META.. Returns -1 if none found.
586    */
587   public int getServerWith(byte[] regionName) {
588     int index = -1;
589     int count = 0;
590     for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
591       HRegionServer hrs = rst.getRegionServer();
592       HRegion metaRegion =
593         hrs.getOnlineRegion(regionName);
594       if (metaRegion != null) {
595         index = count;
596         break;
597       }
598       count++;
599     }
600     return index;
601   }
602 
603   /**
604    * Add an exception to send when a region server checks back in
605    * @param serverNumber Which server to send it to
606    * @param ex The exception that will be sent
607    * @throws IOException
608    */
609   public void addExceptionToSendRegionServer(final int serverNumber,
610       IOException ex) throws IOException {
611     MiniHBaseClusterRegionServer hrs =
612       (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
613     addExceptionToSendRegionServer(hrs, ex);
614   }
615 
616   /**
617    * Add an exception to send when a region server checks back in
618    * @param hrs Which server to send it to
619    * @param ex The exception that will be sent
620    * @throws IOException
621    */
622   public void addExceptionToSendRegionServer(
623       final MiniHBaseClusterRegionServer hrs, IOException ex)
624       throws IOException {
625     ((MiniHBaseClusterMaster)getMaster()).addException(hrs.getHServerInfo(),ex);
626   }
627 
628   /**
629    * Add a message to include in the responses send a regionserver when it
630    * checks back in.
631    * @param serverNumber Which server to send it to.
632    * @param msg The MESSAGE
633    * @throws IOException
634    */
635   public void addMessageToSendRegionServer(final int serverNumber,
636     final HMsg msg)
637   throws IOException {
638     MiniHBaseClusterRegionServer hrs =
639       (MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
640     addMessageToSendRegionServer(hrs, msg);
641   }
642 
643   /**
644    * Add a message to include in the responses send a regionserver when it
645    * checks back in.
646    * @param hrs Which region server.
647    * @param msg The MESSAGE
648    * @throws IOException
649    */
650   public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs,
651     final HMsg msg)
652   throws IOException {
653     ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
654   }
655 
656   /**
657    * Counts the total numbers of regions being served by the currently online
658    * region servers by asking each how many regions they have.  Does not look
659    * at META at all.  Count includes catalog tables.
660    * @return number of regions being served by all region servers
661    */
662   public long countServedRegions() {
663     long count = 0;
664     for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
665       count += rst.getRegionServer().getNumberOfOnlineRegions();
666     }
667     return count;
668   }
669 }