1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.Random;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
32  import org.apache.hadoop.hbase.client.Get;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.junit.Ignore;
40  import org.junit.Test;
41  
42  import com.google.common.collect.Lists;
43  
44  /**
45   * Test case that uses multiple threads to read and write multifamily rows
46   * into a table, verifying that reads never see partially-complete writes.
47   * 
48   * This can run as a junit test, or with a main() function which runs against
49   * a real cluster (eg for testing with failures, region movement, etc)
50   */
51  public class BROKE_TODO_FIX_TestAcidGuarantees {
52    protected static final Log LOG = LogFactory.getLog(BROKE_TODO_FIX_TestAcidGuarantees.class);
53    public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
54    public static final byte [] FAMILY_A = Bytes.toBytes("A");
55    public static final byte [] FAMILY_B = Bytes.toBytes("B");
56    public static final byte [] FAMILY_C = Bytes.toBytes("C");
57    public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
58  
59    public static final byte[][] FAMILIES = new byte[][] {
60      FAMILY_A, FAMILY_B, FAMILY_C };
61  
62    private HBaseTestingUtility util;
63  
64    public static int NUM_COLS_TO_CHECK = 50;
65  
66    private void createTableIfMissing()
67      throws IOException {
68      try {
69        util.createTable(TABLE_NAME, FAMILIES);
70      } catch (TableExistsException tee) {
71      }
72    }
73  
74    public BROKE_TODO_FIX_TestAcidGuarantees() {
75      // Set small flush size for minicluster so we exercise reseeking scanners
76      Configuration conf = HBaseConfiguration.create();
77      conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
78      util = new HBaseTestingUtility(conf);
79    }
80    
81    /**
82     * Thread that does random full-row writes into a table.
83     */
84    public static class AtomicityWriter extends RepeatingTestThread {
85      Random rand = new Random();
86      byte data[] = new byte[10];
87      byte targetRows[][];
88      byte targetFamilies[][];
89      HTable table;
90      AtomicLong numWritten = new AtomicLong();
91      
92      public AtomicityWriter(TestContext ctx, byte targetRows[][],
93                             byte targetFamilies[][]) throws IOException {
94        super(ctx);
95        this.targetRows = targetRows;
96        this.targetFamilies = targetFamilies;
97        table = new HTable(ctx.getConf(), TABLE_NAME);
98      }
99      public void doAnAction() throws Exception {
100       // Pick a random row to write into
101       byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
102       Put p = new Put(targetRow); 
103       rand.nextBytes(data);
104 
105       for (byte[] family : targetFamilies) {
106         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
107           byte qualifier[] = Bytes.toBytes("col" + i);
108           p.add(family, qualifier, data);
109         }
110       }
111       table.put(p);
112       numWritten.getAndIncrement();
113     }
114   }
115   
116   /**
117    * Thread that does single-row reads in a table, looking for partially
118    * completed rows.
119    */
120   public static class AtomicGetReader extends RepeatingTestThread {
121     byte targetRow[];
122     byte targetFamilies[][];
123     HTable table;
124     int numVerified = 0;
125     AtomicLong numRead = new AtomicLong();
126 
127     public AtomicGetReader(TestContext ctx, byte targetRow[],
128                            byte targetFamilies[][]) throws IOException {
129       super(ctx);
130       this.targetRow = targetRow;
131       this.targetFamilies = targetFamilies;
132       table = new HTable(ctx.getConf(), TABLE_NAME);
133     }
134 
135     public void doAnAction() throws Exception {
136       Get g = new Get(targetRow);
137       Result res = table.get(g);
138       byte[] gotValue = null;
139       if (res.getRow() == null) {
140         // Trying to verify but we didn't find the row - the writing
141         // thread probably just hasn't started writing yet, so we can
142         // ignore this action
143         return;
144       }
145       
146       for (byte[] family : targetFamilies) {
147         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
148           byte qualifier[] = Bytes.toBytes("col" + i);
149           byte thisValue[] = res.getValue(family, qualifier);
150           if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
151             gotFailure(gotValue, res);
152           }
153           numVerified++;
154           gotValue = thisValue;
155         }
156       }
157       numRead.getAndIncrement();
158     }
159 
160     private void gotFailure(byte[] expected, Result res) {
161       StringBuilder msg = new StringBuilder();
162       msg.append("Failed after ").append(numVerified).append("!");
163       msg.append("Expected=").append(Bytes.toStringBinary(expected));
164       msg.append("Got:\n");
165       for (KeyValue kv : res.list()) {
166         msg.append(kv.toString());
167         msg.append(" val= ");
168         msg.append(Bytes.toStringBinary(kv.getValue()));
169         msg.append("\n");
170       }
171       throw new RuntimeException(msg.toString());
172     }
173   }
174   
175   /**
176    * Thread that does full scans of the table looking for any partially completed
177    * rows.
178    */
179   public static class AtomicScanReader extends RepeatingTestThread {
180     byte targetFamilies[][];
181     HTable table;
182     AtomicLong numScans = new AtomicLong();
183     AtomicLong numRowsScanned = new AtomicLong();
184 
185     public AtomicScanReader(TestContext ctx,
186                            byte targetFamilies[][]) throws IOException {
187       super(ctx);
188       this.targetFamilies = targetFamilies;
189       table = new HTable(ctx.getConf(), TABLE_NAME);
190     }
191 
192     public void doAnAction() throws Exception {
193       Scan s = new Scan();
194       for (byte[] family : targetFamilies) {
195         s.addFamily(family);
196       }
197       ResultScanner scanner = table.getScanner(s);
198       
199       for (Result res : scanner) {
200         byte[] gotValue = null;
201   
202         for (byte[] family : targetFamilies) {
203           for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
204             byte qualifier[] = Bytes.toBytes("col" + i);
205             byte thisValue[] = res.getValue(family, qualifier);
206             if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
207               gotFailure(gotValue, res);
208             }
209             gotValue = thisValue;
210           }
211         }
212         numRowsScanned.getAndIncrement();
213       }
214       numScans.getAndIncrement();
215     }
216 
217     private void gotFailure(byte[] expected, Result res) {
218       StringBuilder msg = new StringBuilder();
219       msg.append("Failed after ").append(numRowsScanned).append("!");
220       msg.append("Expected=").append(Bytes.toStringBinary(expected));
221       msg.append("Got:\n");
222       for (KeyValue kv : res.list()) {
223         msg.append(kv.toString());
224         msg.append(" val= ");
225         msg.append(Bytes.toStringBinary(kv.getValue()));
226         msg.append("\n");
227       }
228       throw new RuntimeException(msg.toString());
229     }
230   }
231 
232 
233   public void runTestAtomicity(long millisToRun,
234       int numWriters,
235       int numGetters,
236       int numScanners,
237       int numUniqueRows) throws Exception {
238     createTableIfMissing();
239     TestContext ctx = new TestContext(util.getConfiguration());
240     
241     byte rows[][] = new byte[numUniqueRows][];
242     for (int i = 0; i < numUniqueRows; i++) {
243       rows[i] = Bytes.toBytes("test_row_" + i);
244     }
245     
246     List<AtomicityWriter> writers = Lists.newArrayList();
247     for (int i = 0; i < numWriters; i++) {
248       AtomicityWriter writer = new AtomicityWriter(
249           ctx, rows, FAMILIES);
250       writers.add(writer);
251       ctx.addThread(writer);
252     }
253 
254     List<AtomicGetReader> getters = Lists.newArrayList();
255     for (int i = 0; i < numGetters; i++) {
256       AtomicGetReader getter = new AtomicGetReader(
257           ctx, rows[i % numUniqueRows], FAMILIES);
258       getters.add(getter);
259       ctx.addThread(getter);
260     }
261     
262     List<AtomicScanReader> scanners = Lists.newArrayList();
263     for (int i = 0; i < numScanners; i++) {
264       AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
265       scanners.add(scanner);
266       ctx.addThread(scanner);
267     }
268     
269     ctx.startThreads();
270     ctx.waitFor(millisToRun);
271     ctx.stop();
272     
273     LOG.info("Finished test. Writers:");
274     for (AtomicityWriter writer : writers) {
275       LOG.info("  wrote " + writer.numWritten.get());
276     }
277     LOG.info("Readers:");
278     for (AtomicGetReader reader : getters) {
279       LOG.info("  read " + reader.numRead.get());
280     }
281     LOG.info("Scanners:");
282     for (AtomicScanReader scanner : scanners) {
283       LOG.info("  scanned " + scanner.numScans.get());
284       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
285     }
286   }
287 
288   @Test
289   public void testGetAtomicity() throws Exception {
290     util.startMiniCluster(1);
291     try {
292       runTestAtomicity(20000, 5, 5, 0, 3);
293     } finally {
294       util.shutdownMiniCluster();
295     }    
296   }
297 
298   @Test
299   @Ignore("Currently not passing - see HBASE-2670")
300   public void testScanAtomicity() throws Exception {
301     util.startMiniCluster(1);
302     try {
303       runTestAtomicity(20000, 5, 0, 5, 3);
304     } finally {
305       util.shutdownMiniCluster();
306     }    
307   }
308 
309   @Test
310   @Ignore("Currently not passing - see HBASE-2670")
311   public void testMixedAtomicity() throws Exception {
312     util.startMiniCluster(1);
313     try {
314       runTestAtomicity(20000, 5, 2, 2, 3);
315     } finally {
316       util.shutdownMiniCluster();
317     }    
318   }
319 
320   public static void main(String args[]) throws Exception {
321     Configuration c = HBaseConfiguration.create();
322     BROKE_TODO_FIX_TestAcidGuarantees test = new BROKE_TODO_FIX_TestAcidGuarantees();
323     test.setConf(c);
324     test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
325   }
326 
327   private void setConf(Configuration c) {
328     util = new HBaseTestingUtility(c);
329   }
330 }