1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.io.PrintStream;
26 import java.io.File;
27 import java.text.SimpleDateFormat;
28 import java.util.ArrayList;
29 import java.util.Date;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Random;
33 import java.util.TreeMap;
34 import java.util.Arrays;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37 import java.lang.reflect.Constructor;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FSDataInputStream;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.client.Get;
47 import org.apache.hadoop.hbase.client.HBaseAdmin;
48 import org.apache.hadoop.hbase.client.HTable;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.client.Result;
51 import org.apache.hadoop.hbase.client.ResultScanner;
52 import org.apache.hadoop.hbase.client.Scan;
53 import org.apache.hadoop.hbase.filter.PageFilter;
54 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
55 import org.apache.hadoop.hbase.filter.Filter;
56 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
57 import org.apache.hadoop.hbase.filter.CompareFilter;
58 import org.apache.hadoop.hbase.filter.BinaryComparator;
59 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.apache.hadoop.hbase.util.FSUtils;
62 import org.apache.hadoop.hbase.util.Hash;
63 import org.apache.hadoop.hbase.util.MurmurHash;
64 import org.apache.hadoop.hbase.util.Pair;
65 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
66 import org.apache.hadoop.hdfs.MiniDFSCluster;
67 import org.apache.hadoop.io.LongWritable;
68 import org.apache.hadoop.io.NullWritable;
69 import org.apache.hadoop.io.Text;
70 import org.apache.hadoop.io.Writable;
71 import org.apache.hadoop.mapreduce.InputSplit;
72 import org.apache.hadoop.mapreduce.Job;
73 import org.apache.hadoop.mapreduce.JobContext;
74 import org.apache.hadoop.mapreduce.Mapper;
75 import org.apache.hadoop.mapreduce.RecordReader;
76 import org.apache.hadoop.mapreduce.TaskAttemptContext;
77 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
78 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
79 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
80 import org.apache.hadoop.util.LineReader;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public class PerformanceEvaluation {
99 protected static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
100
101 private static final int ROW_LENGTH = 1000;
102 private static final int ONE_GB = 1024 * 1024 * 1000;
103 private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
104
105 public static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
106 public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
107 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
108
109 protected static final HTableDescriptor TABLE_DESCRIPTOR;
110 static {
111 TABLE_DESCRIPTOR = new HTableDescriptor(TABLE_NAME);
112 TABLE_DESCRIPTOR.addFamily(new HColumnDescriptor(FAMILY_NAME));
113 }
114
115 protected Map<String, CmdDescriptor> commands = new TreeMap<String, CmdDescriptor>();
116
117 volatile Configuration conf;
118 private boolean miniCluster = false;
119 private boolean nomapred = false;
120 private int N = 1;
121 private int R = ROWS_PER_GB;
122 private boolean flushCommits = true;
123 private boolean writeToWAL = true;
124
125 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
126
127
128
129 public static final Pattern LINE_PATTERN =
130 Pattern.compile("startRow=(\\d+),\\s+" +
131 "perClientRunRows=(\\d+),\\s+" +
132 "totalRows=(\\d+),\\s+" +
133 "clients=(\\d+),\\s+" +
134 "flushCommits=(\\w+),\\s+" +
135 "writeToWAL=(\\w+)");
136
137
138
139
140
141 protected static enum Counter {
142
143 ELAPSED_TIME,
144
145 ROWS}
146
147
148
149
150
151
152 public PerformanceEvaluation(final Configuration c) {
153 this.conf = c;
154
155 addCommandDescriptor(RandomReadTest.class, "randomRead",
156 "Run random read test");
157 addCommandDescriptor(RandomSeekScanTest.class, "randomSeekScan",
158 "Run random seek and scan 100 test");
159 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
160 "Run random seek scan with both start and stop row (max 10 rows)");
161 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
162 "Run random seek scan with both start and stop row (max 100 rows)");
163 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
164 "Run random seek scan with both start and stop row (max 1000 rows)");
165 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
166 "Run random seek scan with both start and stop row (max 10000 rows)");
167 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
168 "Run random write test");
169 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
170 "Run sequential read test");
171 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
172 "Run sequential write test");
173 addCommandDescriptor(ScanTest.class, "scan",
174 "Run scan test (read every row)");
175 addCommandDescriptor(FilteredScanTest.class, "filterScan",
176 "Run scan test using a filter to find a specific row based on it's value (make sure to use --rows=20)");
177 }
178
179 protected void addCommandDescriptor(Class<? extends Test> cmdClass,
180 String name, String description) {
181 CmdDescriptor cmdDescriptor =
182 new CmdDescriptor(cmdClass, name, description);
183 commands.put(name, cmdDescriptor);
184 }
185
186
187
188
189 static interface Status {
190
191
192
193
194
195 void setStatus(final String msg) throws IOException;
196 }
197
198
199
200
201
202
203
204 public static class PeInputSplit extends InputSplit implements Writable {
205 private int startRow = 0;
206 private int rows = 0;
207 private int totalRows = 0;
208 private int clients = 0;
209 private boolean flushCommits = false;
210 private boolean writeToWAL = true;
211
212 public PeInputSplit() {
213 this.startRow = 0;
214 this.rows = 0;
215 this.totalRows = 0;
216 this.clients = 0;
217 this.flushCommits = false;
218 this.writeToWAL = true;
219 }
220
221 public PeInputSplit(int startRow, int rows, int totalRows, int clients,
222 boolean flushCommits, boolean writeToWAL) {
223 this.startRow = startRow;
224 this.rows = rows;
225 this.totalRows = totalRows;
226 this.clients = clients;
227 this.flushCommits = flushCommits;
228 this.writeToWAL = writeToWAL;
229 }
230
231 @Override
232 public void readFields(DataInput in) throws IOException {
233 this.startRow = in.readInt();
234 this.rows = in.readInt();
235 this.totalRows = in.readInt();
236 this.clients = in.readInt();
237 this.flushCommits = in.readBoolean();
238 this.writeToWAL = in.readBoolean();
239 }
240
241 @Override
242 public void write(DataOutput out) throws IOException {
243 out.writeInt(startRow);
244 out.writeInt(rows);
245 out.writeInt(totalRows);
246 out.writeInt(clients);
247 out.writeBoolean(flushCommits);
248 out.writeBoolean(writeToWAL);
249 }
250
251 @Override
252 public long getLength() throws IOException, InterruptedException {
253 return 0;
254 }
255
256 @Override
257 public String[] getLocations() throws IOException, InterruptedException {
258 return new String[0];
259 }
260
261 public int getStartRow() {
262 return startRow;
263 }
264
265 public int getRows() {
266 return rows;
267 }
268
269 public int getTotalRows() {
270 return totalRows;
271 }
272
273 public int getClients() {
274 return clients;
275 }
276
277 public boolean isFlushCommits() {
278 return flushCommits;
279 }
280
281 public boolean isWriteToWAL() {
282 return writeToWAL;
283 }
284 }
285
286
287
288
289
290 public static class PeInputFormat extends FileInputFormat<NullWritable, PeInputSplit> {
291
292 @Override
293 public List<InputSplit> getSplits(JobContext job) throws IOException {
294
295 List<InputSplit> splitList = new ArrayList<InputSplit>();
296
297 for (FileStatus file: listStatus(job)) {
298 Path path = file.getPath();
299 FileSystem fs = path.getFileSystem(job.getConfiguration());
300 FSDataInputStream fileIn = fs.open(path);
301 LineReader in = new LineReader(fileIn, job.getConfiguration());
302 int lineLen = 0;
303 while(true) {
304 Text lineText = new Text();
305 lineLen = in.readLine(lineText);
306 if(lineLen <= 0) {
307 break;
308 }
309 Matcher m = LINE_PATTERN.matcher(lineText.toString());
310 if((m != null) && m.matches()) {
311 int startRow = Integer.parseInt(m.group(1));
312 int rows = Integer.parseInt(m.group(2));
313 int totalRows = Integer.parseInt(m.group(3));
314 int clients = Integer.parseInt(m.group(4));
315 boolean flushCommits = Boolean.parseBoolean(m.group(5));
316 boolean writeToWAL = Boolean.parseBoolean(m.group(6));
317
318 LOG.debug("split["+ splitList.size() + "] " +
319 " startRow=" + startRow +
320 " rows=" + rows +
321 " totalRows=" + totalRows +
322 " clients=" + clients +
323 " flushCommits=" + flushCommits +
324 " writeToWAL=" + writeToWAL);
325
326 PeInputSplit newSplit =
327 new PeInputSplit(startRow, rows, totalRows, clients,
328 flushCommits, writeToWAL);
329 splitList.add(newSplit);
330 }
331 }
332 in.close();
333 }
334
335 LOG.info("Total # of splits: " + splitList.size());
336 return splitList;
337 }
338
339 @Override
340 public RecordReader<NullWritable, PeInputSplit> createRecordReader(InputSplit split,
341 TaskAttemptContext context) {
342 return new PeRecordReader();
343 }
344
345 public static class PeRecordReader extends RecordReader<NullWritable, PeInputSplit> {
346 private boolean readOver = false;
347 private PeInputSplit split = null;
348 private NullWritable key = null;
349 private PeInputSplit value = null;
350
351 @Override
352 public void initialize(InputSplit split, TaskAttemptContext context)
353 throws IOException, InterruptedException {
354 this.readOver = false;
355 this.split = (PeInputSplit)split;
356 }
357
358 @Override
359 public boolean nextKeyValue() throws IOException, InterruptedException {
360 if(readOver) {
361 return false;
362 }
363
364 key = NullWritable.get();
365 value = (PeInputSplit)split;
366
367 readOver = true;
368 return true;
369 }
370
371 @Override
372 public NullWritable getCurrentKey() throws IOException, InterruptedException {
373 return key;
374 }
375
376 @Override
377 public PeInputSplit getCurrentValue() throws IOException, InterruptedException {
378 return value;
379 }
380
381 @Override
382 public float getProgress() throws IOException, InterruptedException {
383 if(readOver) {
384 return 1.0f;
385 } else {
386 return 0.0f;
387 }
388 }
389
390 @Override
391 public void close() throws IOException {
392
393 }
394 }
395 }
396
397
398
399
400 public static class EvaluationMapTask
401 extends Mapper<NullWritable, PeInputSplit, LongWritable, LongWritable> {
402
403
404 public final static String CMD_KEY = "EvaluationMapTask.command";
405
406 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
407
408 private Class<? extends Test> cmd;
409 private PerformanceEvaluation pe;
410
411 @Override
412 protected void setup(Context context) throws IOException, InterruptedException {
413 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
414
415
416
417 Class<? extends PerformanceEvaluation> peClass =
418 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
419 try {
420 this.pe = peClass.getConstructor(Configuration.class)
421 .newInstance(context.getConfiguration());
422 } catch (Exception e) {
423 throw new IllegalStateException("Could not instantiate PE instance", e);
424 }
425 }
426
427 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
428 Class<? extends Type> clazz = null;
429 try {
430 clazz = Class.forName(className).asSubclass(type);
431 } catch (ClassNotFoundException e) {
432 throw new IllegalStateException("Could not find class for name: " + className, e);
433 }
434 return clazz;
435 }
436
437 protected void map(NullWritable key, PeInputSplit value, final Context context)
438 throws IOException, InterruptedException {
439
440 Status status = new Status() {
441 public void setStatus(String msg) {
442 context.setStatus(msg);
443 }
444 };
445
446
447 long elapsedTime = this.pe.runOneClient(this.cmd, value.getStartRow(),
448 value.getRows(), value.getTotalRows(),
449 value.isFlushCommits(), value.isWriteToWAL(),
450 status);
451
452
453 context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime);
454 context.getCounter(Counter.ROWS).increment(value.rows);
455 context.write(new LongWritable(value.startRow), new LongWritable(elapsedTime));
456 context.progress();
457 }
458 }
459
460
461
462
463
464
465
466 private boolean checkTable(HBaseAdmin admin) throws IOException {
467 HTableDescriptor tableDescriptor = getTableDescriptor();
468 boolean tableExists = admin.tableExists(tableDescriptor.getName());
469 if (!tableExists) {
470 admin.createTable(tableDescriptor);
471 LOG.info("Table " + tableDescriptor + " created");
472 }
473 return !tableExists;
474 }
475
476 protected HTableDescriptor getTableDescriptor() {
477 return TABLE_DESCRIPTOR;
478 }
479
480
481
482
483
484
485
486 private void runNIsMoreThanOne(final Class<? extends Test> cmd)
487 throws IOException, InterruptedException, ClassNotFoundException {
488 checkTable(new HBaseAdmin(conf));
489 if (this.nomapred) {
490 doMultipleClients(cmd);
491 } else {
492 doMapReduce(cmd);
493 }
494 }
495
496
497
498
499
500
501 private void doMultipleClients(final Class<? extends Test> cmd) throws IOException {
502 final List<Thread> threads = new ArrayList<Thread>(this.N);
503 final int perClientRows = R/N;
504 for (int i = 0; i < this.N; i++) {
505 Thread t = new Thread (Integer.toString(i)) {
506 @Override
507 public void run() {
508 super.run();
509 PerformanceEvaluation pe = new PerformanceEvaluation(conf);
510 int index = Integer.parseInt(getName());
511 try {
512 long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
513 perClientRows, R,
514 flushCommits, writeToWAL, new Status() {
515 public void setStatus(final String msg) throws IOException {
516 LOG.info("client-" + getName() + " " + msg);
517 }
518 });
519 LOG.info("Finished " + getName() + " in " + elapsedTime +
520 "ms writing " + perClientRows + " rows");
521 } catch (IOException e) {
522 throw new RuntimeException(e);
523 }
524 }
525 };
526 threads.add(t);
527 }
528 for (Thread t: threads) {
529 t.start();
530 }
531 for (Thread t: threads) {
532 while(t.isAlive()) {
533 try {
534 t.join();
535 } catch (InterruptedException e) {
536 LOG.debug("Interrupted, continuing" + e.toString());
537 }
538 }
539 }
540 }
541
542
543
544
545
546
547
548
549 private void doMapReduce(final Class<? extends Test> cmd) throws IOException,
550 InterruptedException, ClassNotFoundException {
551 Path inputDir = writeInputFile(this.conf);
552 this.conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
553 this.conf.set(EvaluationMapTask.PE_KEY, getClass().getName());
554 Job job = new Job(this.conf);
555 job.setJarByClass(PerformanceEvaluation.class);
556 job.setJobName("HBase Performance Evaluation");
557
558 job.setInputFormatClass(PeInputFormat.class);
559 PeInputFormat.setInputPaths(job, inputDir);
560
561 job.setOutputKeyClass(LongWritable.class);
562 job.setOutputValueClass(LongWritable.class);
563
564 job.setMapperClass(EvaluationMapTask.class);
565 job.setReducerClass(LongSumReducer.class);
566
567 job.setNumReduceTasks(1);
568
569 job.setOutputFormatClass(TextOutputFormat.class);
570 TextOutputFormat.setOutputPath(job, new Path(inputDir,"outputs"));
571
572 TableMapReduceUtil.addDependencyJars(job);
573 job.waitForCompletion(true);
574 }
575
576
577
578
579
580
581
582 private Path writeInputFile(final Configuration c) throws IOException {
583 FileSystem fs = FileSystem.get(c);
584 if (!fs.exists(PERF_EVAL_DIR)) {
585 fs.mkdirs(PERF_EVAL_DIR);
586 }
587 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
588 Path subdir = new Path(PERF_EVAL_DIR, formatter.format(new Date()));
589 fs.mkdirs(subdir);
590 Path inputFile = new Path(subdir, "input.txt");
591 PrintStream out = new PrintStream(fs.create(inputFile));
592
593 Map<Integer, String> m = new TreeMap<Integer, String>();
594 Hash h = MurmurHash.getInstance();
595 int perClientRows = (this.R / this.N);
596 try {
597 for (int i = 0; i < 10; i++) {
598 for (int j = 0; j < N; j++) {
599 String s = "startRow=" + ((j * perClientRows) + (i * (perClientRows/10))) +
600 ", perClientRunRows=" + (perClientRows / 10) +
601 ", totalRows=" + this.R +
602 ", clients=" + this.N +
603 ", flushCommits=" + this.flushCommits +
604 ", writeToWAL=" + this.writeToWAL;
605 int hash = h.hash(Bytes.toBytes(s));
606 m.put(hash, s);
607 }
608 }
609 for (Map.Entry<Integer, String> e: m.entrySet()) {
610 out.println(e.getValue());
611 }
612 } finally {
613 out.close();
614 }
615 return subdir;
616 }
617
618
619
620
621 static class CmdDescriptor {
622 private Class<? extends Test> cmdClass;
623 private String name;
624 private String description;
625
626 CmdDescriptor(Class<? extends Test> cmdClass, String name, String description) {
627 this.cmdClass = cmdClass;
628 this.name = name;
629 this.description = description;
630 }
631
632 public Class<? extends Test> getCmdClass() {
633 return cmdClass;
634 }
635
636 public String getName() {
637 return name;
638 }
639
640 public String getDescription() {
641 return description;
642 }
643 }
644
645
646
647
648
649 static class TestOptions {
650 private int startRow;
651 private int perClientRunRows;
652 private int totalRows;
653 private byte[] tableName;
654 private boolean flushCommits;
655 private boolean writeToWAL = true;
656
657 TestOptions() {
658 }
659
660 TestOptions(int startRow, int perClientRunRows, int totalRows, byte[] tableName, boolean flushCommits, boolean writeToWAL) {
661 this.startRow = startRow;
662 this.perClientRunRows = perClientRunRows;
663 this.totalRows = totalRows;
664 this.tableName = tableName;
665 this.flushCommits = flushCommits;
666 this.writeToWAL = writeToWAL;
667 }
668
669 public int getStartRow() {
670 return startRow;
671 }
672
673 public int getPerClientRunRows() {
674 return perClientRunRows;
675 }
676
677 public int getTotalRows() {
678 return totalRows;
679 }
680
681 public byte[] getTableName() {
682 return tableName;
683 }
684
685 public boolean isFlushCommits() {
686 return flushCommits;
687 }
688
689 public boolean isWriteToWAL() {
690 return writeToWAL;
691 }
692 }
693
694
695
696
697
698 static abstract class Test {
699
700
701 private static final Random randomSeed =
702 new Random(System.currentTimeMillis());
703 private static long nextRandomSeed() {
704 return randomSeed.nextLong();
705 }
706 protected final Random rand = new Random(nextRandomSeed());
707
708 protected final int startRow;
709 protected final int perClientRunRows;
710 protected final int totalRows;
711 private final Status status;
712 protected byte[] tableName;
713 protected HBaseAdmin admin;
714 protected HTable table;
715 protected volatile Configuration conf;
716 protected boolean flushCommits;
717 protected boolean writeToWAL;
718
719
720
721
722
723 Test(final Configuration conf, final TestOptions options, final Status status) {
724 super();
725 this.startRow = options.getStartRow();
726 this.perClientRunRows = options.getPerClientRunRows();
727 this.totalRows = options.getTotalRows();
728 this.status = status;
729 this.tableName = options.getTableName();
730 this.table = null;
731 this.conf = conf;
732 this.flushCommits = options.isFlushCommits();
733 this.writeToWAL = options.isWriteToWAL();
734 }
735
736 private String generateStatus(final int sr, final int i, final int lr) {
737 return sr + "/" + i + "/" + lr;
738 }
739
740 protected int getReportingPeriod() {
741 int period = this.perClientRunRows / 10;
742 return period == 0? this.perClientRunRows: period;
743 }
744
745 void testSetup() throws IOException {
746 this.admin = new HBaseAdmin(conf);
747 this.table = new HTable(conf, tableName);
748 this.table.setAutoFlush(false);
749 this.table.setScannerCaching(30);
750 }
751
752 void testTakedown() throws IOException {
753 if (flushCommits) {
754 this.table.flushCommits();
755 }
756 }
757
758
759
760
761
762
763 long test() throws IOException {
764 long elapsedTime;
765 testSetup();
766 long startTime = System.currentTimeMillis();
767 try {
768 testTimed();
769 elapsedTime = System.currentTimeMillis() - startTime;
770 } finally {
771 testTakedown();
772 }
773 return elapsedTime;
774 }
775
776
777
778
779 void testTimed() throws IOException {
780 int lastRow = this.startRow + this.perClientRunRows;
781
782 for (int i = this.startRow; i < lastRow; i++) {
783 testRow(i);
784 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
785 status.setStatus(generateStatus(this.startRow, i, lastRow));
786 }
787 }
788 }
789
790
791
792
793
794 void testRow(final int i) throws IOException {
795 }
796 }
797
798 @SuppressWarnings("unused")
799 static class RandomSeekScanTest extends Test {
800 RandomSeekScanTest(Configuration conf, TestOptions options, Status status) {
801 super(conf, options, status);
802 }
803
804 @Override
805 void testRow(final int i) throws IOException {
806 Scan scan = new Scan(getRandomRow(this.rand, this.totalRows));
807 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
808 scan.setFilter(new WhileMatchFilter(new PageFilter(120)));
809 ResultScanner s = this.table.getScanner(scan);
810
811 for (Result rr = null; (rr = s.next()) != null;) {
812
813 }
814 s.close();
815 }
816
817 @Override
818 protected int getReportingPeriod() {
819 int period = this.perClientRunRows / 100;
820 return period == 0? this.perClientRunRows: period;
821 }
822
823 }
824
825 @SuppressWarnings("unused")
826 static abstract class RandomScanWithRangeTest extends Test {
827 RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) {
828 super(conf, options, status);
829 }
830
831 @Override
832 void testRow(final int i) throws IOException {
833 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
834 Scan scan = new Scan(startAndStopRow.getFirst(), startAndStopRow.getSecond());
835 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
836 ResultScanner s = this.table.getScanner(scan);
837 int count = 0;
838 for (Result rr = null; (rr = s.next()) != null;) {
839 count++;
840 }
841
842 if (i % 100 == 0) {
843 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
844 Bytes.toString(startAndStopRow.getFirst()),
845 Bytes.toString(startAndStopRow.getSecond()), count));
846 }
847
848 s.close();
849 }
850
851 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
852
853 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
854 int start = this.rand.nextInt(Integer.MAX_VALUE) % totalRows;
855 int stop = start + maxRange;
856 return new Pair<byte[],byte[]>(format(start), format(stop));
857 }
858
859 @Override
860 protected int getReportingPeriod() {
861 int period = this.perClientRunRows / 100;
862 return period == 0? this.perClientRunRows: period;
863 }
864 }
865
866 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
867 RandomScanWithRange10Test(Configuration conf, TestOptions options, Status status) {
868 super(conf, options, status);
869 }
870
871 @Override
872 protected Pair<byte[], byte[]> getStartAndStopRow() {
873 return generateStartAndStopRows(10);
874 }
875 }
876
877 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
878 RandomScanWithRange100Test(Configuration conf, TestOptions options, Status status) {
879 super(conf, options, status);
880 }
881
882 @Override
883 protected Pair<byte[], byte[]> getStartAndStopRow() {
884 return generateStartAndStopRows(100);
885 }
886 }
887
888 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
889 RandomScanWithRange1000Test(Configuration conf, TestOptions options, Status status) {
890 super(conf, options, status);
891 }
892
893 @Override
894 protected Pair<byte[], byte[]> getStartAndStopRow() {
895 return generateStartAndStopRows(1000);
896 }
897 }
898
899 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
900 RandomScanWithRange10000Test(Configuration conf, TestOptions options, Status status) {
901 super(conf, options, status);
902 }
903
904 @Override
905 protected Pair<byte[], byte[]> getStartAndStopRow() {
906 return generateStartAndStopRows(10000);
907 }
908 }
909
910 static class RandomReadTest extends Test {
911 RandomReadTest(Configuration conf, TestOptions options, Status status) {
912 super(conf, options, status);
913 }
914
915 @Override
916 void testRow(final int i) throws IOException {
917 Get get = new Get(getRandomRow(this.rand, this.totalRows));
918 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
919 this.table.get(get);
920 }
921
922 @Override
923 protected int getReportingPeriod() {
924 int period = this.perClientRunRows / 100;
925 return period == 0? this.perClientRunRows: period;
926 }
927
928 }
929
930 static class RandomWriteTest extends Test {
931 RandomWriteTest(Configuration conf, TestOptions options, Status status) {
932 super(conf, options, status);
933 }
934
935 @Override
936 void testRow(final int i) throws IOException {
937 byte [] row = getRandomRow(this.rand, this.totalRows);
938 Put put = new Put(row);
939 byte[] value = generateValue(this.rand);
940 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
941 put.setWriteToWAL(writeToWAL);
942 table.put(put);
943 }
944 }
945
946 static class ScanTest extends Test {
947 private ResultScanner testScanner;
948
949 ScanTest(Configuration conf, TestOptions options, Status status) {
950 super(conf, options, status);
951 }
952
953 @Override
954 void testSetup() throws IOException {
955 super.testSetup();
956 }
957
958 @Override
959 void testTakedown() throws IOException {
960 if (this.testScanner != null) {
961 this.testScanner.close();
962 }
963 super.testTakedown();
964 }
965
966
967 @Override
968 void testRow(final int i) throws IOException {
969 if (this.testScanner == null) {
970 Scan scan = new Scan(format(this.startRow));
971 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
972 this.testScanner = table.getScanner(scan);
973 }
974 testScanner.next();
975 }
976
977 }
978
979 static class SequentialReadTest extends Test {
980 SequentialReadTest(Configuration conf, TestOptions options, Status status) {
981 super(conf, options, status);
982 }
983
984 @Override
985 void testRow(final int i) throws IOException {
986 Get get = new Get(format(i));
987 get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
988 table.get(get);
989 }
990
991 }
992
993 static class SequentialWriteTest extends Test {
994 SequentialWriteTest(Configuration conf, TestOptions options, Status status) {
995 super(conf, options, status);
996 }
997
998 @Override
999 void testRow(final int i) throws IOException {
1000 Put put = new Put(format(i));
1001 byte[] value = generateValue(this.rand);
1002 put.add(FAMILY_NAME, QUALIFIER_NAME, value);
1003 put.setWriteToWAL(writeToWAL);
1004 table.put(put);
1005 }
1006
1007 }
1008
1009 static class FilteredScanTest extends Test {
1010 protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName());
1011
1012 FilteredScanTest(Configuration conf, TestOptions options, Status status) {
1013 super(conf, options, status);
1014 }
1015
1016 @Override
1017 void testRow(int i) throws IOException {
1018 byte[] value = generateValue(this.rand);
1019 Scan scan = constructScan(value);
1020 ResultScanner scanner = null;
1021 try {
1022 scanner = this.table.getScanner(scan);
1023 while (scanner.next() != null) {
1024 }
1025 } finally {
1026 if (scanner != null) scanner.close();
1027 }
1028 }
1029
1030 protected Scan constructScan(byte[] valuePrefix) throws IOException {
1031 Filter filter = new SingleColumnValueFilter(
1032 FAMILY_NAME, QUALIFIER_NAME, CompareFilter.CompareOp.EQUAL,
1033 new BinaryComparator(valuePrefix)
1034 );
1035 Scan scan = new Scan();
1036 scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
1037 scan.setFilter(filter);
1038 return scan;
1039 }
1040 }
1041
1042
1043
1044
1045
1046
1047
1048 public static byte [] format(final int number) {
1049 byte [] b = new byte[10];
1050 int d = Math.abs(number);
1051 for (int i = b.length - 1; i >= 0; i--) {
1052 b[i] = (byte)((d % 10) + '0');
1053 d /= 10;
1054 }
1055 return b;
1056 }
1057
1058
1059
1060
1061
1062
1063
1064 public static byte[] generateValue(final Random r) {
1065 byte [] b = new byte [ROW_LENGTH];
1066 r.nextBytes(b);
1067 return b;
1068 }
1069
1070 static byte [] getRandomRow(final Random random, final int totalRows) {
1071 return format(random.nextInt(Integer.MAX_VALUE) % totalRows);
1072 }
1073
1074 long runOneClient(final Class<? extends Test> cmd, final int startRow,
1075 final int perClientRunRows, final int totalRows,
1076 boolean flushCommits, boolean writeToWAL,
1077 final Status status)
1078 throws IOException {
1079 status.setStatus("Start " + cmd + " at offset " + startRow + " for " +
1080 perClientRunRows + " rows");
1081 long totalElapsedTime = 0;
1082
1083 Test t = null;
1084 TestOptions options = new TestOptions(startRow, perClientRunRows,
1085 totalRows, getTableDescriptor().getName(), flushCommits, writeToWAL);
1086 try {
1087 Constructor<? extends Test> constructor = cmd.getDeclaredConstructor(
1088 Configuration.class, TestOptions.class, Status.class);
1089 t = constructor.newInstance(this.conf, options, status);
1090 } catch (NoSuchMethodException e) {
1091 throw new IllegalArgumentException("Invalid command class: " +
1092 cmd.getName() + ". It does not provide a constructor as described by" +
1093 "the javadoc comment. Available constructors are: " +
1094 Arrays.toString(cmd.getConstructors()));
1095 } catch (Exception e) {
1096 throw new IllegalStateException("Failed to construct command class", e);
1097 }
1098 totalElapsedTime = t.test();
1099
1100 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
1101 "ms at offset " + startRow + " for " + perClientRunRows + " rows");
1102 return totalElapsedTime;
1103 }
1104
1105 private void runNIsOne(final Class<? extends Test> cmd) {
1106 Status status = new Status() {
1107 public void setStatus(String msg) throws IOException {
1108 LOG.info(msg);
1109 }
1110 };
1111
1112 HBaseAdmin admin = null;
1113 try {
1114 admin = new HBaseAdmin(this.conf);
1115 checkTable(admin);
1116 runOneClient(cmd, 0, this.R, this.R, this.flushCommits, this.writeToWAL,
1117 status);
1118 } catch (Exception e) {
1119 LOG.error("Failed", e);
1120 }
1121 }
1122
1123 private void runTest(final Class<? extends Test> cmd) throws IOException,
1124 InterruptedException, ClassNotFoundException {
1125 MiniHBaseCluster hbaseMiniCluster = null;
1126 MiniDFSCluster dfsCluster = null;
1127 MiniZooKeeperCluster zooKeeperCluster = null;
1128 if (this.miniCluster) {
1129 dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
1130 zooKeeperCluster = new MiniZooKeeperCluster();
1131 int zooKeeperPort = zooKeeperCluster.startup(new File(System.getProperty("java.io.tmpdir")));
1132
1133
1134
1135 FileSystem fs = dfsCluster.getFileSystem();
1136 conf.set("fs.default.name", fs.getUri().toString());
1137 conf.set("hbase.zookeeper.property.clientPort", Integer.toString(zooKeeperPort));
1138 Path parentdir = fs.getHomeDirectory();
1139 conf.set(HConstants.HBASE_DIR, parentdir.toString());
1140 fs.mkdirs(parentdir);
1141 FSUtils.setVersion(fs, parentdir);
1142 hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
1143 }
1144
1145 try {
1146 if (N == 1) {
1147
1148
1149 runNIsOne(cmd);
1150 } else {
1151
1152 runNIsMoreThanOne(cmd);
1153 }
1154 } finally {
1155 if(this.miniCluster) {
1156 if (hbaseMiniCluster != null) hbaseMiniCluster.shutdown();
1157 if (zooKeeperCluster != null) zooKeeperCluster.shutdown();
1158 HBaseTestCase.shutdownDfs(dfsCluster);
1159 }
1160 }
1161 }
1162
1163 protected void printUsage() {
1164 printUsage(null);
1165 }
1166
1167 protected void printUsage(final String message) {
1168 if (message != null && message.length() > 0) {
1169 System.err.println(message);
1170 }
1171 System.err.println("Usage: java " + this.getClass().getName() + " \\");
1172 System.err.println(" [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
1173 System.err.println();
1174 System.err.println("Options:");
1175 System.err.println(" miniCluster Run the test on an HBaseMiniCluster");
1176 System.err.println(" nomapred Run multiple clients using threads " +
1177 "(rather than use mapreduce)");
1178 System.err.println(" rows Rows each client runs. Default: One million");
1179 System.err.println(" flushCommits Used to determine if the test should flush the table. Default: false");
1180 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
1181 System.err.println();
1182 System.err.println("Command:");
1183 for (CmdDescriptor command : commands.values()) {
1184 System.err.println(String.format(" %-15s %s", command.getName(), command.getDescription()));
1185 }
1186 System.err.println();
1187 System.err.println("Args:");
1188 System.err.println(" nclients Integer. Required. Total number of " +
1189 "clients (and HRegionServers)");
1190 System.err.println(" running: 1 <= value <= 500");
1191 System.err.println("Examples:");
1192 System.err.println(" To run a single evaluation client:");
1193 System.err.println(" $ bin/hbase " + this.getClass().getName()
1194 + " sequentialWrite 1");
1195 }
1196
1197 private void getArgs(final int start, final String[] args) {
1198 if(start + 1 > args.length) {
1199 throw new IllegalArgumentException("must supply the number of clients");
1200 }
1201 N = Integer.parseInt(args[start]);
1202 if (N < 1) {
1203 throw new IllegalArgumentException("Number of clients must be > 1");
1204 }
1205
1206 this.R = this.R * N;
1207 }
1208
1209 public int doCommandLine(final String[] args) {
1210
1211
1212 int errCode = -1;
1213 if (args.length < 1) {
1214 printUsage();
1215 return errCode;
1216 }
1217
1218 try {
1219 for (int i = 0; i < args.length; i++) {
1220 String cmd = args[i];
1221 if (cmd.equals("-h") || cmd.startsWith("--h")) {
1222 printUsage();
1223 errCode = 0;
1224 break;
1225 }
1226
1227 final String miniClusterArgKey = "--miniCluster";
1228 if (cmd.startsWith(miniClusterArgKey)) {
1229 this.miniCluster = true;
1230 continue;
1231 }
1232
1233 final String nmr = "--nomapred";
1234 if (cmd.startsWith(nmr)) {
1235 this.nomapred = true;
1236 continue;
1237 }
1238
1239 final String rows = "--rows=";
1240 if (cmd.startsWith(rows)) {
1241 this.R = Integer.parseInt(cmd.substring(rows.length()));
1242 continue;
1243 }
1244
1245 final String flushCommits = "--flushCommits=";
1246 if (cmd.startsWith(flushCommits)) {
1247 this.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
1248 continue;
1249 }
1250
1251 final String writeToWAL = "--writeToWAL=";
1252 if (cmd.startsWith(writeToWAL)) {
1253 this.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
1254 continue;
1255 }
1256
1257 Class<? extends Test> cmdClass = determineCommandClass(cmd);
1258 if (cmdClass != null) {
1259 getArgs(i + 1, args);
1260 runTest(cmdClass);
1261 errCode = 0;
1262 break;
1263 }
1264
1265 printUsage();
1266 break;
1267 }
1268 } catch (Exception e) {
1269 e.printStackTrace();
1270 }
1271
1272 return errCode;
1273 }
1274
1275 private Class<? extends Test> determineCommandClass(String cmd) {
1276 CmdDescriptor descriptor = commands.get(cmd);
1277 return descriptor != null ? descriptor.getCmdClass() : null;
1278 }
1279
1280
1281
1282
1283 public static void main(final String[] args) {
1284 Configuration c = HBaseConfiguration.create();
1285 System.exit(new PerformanceEvaluation(c).doCommandLine(args));
1286 }
1287 }