1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.File;
25  import java.io.IOException;
26  import java.io.OutputStream;
27  import java.lang.reflect.Field;
28  import java.security.MessageDigest;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.NavigableSet;
34  import java.util.UUID;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.commons.logging.impl.Jdk14Logger;
39  import org.apache.commons.logging.impl.Log4JLogger;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.client.Delete;
44  import org.apache.hadoop.hbase.client.Get;
45  import org.apache.hadoop.hbase.client.HBaseAdmin;
46  import org.apache.hadoop.hbase.client.HConnection;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.client.Put;
49  import org.apache.hadoop.hbase.client.Result;
50  import org.apache.hadoop.hbase.client.ResultScanner;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.master.HMaster;
53  import org.apache.hadoop.hbase.regionserver.HRegion;
54  import org.apache.hadoop.hbase.regionserver.HRegionServer;
55  import org.apache.hadoop.hbase.regionserver.InternalScanner;
56  import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
57  import org.apache.hadoop.hbase.regionserver.Store;
58  import org.apache.hadoop.hbase.security.User;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.FSUtils;
61  import org.apache.hadoop.hbase.util.Threads;
62  import org.apache.hadoop.hbase.util.Writables;
63  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
64  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
65  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
66  import org.apache.hadoop.hdfs.DFSClient;
67  import org.apache.hadoop.hdfs.DistributedFileSystem;
68  import org.apache.hadoop.hdfs.MiniDFSCluster;
69  import org.apache.hadoop.hdfs.server.namenode.NameNode;
70  import org.apache.hadoop.mapred.MiniMRCluster;
71  import org.apache.zookeeper.ZooKeeper;
72  
73  /**
74   * Facility for testing HBase. Replacement for
75   * old HBaseTestCase and HBaseCluserTestCase functionality.
76   * Create an instance and keep it around testing HBase.  This class is
77   * meant to be your one-stop shop for anything you might need testing.  Manages
78   * one cluster at a time only.  Depends on log4j being on classpath and
79   * hbase-site.xml for logging and test-run configuration.  It does not set
80   * logging levels nor make changes to configuration parameters.
81   */
82  public class HBaseTestingUtility {
83    private final static Log LOG = LogFactory.getLog(HBaseTestingUtility.class);
84    private Configuration conf;
85    private MiniZooKeeperCluster zkCluster = null;
86    /**
87     * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
88     * part of general shutdown.
89     */
90    private boolean passedZkCluster = false;
91    private MiniDFSCluster dfsCluster = null;
92  
93    private MiniHBaseCluster hbaseCluster = null;
94    private MiniMRCluster mrCluster = null;
95    // If non-null, then already a cluster running.
96    private File clusterTestBuildDir = null;
97  
98    /**
99     * System property key to get test directory value.
100    * Name is as it is because mini dfs has hard-codings to put test data here.
101    */
102   public static final String TEST_DIRECTORY_KEY = "test.build.data";
103 
104   /**
105    * Default parent directory for test output.
106    */
107   public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
108 
109   public HBaseTestingUtility() {
110     this(HBaseConfiguration.create());
111   }
112 
113   public HBaseTestingUtility(Configuration conf) {
114     this.conf = conf;
115   }
116 
117   public MiniHBaseCluster getHbaseCluster() {
118     return hbaseCluster;
119   }
120 
121   /**
122    * Returns this classes's instance of {@link Configuration}.  Be careful how
123    * you use the returned Configuration since {@link HConnection} instances
124    * can be shared.  The Map of HConnections is keyed by the Configuration.  If
125    * say, a Connection was being used against a cluster that had been shutdown,
126    * see {@link #shutdownMiniCluster()}, then the Connection will no longer
127    * be wholesome.  Rather than use the return direct, its usually best to
128    * make a copy and use that.  Do
129    * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
130    * @return Instance of Configuration.
131    */
132   public Configuration getConfiguration() {
133     return this.conf;
134   }
135 
136   /**
137    * @return Where to write test data on local filesystem; usually
138    * {@link #DEFAULT_TEST_DIRECTORY}
139    * @see #setupClusterTestBuildDir()
140    * @see #clusterTestBuildDir()
141    * @see #getTestFileSystem()
142    */
143   public static Path getTestDir() {
144     return new Path(System.getProperty(TEST_DIRECTORY_KEY,
145       DEFAULT_TEST_DIRECTORY));
146   }
147 
148   /**
149    * @param subdirName
150    * @return Path to a subdirectory named <code>subdirName</code> under
151    * {@link #getTestDir()}.
152    * @see #setupClusterTestBuildDir()
153    * @see #clusterTestBuildDir(String)
154    * @see #getTestFileSystem()
155    */
156   public static Path getTestDir(final String subdirName) {
157     return new Path(getTestDir(), subdirName);
158   }
159 
160   /**
161    * Home our cluster in a dir under {@link #DEFAULT_TEST_DIRECTORY}.  Give it a
162    * random name
163    * so can have many concurrent clusters running if we need to.  Need to
164    * amend the {@link #TEST_DIRECTORY_KEY} System property.  Its what
165    * minidfscluster bases
166    * it data dir on.  Moding a System property is not the way to do concurrent
167    * instances -- another instance could grab the temporary
168    * value unintentionally -- but not anything can do about it at moment;
169    * single instance only is how the minidfscluster works.
170    * @return The calculated cluster test build directory.
171    */
172   public File setupClusterTestBuildDir() {
173     String randomStr = UUID.randomUUID().toString();
174     String dirStr = getTestDir(randomStr).toString();
175     File dir = new File(dirStr).getAbsoluteFile();
176     // Have it cleaned up on exit
177     dir.deleteOnExit();
178     return dir;
179   }
180 
181   /**
182    * @throws IOException If a cluster -- zk, dfs, or hbase -- already running.
183    */
184   void isRunningCluster(String passedBuildPath) throws IOException {
185     if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
186     throw new IOException("Cluster already running at " +
187       this.clusterTestBuildDir);
188   }
189 
190   /**
191    * Start a minidfscluster.
192    * @param servers How many DNs to start.
193    * @throws Exception
194    * @see {@link #shutdownMiniDFSCluster()}
195    * @return The mini dfs cluster created.
196    */
197   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
198     return startMiniDFSCluster(servers, null);
199   }
200 
201   /**
202    * Start a minidfscluster.
203    * Can only create one.
204    * @param dir Where to home your dfs cluster.
205    * @param servers How many DNs to start.
206    * @throws Exception
207    * @see {@link #shutdownMiniDFSCluster()}
208    * @return The mini dfs cluster created.
209    */
210   public MiniDFSCluster startMiniDFSCluster(int servers, final File dir)
211   throws Exception {
212     // This does the following to home the minidfscluster
213     //     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
214     // Some tests also do this:
215     //  System.getProperty("test.cache.data", "build/test/cache");
216     if (dir == null) {
217       this.clusterTestBuildDir = setupClusterTestBuildDir();
218     } else {
219       this.clusterTestBuildDir = dir;
220     }
221     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.toString());
222     System.setProperty("test.cache.data", this.clusterTestBuildDir.toString());
223     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
224       true, null, null, null, null);
225     // Set this just-started cluser as our filesystem.
226     FileSystem fs = this.dfsCluster.getFileSystem();
227     this.conf.set("fs.defaultFS", fs.getUri().toString());
228     // Do old style too just to be safe.
229     this.conf.set("fs.default.name", fs.getUri().toString());
230     return this.dfsCluster;
231   }
232 
233   /**
234    * Shuts down instance created by call to {@link #startMiniDFSCluster(int, File)}
235    * or does nothing.
236    * @throws Exception
237    */
238   public void shutdownMiniDFSCluster() throws Exception {
239     if (this.dfsCluster != null) {
240       // The below throws an exception per dn, AsynchronousCloseException.
241       this.dfsCluster.shutdown();
242     }
243   }
244 
245   /**
246    * Call this if you only want a zk cluster.
247    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
248    * @throws Exception
249    * @see #shutdownMiniZKCluster()
250    * @return zk cluster started.
251    */
252   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
253     return startMiniZKCluster(setupClusterTestBuildDir());
254 
255   }
256 
257   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
258   throws Exception {
259     this.passedZkCluster = false;
260     if (this.zkCluster != null) {
261       throw new IOException("Cluster already running at " + dir);
262     }
263     this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
264     int clientPort = this.zkCluster.startup(dir);
265     this.conf.set("hbase.zookeeper.property.clientPort",
266       Integer.toString(clientPort));
267     return this.zkCluster;
268   }
269 
270   /**
271    * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)}
272    * or does nothing.
273    * @throws IOException
274    * @see #startMiniZKCluster()
275    */
276   public void shutdownMiniZKCluster() throws IOException {
277     if (this.zkCluster != null) {
278       this.zkCluster.shutdown();
279       this.zkCluster = null;
280     }
281   }
282 
283   /**
284    * Start up a minicluster of hbase, dfs, and zookeeper.
285    * @throws Exception
286    * @return Mini hbase cluster instance created.
287    * @see {@link #shutdownMiniDFSCluster()}
288    */
289   public MiniHBaseCluster startMiniCluster() throws Exception {
290     return startMiniCluster(1, 1);
291   }
292 
293   /**
294    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
295    * Modifies Configuration.  Homes the cluster data directory under a random
296    * subdirectory in a directory under System property test.build.data.
297    * Directory is cleaned up on exit.
298    * @param numSlaves Number of slaves to start up.  We'll start this many
299    * datanodes and regionservers.  If numSlaves is > 1, then make sure
300    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
301    * bind errors.
302    * @throws Exception
303    * @see {@link #shutdownMiniCluster()}
304    * @return Mini hbase cluster instance created.
305    */
306   public MiniHBaseCluster startMiniCluster(final int numSlaves)
307   throws Exception {
308     return startMiniCluster(1, numSlaves);
309   }
310 
311   /**
312    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
313    * Modifies Configuration.  Homes the cluster data directory under a random
314    * subdirectory in a directory under System property test.build.data.
315    * Directory is cleaned up on exit.
316    * @param numMasters Number of masters to start up.  We'll start this many
317    * hbase masters.  If numMasters > 1, you can find the active/primary master
318    * with {@link MiniHBaseCluster#getMaster()}.
319    * @param numSlaves Number of slaves to start up.  We'll start this many
320    * datanodes and regionservers.  If numSlaves is > 1, then make sure
321    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
322    * bind errors.
323    * @throws Exception
324    * @see {@link #shutdownMiniCluster()}
325    * @return Mini hbase cluster instance created.
326    */
327   public MiniHBaseCluster startMiniCluster(final int numMasters,
328       final int numSlaves)
329   throws Exception {
330     LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
331         numSlaves + " regionserver(s) and datanode(s)");
332     // If we already put up a cluster, fail.
333     String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
334     isRunningCluster(testBuildPath);
335     if (testBuildPath != null) {
336       LOG.info("Using passed path: " + testBuildPath);
337     }
338     // Make a new random dir to home everything in.  Set it as system property.
339     // minidfs reads home from system property.
340     this.clusterTestBuildDir = testBuildPath == null?
341       setupClusterTestBuildDir() : new File(testBuildPath);
342     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
343     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
344     // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
345     startMiniDFSCluster(numSlaves, this.clusterTestBuildDir);
346     this.dfsCluster.waitClusterUp();
347 
348     // Start up a zk cluster.
349     if (this.zkCluster == null) {
350       startMiniZKCluster(this.clusterTestBuildDir);
351     }
352     return startMiniHBaseCluster(numMasters, numSlaves);
353   }
354 
355   /**
356    * Starts up mini hbase cluster.  Usually used after call to
357    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
358    * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
359    * @param numMasters
360    * @param numSlaves
361    * @return Reference to the hbase mini hbase cluster.
362    * @throws IOException
363    * @throws InterruptedException 
364    * @see {@link #startMiniCluster()}
365    */
366   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
367       final int numSlaves)
368   throws IOException, InterruptedException {
369     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
370     createRootDir();
371     Configuration c = new Configuration(this.conf);
372     this.hbaseCluster = new MiniHBaseCluster(c, numMasters, numSlaves);
373     // Don't leave here till we've done a successful scan of the .META.
374     HTable t = new HTable(c, HConstants.META_TABLE_NAME);
375     ResultScanner s = t.getScanner(new Scan());
376     while (s.next() != null) {
377       continue;
378     }
379     LOG.info("Minicluster is up");
380     return this.hbaseCluster;
381   }
382 
383   /**
384    * Starts the hbase cluster up again after shutting it down previously in a
385    * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
386    * @param servers number of region servers
387    * @throws IOException
388    */
389   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
390     this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
391     // Don't leave here till we've done a successful scan of the .META.
392     HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
393     ResultScanner s = t.getScanner(new Scan());
394     while (s.next() != null) {
395       continue;
396     }
397     LOG.info("HBase has been restarted");
398   }
399 
400   /**
401    * @return Current mini hbase cluster. Only has something in it after a call
402    * to {@link #startMiniCluster()}.
403    * @see #startMiniCluster()
404    */
405   public MiniHBaseCluster getMiniHBaseCluster() {
406     return this.hbaseCluster;
407   }
408 
409   /**
410    * Stops mini hbase, zk, and hdfs clusters.
411    * @throws IOException
412    * @see {@link #startMiniCluster(int)}
413    */
414   public void shutdownMiniCluster() throws IOException {
415     LOG.info("Shutting down minicluster");
416     shutdownMiniHBaseCluster();
417     if (!this.passedZkCluster) shutdownMiniZKCluster();
418     if (this.dfsCluster != null) {
419       // The below throws an exception per dn, AsynchronousCloseException.
420       this.dfsCluster.shutdown();
421     }
422     // Clean up our directory.
423     if (this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
424       // Need to use deleteDirectory because File.delete required dir is empty.
425       if (!FSUtils.deleteDirectory(FileSystem.getLocal(this.conf),
426           new Path(this.clusterTestBuildDir.toString()))) {
427         LOG.warn("Failed delete of " + this.clusterTestBuildDir.toString());
428       }
429       this.clusterTestBuildDir = null;
430     }
431     LOG.info("Minicluster is down");
432   }
433 
434   /**
435    * Shutdown HBase mini cluster.  Does not shutdown zk or dfs if running.
436    * @throws IOException
437    */
438   public void shutdownMiniHBaseCluster() throws IOException {
439     if (this.hbaseCluster != null) {
440       this.hbaseCluster.shutdown();
441       // Wait till hbase is down before going on to shutdown zk.
442       this.hbaseCluster.join();
443     }
444     this.hbaseCluster = null;
445   }
446 
447   /**
448    * Creates an hbase rootdir in user home directory.  Also creates hbase
449    * version file.  Normally you won't make use of this method.  Root hbasedir
450    * is created for you as part of mini cluster startup.  You'd only use this
451    * method if you were doing manual operation.
452    * @return Fully qualified path to hbase root dir
453    * @throws IOException
454    */
455   public Path createRootDir() throws IOException {
456     FileSystem fs = FileSystem.get(this.conf);
457     Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
458     this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
459     fs.mkdirs(hbaseRootdir);
460     FSUtils.setVersion(fs, hbaseRootdir);
461     return hbaseRootdir;
462   }
463 
464   /**
465    * Flushes all caches in the mini hbase cluster
466    * @throws IOException
467    */
468   public void flush() throws IOException {
469     this.hbaseCluster.flushcache();
470   }
471 
472   /**
473    * Flushes all caches in the mini hbase cluster
474    * @throws IOException
475    */
476   public void flush(byte [] tableName) throws IOException {
477     this.hbaseCluster.flushcache(tableName);
478   }
479 
480 
481   /**
482    * Create a table.
483    * @param tableName
484    * @param family
485    * @return An HTable instance for the created table.
486    * @throws IOException
487    */
488   public HTable createTable(byte[] tableName, byte[] family)
489   throws IOException{
490     return createTable(tableName, new byte[][]{family});
491   }
492 
493   /**
494    * Create a table.
495    * @param tableName
496    * @param families
497    * @return An HTable instance for the created table.
498    * @throws IOException
499    */
500   public HTable createTable(byte[] tableName, byte[][] families)
501   throws IOException {
502     return createTable(tableName, families,
503         new Configuration(getConfiguration()));
504   }
505 
506   /**
507    * Create a table.
508    * @param tableName
509    * @param families
510    * @param c Configuration to use
511    * @return An HTable instance for the created table.
512    * @throws IOException
513    */
514   public HTable createTable(byte[] tableName, byte[][] families,
515       final Configuration c)
516   throws IOException {
517     HTableDescriptor desc = new HTableDescriptor(tableName);
518     for(byte[] family : families) {
519       desc.addFamily(new HColumnDescriptor(family));
520     }
521     getHBaseAdmin().createTable(desc);
522     return new HTable(c, tableName);
523   }
524 
525   /**
526    * Create a table.
527    * @param tableName
528    * @param family
529    * @param numVersions
530    * @return An HTable instance for the created table.
531    * @throws IOException
532    */
533   public HTable createTable(byte[] tableName, byte[] family, int numVersions)
534   throws IOException {
535     return createTable(tableName, new byte[][]{family}, numVersions);
536   }
537 
538   /**
539    * Create a table.
540    * @param tableName
541    * @param families
542    * @param numVersions
543    * @return An HTable instance for the created table.
544    * @throws IOException
545    */
546   public HTable createTable(byte[] tableName, byte[][] families,
547       int numVersions)
548   throws IOException {
549     HTableDescriptor desc = new HTableDescriptor(tableName);
550     for (byte[] family : families) {
551       HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
552           HColumnDescriptor.DEFAULT_COMPRESSION,
553           HColumnDescriptor.DEFAULT_IN_MEMORY,
554           HColumnDescriptor.DEFAULT_BLOCKCACHE,
555           Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
556           HColumnDescriptor.DEFAULT_BLOOMFILTER,
557           HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
558       desc.addFamily(hcd);
559     }
560     getHBaseAdmin().createTable(desc);
561     return new HTable(new Configuration(getConfiguration()), tableName);
562   }
563 
564   /**
565    * Create a table.
566    * @param tableName
567    * @param families
568    * @param numVersions
569    * @return An HTable instance for the created table.
570    * @throws IOException
571    */
572   public HTable createTable(byte[] tableName, byte[][] families,
573       int[] numVersions)
574   throws IOException {
575     HTableDescriptor desc = new HTableDescriptor(tableName);
576     int i = 0;
577     for (byte[] family : families) {
578       HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions[i],
579           HColumnDescriptor.DEFAULT_COMPRESSION,
580           HColumnDescriptor.DEFAULT_IN_MEMORY,
581           HColumnDescriptor.DEFAULT_BLOCKCACHE,
582           Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
583           HColumnDescriptor.DEFAULT_BLOOMFILTER,
584           HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
585       desc.addFamily(hcd);
586       i++;
587     }
588     getHBaseAdmin().createTable(desc);
589     return new HTable(new Configuration(getConfiguration()), tableName);
590   }
591 
592   /**
593    * Drop an existing table
594    * @param tableName existing table
595    */
596   public void deleteTable(byte[] tableName) throws IOException {
597     HBaseAdmin admin = new HBaseAdmin(getConfiguration());
598     admin.disableTable(tableName);
599     admin.deleteTable(tableName);
600   }
601 
602   /**
603    * Provide an existing table name to truncate
604    * @param tableName existing table
605    * @return HTable to that new table
606    * @throws IOException
607    */
608   public HTable truncateTable(byte [] tableName) throws IOException {
609     HTable table = new HTable(getConfiguration(), tableName);
610     Scan scan = new Scan();
611     ResultScanner resScan = table.getScanner(scan);
612     for(Result res : resScan) {
613       Delete del = new Delete(res.getRow());
614       table.delete(del);
615     }
616     resScan = table.getScanner(scan);
617     return table;
618   }
619 
620   /**
621    * Load table with rows from 'aaa' to 'zzz'.
622    * @param t Table
623    * @param f Family
624    * @return Count of rows loaded.
625    * @throws IOException
626    */
627   public int loadTable(final HTable t, final byte[] f) throws IOException {
628     t.setAutoFlush(false);
629     byte[] k = new byte[3];
630     int rowCount = 0;
631     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
632       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
633         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
634           k[0] = b1;
635           k[1] = b2;
636           k[2] = b3;
637           Put put = new Put(k);
638           put.add(f, null, k);
639           t.put(put);
640           rowCount++;
641         }
642       }
643     }
644     t.flushCommits();
645     return rowCount;
646   }
647   /**
648    * Load region with rows from 'aaa' to 'zzz'.
649    * @param r Region
650    * @param f Family
651    * @return Count of rows loaded.
652    * @throws IOException
653    */
654   public int loadRegion(final HRegion r, final byte[] f)
655   throws IOException {
656     byte[] k = new byte[3];
657     int rowCount = 0;
658     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
659       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
660         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
661           k[0] = b1;
662           k[1] = b2;
663           k[2] = b3;
664           Put put = new Put(k);
665           put.add(f, null, k);
666           if (r.getLog() == null) put.setWriteToWAL(false);
667           r.put(put);
668           rowCount++;
669         }
670       }
671     }
672     return rowCount;
673   }
674 
675   /**
676    * Return the number of rows in the given table.
677    */
678   public int countRows(final HTable table) throws IOException {
679     Scan scan = new Scan();
680     ResultScanner results = table.getScanner(scan);
681     int count = 0;
682     for (@SuppressWarnings("unused") Result res : results) {
683       count++;
684     }
685     results.close();
686     return count;
687   }
688 
689   /**
690    * Return an md5 digest of the entire contents of a table.
691    */
692   public String checksumRows(final HTable table) throws Exception {
693     Scan scan = new Scan();
694     ResultScanner results = table.getScanner(scan);
695     MessageDigest digest = MessageDigest.getInstance("MD5");
696     for (Result res : results) {
697       digest.update(res.getRow());
698     }
699     results.close();
700     return digest.toString();
701   }
702 
703   /**
704    * Creates many regions names "aaa" to "zzz".
705    *
706    * @param table  The table to use for the data.
707    * @param columnFamily  The family to insert the data into.
708    * @return count of regions created.
709    * @throws IOException When creating the regions fails.
710    */
711   public int createMultiRegions(HTable table, byte[] columnFamily)
712   throws IOException {
713     return createMultiRegions(getConfiguration(), table, columnFamily);
714   }
715 
716   public static final byte[][] KEYS = {
717     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
718     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
719     Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
720     Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
721     Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
722     Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
723     Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
724     Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
725     Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
726   };
727 
728   /**
729    * Creates many regions names "aaa" to "zzz".
730    * @param c Configuration to use.
731    * @param table  The table to use for the data.
732    * @param columnFamily  The family to insert the data into.
733    * @return count of regions created.
734    * @throws IOException When creating the regions fails.
735    */
736   public int createMultiRegions(final Configuration c, final HTable table,
737       final byte[] columnFamily)
738   throws IOException {
739     return createMultiRegions(c, table, columnFamily, KEYS);
740   }
741 
742   /**
743    * Creates the specified number of regions in the specified table.
744    * @param c
745    * @param table
746    * @param columnFamily
747    * @param startKeys
748    * @return
749    * @throws IOException
750    */
751   public int createMultiRegions(final Configuration c, final HTable table,
752       final byte [] family, int numRegions)
753   throws IOException {
754     if (numRegions < 3) throw new IOException("Must create at least 3 regions");
755     byte [] startKey = Bytes.toBytes("aaaaa");
756     byte [] endKey = Bytes.toBytes("zzzzz");
757     byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
758     byte [][] regionStartKeys = new byte[splitKeys.length+1][];
759     for (int i=0;i<splitKeys.length;i++) {
760       regionStartKeys[i+1] = splitKeys[i];
761     }
762     regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
763     return createMultiRegions(c, table, family, regionStartKeys);
764   }
765   
766   public int createMultiRegions(final Configuration c, final HTable table,
767       final byte[] columnFamily, byte [][] startKeys)
768   throws IOException {
769     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
770     HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
771     HTableDescriptor htd = table.getTableDescriptor();
772     if(!htd.hasFamily(columnFamily)) {
773       HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
774       htd.addFamily(hcd);
775     }
776     // remove empty region - this is tricky as the mini cluster during the test
777     // setup already has the "<tablename>,,123456789" row with an empty start
778     // and end key. Adding the custom regions below adds those blindly,
779     // including the new start region from empty to "bbb". lg
780     List<byte[]> rows = getMetaTableRows(htd.getName());
781     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
782     // add custom ones
783     int count = 0;
784     for (int i = 0; i < startKeys.length; i++) {
785       int j = (i + 1) % startKeys.length;
786       HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(),
787         startKeys[i], startKeys[j]);
788       Put put = new Put(hri.getRegionName());
789       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
790         Writables.getBytes(hri));
791       meta.put(put);
792       LOG.info("createMultiRegions: inserted " + hri.toString());
793       newRegions.add(hri);
794       count++;
795     }
796     // see comment above, remove "old" (or previous) single region
797     for (byte[] row : rows) {
798       LOG.info("createMultiRegions: deleting meta row -> " +
799         Bytes.toStringBinary(row));
800       meta.delete(new Delete(row));
801     }
802     // flush cache of regions
803     HConnection conn = table.getConnection();
804     conn.clearRegionCache();
805     // assign all the new regions IF table is enabled.
806     if (getHBaseAdmin().isTableEnabled(table.getTableName())) {
807       for(HRegionInfo hri : newRegions) {
808         hbaseCluster.getMaster().assignRegion(hri);
809       }
810     }
811     return count;
812   }
813 
814   /**
815    * Create rows in META for regions of the specified table with the specified
816    * start keys.  The first startKey should be a 0 length byte array if you
817    * want to form a proper range of regions.
818    * @param conf
819    * @param htd
820    * @param startKeys
821    * @return list of region info for regions added to meta
822    * @throws IOException
823    */
824   public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
825       final HTableDescriptor htd, byte [][] startKeys)
826   throws IOException {
827     HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
828     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
829     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
830     // add custom ones
831     int count = 0;
832     for (int i = 0; i < startKeys.length; i++) {
833       int j = (i + 1) % startKeys.length;
834       HRegionInfo hri = new HRegionInfo(htd, startKeys[i], startKeys[j]);
835       Put put = new Put(hri.getRegionName());
836       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
837         Writables.getBytes(hri));
838       meta.put(put);
839       LOG.info("createMultiRegionsInMeta: inserted " + hri.toString());
840       newRegions.add(hri);
841       count++;
842     }
843     return newRegions;
844   }
845 
846   /**
847    * Returns all rows from the .META. table.
848    *
849    * @throws IOException When reading the rows fails.
850    */
851   public List<byte[]> getMetaTableRows() throws IOException {
852     // TODO: Redo using MetaReader class
853     HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
854     List<byte[]> rows = new ArrayList<byte[]>();
855     ResultScanner s = t.getScanner(new Scan());
856     for (Result result : s) {
857       LOG.info("getMetaTableRows: row -> " +
858         Bytes.toStringBinary(result.getRow()));
859       rows.add(result.getRow());
860     }
861     s.close();
862     return rows;
863   }
864 
865   /**
866    * Returns all rows from the .META. table for a given user table
867    *
868    * @throws IOException When reading the rows fails.
869    */
870   public List<byte[]> getMetaTableRows(byte[] tableName) throws IOException {
871     // TODO: Redo using MetaReader.
872     HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
873     List<byte[]> rows = new ArrayList<byte[]>();
874     ResultScanner s = t.getScanner(new Scan());
875     for (Result result : s) {
876       HRegionInfo info = Writables.getHRegionInfo(
877           result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
878       HTableDescriptor desc = info.getTableDesc();
879       if (Bytes.compareTo(desc.getName(), tableName) == 0) {
880         LOG.info("getMetaTableRows: row -> " +
881             Bytes.toStringBinary(result.getRow()));
882         rows.add(result.getRow());
883       }
884     }
885     s.close();
886     return rows;
887   }
888 
889   /**
890    * Tool to get the reference to the region server object that holds the
891    * region of the specified user table.
892    * It first searches for the meta rows that contain the region of the
893    * specified table, then gets the index of that RS, and finally retrieves
894    * the RS's reference.
895    * @param tableName user table to lookup in .META.
896    * @return region server that holds it, null if the row doesn't exist
897    * @throws IOException
898    */
899   public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
900       throws IOException {
901     List<byte[]> metaRows = getMetaTableRows(tableName);
902     if (metaRows == null || metaRows.size() == 0) {
903       return null;
904     }
905     int index = hbaseCluster.getServerWith(metaRows.get(0));
906     return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
907   }
908 
909   /**
910    * Starts a <code>MiniMRCluster</code> with a default number of
911    * <code>TaskTracker</code>'s.
912    *
913    * @throws IOException When starting the cluster fails.
914    */
915   public void startMiniMapReduceCluster() throws IOException {
916     startMiniMapReduceCluster(2);
917   }
918 
919   /**
920    * Starts a <code>MiniMRCluster</code>.
921    *
922    * @param servers  The number of <code>TaskTracker</code>'s to start.
923    * @throws IOException When starting the cluster fails.
924    */
925   public void startMiniMapReduceCluster(final int servers) throws IOException {
926     LOG.info("Starting mini mapreduce cluster...");
927     // These are needed for the new and improved Map/Reduce framework
928     Configuration c = getConfiguration();
929     System.setProperty("hadoop.log.dir", c.get("hadoop.log.dir"));
930     c.set("mapred.output.dir", c.get("hadoop.tmp.dir"));
931     mrCluster = new MiniMRCluster(servers,
932       FileSystem.get(c).getUri().toString(), 1);
933     LOG.info("Mini mapreduce cluster started");
934     c.set("mapred.job.tracker",
935         mrCluster.createJobConf().get("mapred.job.tracker"));
936   }
937 
938   /**
939    * Stops the previously started <code>MiniMRCluster</code>.
940    */
941   public void shutdownMiniMapReduceCluster() {
942     LOG.info("Stopping mini mapreduce cluster...");
943     if (mrCluster != null) {
944       mrCluster.shutdown();
945     }
946     // Restore configuration to point to local jobtracker
947     conf.set("mapred.job.tracker", "local");
948     LOG.info("Mini mapreduce cluster stopped");
949   }
950 
951   /**
952    * Switches the logger for the given class to DEBUG level.
953    *
954    * @param clazz  The class for which to switch to debug logging.
955    */
956   public void enableDebug(Class<?> clazz) {
957     Log l = LogFactory.getLog(clazz);
958     if (l instanceof Log4JLogger) {
959       ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
960     } else if (l instanceof Jdk14Logger) {
961       ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
962     }
963   }
964 
965   /**
966    * Expire the Master's session
967    * @throws Exception
968    */
969   public void expireMasterSession() throws Exception {
970     HMaster master = hbaseCluster.getMaster();
971     expireSession(master.getZooKeeper(), master);
972   }
973 
974   /**
975    * Expire a region server's session
976    * @param index which RS
977    * @throws Exception
978    */
979   public void expireRegionServerSession(int index) throws Exception {
980     HRegionServer rs = hbaseCluster.getRegionServer(index);
981     expireSession(rs.getZooKeeper(), rs);
982   }
983 
984   public void expireSession(ZooKeeperWatcher nodeZK, Server server)
985   throws Exception {
986     Configuration c = new Configuration(this.conf);
987     String quorumServers = ZKConfig.getZKQuorumServersString(c);
988     int sessionTimeout = 5 * 1000; // 5 seconds
989     ZooKeeper zk = nodeZK.getZooKeeper();
990     byte[] password = zk.getSessionPasswd();
991     long sessionID = zk.getSessionId();
992 
993     ZooKeeper newZK = new ZooKeeper(quorumServers,
994         sessionTimeout, EmptyWatcher.instance, sessionID, password);
995     newZK.close();
996     final long sleep = sessionTimeout * 5L;
997     LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) +
998       "; sleeping=" + sleep);
999 
1000     Thread.sleep(sleep);
1001 
1002     new HTable(new Configuration(conf), HConstants.META_TABLE_NAME);
1003   }
1004 
1005   /**
1006    * Get the HBase cluster.
1007    *
1008    * @return hbase cluster
1009    */
1010   public MiniHBaseCluster getHBaseCluster() {
1011     return hbaseCluster;
1012   }
1013 
1014   /**
1015    * Returns a HBaseAdmin instance.
1016    *
1017    * @return The HBaseAdmin instance.
1018    * @throws IOException
1019    */
1020   public HBaseAdmin getHBaseAdmin()
1021   throws IOException {
1022     return new HBaseAdmin(new Configuration(getConfiguration()));
1023   }
1024 
1025   /**
1026    * Closes the named region.
1027    *
1028    * @param regionName  The region to close.
1029    * @throws IOException
1030    */
1031   public void closeRegion(String regionName) throws IOException {
1032     closeRegion(Bytes.toBytes(regionName));
1033   }
1034 
1035   /**
1036    * Closes the named region.
1037    *
1038    * @param regionName  The region to close.
1039    * @throws IOException
1040    */
1041   public void closeRegion(byte[] regionName) throws IOException {
1042     HBaseAdmin admin = getHBaseAdmin();
1043     admin.closeRegion(regionName, null);
1044   }
1045 
1046   /**
1047    * Closes the region containing the given row.
1048    *
1049    * @param row  The row to find the containing region.
1050    * @param table  The table to find the region.
1051    * @throws IOException
1052    */
1053   public void closeRegionByRow(String row, HTable table) throws IOException {
1054     closeRegionByRow(Bytes.toBytes(row), table);
1055   }
1056 
1057   /**
1058    * Closes the region containing the given row.
1059    *
1060    * @param row  The row to find the containing region.
1061    * @param table  The table to find the region.
1062    * @throws IOException
1063    */
1064   public void closeRegionByRow(byte[] row, HTable table) throws IOException {
1065     HRegionLocation hrl = table.getRegionLocation(row);
1066     closeRegion(hrl.getRegionInfo().getRegionName());
1067   }
1068 
1069   public MiniZooKeeperCluster getZkCluster() {
1070     return zkCluster;
1071   }
1072 
1073   public void setZkCluster(MiniZooKeeperCluster zkCluster) {
1074     this.passedZkCluster = true;
1075     this.zkCluster = zkCluster;
1076   }
1077 
1078   public MiniDFSCluster getDFSCluster() {
1079     return dfsCluster;
1080   }
1081 
1082   public FileSystem getTestFileSystem() throws IOException {
1083     return FileSystem.get(conf);
1084   }
1085 
1086   /**
1087    * @return True if we removed the test dir
1088    * @throws IOException
1089    */
1090   public boolean cleanupTestDir() throws IOException {
1091     return deleteDir(getTestDir());
1092   }
1093 
1094   /**
1095    * @param subdir Test subdir name.
1096    * @return True if we removed the test dir
1097    * @throws IOException
1098    */
1099   public boolean cleanupTestDir(final String subdir) throws IOException {
1100     return deleteDir(getTestDir(subdir));
1101   }
1102 
1103   /**
1104    * @param dir Directory to delete
1105    * @return True if we deleted it.
1106    * @throws IOException
1107    */
1108   public boolean deleteDir(final Path dir) throws IOException {
1109     FileSystem fs = getTestFileSystem();
1110     if (fs.exists(dir)) {
1111       return fs.delete(getTestDir(), true);
1112     }
1113     return false;
1114   }
1115 
1116   public void waitTableAvailable(byte[] table, long timeoutMillis)
1117   throws InterruptedException, IOException {
1118     HBaseAdmin admin = getHBaseAdmin();
1119     long startWait = System.currentTimeMillis();
1120     while (!admin.isTableAvailable(table)) {
1121       assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table),
1122           System.currentTimeMillis() - startWait < timeoutMillis);
1123       Thread.sleep(500);
1124     }
1125   }
1126 
1127   /**
1128    * Make sure that at least the specified number of region servers
1129    * are running
1130    * @param num minimum number of region servers that should be running
1131    * @return True if we started some servers
1132    * @throws IOException
1133    */
1134   public boolean ensureSomeRegionServersAvailable(final int num)
1135       throws IOException {
1136     if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) {
1137       // Need at least "num" servers.
1138       LOG.info("Started new server=" +
1139         this.getHBaseCluster().startRegionServer());
1140       return true;
1141     }
1142     return false;
1143   }
1144 
1145   /**
1146    * This method clones the passed <code>c</code> configuration setting a new
1147    * user into the clone.  Use it getting new instances of FileSystem.  Only
1148    * works for DistributedFileSystem.
1149    * @param c Initial configuration
1150    * @param differentiatingSuffix Suffix to differentiate this user from others.
1151    * @return A new configuration instance with a different user set into it.
1152    * @throws IOException
1153    */
1154   public static User getDifferentUser(final Configuration c,
1155     final String differentiatingSuffix)
1156   throws IOException {
1157     FileSystem currentfs = FileSystem.get(c);
1158     if (!(currentfs instanceof DistributedFileSystem)) {
1159       return User.getCurrent();
1160     }
1161     // Else distributed filesystem.  Make a new instance per daemon.  Below
1162     // code is taken from the AppendTestUtil over in hdfs.
1163     String username = User.getCurrent().getName() +
1164       differentiatingSuffix;
1165     User user = User.createUserForTesting(c, username,
1166         new String[]{"supergroup"});
1167     return user;
1168   }
1169 
1170   /**
1171    * Set soft and hard limits in namenode.
1172    * You'll get a NPE if you call before you've started a minidfscluster.
1173    * @param soft Soft limit
1174    * @param hard Hard limit
1175    * @throws NoSuchFieldException
1176    * @throws SecurityException
1177    * @throws IllegalAccessException
1178    * @throws IllegalArgumentException
1179    */
1180   public void setNameNodeNameSystemLeasePeriod(final int soft, final int hard)
1181   throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
1182     // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another.
1183     // Not available in 0.20 hdfs.  Use reflection to make it happen.
1184 
1185     // private NameNode nameNode;
1186     Field field = this.dfsCluster.getClass().getDeclaredField("nameNode");
1187     field.setAccessible(true);
1188     NameNode nn = (NameNode)field.get(this.dfsCluster);
1189     nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
1190   }
1191 
1192   /**
1193    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
1194    * makes tests linger.  Here is the exception you'll see:
1195    * <pre>
1196    * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
1197    * </pre>
1198    * @param stream A DFSClient.DFSOutputStream.
1199    * @param max
1200    * @throws NoSuchFieldException
1201    * @throws SecurityException
1202    * @throws IllegalAccessException
1203    * @throws IllegalArgumentException
1204    */
1205   public static void setMaxRecoveryErrorCount(final OutputStream stream,
1206       final int max) {
1207     try {
1208       Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
1209       for (Class<?> clazz: clazzes) {
1210         String className = clazz.getSimpleName();
1211         if (className.equals("DFSOutputStream")) {
1212           if (clazz.isInstance(stream)) {
1213             Field maxRecoveryErrorCountField =
1214               stream.getClass().getDeclaredField("maxRecoveryErrorCount");
1215             maxRecoveryErrorCountField.setAccessible(true);
1216             maxRecoveryErrorCountField.setInt(stream, max);
1217             break;
1218           }
1219         }
1220       }
1221     } catch (Exception e) {
1222       LOG.info("Could not set max recovery field", e);
1223     }
1224   }
1225 
1226 
1227   /**
1228    * Wait until <code>countOfRegion</code> in .META. have a non-empty
1229    * info:server.  This means all regions have been deployed, master has been
1230    * informed and updated .META. with the regions deployed server.
1231    * @param conf Configuration
1232    * @param countOfRegions How many regions in .META.
1233    * @throws IOException
1234    */
1235   public void waitUntilAllRegionsAssigned(final int countOfRegions)
1236   throws IOException {
1237     HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME);
1238     while (true) {
1239       int rows = 0;
1240       Scan scan = new Scan();
1241       scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
1242       ResultScanner s = meta.getScanner(scan);
1243       for (Result r = null; (r = s.next()) != null;) {
1244         byte [] b =
1245           r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
1246         if (b == null || b.length <= 0) {
1247           break;
1248         }
1249         rows++;
1250       }
1251       s.close();
1252       // If I get to here and all rows have a Server, then all have been assigned.
1253       if (rows == countOfRegions) {
1254         break;
1255       }
1256       LOG.info("Found=" + rows);
1257       Threads.sleep(1000);
1258     }
1259   }
1260 
1261   /**
1262    * Do a small get/scan against one store. This is required because store
1263    * has no actual methods of querying itself, and relies on StoreScanner.
1264    */
1265   public static List<KeyValue> getFromStoreFile(Store store,
1266                                                 Get get) throws IOException {
1267     ReadWriteConsistencyControl.resetThreadReadPoint();
1268     Scan scan = new Scan(get);
1269     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
1270         scan.getFamilyMap().get(store.getFamily().getName()));
1271 
1272     List<KeyValue> result = new ArrayList<KeyValue>();
1273     scanner.next(result);
1274     if (!result.isEmpty()) {
1275       // verify that we are on the row we want:
1276       KeyValue kv = result.get(0);
1277       if (!Bytes.equals(kv.getRow(), get.getRow())) {
1278         result.clear();
1279       }
1280     }
1281     return result;
1282   }
1283 
1284   /**
1285    * Do a small get/scan against one store. This is required because store
1286    * has no actual methods of querying itself, and relies on StoreScanner.
1287    */
1288   public static List<KeyValue> getFromStoreFile(Store store,
1289                                                 byte [] row,
1290                                                 NavigableSet<byte[]> columns
1291                                                 ) throws IOException {
1292     Get get = new Get(row);
1293     Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
1294     s.put(store.getFamily().getName(), columns);
1295 
1296     return getFromStoreFile(store,get);
1297   }
1298 }