1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.IOException;
23  import java.util.Random;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.commons.math.random.RandomData;
28  import org.apache.commons.math.random.RandomDataImpl;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.io.MapFile;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.io.WritableComparable;
36  
37  /**
38   * <p>
39   * This class runs performance benchmarks for {@link MapFile}.
40   * </p>
41   */
42  public class MapFilePerformanceEvaluation {
43    protected final Configuration conf;
44    private static final int ROW_LENGTH = 10;
45    private static final int ROW_COUNT = 100000;
46  
47    static final Log LOG =
48      LogFactory.getLog(MapFilePerformanceEvaluation.class.getName());
49  
50    /**
51     * @param c
52     */
53    public MapFilePerformanceEvaluation(final Configuration c) {
54      super();
55      this.conf = c;
56    }
57  
58    static ImmutableBytesWritable format(final int i, ImmutableBytesWritable w) {
59      String v = Integer.toString(i);
60      w.set(Bytes.toBytes("0000000000".substring(v.length()) + v));
61      return w;
62    }
63  
64    private void runBenchmarks() throws Exception {
65      final FileSystem fs = FileSystem.get(this.conf);
66      final Path mf = fs.makeQualified(new Path("performanceevaluation.mapfile"));
67      if (fs.exists(mf)) {
68        fs.delete(mf, true);
69      }
70      runBenchmark(new SequentialWriteBenchmark(conf, fs, mf, ROW_COUNT),
71          ROW_COUNT);
72  
73      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
74        public void run() {
75          try {
76            runBenchmark(new UniformRandomSmallScan(conf, fs, mf, ROW_COUNT),
77              ROW_COUNT);
78          } catch (Exception e) {
79            e.printStackTrace();
80          }
81        }
82      });
83      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
84        public void run() {
85          try {
86            runBenchmark(new UniformRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
87                ROW_COUNT);
88          } catch (Exception e) {
89            e.printStackTrace();
90          }
91        }
92      });
93      PerformanceEvaluationCommons.concurrentReads(new Runnable() {
94        public void run() {
95          try {
96            runBenchmark(new GaussianRandomReadBenchmark(conf, fs, mf, ROW_COUNT),
97                ROW_COUNT);
98          } catch (Exception e) {
99            e.printStackTrace();
100         }
101       }
102     });
103     PerformanceEvaluationCommons.concurrentReads(new Runnable() {
104       public void run() {
105         try {
106           runBenchmark(new SequentialReadBenchmark(conf, fs, mf, ROW_COUNT),
107               ROW_COUNT);
108         } catch (Exception e) {
109           e.printStackTrace();
110         }
111       }
112     });
113   }
114 
115   protected void runBenchmark(RowOrientedBenchmark benchmark, int rowCount)
116     throws Exception {
117     LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
118         rowCount + " rows.");
119     long elapsedTime = benchmark.run();
120     LOG.info("Running " + benchmark.getClass().getSimpleName() + " for " +
121         rowCount + " rows took " + elapsedTime + "ms.");
122   }
123 
124   static abstract class RowOrientedBenchmark {
125 
126     protected final Configuration conf;
127     protected final FileSystem fs;
128     protected final Path mf;
129     protected final int totalRows;
130 
131     public RowOrientedBenchmark(Configuration conf, FileSystem fs, Path mf,
132         int totalRows) {
133       this.conf = conf;
134       this.fs = fs;
135       this.mf = mf;
136       this.totalRows = totalRows;
137     }
138 
139     void setUp() throws Exception {
140       // do nothing
141     }
142 
143     abstract void doRow(int i) throws Exception;
144 
145     protected int getReportingPeriod() {
146       return this.totalRows / 10;
147     }
148 
149     void tearDown() throws Exception {
150       // do nothing
151     }
152 
153     /**
154      * Run benchmark
155      * @return elapsed time.
156      * @throws Exception
157      */
158     long run() throws Exception {
159       long elapsedTime;
160       setUp();
161       long startTime = System.currentTimeMillis();
162       try {
163         for (int i = 0; i < totalRows; i++) {
164           if (i > 0 && i % getReportingPeriod() == 0) {
165             LOG.info("Processed " + i + " rows.");
166           }
167           doRow(i);
168         }
169         elapsedTime = System.currentTimeMillis() - startTime;
170       } finally {
171         tearDown();
172       }
173       return elapsedTime;
174     }
175 
176   }
177 
178   static class SequentialWriteBenchmark extends RowOrientedBenchmark {
179 
180     protected MapFile.Writer writer;
181     private Random random = new Random();
182     private byte[] bytes = new byte[ROW_LENGTH];
183     private ImmutableBytesWritable key = new ImmutableBytesWritable();
184     private ImmutableBytesWritable value = new ImmutableBytesWritable();
185 
186     public SequentialWriteBenchmark(Configuration conf, FileSystem fs, Path mf,
187         int totalRows) {
188       super(conf, fs, mf, totalRows);
189     }
190 
191     @Override
192     void setUp() throws Exception {
193       writer = new MapFile.Writer(conf, fs, mf.toString(),
194         ImmutableBytesWritable.class, ImmutableBytesWritable.class);
195     }
196 
197     @Override
198     void doRow(int i) throws Exception {
199       value.set(generateValue());
200       writer.append(format(i, key), value);
201     }
202 
203     private byte[] generateValue() {
204       random.nextBytes(bytes);
205       return bytes;
206     }
207 
208     @Override
209     protected int getReportingPeriod() {
210       return this.totalRows; // don't report progress
211     }
212 
213     @Override
214     void tearDown() throws Exception {
215       writer.close();
216     }
217 
218   }
219 
220   static abstract class ReadBenchmark extends RowOrientedBenchmark {
221     ImmutableBytesWritable key = new ImmutableBytesWritable();
222     ImmutableBytesWritable value = new ImmutableBytesWritable();
223 
224     protected MapFile.Reader reader;
225 
226     public ReadBenchmark(Configuration conf, FileSystem fs, Path mf,
227         int totalRows) {
228       super(conf, fs, mf, totalRows);
229     }
230 
231     @Override
232     void setUp() throws Exception {
233       reader = new MapFile.Reader(fs, mf.toString(), conf);
234     }
235 
236     @Override
237     void tearDown() throws Exception {
238       reader.close();
239     }
240 
241   }
242 
243   static class SequentialReadBenchmark extends ReadBenchmark {
244     ImmutableBytesWritable verify = new ImmutableBytesWritable();
245 
246     public SequentialReadBenchmark(Configuration conf, FileSystem fs,
247         Path mf, int totalRows) {
248       super(conf, fs, mf, totalRows);
249     }
250 
251     @Override
252     void doRow(int i) throws Exception {
253       this.reader.next(key, value);
254       PerformanceEvaluationCommons.assertKey(this.key.get(),
255         format(i, this.verify).get());
256       PerformanceEvaluationCommons.assertValueSize(ROW_LENGTH, value.getSize());
257     }
258 
259     @Override
260     protected int getReportingPeriod() {
261       return this.totalRows; // don't report progress
262     }
263 
264   }
265 
266   static class UniformRandomReadBenchmark extends ReadBenchmark {
267 
268     private Random random = new Random();
269 
270     public UniformRandomReadBenchmark(Configuration conf, FileSystem fs,
271         Path mf, int totalRows) {
272       super(conf, fs, mf, totalRows);
273     }
274 
275     @Override
276     void doRow(int i) throws Exception {
277       ImmutableBytesWritable k = getRandomRow();
278       ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
279       PerformanceEvaluationCommons.assertValueSize(r.getSize(), ROW_LENGTH);
280     }
281 
282     private ImmutableBytesWritable getRandomRow() {
283       return format(random.nextInt(totalRows), key);
284     }
285 
286   }
287 
288   static class UniformRandomSmallScan extends ReadBenchmark {
289     private Random random = new Random();
290 
291     public UniformRandomSmallScan(Configuration conf, FileSystem fs,
292         Path mf, int totalRows) {
293       super(conf, fs, mf, totalRows/10);
294     }
295 
296     @Override
297     void doRow(int i) throws Exception {
298       ImmutableBytesWritable ibw = getRandomRow();
299       WritableComparable<?> wc = this.reader.getClosest(ibw, this.value);
300       if (wc == null) {
301         throw new NullPointerException();
302       }
303       PerformanceEvaluationCommons.assertKey(ibw.get(),
304         ((ImmutableBytesWritable)wc).get());
305       // TODO: Verify we're getting right values.
306       for (int ii = 0; ii < 29; ii++) {
307         this.reader.next(this.key, this.value);
308         PerformanceEvaluationCommons.assertValueSize(this.value.getSize(), ROW_LENGTH);
309       }
310     }
311 
312     private ImmutableBytesWritable getRandomRow() {
313       return format(random.nextInt(totalRows), key);
314     }
315   }
316 
317   static class GaussianRandomReadBenchmark extends ReadBenchmark {
318     private RandomData randomData = new RandomDataImpl();
319 
320     public GaussianRandomReadBenchmark(Configuration conf, FileSystem fs,
321         Path mf, int totalRows) {
322       super(conf, fs, mf, totalRows);
323     }
324 
325     @Override
326     void doRow(int i) throws Exception {
327       ImmutableBytesWritable k = getGaussianRandomRow();
328       ImmutableBytesWritable r = (ImmutableBytesWritable)reader.get(k, value);
329       PerformanceEvaluationCommons.assertValueSize(r.getSize(), ROW_LENGTH);
330     }
331 
332     private ImmutableBytesWritable getGaussianRandomRow() {
333       int r = (int) randomData.nextGaussian((double)totalRows / 2.0,
334           (double)totalRows / 10.0);
335       return format(r, key);
336     }
337 
338   }
339 
340   /**
341    * @param args
342    * @throws Exception
343    * @throws IOException
344    */
345   public static void main(String[] args) throws Exception {
346     new MapFilePerformanceEvaluation(HBaseConfiguration.create()).
347       runBenchmarks();
348   }
349 }