1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.rest;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.text.SimpleDateFormat;
27 import java.util.ArrayList;
28 import java.util.Date;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Random;
32 import java.util.TreeMap;
33 import java.util.Arrays;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
36 import java.lang.reflect.Constructor;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FSDataInputStream;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HColumnDescriptor;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.client.Get;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Result;
51 import org.apache.hadoop.hbase.client.ResultScanner;
52 import org.apache.hadoop.hbase.client.Scan;
53 import org.apache.hadoop.hbase.filter.PageFilter;
54 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
55 import org.apache.hadoop.hbase.filter.Filter;
56 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57 import org.apache.hadoop.hbase.filter.CompareFilter;
58 import org.apache.hadoop.hbase.filter.BinaryComparator;
59 import org.apache.hadoop.hbase.rest.client.Client;
60 import org.apache.hadoop.hbase.rest.client.Cluster;
61 import org.apache.hadoop.hbase.rest.client.RemoteAdmin;
62 import org.apache.hadoop.hbase.rest.client.RemoteHTable;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.Hash;
65 import org.apache.hadoop.hbase.util.MurmurHash;
66 import org.apache.hadoop.hbase.util.Pair;
67 import org.apache.hadoop.io.LongWritable;
68 import org.apache.hadoop.io.NullWritable;
69 import org.apache.hadoop.io.Text;
70 import org.apache.hadoop.io.Writable;
71 import org.apache.hadoop.mapreduce.InputSplit;
72 import org.apache.hadoop.mapreduce.Job;
73 import org.apache.hadoop.mapreduce.JobContext;
74 import org.apache.hadoop.mapreduce.Mapper;
75 import org.apache.hadoop.mapreduce.RecordReader;
76 import org.apache.hadoop.mapreduce.TaskAttemptContext;
77 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
78 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
79 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
80 import org.apache.hadoop.util.LineReader;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public class PerformanceEvaluation {
99 protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
100
101 private static final int ROW_LENGTH = 1000;
102 private static final int ONE_GB = 1024 * 1024 * 1000;
103 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
104
105 public static final byte [] TABLE_NAME = Bytes.toBytes("TestTable");
106 public static final byte [] FAMILY_NAME = Bytes.toBytes("info");
107 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
108
109 protected static final HTableDescriptor TABLE_DESCRIPTOR;
110 static {
111 TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
112 TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
113 }
114
115 protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
116 protected static Cluster cluster = new Cluster();
117 protected static String accessToken = null;
118
119 volatile Configuration conf;
120 private boolean nomapred = false;
121 private int N = 1;
122 private int R = ROWS_PER_GB;
123 private int B = 100;
124
125 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
126
127
128
129 public static final Pattern LINE_PATTERN =
130 Pattern.compile("startRow=(\\d+),\\s+" +
131 "perClientRunRows=(\\d+),\\s+" +
132 "totalRows=(\\d+),\\s+" +
133 "clients=(\\d+),\\s+" +
134 "rowsPerPut=(\\d+)");
135
136
137
138
139
140 protected static enum Counter {
141
142 ELAPSED_TIME,
143
144 ROWS}
145
146
147
148
149
150 public PerformanceEvaluation(final Configuration c) {
151 this.conf = c;
152
153 addCommandDescriptor(RandomReadTest.class, "randomRead",
154 "Run random read test");
155 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
156 "Run random seek and scan 100 test");
157 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
158 "Run random seek scan with both start and stop row (max 10 rows)");
159 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
160 "Run random seek scan with both start and stop row (max 100 rows)");
161 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
162 "Run random seek scan with both start and stop row (max 1000 rows)");
163 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
164 "Run random seek scan with both start and stop row (max 10000 rows)");
165 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
166 "Run random write test");
167 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
168 "Run sequential read test");
169 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
170 "Run sequential write test");
171 addCommandDescriptor(ScanTest.class, "scan",
172 "Run scan test (read every row)");
173 addCommandDescriptor(FilteredScanTest.class, "filterScan",
174 "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
175 }
176
177 protected void addCommandDescriptor(Class<? extends Test> cmdClass,
178 String name, String description) {
179 CmdDescriptor cmdDescriptor =
180 new CmdDescriptor(cmdClass, name, description);
181 commands.put(name, cmdDescriptor);
182 }
183
184
185
186
187 static interface Status {
188
189
190
191
192
193 void setStatus(final String msg) throws IOException;
194 }
195
196
197
198
199
200
201
202 public static class PeInputSplit extends InputSplit implements Writable {
203 private int startRow = 0;
204 private int rows = 0;
205 private int totalRows = 0;
206 private int clients = 0;
207 private int rowsPerPut = 1;
208
209 public PeInputSplit() {
210 this.startRow = 0;
211 this.rows = 0;
212 this.totalRows = 0;
213 this.clients = 0;
214 this.rowsPerPut = 1;
215 }
216
217 public PeInputSplit(int startRow, int rows, int totalRows, int clients,
218 int rowsPerPut) {
219 this.startRow = startRow;
220 this.rows = rows;
221 this.totalRows = totalRows;
222 this.clients = clients;
223 this.rowsPerPut = 1;
224 }
225
226 @Override
227 public void readFields(DataInput in) throws IOException {
228 this.startRow = in.readInt();
229 this.rows = in.readInt();
230 this.totalRows = in.readInt();
231 this.clients = in.readInt();
232 this.rowsPerPut = in.readInt();
233 }
234
235 @Override
236 public void write(DataOutput out) throws IOException {
237 out.writeInt(startRow);
238 out.writeInt(rows);
239 out.writeInt(totalRows);
240 out.writeInt(clients);
241 out.writeInt(rowsPerPut);
242 }
243
244 @Override
245 public long getLength() throws IOException, InterruptedException {
246 return 0;
247 }
248
249 @Override
250 public String[] getLocations() throws IOException, InterruptedException {
251 return new String[0];
252 }
253
254 public int getStartRow() {
255 return startRow;
256 }
257
258 public int getRows() {
259 return rows;
260 }
261
262 public int getTotalRows() {
263 return totalRows;
264 }
265
266 public int getClients() {
267 return clients;
268 }
269
270 public int getRowsPerPut() {
271 return rowsPerPut;
272 }
273 }
274
275
276
277
278
279 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
280
281 @Override
282 public List<InputSplit> getSplits(JobContext job) throws IOException {
283
284 List<InputSplit> splitList = new ArrayList<InputSplit>();
285
286 for (FileStatus file: listStatus(job)) {
287 Path path = file.getPath();
288 FileSystem fs = path.getFileSystem(job.getConfiguration());
289 FSDataInputStream fileIn = fs.open(path);
290 LineReader in = new LineReader(fileIn, job.getConfiguration());
291 int lineLen = 0;
292 while(true) {
293 Text lineText = new Text();
294 lineLen = in.readLine(lineText);
295 if(lineLen <= 0) {
296 break;
297 }
298 Matcher m = LINE_PATTERN.matcher(lineText.toString());
299 if((m != null) && m.matches()) {
300 int startRow = Integer.parseInt(m.group(1));
301 int rows = Integer.parseInt(m.group(2));
302 int totalRows = Integer.parseInt(m.group(3));
303 int clients = Integer.parseInt(m.group(4));
304 int rowsPerPut = Integer.parseInt(m.group(5));
305
306 LOG.debug("split["+ splitList.size() + "] " +
307 " startRow=" + startRow +
308 " rows=" + rows +
309 " totalRows=" + totalRows +
310 " clients=" + clients +
311 " rowsPerPut=" + rowsPerPut);
312
313 PeInputSplit newSplit =
314 new PeInputSplit(startRow, rows, totalRows, clients, rowsPerPut);
315 splitList.add(newSplit);
316 }
317 }
318 in.close();
319 }
320
321 LOG.info("Total # of splits: " + splitList.size());
322 return splitList;
323 }
324
325 @Override
326 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
327 TaskAttemptContext context) {
328 return new PeRecordReader();
329 }
330
331 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
332 private boolean readOver = false;
333 private PeInputSplit split = null;
334 private NullWritable key = null;
335 private PeInputSplit value = null;
336
337 @Override
338 public void initialize(InputSplit split, TaskAttemptContext context)
339 throws IOException, InterruptedException {
340 this.readOver = false;
341 this.split = (PeInputSplit)split;
342 }
343
344 @Override
345 public boolean nextKeyValue() throws IOException, InterruptedException {
346 if(readOver) {
347 return false;
348 }
349
350 key = NullWritable.get();
351 value = (PeInputSplit)split;
352
353 readOver = true;
354 return true;
355 }
356
357 @Override
358 public NullWritable getCurrentKey() throws IOException, InterruptedException {
359 return key;
360 }
361
362 @Override
363 public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
364 return value;
365 }
366
367 @Override
368 public float getProgress() throws IOException, InterruptedException {
369 if(readOver) {
370 return 1.0f;
371 } else {
372 return 0.0f;
373 }
374 }
375
376 @Override
377 public void close() throws IOException {
378
379 }
380 }
381 }
382
383
384
385
386 public static class EvaluationMapTask
387 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
388
389
390 public final static String CMD_KEY = "EvaluationMapTask.command";
391
392 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
393
394 private Class<? extends Test> cmd;
395 private PerformanceEvaluation pe;
396
397 @Override
398 protected void setup(Context context) throws IOException, InterruptedException {
399 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
400
401
402
403 Class<? extends PerformanceEvaluation> peClass =
404 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
405 try {
406 this.pe = peClass.getConstructor(Configuration.class)
407 .newInstance(context.getConfiguration());
408 } catch (Exception e) {
409 throw new IllegalStateException("Could not instantiate PE instance", e);
410 }
411 }
412
413 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
414 Class<? extends Type> clazz = null;
415 try {
416 clazz = Class.forName(className).asSubclass(type);
417 } catch (ClassNotFoundException e) {
418 throw new IllegalStateException("Could not find class for name: " + className, e);
419 }
420 return clazz;
421 }
422
423 protected void map(NullWritable key, PeInputSplit value, final Context context)
424 throws IOException, InterruptedException {
425
426 Status status = new Status() {
427 public void setStatus(String msg) {
428 context.setStatus(msg);
429 }
430 };
431
432
433 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
434 value.getRows(), value.getTotalRows(), value.getRowsPerPut(), status);
435
436
437 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
438 context.getCounter(Counter.ROWS).increment(value.rows);
439 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
440 context.progress();
441 }
442 }
443
444
445
446
447
448
449
450 private boolean checkTable() throws IOException {
451 HTableDescriptor tableDescriptor = getTableDescriptor();
452 RemoteAdmin admin =
453 new RemoteAdmin(new Client(cluster), conf, accessToken);
454 if (!admin.isTableAvailable(tableDescriptor.getName())) {
455 admin.createTable(tableDescriptor);
456 return true;
457 }
458 return false;
459 }
460
461 protected HTableDescriptor getTableDescriptor() {
462 return TABLE_DESCRIPTOR;
463 }
464
465
466
467
468
469
470
471 private void runNIsMoreThanOne(final Class<? extends Test> cmd)
472 throws IOException, InterruptedException, ClassNotFoundException {
473 checkTable();
474 if (nomapred) {
475 doMultipleClients(cmd);
476 } else {
477 doMapReduce(cmd);
478 }
479 }
480
481
482
483
484
485
486 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
487 final List<Thread> threads = new ArrayList<Thread>(N);
488 final int perClientRows = R/N;
489 for (int i = 0; i < N; i++) {
490 Thread t = new Thread (Integer.toString(i)) {
491 @Override
492 public void run() {
493 super.run();
494 PerformanceEvaluation pe = new PerformanceEvaluation(conf);
495 int index = Integer.parseInt(getName());
496 try {
497 long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
498 perClientRows, R, B, new Status() {
499 public void setStatus(final String msg) throws IOException {
500 LOG.info("client-" + getName() + " " + msg);
501 }
502 });
503 LOG.info("Finished " + getName() + " in " + elapsedTime +
504 "ms writing " + perClientRows + " rows");
505 } catch (IOException e) {
506 throw new RuntimeException(e);
507 }
508 }
509 };
510 threads.add(t);
511 }
512 for (Thread t: threads) {
513 t.start();
514 }
515 for (Thread t: threads) {
516 while(t.isAlive()) {
517 try {
518 t.join();
519 } catch (InterruptedException e) {
520 LOG.debug("Interrupted, continuing" + e.toString());
521 }
522 }
523 }
524 }
525
526
527
528
529
530
531
532
533 private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
534 InterruptedException, ClassNotFoundException {
535 Path inputDir = writeInputFile(this.conf);
536 this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
537 this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
538 Job job = new Job(this.conf);
539 job.setJarByClass(PerformanceEvaluation.class);
540 job.setJobName("HBase Performance Evaluation");
541
542 job.setInputFormatClass(PeInputFormat.class);
543 PeInputFormat.setInputPaths(job, inputDir);
544
545 job.setOutputKeyClass(LongWritable.class);
546 job.setOutputValueClass(LongWritable.class);
547
548 job.setMapperClass(EvaluationMapTask.class);
549 job.setReducerClass(LongSumReducer.class);
550
551 job.setNumReduceTasks(1);
552
553 job.setOutputFormatClass(TextOutputFormat.class);
554 TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
555
556 job.waitForCompletion(true);
557 }
558
559
560
561
562
563
564
565 private Path writeInputFile(final Configuration c) throws IOException {
566 FileSystem fs = FileSystem.get(c);
567 if (!fs.exists(PERF_EVAL_DIR)) {
568 fs.mkdirs(PERF_EVAL_DIR);
569 }
570 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
571 Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
572 fs.mkdirs(subdir);
573 Path inputFile = new Path(subdir, "input.txt");
574 PrintStream out = new PrintStream(fs.create(inputFile));
575
576 Map<Integer, String> m = new TreeMap<Integer, String>();
577 Hash h = MurmurHash.getInstance();
578 int perClientRows = (R / N);
579 try {
580 for (int i = 0; i < 10; i++) {
581 for (int j = 0; j < N; j++) {
582 String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
583 ", perClientRunRows=" + (perClientRows / 10) +
584 ", totalRows=" + R +
585 ", clients=" + N +
586 ", rowsPerPut=" + B;
587 int hash = h.hash(Bytes.toBytes(s));
588 m.put(hash, s);
589 }
590 }
591 for (Map.Entry<Integer, String> e: m.entrySet()) {
592 out.println(e.getValue());
593 }
594 } finally {
595 out.close();
596 }
597 return subdir;
598 }
599
600
601
602
603 static class CmdDescriptor {
604 private Class<? extends Test> cmdClass;
605 private String name;
606 private String description;
607
608 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
609 this.cmdClass = cmdClass;
610 this.name = name;
611 this.description = description;
612 }
613
614 public Class<? extends Test> getCmdClass() {
615 return cmdClass;
616 }
617
618 public String getName() {
619 return name;
620 }
621
622 public String getDescription() {
623 return description;
624 }
625 }
626
627
628
629
630
631 static class TestOptions {
632 private int startRow;
633 private int perClientRunRows;
634 private int totalRows;
635 private byte[] tableName;
636 private int rowsPerPut;
637
638 TestOptions() {
639 }
640
641 TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, int rowsPerPut) {
642 this.startRow = startRow;
643 this.perClientRunRows = perClientRunRows;
644 this.totalRows = totalRows;
645 this.tableName = tableName;
646 this.rowsPerPut = rowsPerPut;
647 }
648
649 public int getStartRow() {
650 return startRow;
651 }
652
653 public int getPerClientRunRows() {
654 return perClientRunRows;
655 }
656
657 public int getTotalRows() {
658 return totalRows;
659 }
660
661 public byte[] getTableName() {
662 return tableName;
663 }
664
665 public int getRowsPerPut() {
666 return rowsPerPut;
667 }
668 }
669
670
671
672
673
674 static abstract class Test {
675
676
677 private static final Random randomSeed =
678 new Random(System.currentTimeMillis());
679 private static long nextRandomSeed() {
680 return randomSeed.nextLong();
681 }
682 protected final Random rand = new Random(nextRandomSeed());
683
684 protected final int startRow;
685 protected final int perClientRunRows;
686 protected final int totalRows;
687 protected final Status status;
688 protected byte[] tableName;
689 protected RemoteHTable table;
690 protected volatile Configuration conf;
691
692
693
694
695
696 Test(final Configuration conf, final TestOptions options, final Status status) {
697 super();
698 this.startRow = options.getStartRow();
699 this.perClientRunRows = options.getPerClientRunRows();
700 this.totalRows = options.getTotalRows();
701 this.status = status;
702 this.tableName = options.getTableName();
703 this.table = null;
704 this.conf = conf;
705 }
706
707 protected String generateStatus(final int sr, final int i, final int lr) {
708 return sr + "/" + i + "/" + lr;
709 }
710
711 protected int getReportingPeriod() {
712 int period = this.perClientRunRows / 10;
713 return period == 0? this.perClientRunRows: period;
714 }
715
716 void testSetup() throws IOException {
717 this.table = new RemoteHTable(new Client(cluster), conf, tableName,
718 accessToken);
719 }
720
721 void testTakedown() throws IOException {
722 this.table.close();
723 }
724
725
726
727
728
729
730 long test() throws IOException {
731 long elapsedTime;
732 testSetup();
733 long startTime = System.currentTimeMillis();
734 try {
735 testTimed();
736 elapsedTime = System.currentTimeMillis() - startTime;
737 } finally {
738 testTakedown();
739 }
740 return elapsedTime;
741 }
742
743
744
745
746 void testTimed() throws IOException {
747 int lastRow = this.startRow + this.perClientRunRows;
748
749 for (int i = this.startRow; i < lastRow; i++) {
750 testRow(i);
751 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
752 status.setStatus(generateStatus(this.startRow, i, lastRow));
753 }
754 }
755 }
756
757
758
759
760
761 void testRow(final int i) throws IOException {
762 }
763 }
764
765 @SuppressWarnings("unused")
766 static class RandomSeekScanTest extends Test {
767 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
768 super(conf, options, status);
769 }
770
771 @Override
772 void testRow(final int i) throws IOException {
773 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
774 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
775 scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
776 ResultScanner s = this.table.getScanner(scan);
777
778 for (Result rr = null; (rr = s.next()) != null;) {
779
780 }
781 s.close();
782 }
783
784 @Override
785 protected int getReportingPeriod() {
786 int period = this.perClientRunRows / 100;
787 return period == 0? this.perClientRunRows: period;
788 }
789
790 }
791
792 @SuppressWarnings("unused")
793 static abstract class RandomScanWithRangeTest extends Test {
794 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
795 super(conf, options, status);
796 }
797
798 @Override
799 void testRow(final int i) throws IOException {
800 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
801 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
802 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
803 ResultScanner s = this.table.getScanner(scan);
804 int count = 0;
805 for (Result rr = null; (rr = s.next()) != null;) {
806 count++;
807 }
808
809 if (i % 100 == 0) {
810 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
811 Bytes.toString(startAndStopRow.getFirst()),
812 Bytes.toString(startAndStopRow.getSecond()), count));
813 }
814
815 s.close();
816 }
817
818 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
819
820 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
821 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
822 int stop = start + maxRange;
823 return new Pair<byte[],byte[]>(format(start), format(stop));
824 }
825
826 @Override
827 protected int getReportingPeriod() {
828 int period = this.perClientRunRows / 100;
829 return period == 0? this.perClientRunRows: period;
830 }
831 }
832
833 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
834 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
835 super(conf, options, status);
836 }
837
838 @Override
839 protected Pair<byte[], byte[]> getStartAndStopRow() {
840 return generateStartAndStopRows(10);
841 }
842 }
843
844 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
845 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
846 super(conf, options, status);
847 }
848
849 @Override
850 protected Pair<byte[], byte[]> getStartAndStopRow() {
851 return generateStartAndStopRows(100);
852 }
853 }
854
855 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
856 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
857 super(conf, options, status);
858 }
859
860 @Override
861 protected Pair<byte[], byte[]> getStartAndStopRow() {
862 return generateStartAndStopRows(1000);
863 }
864 }
865
866 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
867 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
868 super(conf, options, status);
869 }
870
871 @Override
872 protected Pair<byte[], byte[]> getStartAndStopRow() {
873 return generateStartAndStopRows(10000);
874 }
875 }
876
877 static class RandomReadTest extends Test {
878 RandomReadTest(Configuration conf, TestOptions options, Status status) {
879 super(conf, options, status);
880 }
881
882 @Override
883 void testRow(final int i) throws IOException {
884 Get get = new Get(getRandomRow(this.rand, this.totalRows));
885 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
886 this.table.get(get);
887 }
888
889 @Override
890 protected int getReportingPeriod() {
891 int period = this.perClientRunRows / 100;
892 return period == 0? this.perClientRunRows: period;
893 }
894
895 }
896
897 static class RandomWriteTest extends Test {
898 int rowsPerPut;
899
900 RandomWriteTest(Configuration conf, TestOptions options, Status status) {
901 super(conf, options, status);
902 rowsPerPut = options.getRowsPerPut();
903 }
904
905 @Override
906 void testTimed() throws IOException {
907 int lastRow = this.startRow + this.perClientRunRows;
908
909 List<Put> puts = new ArrayList<Put>();
910 for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
911 for (int j = 0; j < rowsPerPut; j++) {
912 byte [] row = getRandomRow(this.rand, this.totalRows);
913 Put put = new Put(row);
914 byte[] value = generateValue(this.rand);
915 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
916 puts.add(put);
917 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
918 status.setStatus(generateStatus(this.startRow, i, lastRow));
919 }
920 }
921 table.put(puts);
922 }
923 }
924 }
925
926 static class ScanTest extends Test {
927 private ResultScanner testScanner;
928
929 ScanTest(Configuration conf, TestOptions options, Status status) {
930 super(conf, options, status);
931 }
932
933 @Override
934 void testSetup() throws IOException {
935 super.testSetup();
936 }
937
938 @Override
939 void testTakedown() throws IOException {
940 if (this.testScanner != null) {
941 this.testScanner.close();
942 }
943 super.testTakedown();
944 }
945
946
947 @Override
948 void testRow(final int i) throws IOException {
949 if (this.testScanner == null) {
950 Scan scan = new Scan(format(this.startRow));
951 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
952 this.testScanner = table.getScanner(scan);
953 }
954 testScanner.next();
955 }
956
957 }
958
959 static class SequentialReadTest extends Test {
960 SequentialReadTest(Configuration conf, TestOptions options, Status status) {
961 super(conf, options, status);
962 }
963
964 @Override
965 void testRow(final int i) throws IOException {
966 Get get = new Get(format(i));
967 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
968 table.get(get);
969 }
970
971 }
972
973 static class SequentialWriteTest extends Test {
974 int rowsPerPut;
975
976 SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
977 super(conf, options, status);
978 rowsPerPut = options.getRowsPerPut();
979 }
980
981 @Override
982 void testTimed() throws IOException {
983 int lastRow = this.startRow + this.perClientRunRows;
984
985 List<Put> puts = new ArrayList<Put>();
986 for (int i = this.startRow; i < lastRow; i += rowsPerPut) {
987 for (int j = 0; j < rowsPerPut; j++) {
988 Put put = new Put(format(i + j));
989 byte[] value = generateValue(this.rand);
990 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
991 puts.add(put);
992 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
993 status.setStatus(generateStatus(this.startRow, i, lastRow));
994 }
995 }
996 table.put(puts);
997 }
998 }
999 }
1000
1001 static class FilteredScanTest extends Test {
1002 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1003
1004 FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1005 super(conf, options, status);
1006 }
1007
1008 @Override
1009 void testRow(int i) throws IOException {
1010 byte[] value = generateValue(this.rand);
1011 Scan scan = constructScan(value);
1012 ResultScanner scanner = null;
1013 try {
1014 scanner = this.table.getScanner(scan);
1015 while (scanner.next() != null) {
1016 }
1017 } finally {
1018 if (scanner != null) scanner.close();
1019 }
1020 }
1021
1022 protected Scan constructScan(byte[] valuePrefix) throws IOException {
1023 Filter filter = new SingleColumnValueFilter(
1024 FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
1025 new BinaryComparator(valuePrefix)
1026 );
1027 Scan scan = new Scan();
1028 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1029 scan.setFilter(filter);
1030 return scan;
1031 }
1032 }
1033
1034
1035
1036
1037
1038
1039
1040 public static byte [] format(final int number) {
1041 byte [] b = new byte[10];
1042 int d = Math.abs(number);
1043 for (int i = b.length - 1; i >= 0; i--) {
1044 b[i] = (byte)((d % 10) + '0');
1045 d /= 10;
1046 }
1047 return b;
1048 }
1049
1050
1051
1052
1053
1054
1055
1056 public static byte[] generateValue(final Random r) {
1057 byte [] b = new byte [ROW_LENGTH];
1058 r.nextBytes(b);
1059 return b;
1060 }
1061
1062 static byte [] getRandomRow(final Random random, final int totalRows) {
1063 return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1064 }
1065
1066 long runOneClient(final Class<? extends Test> cmd, final int startRow,
1067 final int perClientRunRows, final int totalRows,
1068 final int rowsPerPut, final Status status)
1069 throws IOException {
1070 status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
1071 perClientRunRows + " rows");
1072 long totalElapsedTime = 0;
1073
1074 Test t = null;
1075 TestOptions options = new TestOptions(startRow, perClientRunRows,
1076 totalRows, getTableDescriptor().getName(), rowsPerPut);
1077 try {
1078 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
1079 Configuration.class, TestOptions.class, Status.class);
1080 t = constructor.newInstance(this.conf, options, status);
1081 } catch (NoSuchMethodException e) {
1082 throw new IllegalArgumentException("Invalid command class: " +
1083 cmd.getName() + ". It does not provide a constructor as described by" +
1084 "the javadoc comment. Available constructors are: " +
1085 Arrays.toString(cmd.getConstructors()));
1086 } catch (Exception e) {
1087 throw new IllegalStateException("Failed to construct command class", e);
1088 }
1089 totalElapsedTime = t.test();
1090
1091 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1092 "ms at offset " + startRow + " for " + perClientRunRows + " rows");
1093 return totalElapsedTime;
1094 }
1095
1096 private void runNIsOne(final Class<? extends Test> cmd) {
1097 Status status = new Status() {
1098 public void setStatus(String msg) throws IOException {
1099 LOG.info(msg);
1100 }
1101 };
1102
1103 try {
1104 checkTable();
1105 runOneClient(cmd, 0, R, R, B, status);
1106 } catch (Exception e) {
1107 LOG.error("Failed", e);
1108 }
1109 }
1110
1111 private void runTest(final Class<? extends Test> cmd) throws IOException,
1112 InterruptedException, ClassNotFoundException {
1113 if (N == 1) {
1114
1115
1116 runNIsOne(cmd);
1117 } else {
1118
1119 runNIsMoreThanOne(cmd);
1120 }
1121 }
1122
1123 protected void printUsage() {
1124 printUsage(null);
1125 }
1126
1127 protected void printUsage(final String message) {
1128 if (message != null && message.length() > 0) {
1129 System.err.println(message);
1130 }
1131 System.err.println("Usage: java " + this.getClass().getName() + " \\");
1132 System.err.println(" [--option] [--option=value] <command> <nclients>");
1133 System.err.println();
1134 System.err.println("Options:");
1135 System.err.println(" host String. Specify Stargate endpoint.");
1136 System.err.println(" token String. API access token.");
1137 System.err.println(" rows Integer. Rows each client runs. Default: One million");
1138 System.err.println(" rowsPerPut Integer. Rows each Stargate (multi)Put. Default: 100");
1139 System.err.println(" nomapred (Flag) Run multiple clients using threads " +
1140 "(rather than use mapreduce)");
1141 System.err.println();
1142 System.err.println("Command:");
1143 for (CmdDescriptor command : commands.values()) {
1144 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1145 }
1146 System.err.println();
1147 System.err.println("Args:");
1148 System.err.println(" nclients Integer. Required. Total number of " +
1149 "clients (and HRegionServers)");
1150 System.err.println(" running: 1 <= value <= 500");
1151 System.err.println("Examples:");
1152 System.err.println(" To run a single evaluation client:");
1153 System.err.println(" $ bin/hbase " + this.getClass().getName()
1154 + " sequentialWrite 1");
1155 }
1156
1157 private void getArgs(final int start, final String[] args) {
1158 if(start + 1 > args.length) {
1159 throw new IllegalArgumentException("must supply the number of clients");
1160 }
1161 N = Integer.parseInt(args[start]);
1162 if (N < 1) {
1163 throw new IllegalArgumentException("Number of clients must be > 1");
1164 }
1165
1166 R = R * N;
1167 }
1168
1169 public int doCommandLine(final String[] args) {
1170
1171
1172 int errCode = -1;
1173 if (args.length < 1) {
1174 printUsage();
1175 return errCode;
1176 }
1177
1178 try {
1179 for (int i = 0; i < args.length; i++) {
1180 String cmd = args[i];
1181 if (cmd.equals("-h")) {
1182 printUsage();
1183 errCode = 0;
1184 break;
1185 }
1186
1187 final String nmr = "--nomapred";
1188 if (cmd.startsWith(nmr)) {
1189 nomapred = true;
1190 continue;
1191 }
1192
1193 final String rows = "--rows=";
1194 if (cmd.startsWith(rows)) {
1195 R = Integer.parseInt(cmd.substring(rows.length()));
1196 continue;
1197 }
1198
1199 final String rowsPerPut = "--rowsPerPut=";
1200 if (cmd.startsWith(rowsPerPut)) {
1201 this.B = Integer.parseInt(cmd.substring(rowsPerPut.length()));
1202 continue;
1203 }
1204
1205 final String host = "--host=";
1206 if (cmd.startsWith(host)) {
1207 cluster.add(cmd.substring(host.length()));
1208 continue;
1209 }
1210
1211 final String token = "--token=";
1212 if (cmd.startsWith(token)) {
1213 accessToken = cmd.substring(token.length());
1214 continue;
1215 }
1216
1217 Class<? extends Test> cmdClass = determineCommandClass(cmd);
1218 if (cmdClass != null) {
1219 getArgs(i + 1, args);
1220 if (cluster.isEmpty()) {
1221 String s = conf.get("stargate.hostname", "localhost");
1222 if (s.contains(":")) {
1223 cluster.add(s);
1224 } else {
1225 cluster.add(s, conf.getInt("stargate.port", 8080));
1226 }
1227 }
1228 runTest(cmdClass);
1229 errCode = 0;
1230 break;
1231 }
1232
1233 printUsage();
1234 break;
1235 }
1236 } catch (Exception e) {
1237 e.printStackTrace();
1238 }
1239
1240 return errCode;
1241 }
1242
1243 private Class<? extends Test> determineCommandClass(String cmd) {
1244 CmdDescriptor descriptor = commands.get(cmd);
1245 return descriptor != null ? descriptor.getCmdClass() : null;
1246 }
1247
1248
1249
1250
1251 public static void main(final String[] args) {
1252 Configuration c = HBaseConfiguration.create();
1253 System.exit(new PerformanceEvaluation(c).doCommandLine(args));
1254 }
1255 }