1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.io.PrintStream;
26  import java.io.File;
27  import java.text.SimpleDateFormat;
28  import java.util.ArrayList;
29  import java.util.Date;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  import java.util.TreeMap;
34  import java.util.Arrays;
35  import java.util.regex.Matcher;
36  import java.util.regex.Pattern;
37  import java.lang.reflect.Constructor;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FSDataInputStream;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.client.Get;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.filter.PageFilter;
54  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
55  import org.apache.hadoop.hbase.filter.Filter;
56  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57  import org.apache.hadoop.hbase.filter.CompareFilter;
58  import org.apache.hadoop.hbase.filter.BinaryComparator;
59  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  import org.apache.hadoop.hbase.util.Hash;
63  import org.apache.hadoop.hbase.util.MurmurHash;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
66  import org.apache.hadoop.hdfs.MiniDFSCluster;
67  import org.apache.hadoop.io.LongWritable;
68  import org.apache.hadoop.io.NullWritable;
69  import org.apache.hadoop.io.Text;
70  import org.apache.hadoop.io.Writable;
71  import org.apache.hadoop.mapreduce.InputSplit;
72  import org.apache.hadoop.mapreduce.Job;
73  import org.apache.hadoop.mapreduce.JobContext;
74  import org.apache.hadoop.mapreduce.Mapper;
75  import org.apache.hadoop.mapreduce.RecordReader;
76  import org.apache.hadoop.mapreduce.TaskAttemptContext;
77  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
78  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
79  import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
80  import org.apache.hadoop.util.LineReader;
81  
82  /**
83   * Script used evaluating HBase performance and scalability.  Runs a HBase
84   * client that steps through one of a set of hardcoded tests or 'experiments'
85   * (e.g. a random reads test, a random writes test, etc.). Pass on the
86   * command-line which test to run and how many clients are participating in
87   * this experiment. Run <code>java PerformanceEvaluation --help</code> to
88   * obtain usage.
89   *
90   * <p>This class sets up and runs the evaluation programs described in
91   * Section 7, <i>Performance Evaluation</i>, of the <a
92   * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
93   * paper, pages 8-10.
94   *
95   * <p>If number of clients > 1, we start up a MapReduce job. Each map task
96   * runs an individual client. Each client does about 1GB of data.
97   */
98  public class PerformanceEvaluation {
99    protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
100 
101   private static final int ROW_LENGTH = 1000;
102   private static final int ONE_GB = 1024 * 1024 * 1000;
103   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
104 
105   public static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
106   public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
107   public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
108 
109   protected static final HTableDescriptor TABLE_DESCRIPTOR;
110   static {
111     TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
112     TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
113   }
114 
115   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
116 
117   volatile Configuration conf;
118   private boolean miniCluster = false;
119   private boolean nomapred = false;
120   private int N = 1;
121   private int R = ROWS_PER_GB;
122   private boolean flushCommits = true;
123   private boolean writeToWAL = true;
124 
125   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
126   /**
127    * Regex to parse lines in input file passed to mapreduce task.
128    */
129   public static final Pattern LINE_PATTERN =
130     Pattern.compile("startRow=(\\d+),\\s+" +
131         "perClientRunRows=(\\d+),\\s+" +
132         "totalRows=(\\d+),\\s+" +
133         "clients=(\\d+),\\s+" +
134         "flushCommits=(\\w+),\\s+" +
135         "writeToWAL=(\\w+)");
136 
137   /**
138    * Enum for map metrics.  Keep it out here rather than inside in the Map
139    * inner-class so we can find associated properties.
140    */
141   protected static enum Counter {
142     /** elapsed time */
143     ELAPSED_TIME,
144     /** number of rows */
145     ROWS}
146 
147 
148   /**
149    * Constructor
150    * @param c Configuration object
151    */
152   public PerformanceEvaluation(final Configuration c) {
153     this.conf = c;
154 
155     addCommandDescriptor(RandomReadTest.class, "randomRead",
156         "Run random read test");
157     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
158         "Run random seek and scan 100 test");
159     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
160         "Run random seek scan with both start and stop row (max 10 rows)");
161     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
162         "Run random seek scan with both start and stop row (max 100 rows)");
163     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
164         "Run random seek scan with both start and stop row (max 1000 rows)");
165     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
166         "Run random seek scan with both start and stop row (max 10000 rows)");
167     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
168         "Run random write test");
169     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
170         "Run sequential read test");
171     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
172         "Run sequential write test");
173     addCommandDescriptor(ScanTest.class, "scan",
174         "Run scan test (read every row)");
175     addCommandDescriptor(FilteredScanTest.class, "filterScan",
176         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
177   }
178 
179   protected void addCommandDescriptor(Class<? extends Test> cmdClass,
180       String name, String description) {
181     CmdDescriptor cmdDescriptor =
182       new CmdDescriptor(cmdClass, name, description);
183     commands.put(name, cmdDescriptor);
184   }
185 
186   /**
187    * Implementations can have their status set.
188    */
189   static interface Status {
190     /**
191      * Sets status
192      * @param msg status message
193      * @throws IOException
194      */
195     void setStatus(final String msg) throws IOException;
196   }
197 
198   /**
199    *  This class works as the InputSplit of Performance Evaluation
200    *  MapReduce InputFormat, and the Record Value of RecordReader.
201    *  Each map task will only read one record from a PeInputSplit,
202    *  the record value is the PeInputSplit itself.
203    */
204   public static class PeInputSplit extends InputSplit implements Writable {
205     private int startRow = 0;
206     private int rows = 0;
207     private int totalRows = 0;
208     private int clients = 0;
209     private boolean flushCommits = false;
210     private boolean writeToWAL = true;
211 
212     public PeInputSplit() {
213       this.startRow = 0;
214       this.rows = 0;
215       this.totalRows = 0;
216       this.clients = 0;
217       this.flushCommits = false;
218       this.writeToWAL = true;
219     }
220 
221     public PeInputSplit(int startRow, int rows, int totalRows, int clients,
222         boolean flushCommits, boolean writeToWAL) {
223       this.startRow = startRow;
224       this.rows = rows;
225       this.totalRows = totalRows;
226       this.clients = clients;
227       this.flushCommits = flushCommits;
228       this.writeToWAL = writeToWAL;
229     }
230 
231     @Override
232     public void readFields(DataInput in) throws IOException {
233       this.startRow = in.readInt();
234       this.rows = in.readInt();
235       this.totalRows = in.readInt();
236       this.clients = in.readInt();
237       this.flushCommits = in.readBoolean();
238       this.writeToWAL = in.readBoolean();
239     }
240 
241     @Override
242     public void write(DataOutput out) throws IOException {
243       out.writeInt(startRow);
244       out.writeInt(rows);
245       out.writeInt(totalRows);
246       out.writeInt(clients);
247       out.writeBoolean(flushCommits);
248       out.writeBoolean(writeToWAL);
249     }
250 
251     @Override
252     public long getLength() throws IOException, InterruptedException {
253       return 0;
254     }
255 
256     @Override
257     public String[] getLocations() throws IOException, InterruptedException {
258       return new String[0];
259     }
260 
261     public int getStartRow() {
262       return startRow;
263     }
264 
265     public int getRows() {
266       return rows;
267     }
268 
269     public int getTotalRows() {
270       return totalRows;
271     }
272 
273     public int getClients() {
274       return clients;
275     }
276 
277     public boolean isFlushCommits() {
278       return flushCommits;
279     }
280 
281     public boolean isWriteToWAL() {
282       return writeToWAL;
283     }
284   }
285 
286   /**
287    *  InputFormat of Performance Evaluation MapReduce job.
288    *  It extends from FileInputFormat, want to use it's methods such as setInputPaths().
289    */
290   public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
291 
292     @Override
293     public List<InputSplit> getSplits(JobContext job) throws IOException {
294       // generate splits
295       List<InputSplit> splitList = new ArrayList<InputSplit>();
296 
297       for (FileStatus file: listStatus(job)) {
298         Path path = file.getPath();
299         FileSystem fs = path.getFileSystem(job.getConfiguration());
300         FSDataInputStream fileIn = fs.open(path);
301         LineReader in = new LineReader(fileIn, job.getConfiguration());
302         int lineLen = 0;
303         while(true) {
304           Text lineText = new Text();
305           lineLen = in.readLine(lineText);
306           if(lineLen <= 0) {
307           break;
308           }
309           Matcher m = LINE_PATTERN.matcher(lineText.toString());
310           if((m != null) && m.matches()) {
311             int startRow = Integer.parseInt(m.group(1));
312             int rows = Integer.parseInt(m.group(2));
313             int totalRows = Integer.parseInt(m.group(3));
314             int clients = Integer.parseInt(m.group(4));
315             boolean flushCommits = Boolean.parseBoolean(m.group(5));
316             boolean writeToWAL = Boolean.parseBoolean(m.group(6));
317 
318             LOG.debug("split["+ splitList.size() + "] " +
319                      " startRow=" + startRow +
320                      " rows=" + rows +
321                      " totalRows=" + totalRows +
322                      " clients=" + clients +
323                      " flushCommits=" + flushCommits +
324                      " writeToWAL=" + writeToWAL);
325 
326             PeInputSplit newSplit =
327               new PeInputSplit(startRow, rows, totalRows, clients,
328                 flushCommits, writeToWAL);
329             splitList.add(newSplit);
330           }
331         }
332         in.close();
333       }
334 
335       LOG.info("Total # of splits: " + splitList.size());
336       return splitList;
337     }
338 
339     @Override
340     public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
341                             TaskAttemptContext context) {
342       return new PeRecordReader();
343     }
344 
345     public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
346       private boolean readOver = false;
347       private PeInputSplit split = null;
348       private NullWritable key = null;
349       private PeInputSplit value = null;
350 
351       @Override
352       public void initialize(InputSplit split, TaskAttemptContext context)
353                   throws IOException, InterruptedException {
354         this.readOver = false;
355         this.split = (PeInputSplit)split;
356       }
357 
358       @Override
359       public boolean nextKeyValue() throws IOException, InterruptedException {
360         if(readOver) {
361           return false;
362         }
363 
364         key = NullWritable.get();
365         value = (PeInputSplit)split;
366 
367         readOver = true;
368         return true;
369       }
370 
371       @Override
372       public NullWritable getCurrentKey() throws IOException, InterruptedException {
373         return key;
374       }
375 
376       @Override
377       public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
378         return value;
379       }
380 
381       @Override
382       public float getProgress() throws IOException, InterruptedException {
383         if(readOver) {
384           return 1.0f;
385         } else {
386           return 0.0f;
387         }
388       }
389 
390       @Override
391       public void close() throws IOException {
392         // do nothing
393       }
394     }
395   }
396 
397   /**
398    * MapReduce job that runs a performance evaluation client in each map task.
399    */
400   public static class EvaluationMapTask
401       extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
402 
403     /** configuration parameter name that contains the command */
404     public final static String CMD_KEY = "EvaluationMapTask.command";
405     /** configuration parameter name that contains the PE impl */
406     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
407 
408     private Class<? extends Test> cmd;
409     private PerformanceEvaluation pe;
410 
411     @Override
412     protected void setup(Context context) throws IOException, InterruptedException {
413       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
414 
415       // this is required so that extensions of PE are instantiated within the
416       // map reduce task...
417       Class<? extends PerformanceEvaluation> peClass =
418           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
419       try {
420         this.pe = peClass.getConstructor(Configuration.class)
421             .newInstance(context.getConfiguration());
422       } catch (Exception e) {
423         throw new IllegalStateException("Could not instantiate PE instance", e);
424       }
425     }
426 
427     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
428       Class<? extends Type> clazz = null;
429       try {
430         clazz = Class.forName(className).asSubclass(type);
431       } catch (ClassNotFoundException e) {
432         throw new IllegalStateException("Could not find class for name: " + className, e);
433       }
434       return clazz;
435     }
436 
437     protected void map(NullWritable key, PeInputSplit value, final Context context)
438            throws IOException, InterruptedException {
439 
440       Status status = new Status() {
441         public void setStatus(String msg) {
442            context.setStatus(msg);
443         }
444       };
445 
446       // Evaluation task
447       long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
448                                   value.getRows(), value.getTotalRows(),
449                                   value.isFlushCommits(), value.isWriteToWAL(),
450                                   status);
451       // Collect how much time the thing took. Report as map output and
452       // to the ELAPSED_TIME counter.
453       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
454       context.getCounter(Counter.ROWS).increment(value.rows);
455       context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
456       context.progress();
457     }
458   }
459 
460   /*
461    * If table does not already exist, create.
462    * @param c Client to use checking.
463    * @return True if we created the table.
464    * @throws IOException
465    */
466   private boolean checkTable(HBaseAdmin admin) throws IOException {
467     HTableDescriptor tableDescriptor = getTableDescriptor();
468     boolean tableExists = admin.tableExists(tableDescriptor.getName());
469     if (!tableExists) {
470       admin.createTable(tableDescriptor);
471       LOG.info("Table " + tableDescriptor + " created");
472     }
473     return !tableExists;
474   }
475 
476   protected HTableDescriptor getTableDescriptor() {
477     return TABLE_DESCRIPTOR;
478   }
479 
480   /*
481    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
482    * one map per client.  Then run a single reduce to sum the elapsed times.
483    * @param cmd Command to run.
484    * @throws IOException
485    */
486   private void runNIsMoreThanOne(final Class<? extends Test> cmd)
487   throws IOException, InterruptedException, ClassNotFoundException {
488     checkTable(new HBaseAdmin(conf));
489     if (this.nomapred) {
490       doMultipleClients(cmd);
491     } else {
492       doMapReduce(cmd);
493     }
494   }
495 
496   /*
497    * Run all clients in this vm each to its own thread.
498    * @param cmd Command to run.
499    * @throws IOException
500    */
501   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
502     final List<Thread> threads = new ArrayList<Thread>(this.N);
503     final int perClientRows = R/N;
504     for (int i = 0; i < this.N; i++) {
505       Thread t = new Thread (Integer.toString(i)) {
506         @Override
507         public void run() {
508           super.run();
509           PerformanceEvaluation pe = new PerformanceEvaluation(conf);
510           int index = Integer.parseInt(getName());
511           try {
512             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
513                perClientRows, R,
514                 flushCommits, writeToWAL, new Status() {
515                   public void setStatus(final String msg) throws IOException {
516                     LOG.info("client-" + getName() + " " + msg);
517                   }
518                 });
519             LOG.info("Finished " + getName() + " in " + elapsedTime +
520               "ms writing " + perClientRows + " rows");
521           } catch (IOException e) {
522             throw new RuntimeException(e);
523           }
524         }
525       };
526       threads.add(t);
527     }
528     for (Thread t: threads) {
529       t.start();
530     }
531     for (Thread t: threads) {
532       while(t.isAlive()) {
533         try {
534           t.join();
535         } catch (InterruptedException e) {
536           LOG.debug("Interrupted, continuing" + e.toString());
537         }
538       }
539     }
540   }
541 
542   /*
543    * Run a mapreduce job.  Run as many maps as asked-for clients.
544    * Before we start up the job, write out an input file with instruction
545    * per client regards which row they are to start on.
546    * @param cmd Command to run.
547    * @throws IOException
548    */
549   private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
550         InterruptedException, ClassNotFoundException {
551     Path inputDir = writeInputFile(this.conf);
552     this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
553     this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
554     Job job = new Job(this.conf);
555     job.setJarByClass(PerformanceEvaluation.class);
556     job.setJobName("HBase Performance Evaluation");
557 
558     job.setInputFormatClass(PeInputFormat.class);
559     PeInputFormat.setInputPaths(job, inputDir);
560 
561     job.setOutputKeyClass(LongWritable.class);
562     job.setOutputValueClass(LongWritable.class);
563 
564     job.setMapperClass(EvaluationMapTask.class);
565     job.setReducerClass(LongSumReducer.class);
566 
567     job.setNumReduceTasks(1);
568 
569     job.setOutputFormatClass(TextOutputFormat.class);
570     TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
571 
572     TableMapReduceUtil.addDependencyJars(job);
573     job.waitForCompletion(true);
574   }
575 
576   /*
577    * Write input file of offsets-per-client for the mapreduce job.
578    * @param c Configuration
579    * @return Directory that contains file written.
580    * @throws IOException
581    */
582   private Path writeInputFile(final Configuration c) throws IOException {
583     FileSystem fs = FileSystem.get(c);
584     if (!fs.exists(PERF_EVAL_DIR)) {
585       fs.mkdirs(PERF_EVAL_DIR);
586     }
587     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
588     Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
589     fs.mkdirs(subdir);
590     Path inputFile = new Path(subdir, "input.txt");
591     PrintStream out = new PrintStream(fs.create(inputFile));
592     // Make input random.
593     Map<Integer, String> m = new TreeMap<Integer, String>();
594     Hash h = MurmurHash.getInstance();
595     int perClientRows = (this.R / this.N);
596     try {
597       for (int i = 0; i < 10; i++) {
598         for (int j = 0; j < N; j++) {
599           String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
600           ", perClientRunRows=" + (perClientRows / 10) +
601           ", totalRows=" + this.R +
602           ", clients=" + this.N +
603           ", flushCommits=" + this.flushCommits +
604           ", writeToWAL=" + this.writeToWAL;
605           int hash = h.hash(Bytes.toBytes(s));
606           m.put(hash, s);
607         }
608       }
609       for (Map.Entry<Integer, String> e: m.entrySet()) {
610         out.println(e.getValue());
611       }
612     } finally {
613       out.close();
614     }
615     return subdir;
616   }
617 
618   /**
619    * Describes a command.
620    */
621   static class CmdDescriptor {
622     private Class<? extends Test> cmdClass;
623     private String name;
624     private String description;
625 
626     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
627       this.cmdClass = cmdClass;
628       this.name = name;
629       this.description = description;
630     }
631 
632     public Class<? extends Test> getCmdClass() {
633       return cmdClass;
634     }
635 
636     public String getName() {
637       return name;
638     }
639 
640     public String getDescription() {
641       return description;
642     }
643   }
644 
645   /**
646    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test
647    * tests}.  This makes the reflection logic a little easier to understand...
648    */
649   static class TestOptions {
650     private int startRow;
651     private int perClientRunRows;
652     private int totalRows;
653     private byte[] tableName;
654     private boolean flushCommits;
655     private boolean writeToWAL = true;
656 
657     TestOptions() {
658     }
659 
660     TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) {
661       this.startRow = startRow;
662       this.perClientRunRows = perClientRunRows;
663       this.totalRows = totalRows;
664       this.tableName = tableName;
665       this.flushCommits = flushCommits;
666       this.writeToWAL = writeToWAL;
667     }
668 
669     public int getStartRow() {
670       return startRow;
671     }
672 
673     public int getPerClientRunRows() {
674       return perClientRunRows;
675     }
676 
677     public int getTotalRows() {
678       return totalRows;
679     }
680 
681     public byte[] getTableName() {
682       return tableName;
683     }
684 
685     public boolean isFlushCommits() {
686       return flushCommits;
687     }
688 
689     public boolean isWriteToWAL() {
690       return writeToWAL;
691     }
692   }
693 
694   /*
695    * A test.
696    * Subclass to particularize what happens per row.
697    */
698   static abstract class Test {
699     // Below is make it so when Tests are all running in the one
700     // jvm, that they each have a differently seeded Random.
701     private static final Random randomSeed =
702       new Random(System.currentTimeMillis());
703     private static long nextRandomSeed() {
704       return randomSeed.nextLong();
705     }
706     protected final Random rand = new Random(nextRandomSeed());
707 
708     protected final int startRow;
709     protected final int perClientRunRows;
710     protected final int totalRows;
711     private final Status status;
712     protected byte[] tableName;
713     protected HBaseAdmin admin;
714     protected HTable table;
715     protected volatile Configuration conf;
716     protected boolean flushCommits;
717     protected boolean writeToWAL;
718 
719     /**
720      * Note that all subclasses of this class must provide a public contructor
721      * that has the exact same list of arguments.
722      */
723     Test(final Configuration conf, final TestOptions options, final Status status) {
724       super();
725       this.startRow = options.getStartRow();
726       this.perClientRunRows = options.getPerClientRunRows();
727       this.totalRows = options.getTotalRows();
728       this.status = status;
729       this.tableName = options.getTableName();
730       this.table = null;
731       this.conf = conf;
732       this.flushCommits = options.isFlushCommits();
733       this.writeToWAL = options.isWriteToWAL();
734     }
735 
736     private String generateStatus(final int sr, final int i, final int lr) {
737       return sr + "/" + i + "/" + lr;
738     }
739 
740     protected int getReportingPeriod() {
741       int period = this.perClientRunRows / 10;
742       return period == 0? this.perClientRunRows: period;
743     }
744 
745     void testSetup() throws IOException {
746       this.admin = new HBaseAdmin(conf);
747       this.table = new HTable(conf, tableName);
748       this.table.setAutoFlush(false);
749       this.table.setScannerCaching(30);
750     }
751 
752     void testTakedown()  throws IOException {
753       if (flushCommits) {
754         this.table.flushCommits();
755       }
756     }
757 
758     /*
759      * Run test
760      * @return Elapsed time.
761      * @throws IOException
762      */
763     long test() throws IOException {
764       long elapsedTime;
765       testSetup();
766       long startTime = System.currentTimeMillis();
767       try {
768         testTimed();
769         elapsedTime = System.currentTimeMillis() - startTime;
770       } finally {
771         testTakedown();
772       }
773       return elapsedTime;
774     }
775 
776     /**
777      * Provides an extension point for tests that don't want a per row invocation.
778      */
779     void testTimed() throws IOException {
780       int lastRow = this.startRow + this.perClientRunRows;
781       // Report on completion of 1/10th of total.
782       for (int i = this.startRow; i < lastRow; i++) {
783         testRow(i);
784         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
785           status.setStatus(generateStatus(this.startRow, i, lastRow));
786         }
787       }
788     }
789 
790     /*
791     * Test for individual row.
792     * @param i Row index.
793     */
794     void testRow(final int i) throws IOException {
795     }
796   }
797 
798   @SuppressWarnings("unused")
799   static class RandomSeekScanTest extends Test {
800     RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
801       super(conf, options, status);
802     }
803 
804     @Override
805     void testRow(final int i) throws IOException {
806       Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
807       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
808       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
809       ResultScanner s = this.table.getScanner(scan);
810       //int count = 0;
811       for (Result rr = null; (rr = s.next()) != null;) {
812         // LOG.info("" + count++ + " " + rr.toString());
813       }
814       s.close();
815     }
816 
817     @Override
818     protected int getReportingPeriod() {
819       int period = this.perClientRunRows / 100;
820       return period == 0? this.perClientRunRows: period;
821     }
822 
823   }
824 
825   @SuppressWarnings("unused")
826   static abstract class RandomScanWithRangeTest extends Test {
827     RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
828       super(conf, options, status);
829     }
830 
831     @Override
832     void testRow(final int i) throws IOException {
833       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
834       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
835       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
836       ResultScanner s = this.table.getScanner(scan);
837       int count = 0;
838       for (Result rr = null; (rr = s.next()) != null;) {
839         count++;
840       }
841 
842       if (i % 100 == 0) {
843         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
844             Bytes.toString(startAndStopRow.getFirst()),
845             Bytes.toString(startAndStopRow.getSecond()), count));
846       }
847 
848       s.close();
849     }
850 
851     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
852 
853     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
854       int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
855       int stop = start + maxRange;
856       return new Pair<byte[],byte[]>(format(start), format(stop));
857     }
858 
859     @Override
860     protected int getReportingPeriod() {
861       int period = this.perClientRunRows / 100;
862       return period == 0? this.perClientRunRows: period;
863     }
864   }
865 
866   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
867     RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
868       super(conf, options, status);
869     }
870 
871     @Override
872     protected Pair<byte[], byte[]> getStartAndStopRow() {
873       return generateStartAndStopRows(10);
874     }
875   }
876 
877   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
878     RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
879       super(conf, options, status);
880     }
881 
882     @Override
883     protected Pair<byte[], byte[]> getStartAndStopRow() {
884       return generateStartAndStopRows(100);
885     }
886   }
887 
888   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
889     RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
890       super(conf, options, status);
891     }
892 
893     @Override
894     protected Pair<byte[], byte[]> getStartAndStopRow() {
895       return generateStartAndStopRows(1000);
896     }
897   }
898 
899   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
900     RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
901       super(conf, options, status);
902     }
903 
904     @Override
905     protected Pair<byte[], byte[]> getStartAndStopRow() {
906       return generateStartAndStopRows(10000);
907     }
908   }
909 
910   static class RandomReadTest extends Test {
911     RandomReadTest(Configuration conf, TestOptions options, Status status) {
912       super(conf, options, status);
913     }
914 
915     @Override
916     void testRow(final int i) throws IOException {
917       Get get = new Get(getRandomRow(this.rand, this.totalRows));
918       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
919       this.table.get(get);
920     }
921 
922     @Override
923     protected int getReportingPeriod() {
924       int period = this.perClientRunRows / 100;
925       return period == 0? this.perClientRunRows: period;
926     }
927 
928   }
929 
930   static class RandomWriteTest extends Test {
931     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
932       super(conf, options, status);
933     }
934 
935     @Override
936     void testRow(final int i) throws IOException {
937       byte [] row = getRandomRow(this.rand, this.totalRows);
938       Put put = new Put(row);
939       byte[] value = generateValue(this.rand);
940       put.add(FAMILY_NAME, QUALIFIER_NAME, value);
941       put.setWriteToWAL(writeToWAL);
942       table.put(put);
943     }
944   }
945 
946   static class ScanTest extends Test {
947     private ResultScanner testScanner;
948 
949     ScanTest(Configuration conf, TestOptions options, Status status) {
950       super(conf, options, status);
951     }
952 
953     @Override
954     void testSetup() throws IOException {
955       super.testSetup();
956     }
957 
958     @Override
959     void testTakedown() throws IOException {
960       if (this.testScanner != null) {
961         this.testScanner.close();
962       }
963       super.testTakedown();
964     }
965 
966 
967     @Override
968     void testRow(final int i) throws IOException {
969       if (this.testScanner == null) {
970         Scan scan = new Scan(format(this.startRow));
971         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
972         this.testScanner = table.getScanner(scan);
973       }
974       testScanner.next();
975     }
976 
977   }
978 
979   static class SequentialReadTest extends Test {
980     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
981       super(conf, options, status);
982     }
983 
984     @Override
985     void testRow(final int i) throws IOException {
986       Get get = new Get(format(i));
987       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
988       table.get(get);
989     }
990 
991   }
992 
993   static class SequentialWriteTest extends Test {
994     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
995       super(conf, options, status);
996     }
997 
998     @Override
999     void testRow(final int i) throws IOException {
1000       Put put = new Put(format(i));
1001       byte[] value = generateValue(this.rand);
1002       put.add(FAMILY_NAME, QUALIFIER_NAME, value);
1003       put.setWriteToWAL(writeToWAL);
1004       table.put(put);
1005     }
1006 
1007   }
1008 
1009   static class FilteredScanTest extends Test {
1010     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1011 
1012     FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1013       super(conf, options, status);
1014     }
1015 
1016     @Override
1017     void testRow(int i) throws IOException {
1018       byte[] value = generateValue(this.rand);
1019       Scan scan = constructScan(value);
1020       ResultScanner scanner = null;
1021       try {
1022         scanner = this.table.getScanner(scan);
1023         while (scanner.next() != null) {
1024         }
1025       } finally {
1026         if (scanner != null) scanner.close();
1027       }
1028     }
1029 
1030     protected Scan constructScan(byte[] valuePrefix) throws IOException {
1031       Filter filter = new SingleColumnValueFilter(
1032           FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
1033           new BinaryComparator(valuePrefix)
1034       );
1035       Scan scan = new Scan();
1036       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1037       scan.setFilter(filter);
1038       return scan;
1039     }
1040   }
1041 
1042   /*
1043    * Format passed integer.
1044    * @param number
1045    * @return Returns zero-prefixed 10-byte wide decimal version of passed
1046    * number (Does absolute in case number is negative).
1047    */
1048   public static byte [] format(final int number) {
1049     byte [] b = new byte[10];
1050     int d = Math.abs(number);
1051     for (int i = b.length - 1; i >= 0; i--) {
1052       b[i] = (byte)((d % 10) + '0');
1053       d /= 10;
1054     }
1055     return b;
1056   }
1057 
1058   /*
1059    * This method takes some time and is done inline uploading data.  For
1060    * example, doing the mapfile test, generation of the key and value
1061    * consumes about 30% of CPU time.
1062    * @return Generated random value to insert into a table cell.
1063    */
1064   public static byte[] generateValue(final Random r) {
1065     byte [] b = new byte [ROW_LENGTH];
1066     r.nextBytes(b);
1067     return b;
1068   }
1069 
1070   static byte [] getRandomRow(final Random random, final int totalRows) {
1071     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1072   }
1073 
1074   long runOneClient(final Class<? extends Test> cmd, final int startRow,
1075                     final int perClientRunRows, final int totalRows,
1076                     boolean flushCommits, boolean writeToWAL,
1077                     final Status status)
1078   throws IOException {
1079     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
1080       perClientRunRows + " rows");
1081     long totalElapsedTime = 0;
1082 
1083     Test t = null;
1084     TestOptions options = new TestOptions(startRow, perClientRunRows,
1085         totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL);
1086     try {
1087       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
1088           Configuration.class, TestOptions.class, Status.class);
1089       t = constructor.newInstance(this.conf, options, status);
1090     } catch (NoSuchMethodException e) {
1091       throw new IllegalArgumentException("Invalid command class: " +
1092           cmd.getName() + ".  It does not provide a constructor as described by" +
1093           "the javadoc comment.  Available constructors are: " +
1094           Arrays.toString(cmd.getConstructors()));
1095     } catch (Exception e) {
1096       throw new IllegalStateException("Failed to construct command class", e);
1097     }
1098     totalElapsedTime = t.test();
1099 
1100     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1101       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
1102     return totalElapsedTime;
1103   }
1104 
1105   private void runNIsOne(final Class<? extends Test> cmd) {
1106     Status status = new Status() {
1107       public void setStatus(String msg) throws IOException {
1108         LOG.info(msg);
1109       }
1110     };
1111 
1112     HBaseAdmin admin = null;
1113     try {
1114       admin = new HBaseAdmin(this.conf);
1115       checkTable(admin);
1116       runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL,
1117         status);
1118     } catch (Exception e) {
1119       LOG.error("Failed", e);
1120     }
1121   }
1122 
1123   private void runTest(final Class<? extends Test> cmd) throws IOException,
1124           InterruptedException, ClassNotFoundException {
1125     MiniHBaseCluster hbaseMiniCluster = null;
1126     MiniDFSCluster dfsCluster = null;
1127     MiniZooKeeperCluster zooKeeperCluster = null;
1128     if (this.miniCluster) {
1129       dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
1130       zooKeeperCluster = new MiniZooKeeperCluster();
1131       int zooKeeperPort = zooKeeperCluster.startup(new File(System.getProperty("java.io.tmpdir")));
1132 
1133       // mangle the conf so that the fs parameter points to the minidfs we
1134       // just started up
1135       FileSystem fs = dfsCluster.getFileSystem();
1136       conf.set("fs.default.name", fs.getUri().toString());
1137       conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
1138       Path parentdir = fs.getHomeDirectory();
1139       conf.set(HConstants.HBASE_DIR, parentdir.toString());
1140       fs.mkdirs(parentdir);
1141       FSUtils.setVersion(fs, parentdir);
1142       hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
1143     }
1144 
1145     try {
1146       if (N == 1) {
1147         // If there is only one client and one HRegionServer, we assume nothing
1148         // has been set up at all.
1149         runNIsOne(cmd);
1150       } else {
1151         // Else, run
1152         runNIsMoreThanOne(cmd);
1153       }
1154     } finally {
1155       if(this.miniCluster) {
1156         if (hbaseMiniCluster != null) hbaseMiniCluster.shutdown();
1157         if (zooKeeperCluster != null) zooKeeperCluster.shutdown();
1158         HBaseTestCase.shutdownDfs(dfsCluster);
1159       }
1160     }
1161   }
1162 
1163   protected void printUsage() {
1164     printUsage(null);
1165   }
1166 
1167   protected void printUsage(final String message) {
1168     if (message != null && message.length() > 0) {
1169       System.err.println(message);
1170     }
1171     System.err.println("Usage: java " + this.getClass().getName() + " \\");
1172     System.err.println("  [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
1173     System.err.println();
1174     System.err.println("Options:");
1175     System.err.println(" miniCluster     Run the test on an HBaseMiniCluster");
1176     System.err.println(" nomapred        Run multiple clients using threads " +
1177       "(rather than use mapreduce)");
1178     System.err.println(" rows            Rows each client runs. Default: One million");
1179     System.err.println(" flushCommits    Used to determine if the test should flush the table.  Default: false");
1180     System.err.println(" writeToWAL      Set writeToWAL on puts. Default: True");
1181     System.err.println();
1182     System.err.println("Command:");
1183     for (CmdDescriptor command : commands.values()) {
1184       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1185     }
1186     System.err.println();
1187     System.err.println("Args:");
1188     System.err.println(" nclients        Integer. Required. Total number of " +
1189       "clients (and HRegionServers)");
1190     System.err.println("                 running: 1 <= value <= 500");
1191     System.err.println("Examples:");
1192     System.err.println(" To run a single evaluation client:");
1193     System.err.println(" $ bin/hbase " + this.getClass().getName()
1194         + " sequentialWrite 1");
1195   }
1196 
1197   private void getArgs(final int start, final String[] args) {
1198     if(start + 1 > args.length) {
1199       throw new IllegalArgumentException("must supply the number of clients");
1200     }
1201     N = Integer.parseInt(args[start]);
1202     if (N < 1) {
1203       throw new IllegalArgumentException("Number of clients must be > 1");
1204     }
1205     // Set total number of rows to write.
1206     this.R = this.R * N;
1207   }
1208 
1209   public int doCommandLine(final String[] args) {
1210     // Process command-line args. TODO: Better cmd-line processing
1211     // (but hopefully something not as painful as cli options).
1212     int errCode = -1;
1213     if (args.length < 1) {
1214       printUsage();
1215       return errCode;
1216     }
1217 
1218     try {
1219       for (int i = 0; i < args.length; i++) {
1220         String cmd = args[i];
1221         if (cmd.equals("-h") || cmd.startsWith("--h")) {
1222           printUsage();
1223           errCode = 0;
1224           break;
1225         }
1226 
1227         final String miniClusterArgKey = "--miniCluster";
1228         if (cmd.startsWith(miniClusterArgKey)) {
1229           this.miniCluster = true;
1230           continue;
1231         }
1232 
1233         final String nmr = "--nomapred";
1234         if (cmd.startsWith(nmr)) {
1235           this.nomapred = true;
1236           continue;
1237         }
1238 
1239         final String rows = "--rows=";
1240         if (cmd.startsWith(rows)) {
1241           this.R = Integer.parseInt(cmd.substring(rows.length()));
1242           continue;
1243         }
1244 
1245         final String flushCommits = "--flushCommits=";
1246         if (cmd.startsWith(flushCommits)) {
1247           this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1248           continue;
1249         }
1250 
1251         final String writeToWAL = "--writeToWAL=";
1252         if (cmd.startsWith(writeToWAL)) {
1253           this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1254           continue;
1255         }
1256 
1257         Class<? extends Test> cmdClass = determineCommandClass(cmd);
1258         if (cmdClass != null) {
1259           getArgs(i + 1, args);
1260           runTest(cmdClass);
1261           errCode = 0;
1262           break;
1263         }
1264 
1265         printUsage();
1266         break;
1267       }
1268     } catch (Exception e) {
1269       e.printStackTrace();
1270     }
1271 
1272     return errCode;
1273   }
1274 
1275   private Class<? extends Test> determineCommandClass(String cmd) {
1276     CmdDescriptor descriptor = commands.get(cmd);
1277     return descriptor != null ? descriptor.getCmdClass() : null;
1278   }
1279 
1280   /**
1281    * @param args
1282    */
1283   public static void main(final String[] args) {
1284     Configuration c = HBaseConfiguration.create();
1285     System.exit(new PerformanceEvaluation(c).doCommandLine(args));
1286   }
1287 }