View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.IOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.client.HBaseAdmin;
32  import org.apache.hadoop.hbase.regionserver.HRegionServer;
33  import org.apache.hadoop.hbase.security.User;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
36  
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import org.apache.hadoop.hbase.master.HMaster;
39  import org.apache.hadoop.hbase.util.JVMClusterUtil;
40  
41  /**
42   * This class creates a single process HBase cluster. One thread is created for
43   * a master and one per region server.
44   *
45   * Call {@link #startup()} to start the cluster running and {@link #shutdown()}
46   * to close it all down. {@link #join} the cluster is you want to wait on
47   * shutdown completion.
48   *
49   * <p>Runs master on port 60000 by default.  Because we can't just kill the
50   * process -- not till HADOOP-1700 gets fixed and even then.... -- we need to
51   * be able to find the master with a remote client to run shutdown.  To use a
52   * port other than 60000, set the hbase.master to a value of 'local:PORT':
53   * that is 'local', not 'localhost', and the port number the master should use
54   * instead of 60000.
55   *
56   */
57  public class LocalHBaseCluster {
58    static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
59    private final List<JVMClusterUtil.MasterThread> masterThreads =
60      new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
61    private final List<JVMClusterUtil.RegionServerThread> regionThreads =
62      new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
63    private final static int DEFAULT_NO = 1;
64    /** local mode */
65    public static final String LOCAL = "local";
66    /** 'local:' */
67    public static final String LOCAL_COLON = LOCAL + ":";
68    private final Configuration conf;
69    private final Class<? extends HMaster> masterClass;
70    private final Class<? extends HRegionServer> regionServerClass;
71  
72    /**
73     * Constructor.
74     * @param conf
75     * @throws IOException
76     */
77    public LocalHBaseCluster(final Configuration conf)
78    throws IOException {
79      this(conf, DEFAULT_NO);
80    }
81  
82    /**
83     * Constructor.
84     * @param conf Configuration to use.  Post construction has the master's
85     * address.
86     * @param noRegionServers Count of regionservers to start.
87     * @throws IOException
88     */
89    public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
90    throws IOException {
91      this(conf, 1, noRegionServers, getMasterImplementation(conf),
92          getRegionServerImplementation(conf));
93    }
94  
95    /**
96     * Constructor.
97     * @param conf Configuration to use.  Post construction has the active master
98     * address.
99     * @param noMasters Count of masters to start.
100    * @param noRegionServers Count of regionservers to start.
101    * @throws IOException
102    */
103   public LocalHBaseCluster(final Configuration conf, final int noMasters,
104       final int noRegionServers)
105   throws IOException {
106     this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
107         getRegionServerImplementation(conf));
108   }
109 
110   @SuppressWarnings("unchecked")
111   private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
112     return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
113        HRegionServer.class);
114   }
115 
116   @SuppressWarnings("unchecked")
117   private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
118     return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
119        HMaster.class);
120   }
121 
122   /**
123    * Constructor.
124    * @param conf Configuration to use.  Post construction has the master's
125    * address.
126    * @param noMasters Count of masters to start.
127    * @param noRegionServers Count of regionservers to start.
128    * @param masterClass
129    * @param regionServerClass
130    * @throws IOException
131    */
132   @SuppressWarnings("unchecked")
133   public LocalHBaseCluster(final Configuration conf, final int noMasters,
134     final int noRegionServers, final Class<? extends HMaster> masterClass,
135     final Class<? extends HRegionServer> regionServerClass)
136   throws IOException {
137     this.conf = conf;
138     // Always have masters and regionservers come up on port '0' so we don't
139     // clash over default ports.
140     conf.set(HConstants.MASTER_PORT, "0");
141     conf.set(HConstants.REGIONSERVER_PORT, "0");
142     // Start the HMasters.
143     this.masterClass =
144       (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
145           masterClass);
146     for (int i = 0; i < noMasters; i++) {
147       addMaster(new Configuration(conf), i);
148     }
149     // Start the HRegionServers.
150     this.regionServerClass =
151       (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
152        regionServerClass);
153 
154     for (int i = 0; i < noRegionServers; i++) {
155       addRegionServer(new Configuration(conf), i);
156     }
157   }
158 
159   public JVMClusterUtil.RegionServerThread addRegionServer()
160       throws IOException {
161     return addRegionServer(new Configuration(conf), this.regionThreads.size());
162   }
163 
164   public JVMClusterUtil.RegionServerThread addRegionServer(
165       Configuration config, final int index)
166   throws IOException {
167     // Create each regionserver with its own Configuration instance so each has
168     // its HConnection instance rather than share (see HBASE_INSTANCES down in
169     // the guts of HConnectionManager.
170     JVMClusterUtil.RegionServerThread rst =
171       JVMClusterUtil.createRegionServerThread(config,
172           this.regionServerClass, index);
173     this.regionThreads.add(rst);
174     return rst;
175   }
176 
177   public JVMClusterUtil.RegionServerThread addRegionServer(
178       final Configuration config, final int index, User user)
179   throws IOException, InterruptedException {
180     return user.runAs(
181         new PrivilegedExceptionAction<JVMClusterUtil.RegionServerThread>() {
182           public JVMClusterUtil.RegionServerThread run() throws Exception {
183             return addRegionServer(config, index);
184           }
185         });
186   }
187 
188   public JVMClusterUtil.MasterThread addMaster() throws IOException {
189     return addMaster(new Configuration(conf), this.masterThreads.size());
190   }
191 
192   public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
193   throws IOException {
194     // Create each master with its own Configuration instance so each has
195     // its HConnection instance rather than share (see HBASE_INSTANCES down in
196     // the guts of HConnectionManager.
197     JVMClusterUtil.MasterThread mt =
198       JVMClusterUtil.createMasterThread(c,
199         this.masterClass, index);
200     this.masterThreads.add(mt);
201     return mt;
202   }
203 
204   public JVMClusterUtil.MasterThread addMaster(
205       final Configuration c, final int index, User user)
206   throws IOException, InterruptedException {
207     return user.runAs(
208         new PrivilegedExceptionAction<JVMClusterUtil.MasterThread>() {
209           public JVMClusterUtil.MasterThread run() throws Exception {
210             return addMaster(c, index);
211           }
212         });
213   }
214 
215   /**
216    * @param serverNumber
217    * @return region server
218    */
219   public HRegionServer getRegionServer(int serverNumber) {
220     return regionThreads.get(serverNumber).getRegionServer();
221   }
222 
223   /**
224    * @return Read-only list of region server threads.
225    */
226   public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
227     return Collections.unmodifiableList(this.regionThreads);
228   }
229 
230   /**
231    * @return List of running servers (Some servers may have been killed or
232    * aborted during lifetime of cluster; these servers are not included in this
233    * list).
234    */
235   public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
236     List<JVMClusterUtil.RegionServerThread> liveServers =
237       new ArrayList<JVMClusterUtil.RegionServerThread>();
238     List<RegionServerThread> list = getRegionServers();
239     for (JVMClusterUtil.RegionServerThread rst: list) {
240       if (rst.isAlive()) liveServers.add(rst);
241     }
242     return liveServers;
243   }
244 
245   /**
246    * Wait for the specified region server to stop
247    * Removes this thread from list of running threads.
248    * @param serverNumber
249    * @return Name of region server that just went down.
250    */
251   public String waitOnRegionServer(int serverNumber) {
252     JVMClusterUtil.RegionServerThread regionServerThread =
253       this.regionThreads.remove(serverNumber);
254     while (regionServerThread.isAlive()) {
255       try {
256         LOG.info("Waiting on " +
257           regionServerThread.getRegionServer().getHServerInfo().toString());
258         regionServerThread.join();
259       } catch (InterruptedException e) {
260         e.printStackTrace();
261       } catch (IOException e) {
262         e.printStackTrace();
263       }
264     }
265     return regionServerThread.getName();
266   }
267 
268   /**
269    * Wait for the specified region server to stop
270    * Removes this thread from list of running threads.
271    * @param rst
272    * @return Name of region server that just went down.
273    */
274   public String waitOnRegionServer(JVMClusterUtil.RegionServerThread rst) {
275     while (rst.isAlive()) {
276       try {
277         LOG.info("Waiting on " +
278           rst.getRegionServer().getHServerInfo().toString());
279         rst.join();
280       } catch (InterruptedException e) {
281         e.printStackTrace();
282       } catch (IOException e) {
283         e.printStackTrace();
284       }
285     }
286     for (int i=0;i<regionThreads.size();i++) {
287       if (regionThreads.get(i) == rst) {
288         regionThreads.remove(i);
289         break;
290       }
291     }
292     return rst.getName();
293   }
294 
295   /**
296    * @param serverNumber
297    * @return the HMaster thread
298    */
299   public HMaster getMaster(int serverNumber) {
300     return masterThreads.get(serverNumber).getMaster();
301   }
302 
303   /**
304    * Gets the current active master, if available.  If no active master, returns
305    * null.
306    * @return the HMaster for the active master
307    */
308   public HMaster getActiveMaster() {
309     for (JVMClusterUtil.MasterThread mt : masterThreads) {
310       if (mt.getMaster().isActiveMaster()) {
311         return mt.getMaster();
312       }
313     }
314     return null;
315   }
316 
317   /**
318    * @return Read-only list of master threads.
319    */
320   public List<JVMClusterUtil.MasterThread> getMasters() {
321     return Collections.unmodifiableList(this.masterThreads);
322   }
323 
324   /**
325    * @return List of running master servers (Some servers may have been killed
326    * or aborted during lifetime of cluster; these servers are not included in
327    * this list).
328    */
329   public List<JVMClusterUtil.MasterThread> getLiveMasters() {
330     List<JVMClusterUtil.MasterThread> liveServers =
331       new ArrayList<JVMClusterUtil.MasterThread>();
332     List<JVMClusterUtil.MasterThread> list = getMasters();
333     for (JVMClusterUtil.MasterThread mt: list) {
334       if (mt.isAlive()) {
335         liveServers.add(mt);
336       }
337     }
338     return liveServers;
339   }
340 
341   /**
342    * Wait for the specified master to stop
343    * Removes this thread from list of running threads.
344    * @param serverNumber
345    * @return Name of master that just went down.
346    */
347   public String waitOnMaster(int serverNumber) {
348     JVMClusterUtil.MasterThread masterThread =
349       this.masterThreads.remove(serverNumber);
350     while (masterThread.isAlive()) {
351       try {
352         LOG.info("Waiting on " +
353           masterThread.getMaster().getServerName().toString());
354         masterThread.join();
355       } catch (InterruptedException e) {
356         e.printStackTrace();
357       }
358     }
359     return masterThread.getName();
360   }
361 
362   /**
363    * Wait for the specified master to stop
364    * Removes this thread from list of running threads.
365    * @param masterThread
366    * @return Name of master that just went down.
367    */
368   public String waitOnMaster(JVMClusterUtil.MasterThread masterThread) {
369     while (masterThread.isAlive()) {
370       try {
371         LOG.info("Waiting on " +
372           masterThread.getMaster().getServerName().toString());
373         masterThread.join();
374       } catch (InterruptedException e) {
375         e.printStackTrace();
376       }
377     }
378     for (int i=0;i<masterThreads.size();i++) {
379       if (masterThreads.get(i) == masterThread) {
380         masterThreads.remove(i);
381         break;
382       }
383     }
384     return masterThread.getName();
385   }
386 
387   /**
388    * Wait for Mini HBase Cluster to shut down.
389    * Presumes you've already called {@link #shutdown()}.
390    */
391   public void join() {
392     if (this.regionThreads != null) {
393         for(Thread t: this.regionThreads) {
394           if (t.isAlive()) {
395             try {
396               t.join();
397           } catch (InterruptedException e) {
398             // continue
399           }
400         }
401       }
402     }
403     if (this.masterThreads != null) {
404       for (Thread t : this.masterThreads) {
405         if (t.isAlive()) {
406           try {
407             t.join();
408           } catch (InterruptedException e) {
409             // continue
410           }
411         }
412       }
413     }
414   }
415 
416   /**
417    * Start the cluster.
418    */
419   public void startup() {
420     JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
421   }
422 
423   /**
424    * Shut down the mini HBase cluster
425    */
426   public void shutdown() {
427     JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
428   }
429 
430   /**
431    * @param c Configuration to check.
432    * @return True if a 'local' address in hbase.master value.
433    */
434   public static boolean isLocal(final Configuration c) {
435     final String mode = c.get(HConstants.CLUSTER_DISTRIBUTED);
436     return mode == null || mode.equals(HConstants.CLUSTER_IS_LOCAL);
437   }
438 
439   /**
440    * Test things basically work.
441    * @param args
442    * @throws IOException
443    */
444   public static void main(String[] args) throws IOException {
445     Configuration conf = HBaseConfiguration.create();
446     LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
447     cluster.startup();
448     HBaseAdmin admin = new HBaseAdmin(conf);
449     HTableDescriptor htd =
450       new HTableDescriptor(Bytes.toBytes(cluster.getClass().getName()));
451     admin.createTable(htd);
452     cluster.shutdown();
453   }
454 }