1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.IOException;
23 import java.util.Random;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.commons.math.random.RandomData;
28 import org.apache.commons.math.random.RandomDataImpl;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.io.MapFile;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.io.WritableComparable;
36
37
38
39
40
41
42 public class MapFilePerformanceEvaluation {
43 protected final Configuration conf;
44 private static final int ROW_LENGTH = 10;
45 private static final int ROW_COUNT = 100000;
46
47 static final Log LOG =
48 LogFactory.getLog(MapFilePerformanceEvaluation.class.getName());
49
50
51
52
53 public MapFilePerformanceEvaluation(final Configuration c) {
54 super();
55 this.conf = c;
56 }
57
58 static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
59 String v = Integer.toString(i);
60 w.set(Bytes.toBytes("0000000000".substring(v.length()) + v));
61 return w;
62 }
63
64 private void runBenchmarks() throws Exception {
65 final FileSystem fs = FileSystem.get(this.conf);
66 final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
67 if (fs.exists(mf)) {
68 fs.delete(mf, true);
69 }
70 runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
71 ROW_COUNT);
72
73 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
74 public void run() {
75 try {
76 runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
77 ROW_COUNT);
78 } catch (Exception e) {
79 e.printStackTrace();
80 }
81 }
82 });
83 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
84 public void run() {
85 try {
86 runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
87 ROW_COUNT);
88 } catch (Exception e) {
89 e.printStackTrace();
90 }
91 }
92 });
93 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
94 public void run() {
95 try {
96 runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
97 ROW_COUNT);
98 } catch (Exception e) {
99 e.printStackTrace();
100 }
101 }
102 });
103 PerformanceEvaluationCommons.concurrentReads(new Runnable() {
104 public void run() {
105 try {
106 runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
107 ROW_COUNT);
108 } catch (Exception e) {
109 e.printStackTrace();
110 }
111 }
112 });
113 }
114
115 protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
116 throws Exception {
117 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
118 rowCount + " rows.");
119 long elapsedTime = benchmark.run();
120 LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
121 rowCount + " rows took " + elapsedTime + "ms.");
122 }
123
124 static abstract class RowOrientedBenchmark {
125
126 protected final Configuration conf;
127 protected final FileSystem fs;
128 protected final Path mf;
129 protected final int totalRows;
130
131 public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
132 int totalRows) {
133 this.conf = conf;
134 this.fs = fs;
135 this.mf = mf;
136 this.totalRows = totalRows;
137 }
138
139 void setUp() throws Exception {
140
141 }
142
143 abstract void doRow(int i) throws Exception;
144
145 protected int getReportingPeriod() {
146 return this.totalRows / 10;
147 }
148
149 void tearDown() throws Exception {
150
151 }
152
153
154
155
156
157
158 long run() throws Exception {
159 long elapsedTime;
160 setUp();
161 long startTime = System.currentTimeMillis();
162 try {
163 for (int i = 0; i < totalRows; i++) {
164 if (i > 0 && i % getReportingPeriod() == 0) {
165 LOG.info("Processed " + i + " rows.");
166 }
167 doRow(i);
168 }
169 elapsedTime = System.currentTimeMillis() - startTime;
170 } finally {
171 tearDown();
172 }
173 return elapsedTime;
174 }
175
176 }
177
178 static class SequentialWriteBenchmark extends RowOrientedBenchmark {
179
180 protected MapFile.Writer writer;
181 private Random random = new Random();
182 private byte[] bytes = new byte[ROW_LENGTH];
183 private ImmutableBytesWritable key = new ImmutableBytesWritable();
184 private ImmutableBytesWritable value = new ImmutableBytesWritable();
185
186 public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
187 int totalRows) {
188 super(conf, fs, mf, totalRows);
189 }
190
191 @Override
192 void setUp() throws Exception {
193 writer = new MapFile.Writer(conf, fs, mf.toString(),
194 ImmutableBytesWritable.class, ImmutableBytesWritable.class);
195 }
196
197 @Override
198 void doRow(int i) throws Exception {
199 value.set(generateValue());
200 writer.append(format(i, key), value);
201 }
202
203 private byte[] generateValue() {
204 random.nextBytes(bytes);
205 return bytes;
206 }
207
208 @Override
209 protected int getReportingPeriod() {
210 return this.totalRows;
211 }
212
213 @Override
214 void tearDown() throws Exception {
215 writer.close();
216 }
217
218 }
219
220 static abstract class ReadBenchmark extends RowOrientedBenchmark {
221 ImmutableBytesWritable key = new ImmutableBytesWritable();
222 ImmutableBytesWritable value = new ImmutableBytesWritable();
223
224 protected MapFile.Reader reader;
225
226 public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
227 int totalRows) {
228 super(conf, fs, mf, totalRows);
229 }
230
231 @Override
232 void setUp() throws Exception {
233 reader = new MapFile.Reader(fs, mf.toString(), conf);
234 }
235
236 @Override
237 void tearDown() throws Exception {
238 reader.close();
239 }
240
241 }
242
243 static class SequentialReadBenchmark extends ReadBenchmark {
244 ImmutableBytesWritable verify = new ImmutableBytesWritable();
245
246 public SequentialReadBenchmark(Configuration conf, FileSystem fs,
247 Path mf, int totalRows) {
248 super(conf, fs, mf, totalRows);
249 }
250
251 @Override
252 void doRow(int i) throws Exception {
253 this.reader.next(key, value);
254 PerformanceEvaluationCommons.assertKey(this.key.get(),
255 format(i, this.verify).get());
256 PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, value.getSize());
257 }
258
259 @Override
260 protected int getReportingPeriod() {
261 return this.totalRows;
262 }
263
264 }
265
266 static class UniformRandomReadBenchmark extends ReadBenchmark {
267
268 private Random random = new Random();
269
270 public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
271 Path mf, int totalRows) {
272 super(conf, fs, mf, totalRows);
273 }
274
275 @Override
276 void doRow(int i) throws Exception {
277 ImmutableBytesWritable k = getRandomRow();
278 ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
279 PerformanceEvaluationCommons.assertValueSize(r.getSize(), ROW_LENGTH);
280 }
281
282 private ImmutableBytesWritable getRandomRow() {
283 return format(random.nextInt(totalRows), key);
284 }
285
286 }
287
288 static class UniformRandomSmallScan extends ReadBenchmark {
289 private Random random = new Random();
290
291 public UniformRandomSmallScan(Configuration conf, FileSystem fs,
292 Path mf, int totalRows) {
293 super(conf, fs, mf, totalRows/10);
294 }
295
296 @Override
297 void doRow(int i) throws Exception {
298 ImmutableBytesWritable ibw = getRandomRow();
299 WritableComparable<?> wc = this.reader.getClosest(ibw, this.value);
300 if (wc == null) {
301 throw new NullPointerException();
302 }
303 PerformanceEvaluationCommons.assertKey(ibw.get(),
304 ((ImmutableBytesWritable)wc).get());
305
306 for (int ii = 0; ii < 29; ii++) {
307 this.reader.next(this.key, this.value);
308 PerformanceEvaluationCommons.assertValueSize(this.value.getSize(), ROW_LENGTH);
309 }
310 }
311
312 private ImmutableBytesWritable getRandomRow() {
313 return format(random.nextInt(totalRows), key);
314 }
315 }
316
317 static class GaussianRandomReadBenchmark extends ReadBenchmark {
318 private RandomData randomData = new RandomDataImpl();
319
320 public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
321 Path mf, int totalRows) {
322 super(conf, fs, mf, totalRows);
323 }
324
325 @Override
326 void doRow(int i) throws Exception {
327 ImmutableBytesWritable k = getGaussianRandomRow();
328 ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
329 PerformanceEvaluationCommons.assertValueSize(r.getSize(), ROW_LENGTH);
330 }
331
332 private ImmutableBytesWritable getGaussianRandomRow() {
333 int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
334 (double)totalRows / 10.0);
335 return format(r, key);
336 }
337
338 }
339
340
341
342
343
344
345 public static void main(String[] args) throws Exception {
346 new MapFilePerformanceEvaluation(HBaseConfiguration.create()).
347 runBenchmarks();
348 }
349 }