1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.rest;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.io.PrintStream;
26  import java.text.SimpleDateFormat;
27  import java.util.ArrayList;
28  import java.util.Date;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.TreeMap;
33  import java.util.Arrays;
34  import java.util.regex.Matcher;
35  import java.util.regex.Pattern;
36  import java.lang.reflect.Constructor;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FSDataInputStream;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.client.Get;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.ResultScanner;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.filter.PageFilter;
54  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
55  import org.apache.hadoop.hbase.filter.Filter;
56  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57  import org.apache.hadoop.hbase.filter.CompareFilter;
58  import org.apache.hadoop.hbase.filter.BinaryComparator;
59  import org.apache.hadoop.hbase.rest.client.Client;
60  import org.apache.hadoop.hbase.rest.client.Cluster;
61  import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
62  import org.apache.hadoop.hbase.rest.client.RemoteHTable;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.Hash;
65  import org.apache.hadoop.hbase.util.MurmurHash;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.io.LongWritable;
68  import org.apache.hadoop.io.NullWritable;
69  import org.apache.hadoop.io.Text;
70  import org.apache.hadoop.io.Writable;
71  import org.apache.hadoop.mapreduce.InputSplit;
72  import org.apache.hadoop.mapreduce.Job;
73  import org.apache.hadoop.mapreduce.JobContext;
74  import org.apache.hadoop.mapreduce.Mapper;
75  import org.apache.hadoop.mapreduce.RecordReader;
76  import org.apache.hadoop.mapreduce.TaskAttemptContext;
77  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
78  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
79  import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
80  import org.apache.hadoop.util.LineReader;
81  
82  /**
83   * Script used evaluating Stargate performance and scalability.  Runs a SG
84   * client that steps through one of a set of hardcoded tests or 'experiments'
85   * (e.g. a random reads test, a random writes test, etc.). Pass on the
86   * command-line which test to run and how many clients are participating in
87   * this experiment. Run <code>java PerformanceEvaluation --help</code> to
88   * obtain usage.
89   * 
90   * <p>This class sets up and runs the evaluation programs described in
91   * Section 7, <i>Performance Evaluation</i>, of the <a
92   * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
93   * paper, pages 8-10.
94   * 
95   * <p>If number of clients > 1, we start up a MapReduce job. Each map task
96   * runs an individual client. Each client does about 1GB of data.
97   */
98  public class PerformanceEvaluation  {
99    protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
100   
101   private static final int ROW_LENGTH = 1000;
102   private static final int ONE_GB = 1024 * 1024 * 1000;
103   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
104   
105   public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable");
106   public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
107   public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
108 
109   protected static final HTableDescriptor TABLE_DESCRIPTOR;
110   static {
111     TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
112     TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
113   }
114 
115   protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
116   protected static Cluster cluster = new Cluster();
117   protected static String accessToken = null;
118 
119   volatile Configuration conf;
120   private boolean nomapred = false;
121   private int N = 1;
122   private int R = ROWS_PER_GB;
123   private int B = 100;
124 
125   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
126   /**
127    * Regex to parse lines in input file passed to mapreduce task.
128    */
129   public static final Pattern LINE_PATTERN =
130     Pattern.compile("startRow=(\\d+),\\s+" +
131         "perClientRunRows=(\\d+),\\s+" +
132         "totalRows=(\\d+),\\s+" + 
133         "clients=(\\d+),\\s+" + 
134         "rowsPerPut=(\\d+)");
135 
136   /**
137    * Enum for map metrics.  Keep it out here rather than inside in the Map
138    * inner-class so we can find associated properties.
139    */
140   protected static enum Counter {
141     /** elapsed time */
142     ELAPSED_TIME,
143     /** number of rows */
144     ROWS}
145 
146   /**
147    * Constructor
148    * @param c Configuration object
149    */
150   public PerformanceEvaluation(final Configuration c) {
151     this.conf = c;
152 
153     addCommandDescriptor(RandomReadTest.class, "randomRead",
154         "Run random read test");
155     addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
156         "Run random seek and scan 100 test");
157     addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
158         "Run random seek scan with both start and stop row (max 10 rows)");
159     addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
160         "Run random seek scan with both start and stop row (max 100 rows)");
161     addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
162         "Run random seek scan with both start and stop row (max 1000 rows)");
163     addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
164         "Run random seek scan with both start and stop row (max 10000 rows)");
165     addCommandDescriptor(RandomWriteTest.class, "randomWrite",
166         "Run random write test");
167     addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
168         "Run sequential read test");
169     addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
170         "Run sequential write test");
171     addCommandDescriptor(ScanTest.class, "scan",
172         "Run scan test (read every row)");
173     addCommandDescriptor(FilteredScanTest.class, "filterScan",
174         "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
175   }
176 
177   protected void addCommandDescriptor(Class<? extends Test> cmdClass, 
178       String name, String description) {
179     CmdDescriptor cmdDescriptor = 
180       new CmdDescriptor(cmdClass, name, description);
181     commands.put(name, cmdDescriptor);
182   }
183   
184   /**
185    * Implementations can have their status set.
186    */
187   static interface Status {
188     /**
189      * Sets status
190      * @param msg status message
191      * @throws IOException
192      */
193     void setStatus(final String msg) throws IOException;
194   }
195   
196   /**
197    *  This class works as the InputSplit of Performance Evaluation
198    *  MapReduce InputFormat, and the Record Value of RecordReader. 
199    *  Each map task will only read one record from a PeInputSplit, 
200    *  the record value is the PeInputSplit itself.
201    */
202   public static class PeInputSplit extends InputSplit implements Writable {
203     private int startRow = 0;
204     private int rows = 0;
205     private int totalRows = 0;
206     private int clients = 0;
207     private int rowsPerPut = 1;
208 
209     public PeInputSplit() {
210       this.startRow = 0;
211       this.rows = 0;
212       this.totalRows = 0;
213       this.clients = 0;
214       this.rowsPerPut = 1;
215     }
216 
217     public PeInputSplit(int startRow, int rows, int totalRows, int clients,
218         int rowsPerPut) {
219       this.startRow = startRow;
220       this.rows = rows;
221       this.totalRows = totalRows;
222       this.clients = clients;
223       this.rowsPerPut = 1;
224     }
225     
226     @Override
227     public void readFields(DataInput in) throws IOException {
228       this.startRow = in.readInt();
229       this.rows = in.readInt();
230       this.totalRows = in.readInt();
231       this.clients = in.readInt();
232       this.rowsPerPut = in.readInt();
233     }
234 
235     @Override
236     public void write(DataOutput out) throws IOException {
237       out.writeInt(startRow);
238       out.writeInt(rows);
239       out.writeInt(totalRows);
240       out.writeInt(clients);
241       out.writeInt(rowsPerPut);
242     }
243     
244     @Override
245     public long getLength() throws IOException, InterruptedException {
246       return 0;
247     }
248   
249     @Override
250     public String[] getLocations() throws IOException, InterruptedException {
251       return new String[0];
252     }
253     
254     public int getStartRow() {
255       return startRow;
256     }
257     
258     public int getRows() {
259       return rows;
260     }
261     
262     public int getTotalRows() {
263       return totalRows;
264     }
265     
266     public int getClients() {
267       return clients;
268     }
269 
270     public int getRowsPerPut() {
271       return rowsPerPut;
272     }
273   }
274 
275   /**
276    *  InputFormat of Performance Evaluation MapReduce job.
277    *  It extends from FileInputFormat, want to use it's methods such as setInputPaths(). 
278    */
279   public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
280 
281     @Override
282     public List<InputSplit> getSplits(JobContext job) throws IOException {
283       // generate splits
284       List<InputSplit> splitList = new ArrayList<InputSplit>();
285       
286       for (FileStatus file: listStatus(job)) {
287         Path path = file.getPath();
288         FileSystem fs = path.getFileSystem(job.getConfiguration());
289         FSDataInputStream fileIn = fs.open(path);
290         LineReader in = new LineReader(fileIn, job.getConfiguration());
291         int lineLen = 0;
292         while(true) {
293           Text lineText = new Text();
294           lineLen = in.readLine(lineText);
295           if(lineLen <= 0) {
296           break;
297           }
298           Matcher m = LINE_PATTERN.matcher(lineText.toString());
299           if((m != null) && m.matches()) {
300             int startRow = Integer.parseInt(m.group(1));
301             int rows = Integer.parseInt(m.group(2));
302             int totalRows = Integer.parseInt(m.group(3));
303             int clients = Integer.parseInt(m.group(4));
304             int rowsPerPut = Integer.parseInt(m.group(5));
305 
306             LOG.debug("split["+ splitList.size() + "] " + 
307                      " startRow=" + startRow +
308                      " rows=" + rows +
309                      " totalRows=" + totalRows +
310                      " clients=" + clients +
311                      " rowsPerPut=" + rowsPerPut);
312 
313             PeInputSplit newSplit =
314               new PeInputSplit(startRow, rows, totalRows, clients, rowsPerPut);
315             splitList.add(newSplit);
316           }
317         }
318         in.close();
319       }
320       
321       LOG.info("Total # of splits: " + splitList.size());
322       return splitList;
323     }
324     
325     @Override
326     public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
327                             TaskAttemptContext context) {
328       return new PeRecordReader();
329     }
330     
331     public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
332       private boolean readOver = false;
333       private PeInputSplit split = null;
334       private NullWritable key = null;
335       private PeInputSplit value = null;
336       
337       @Override
338       public void initialize(InputSplit split, TaskAttemptContext context) 
339                   throws IOException, InterruptedException {
340         this.readOver = false;
341         this.split = (PeInputSplit)split;
342       }
343       
344       @Override
345       public boolean nextKeyValue() throws IOException, InterruptedException {
346         if(readOver) {
347           return false;
348         }
349         
350         key = NullWritable.get();
351         value = (PeInputSplit)split;
352         
353         readOver = true;
354         return true;
355       }
356       
357       @Override
358       public NullWritable getCurrentKey() throws IOException, InterruptedException {
359         return key;
360       }
361       
362       @Override
363       public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
364         return value;
365       }
366       
367       @Override
368       public float getProgress() throws IOException, InterruptedException {
369         if(readOver) {
370           return 1.0f;
371         } else {
372           return 0.0f;
373         }
374       }
375       
376       @Override
377       public void close() throws IOException {
378         // do nothing
379       }
380     }
381   }
382   
383   /**
384    * MapReduce job that runs a performance evaluation client in each map task.
385    */
386   public static class EvaluationMapTask 
387       extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
388 
389     /** configuration parameter name that contains the command */
390     public final static String CMD_KEY = "EvaluationMapTask.command";
391     /** configuration parameter name that contains the PE impl */
392     public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
393 
394     private Class<? extends Test> cmd;
395     private PerformanceEvaluation pe;
396 
397     @Override
398     protected void setup(Context context) throws IOException, InterruptedException {
399       this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
400 
401       // this is required so that extensions of PE are instantiated within the
402       // map reduce task...
403       Class<? extends PerformanceEvaluation> peClass =
404           forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
405       try {
406         this.pe = peClass.getConstructor(Configuration.class)
407             .newInstance(context.getConfiguration());
408       } catch (Exception e) {
409         throw new IllegalStateException("Could not instantiate PE instance", e);
410       }
411     }
412 
413     private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
414       Class<? extends Type> clazz = null;
415       try {
416         clazz = Class.forName(className).asSubclass(type);
417       } catch (ClassNotFoundException e) {
418         throw new IllegalStateException("Could not find class for name: " + className, e);
419       }
420       return clazz;
421     }
422 
423     protected void map(NullWritable key, PeInputSplit value, final Context context) 
424            throws IOException, InterruptedException {
425       
426       Status status = new Status() {
427         public void setStatus(String msg) {
428            context.setStatus(msg); 
429         }
430       };
431       
432       // Evaluation task
433       long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
434         value.getRows(), value.getTotalRows(), value.getRowsPerPut(), status);
435       // Collect how much time the thing took. Report as map output and
436       // to the ELAPSED_TIME counter.
437       context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
438       context.getCounter(Counter.ROWS).increment(value.rows);
439       context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
440       context.progress();
441     }
442   }
443   
444   /*
445    * If table does not already exist, create.
446    * @param c Client to use checking.
447    * @return True if we created the table.
448    * @throws IOException
449    */
450   private boolean checkTable() throws IOException {
451     HTableDescriptor tableDescriptor = getTableDescriptor();
452     RemoteAdmin admin =
453       new RemoteAdmin(new Client(cluster), conf, accessToken);
454     if (!admin.isTableAvailable(tableDescriptor.getName())) {
455       admin.createTable(tableDescriptor);
456       return true;
457     }
458     return false;
459   }
460 
461   protected HTableDescriptor getTableDescriptor() {
462     return TABLE_DESCRIPTOR;
463   }
464 
465   /*
466    * We're to run multiple clients concurrently.  Setup a mapreduce job.  Run
467    * one map per client.  Then run a single reduce to sum the elapsed times.
468    * @param cmd Command to run.
469    * @throws IOException
470    */
471   private void runNIsMoreThanOne(final Class<? extends Test> cmd)
472   throws IOException, InterruptedException, ClassNotFoundException {
473     checkTable();
474     if (nomapred) {
475       doMultipleClients(cmd);
476     } else {
477       doMapReduce(cmd);
478     }
479   }
480   
481   /*
482    * Run all clients in this vm each to its own thread.
483    * @param cmd Command to run.
484    * @throws IOException
485    */
486   private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
487     final List<Thread> threads = new ArrayList<Thread>(N);
488     final int perClientRows = R/N;
489     for (int i = 0; i < N; i++) {
490       Thread t = new Thread (Integer.toString(i)) {
491         @Override
492         public void run() {
493           super.run();
494           PerformanceEvaluation pe = new PerformanceEvaluation(conf);
495           int index = Integer.parseInt(getName());
496           try {
497             long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
498               perClientRows, R, B, new Status() {
499                   public void setStatus(final String msg) throws IOException {
500                     LOG.info("client-" + getName() + " " + msg);
501                   }
502                 });
503             LOG.info("Finished " + getName() + " in " + elapsedTime +
504               "ms writing " + perClientRows + " rows");
505           } catch (IOException e) {
506             throw new RuntimeException(e);
507           }
508         }
509       };
510       threads.add(t);
511     }
512     for (Thread t: threads) {
513       t.start();
514     }
515     for (Thread t: threads) {
516       while(t.isAlive()) {
517         try {
518           t.join();
519         } catch (InterruptedException e) {
520           LOG.debug("Interrupted, continuing" + e.toString());
521         }
522       }
523     }
524   }
525   
526   /*
527    * Run a mapreduce job.  Run as many maps as asked-for clients.
528    * Before we start up the job, write out an input file with instruction
529    * per client regards which row they are to start on.
530    * @param cmd Command to run.
531    * @throws IOException
532    */
533   private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
534         InterruptedException, ClassNotFoundException {
535     Path inputDir = writeInputFile(this.conf);
536     this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
537     this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
538     Job job = new Job(this.conf);
539     job.setJarByClass(PerformanceEvaluation.class);
540     job.setJobName("HBase Performance Evaluation");
541     
542     job.setInputFormatClass(PeInputFormat.class);
543     PeInputFormat.setInputPaths(job, inputDir);
544     
545     job.setOutputKeyClass(LongWritable.class);
546     job.setOutputValueClass(LongWritable.class);
547     
548     job.setMapperClass(EvaluationMapTask.class);
549     job.setReducerClass(LongSumReducer.class);
550         
551     job.setNumReduceTasks(1);
552     
553     job.setOutputFormatClass(TextOutputFormat.class);
554     TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
555     
556     job.waitForCompletion(true);
557   }
558   
559   /*
560    * Write input file of offsets-per-client for the mapreduce job.
561    * @param c Configuration
562    * @return Directory that contains file written.
563    * @throws IOException
564    */
565   private Path writeInputFile(final Configuration c) throws IOException {
566     FileSystem fs = FileSystem.get(c);
567     if (!fs.exists(PERF_EVAL_DIR)) {
568       fs.mkdirs(PERF_EVAL_DIR);
569     }
570     SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
571     Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
572     fs.mkdirs(subdir);
573     Path inputFile = new Path(subdir, "input.txt");
574     PrintStream out = new PrintStream(fs.create(inputFile));
575     // Make input random.
576     Map<Integer, String> m = new TreeMap<Integer, String>();
577     Hash h = MurmurHash.getInstance();
578     int perClientRows = (R / N);
579     try {
580       for (int i = 0; i < 10; i++) {
581         for (int j = 0; j < N; j++) {
582           String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
583           ", perClientRunRows=" + (perClientRows / 10) +
584           ", totalRows=" + R +
585           ", clients=" + N +
586           ", rowsPerPut=" + B;
587           int hash = h.hash(Bytes.toBytes(s));
588           m.put(hash, s);
589         }
590       }
591       for (Map.Entry<Integer, String> e: m.entrySet()) {
592         out.println(e.getValue());
593       }
594     } finally {
595       out.close();
596     }
597     return subdir;
598   }
599 
600   /**
601    * Describes a command.
602    */
603   static class CmdDescriptor {
604     private Class<? extends Test> cmdClass;
605     private String name;
606     private String description;
607 
608     CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
609       this.cmdClass = cmdClass;
610       this.name = name;
611       this.description = description;
612     }
613 
614     public Class<? extends Test> getCmdClass() {
615       return cmdClass;
616     }
617 
618     public String getName() {
619       return name;
620     }
621 
622     public String getDescription() {
623       return description;
624     }
625   }
626 
627   /**
628    * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation.Test
629    * tests}.  This makes the reflection logic a little easier to understand...
630    */
631   static class TestOptions {
632     private int startRow;
633     private int perClientRunRows;
634     private int totalRows;
635     private byte[] tableName;
636     private int rowsPerPut;
637 
638     TestOptions() {
639     }
640 
641     TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, int rowsPerPut) {
642       this.startRow = startRow;
643       this.perClientRunRows = perClientRunRows;
644       this.totalRows = totalRows;
645       this.tableName = tableName;
646       this.rowsPerPut = rowsPerPut;
647     }
648 
649     public int getStartRow() {
650       return startRow;
651     }
652 
653     public int getPerClientRunRows() {
654       return perClientRunRows;
655     }
656 
657     public int getTotalRows() {
658       return totalRows;
659     }
660 
661     public byte[] getTableName() {
662       return tableName;
663     }
664 
665     public int getRowsPerPut() {
666       return rowsPerPut;
667     }
668   }
669 
670   /*
671    * A test.
672    * Subclass to particularize what happens per row.
673    */
674   static abstract class Test {
675     // Below is make it so when Tests are all running in the one
676     // jvm, that they each have a differently seeded Random. 
677     private static final Random randomSeed =
678       new Random(System.currentTimeMillis());
679     private static long nextRandomSeed() {
680       return randomSeed.nextLong();
681     }
682     protected final Random rand = new Random(nextRandomSeed());
683 
684     protected final int startRow;
685     protected final int perClientRunRows;
686     protected final int totalRows;
687     protected final Status status;
688     protected byte[] tableName;
689     protected RemoteHTable table;
690     protected volatile Configuration conf;
691 
692     /**
693      * Note that all subclasses of this class must provide a public contructor
694      * that has the exact same list of arguments.
695      */
696     Test(final Configuration conf, final TestOptions options, final Status status) {
697       super();
698       this.startRow = options.getStartRow();
699       this.perClientRunRows = options.getPerClientRunRows();
700       this.totalRows = options.getTotalRows();
701       this.status = status;
702       this.tableName = options.getTableName();
703       this.table = null;
704       this.conf = conf;
705     }
706     
707     protected String generateStatus(final int sr, final int i, final int lr) {
708       return sr + "/" + i + "/" + lr;
709     }
710     
711     protected int getReportingPeriod() {
712       int period = this.perClientRunRows / 10;
713       return period == 0? this.perClientRunRows: period;
714     }
715     
716     void testSetup() throws IOException {
717       this.table = new RemoteHTable(new Client(cluster), conf, tableName,
718         accessToken);
719     }
720 
721     void testTakedown()  throws IOException {
722       this.table.close();
723     }
724     
725     /*
726      * Run test
727      * @return Elapsed time.
728      * @throws IOException
729      */
730     long test() throws IOException {
731       long elapsedTime;
732       testSetup();
733       long startTime = System.currentTimeMillis();
734       try {
735         testTimed();
736         elapsedTime = System.currentTimeMillis() - startTime;
737       } finally {
738         testTakedown();
739       }
740       return elapsedTime;
741     }
742 
743     /**
744      * Provides an extension point for tests that don't want a per row invocation.
745      */
746     void testTimed() throws IOException {
747       int lastRow = this.startRow + this.perClientRunRows;
748       // Report on completion of 1/10th of total.
749       for (int i = this.startRow; i < lastRow; i++) {
750         testRow(i);
751         if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
752           status.setStatus(generateStatus(this.startRow, i, lastRow));
753         }
754       }
755     }
756 
757     /*
758     * Test for individual row.
759     * @param i Row index.
760     */
761     void testRow(final int i) throws IOException {
762     }
763   }
764 
765   @SuppressWarnings("unused")
766   static class RandomSeekScanTest extends Test {
767     RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
768       super(conf, options, status);
769     }
770 
771     @Override
772     void testRow(final int i) throws IOException {
773       Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
774       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
775       scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
776       ResultScanner s = this.table.getScanner(scan);
777       //int count = 0;
778       for (Result rr = null; (rr = s.next()) != null;) {
779         // LOG.info("" + count++ + " " + rr.toString());
780       }
781       s.close();
782     }
783  
784     @Override
785     protected int getReportingPeriod() {
786       int period = this.perClientRunRows / 100;
787       return period == 0? this.perClientRunRows: period;
788     }
789 
790   }
791 
792   @SuppressWarnings("unused")
793   static abstract class RandomScanWithRangeTest extends Test {
794     RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
795       super(conf, options, status);
796     }
797 
798     @Override
799     void testRow(final int i) throws IOException {
800       Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
801       Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
802       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
803       ResultScanner s = this.table.getScanner(scan);
804       int count = 0;
805       for (Result rr = null; (rr = s.next()) != null;) {
806         count++;
807       }
808 
809       if (i % 100 == 0) {
810         LOG.info(String.format("Scan for key range %s - %s returned %s rows",
811             Bytes.toString(startAndStopRow.getFirst()),
812             Bytes.toString(startAndStopRow.getSecond()), count));
813       }
814 
815       s.close();
816     }
817 
818     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
819 
820     protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
821       int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
822       int stop = start + maxRange;
823       return new Pair<byte[],byte[]>(format(start), format(stop));
824     }
825 
826     @Override
827     protected int getReportingPeriod() {
828       int period = this.perClientRunRows / 100;
829       return period == 0? this.perClientRunRows: period;
830     }
831   }
832 
833   static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
834     RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
835       super(conf, options, status);
836     }
837 
838     @Override
839     protected Pair<byte[], byte[]> getStartAndStopRow() {
840       return generateStartAndStopRows(10);
841     }
842   }
843 
844   static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
845     RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
846       super(conf, options, status);
847     }
848 
849     @Override
850     protected Pair<byte[], byte[]> getStartAndStopRow() {
851       return generateStartAndStopRows(100);
852     }
853   }
854 
855   static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
856     RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
857       super(conf, options, status);
858     }
859 
860     @Override
861     protected Pair<byte[], byte[]> getStartAndStopRow() {
862       return generateStartAndStopRows(1000);
863     }
864   }
865 
866   static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
867     RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
868       super(conf, options, status);
869     }
870 
871     @Override
872     protected Pair<byte[], byte[]> getStartAndStopRow() {
873       return generateStartAndStopRows(10000);
874     }
875   }
876 
877   static class RandomReadTest extends Test {
878     RandomReadTest(Configuration conf, TestOptions options, Status status) {
879       super(conf, options, status);
880     }
881 
882     @Override
883     void testRow(final int i) throws IOException {
884       Get get = new Get(getRandomRow(this.rand, this.totalRows));
885       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
886       this.table.get(get);
887     }
888 
889     @Override
890     protected int getReportingPeriod() {
891       int period = this.perClientRunRows / 100;
892       return period == 0? this.perClientRunRows: period;
893     }
894 
895   }
896   
897   static class RandomWriteTest extends Test {
898     int rowsPerPut;
899 
900     RandomWriteTest(Configuration conf, TestOptions options, Status status) {
901       super(conf, options, status);
902       rowsPerPut = options.getRowsPerPut();
903     }
904     
905     @Override
906     void testTimed() throws IOException {
907       int lastRow = this.startRow + this.perClientRunRows;
908       // Report on completion of 1/10th of total.
909       List<Put> puts = new ArrayList<Put>();
910       for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
911         for (int j = 0; j < rowsPerPut; j++) {
912           byte [] row = getRandomRow(this.rand, this.totalRows);
913           Put put = new Put(row);
914           byte[] value = generateValue(this.rand);
915           put.add(FAMILY_NAME, QUALIFIER_NAME, value);
916           puts.add(put);
917           if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
918             status.setStatus(generateStatus(this.startRow, i, lastRow));
919           }
920         }
921         table.put(puts);
922       }
923     }
924   }
925   
926   static class ScanTest extends Test {
927     private ResultScanner testScanner;
928 
929     ScanTest(Configuration conf, TestOptions options, Status status) {
930       super(conf, options, status);
931     }
932     
933     @Override
934     void testSetup() throws IOException {
935       super.testSetup();
936     }
937     
938     @Override
939     void testTakedown() throws IOException {
940       if (this.testScanner != null) {
941         this.testScanner.close();
942       }
943       super.testTakedown();
944     }
945     
946     
947     @Override
948     void testRow(final int i) throws IOException {
949       if (this.testScanner == null) {
950         Scan scan = new Scan(format(this.startRow));
951         scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
952         this.testScanner = table.getScanner(scan);
953       }
954       testScanner.next();
955     }
956 
957   }
958   
959   static class SequentialReadTest extends Test {
960     SequentialReadTest(Configuration conf, TestOptions options, Status status) {
961       super(conf, options, status);
962     }
963     
964     @Override
965     void testRow(final int i) throws IOException {
966       Get get = new Get(format(i));
967       get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
968       table.get(get);
969     }
970 
971   }
972   
973   static class SequentialWriteTest extends Test {
974     int rowsPerPut;
975 
976     SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
977       super(conf, options, status);
978       rowsPerPut = options.getRowsPerPut();
979     }
980 
981     @Override
982     void testTimed() throws IOException {
983       int lastRow = this.startRow + this.perClientRunRows;
984       // Report on completion of 1/10th of total.
985       List<Put> puts = new ArrayList<Put>();
986       for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
987         for (int j = 0; j < rowsPerPut; j++) {
988           Put put = new Put(format(i + j));
989           byte[] value = generateValue(this.rand);
990           put.add(FAMILY_NAME, QUALIFIER_NAME, value);
991           puts.add(put);
992           if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
993             status.setStatus(generateStatus(this.startRow, i, lastRow));
994           }
995         }
996         table.put(puts);
997       }
998     }
999   }
1000 
1001   static class FilteredScanTest extends Test {
1002     protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1003 
1004     FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1005       super(conf, options, status);
1006     }
1007 
1008     @Override
1009     void testRow(int i) throws IOException {
1010       byte[] value = generateValue(this.rand);
1011       Scan scan = constructScan(value);
1012       ResultScanner scanner = null;
1013       try {
1014         scanner = this.table.getScanner(scan);
1015         while (scanner.next() != null) {
1016         }
1017       } finally {
1018         if (scanner != null) scanner.close();
1019       }
1020     }
1021 
1022     protected Scan constructScan(byte[] valuePrefix) throws IOException {
1023       Filter filter = new SingleColumnValueFilter(
1024           FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
1025           new BinaryComparator(valuePrefix)
1026       );
1027       Scan scan = new Scan();
1028       scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1029       scan.setFilter(filter);
1030       return scan;
1031     }
1032   }
1033   
1034   /*
1035    * Format passed integer.
1036    * @param number
1037    * @return Returns zero-prefixed 10-byte wide decimal version of passed
1038    * number (Does absolute in case number is negative).
1039    */
1040   public static byte [] format(final int number) {
1041     byte [] b = new byte[10];
1042     int d = Math.abs(number);
1043     for (int i = b.length - 1; i >= 0; i--) {
1044       b[i] = (byte)((d % 10) + '0');
1045       d /= 10;
1046     }
1047     return b;
1048   }
1049   
1050   /*
1051    * This method takes some time and is done inline uploading data.  For
1052    * example, doing the mapfile test, generation of the key and value
1053    * consumes about 30% of CPU time.
1054    * @return Generated random value to insert into a table cell.
1055    */
1056   public static byte[] generateValue(final Random r) {
1057     byte [] b = new byte [ROW_LENGTH];
1058     r.nextBytes(b);
1059     return b;
1060   }
1061   
1062   static byte [] getRandomRow(final Random random, final int totalRows) {
1063     return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1064   }
1065   
1066   long runOneClient(final Class<? extends Test> cmd, final int startRow,
1067                     final int perClientRunRows, final int totalRows, 
1068                     final int rowsPerPut, final Status status)
1069   throws IOException {
1070     status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
1071       perClientRunRows + " rows");
1072     long totalElapsedTime = 0;
1073 
1074     Test t = null;
1075     TestOptions options = new TestOptions(startRow, perClientRunRows,
1076       totalRows, getTableDescriptor().getName(), rowsPerPut);
1077     try {
1078       Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
1079           Configuration.class, TestOptions.class, Status.class);
1080       t = constructor.newInstance(this.conf, options, status);
1081     } catch (NoSuchMethodException e) {
1082       throw new IllegalArgumentException("Invalid command class: " +
1083           cmd.getName() + ".  It does not provide a constructor as described by" +
1084           "the javadoc comment.  Available constructors are: " +
1085           Arrays.toString(cmd.getConstructors()));
1086     } catch (Exception e) {
1087       throw new IllegalStateException("Failed to construct command class", e);
1088     }
1089     totalElapsedTime = t.test();
1090 
1091     status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1092       "ms at offset " + startRow + " for " + perClientRunRows + " rows");
1093     return totalElapsedTime;
1094   }
1095   
1096   private void runNIsOne(final Class<? extends Test> cmd) {
1097     Status status = new Status() {
1098       public void setStatus(String msg) throws IOException {
1099         LOG.info(msg);
1100       }
1101     };
1102 
1103     try {
1104       checkTable();
1105       runOneClient(cmd, 0, R, R, B, status);
1106     } catch (Exception e) {
1107       LOG.error("Failed", e);
1108     } 
1109   }
1110 
1111   private void runTest(final Class<? extends Test> cmd) throws IOException,
1112           InterruptedException, ClassNotFoundException {
1113     if (N == 1) {
1114       // If there is only one client and one HRegionServer, we assume nothing
1115       // has been set up at all.
1116       runNIsOne(cmd);
1117     } else {
1118       // Else, run 
1119       runNIsMoreThanOne(cmd);
1120     }
1121   }
1122 
1123   protected void printUsage() {
1124     printUsage(null);
1125   }
1126   
1127   protected void printUsage(final String message) {
1128     if (message != null && message.length() > 0) {
1129       System.err.println(message);
1130     }
1131     System.err.println("Usage: java " + this.getClass().getName() + " \\");
1132     System.err.println("  [--option] [--option=value] <command> <nclients>");
1133     System.err.println();
1134     System.err.println("Options:");
1135     System.err.println(" host          String. Specify Stargate endpoint.");
1136     System.err.println(" token         String. API access token.");
1137     System.err.println(" rows          Integer. Rows each client runs. Default: One million");
1138     System.err.println(" rowsPerPut    Integer. Rows each Stargate (multi)Put. Default: 100");
1139     System.err.println(" nomapred      (Flag) Run multiple clients using threads " +
1140       "(rather than use mapreduce)");
1141     System.err.println();
1142     System.err.println("Command:");
1143     for (CmdDescriptor command : commands.values()) {
1144       System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1145     }
1146     System.err.println();
1147     System.err.println("Args:");
1148     System.err.println(" nclients      Integer. Required. Total number of " +
1149       "clients (and HRegionServers)");
1150     System.err.println("               running: 1 <= value <= 500");
1151     System.err.println("Examples:");
1152     System.err.println(" To run a single evaluation client:");
1153     System.err.println(" $ bin/hbase " + this.getClass().getName()
1154         + " sequentialWrite 1");
1155   }
1156 
1157   private void getArgs(final int start, final String[] args) {
1158     if(start + 1 > args.length) {
1159       throw new IllegalArgumentException("must supply the number of clients");
1160     }
1161     N = Integer.parseInt(args[start]);
1162     if (N < 1) {
1163       throw new IllegalArgumentException("Number of clients must be > 1");
1164     }
1165     // Set total number of rows to write.
1166     R = R * N;
1167   }
1168   
1169   public int doCommandLine(final String[] args) {
1170     // Process command-line args. TODO: Better cmd-line processing
1171     // (but hopefully something not as painful as cli options).    
1172     int errCode = -1;
1173     if (args.length < 1) {
1174       printUsage();
1175       return errCode;
1176     }
1177 
1178     try {
1179       for (int i = 0; i < args.length; i++) {
1180         String cmd = args[i];
1181         if (cmd.equals("-h")) {
1182           printUsage();
1183           errCode = 0;
1184           break;
1185         }
1186        
1187         final String nmr = "--nomapred";
1188         if (cmd.startsWith(nmr)) {
1189           nomapred = true;
1190           continue;
1191         }
1192         
1193         final String rows = "--rows=";
1194         if (cmd.startsWith(rows)) {
1195           R = Integer.parseInt(cmd.substring(rows.length()));
1196           continue;
1197         }
1198 
1199         final String rowsPerPut = "--rowsPerPut=";
1200         if (cmd.startsWith(rowsPerPut)) {
1201           this.B = Integer.parseInt(cmd.substring(rowsPerPut.length()));
1202           continue;
1203         }
1204 
1205         final String host = "--host=";
1206         if (cmd.startsWith(host)) {
1207           cluster.add(cmd.substring(host.length()));
1208           continue;
1209         }
1210 
1211         final String token = "--token=";
1212         if (cmd.startsWith(token)) {
1213           accessToken = cmd.substring(token.length());
1214           continue;
1215         }
1216 
1217         Class<? extends Test> cmdClass = determineCommandClass(cmd);
1218         if (cmdClass != null) {
1219           getArgs(i + 1, args);
1220           if (cluster.isEmpty()) {
1221             String s = conf.get("stargate.hostname", "localhost");
1222             if (s.contains(":")) {
1223               cluster.add(s);
1224             } else {
1225               cluster.add(s, conf.getInt("stargate.port", 8080));
1226             }
1227           }
1228           runTest(cmdClass);
1229           errCode = 0;
1230           break;
1231         }
1232     
1233         printUsage();
1234         break;
1235       }
1236     } catch (Exception e) {
1237       e.printStackTrace();
1238     }
1239     
1240     return errCode;
1241   }
1242 
1243   private Class<? extends Test> determineCommandClass(String cmd) {
1244     CmdDescriptor descriptor = commands.get(cmd);
1245     return descriptor != null ? descriptor.getCmdClass() : null;
1246   }
1247 
1248   /**
1249    * @param args
1250    */
1251   public static void main(final String[] args) {
1252     Configuration c = HBaseConfiguration.create();
1253     System.exit(new PerformanceEvaluation(c).doCommandLine(args));
1254   }
1255 }