1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.fs.FileStatus;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HBaseTestCase;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.regionserver.wal.HLog;
35  import org.apache.hadoop.hbase.client.Delete;
36  import org.apache.hadoop.hbase.client.Get;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hdfs.MiniDFSCluster;
43  
44  import org.mockito.invocation.InvocationOnMock;
45  import org.mockito.stubbing.Answer;
46  import static org.mockito.Mockito.doAnswer;
47  import static org.mockito.Mockito.spy;
48  
49  
50  /**
51   * Test compactions
52   */
53  public class TestCompaction extends HBaseTestCase {
54    static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
55    private HRegion r = null;
56    private Path compactionDir = null;
57    private Path regionCompactionDir = null;
58    private static final byte [] COLUMN_FAMILY = fam1;
59    private final byte [] STARTROW = Bytes.toBytes(START_KEY);
60    private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
61    private int compactionThreshold;
62    private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
63    final private byte[] col1, col2;
64  
65    private MiniDFSCluster cluster;
66  
67    /** constructor */
68    public TestCompaction() throws Exception {
69      super();
70  
71      // Set cache flush size to 1MB
72      conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
73      conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
74      this.cluster = null;
75      compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
76  
77      firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
78      secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
79      // Increment the least significant character so we get to next row.
80      secondRowBytes[START_KEY_BYTES.length - 1]++;
81      thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
82      thirdRowBytes[START_KEY_BYTES.length - 1]++;
83      thirdRowBytes[START_KEY_BYTES.length - 1]++;
84      col1 = "column1".getBytes(HConstants.UTF8_ENCODING);
85      col2 = "column2".getBytes(HConstants.UTF8_ENCODING);
86    }
87  
88    @Override
89    public void setUp() throws Exception {
90      this.cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
91      // Make the hbase rootdir match the minidfs we just span up
92      this.conf.set(HConstants.HBASE_DIR,
93        this.cluster.getFileSystem().getHomeDirectory().toString());
94      super.setUp();
95      HTableDescriptor htd = createTableDescriptor(getName());
96      this.r = createNewHRegion(htd, null, null);
97    }
98  
99    @Override
100   public void tearDown() throws Exception {
101     HLog hlog = r.getLog();
102     this.r.close();
103     hlog.closeAndDelete();
104     if (this.cluster != null) {
105       shutdownDfs(cluster);
106     }
107     super.tearDown();
108   }
109 
110   /**
111    * Test that on a major compaction, if all cells are expired or deleted, then
112    * we'll end up with no product.  Make sure scanner over region returns
113    * right answer in this case - and that it just basically works.
114    * @throws IOException
115    */
116   public void testMajorCompactingToNoOutput() throws IOException {
117     createStoreFile(r);
118     for (int i = 0; i < compactionThreshold; i++) {
119       createStoreFile(r);
120     }
121     // Now delete everything.
122     InternalScanner s = r.getScanner(new Scan());
123     do {
124       List<KeyValue> results = new ArrayList<KeyValue>();
125       boolean result = s.next(results);
126       r.delete(new Delete(results.get(0).getRow()), null, false);
127       if (!result) break;
128     } while(true);
129     // Flush
130     r.flushcache();
131     // Major compact.
132     r.compactStores(true);
133     s = r.getScanner(new Scan());
134     int counter = 0;
135     do {
136       List<KeyValue> results = new ArrayList<KeyValue>();
137       boolean result = s.next(results);
138       if (!result) break;
139       counter++;
140     } while(true);
141     assertEquals(0, counter);
142   }
143 
144   /**
145    * Run compaction and flushing memstore
146    * Assert deletes get cleaned up.
147    * @throws Exception
148    */
149   public void testMajorCompaction() throws Exception {
150     createStoreFile(r);
151     for (int i = 0; i < compactionThreshold; i++) {
152       createStoreFile(r);
153     }
154     // Add more content.
155     addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
156 
157     // Now there are about 5 versions of each column.
158     // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
159     //
160     // Assert == 3 when we ask for versions.
161     Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
162     assertEquals(compactionThreshold, result.size());
163 
164     r.flushcache();
165     r.compactStores(true);
166 
167     // look at the second row
168     // Increment the least significant character so we get to next row.
169     byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
170     secondRowBytes[START_KEY_BYTES.length - 1]++;
171 
172     // Always 3 versions if that is what max versions is.
173     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
174     assertEquals(compactionThreshold, result.size());
175 
176     // Now add deletes to memstore and then flush it.
177     // That will put us over
178     // the compaction threshold of 3 store files.  Compacting these store files
179     // should result in a compacted store file that has no references to the
180     // deleted row.
181     Delete delete = new Delete(secondRowBytes, System.currentTimeMillis(), null);
182     byte [][] famAndQf = {COLUMN_FAMILY, null};
183     delete.deleteFamily(famAndQf[0]);
184     r.delete(delete, null, true);
185 
186     // Assert deleted.
187     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
188     assertTrue("Second row should have been deleted", result.isEmpty());
189 
190     r.flushcache();
191 
192     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
193     assertTrue("Second row should have been deleted", result.isEmpty());
194 
195     // Add a bit of data and flush.  Start adding at 'bbb'.
196     createSmallerStoreFile(this.r);
197     r.flushcache();
198     // Assert that the second row is still deleted.
199     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
200     assertTrue("Second row should still be deleted", result.isEmpty());
201 
202     // Force major compaction.
203     r.compactStores(true);
204     assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
205 
206     result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
207     assertTrue("Second row should still be deleted", result.isEmpty());
208 
209     // Make sure the store files do have some 'aaa' keys in them -- exactly 3.
210     // Also, that compacted store files do not have any secondRowBytes because
211     // they were deleted.
212     verifyCounts(3,0);
213 
214     // Multiple versions allowed for an entry, so the delete isn't enough
215     // Lower TTL and expire to ensure that all our entries have been wiped
216     final int ttlInSeconds = 1;
217     for (Store store: this.r.stores.values()) {
218       store.ttl = ttlInSeconds * 1000;
219     }
220     Thread.sleep(ttlInSeconds * 1000);
221 
222     r.compactStores(true);
223     int count = count();
224     assertTrue("Should not see anything after TTL has expired", count == 0);
225   }
226 
227   public void testMinorCompactionWithDeleteRow() throws Exception {
228     Delete deleteRow = new Delete(secondRowBytes);
229     testMinorCompactionWithDelete(deleteRow);
230   }
231   public void testMinorCompactionWithDeleteColumn1() throws Exception {
232     Delete dc = new Delete(secondRowBytes);
233     /* delete all timestamps in the column */
234     dc.deleteColumns(fam2, col2);
235     testMinorCompactionWithDelete(dc);
236   }
237   public void testMinorCompactionWithDeleteColumn2() throws Exception {
238     Delete dc = new Delete(secondRowBytes);
239     dc.deleteColumn(fam2, col2);
240     /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
241      * we only delete the latest version. One might expect to see only
242      * versions 1 and 2. HBase differs, and gives us 0, 1 and 2.
243      * This is okay as well. Since there was no compaction done before the
244      * delete, version 0 seems to stay on.
245      */
246     //testMinorCompactionWithDelete(dc, 2);
247     testMinorCompactionWithDelete(dc, 3);
248   }
249   public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
250     Delete deleteCF = new Delete(secondRowBytes);
251     deleteCF.deleteFamily(fam2);
252     testMinorCompactionWithDelete(deleteCF);
253   }
254   public void testMinorCompactionWithDeleteVersion1() throws Exception {
255     Delete deleteVersion = new Delete(secondRowBytes);
256     deleteVersion.deleteColumns(fam2, col2, 2);
257     /* compactionThreshold is 3. The table has 4 versions: 0, 1, 2, and 3.
258      * We delete versions 0 ... 2. So, we still have one remaining.
259      */
260     testMinorCompactionWithDelete(deleteVersion, 1);
261   }
262   public void testMinorCompactionWithDeleteVersion2() throws Exception {
263     Delete deleteVersion = new Delete(secondRowBytes);
264     deleteVersion.deleteColumn(fam2, col2, 1);
265     /*
266      * the table has 4 versions: 0, 1, 2, and 3.
267      * 0 does not count.
268      * We delete 1.
269      * Should have 2 remaining.
270      */
271     testMinorCompactionWithDelete(deleteVersion, 2);
272   }
273 
274   /*
275    * A helper function to test the minor compaction algorithm. We check that
276    * the delete markers are left behind. Takes delete as an argument, which
277    * can be any delete (row, column, columnfamliy etc), that essentially
278    * deletes row2 and column2. row1 and column1 should be undeleted
279    */
280   private void testMinorCompactionWithDelete(Delete delete) throws Exception {
281     testMinorCompactionWithDelete(delete, 0);
282   }
283   private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
284     HRegionIncommon loader = new HRegionIncommon(r);
285     for (int i = 0; i < compactionThreshold + 1; i++) {
286       addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
287       addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
288       addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
289       addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
290       r.flushcache();
291     }
292 
293     Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
294     assertEquals(compactionThreshold, result.size());
295     result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
296     assertEquals(compactionThreshold, result.size());
297 
298     // Now add deletes to memstore and then flush it.  That will put us over
299     // the compaction threshold of 3 store files.  Compacting these store files
300     // should result in a compacted store file that has no references to the
301     // deleted row.
302     r.delete(delete, null, true);
303 
304     // Make sure that we have only deleted family2 from secondRowBytes
305     result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
306     assertEquals(expectedResultsAfterDelete, result.size());
307     // but we still have firstrow
308     result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
309     assertEquals(compactionThreshold, result.size());
310 
311     r.flushcache();
312     // should not change anything.
313     // Let us check again
314 
315     // Make sure that we have only deleted family2 from secondRowBytes
316     result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
317     assertEquals(expectedResultsAfterDelete, result.size());
318     // but we still have firstrow
319     result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
320     assertEquals(compactionThreshold, result.size());
321 
322     // do a compaction
323     Store store2 = this.r.stores.get(fam2);
324     int numFiles1 = store2.getStorefiles().size();
325     assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
326     store2.compactRecent(compactionThreshold);   // = 3
327     int numFiles2 = store2.getStorefiles().size();
328     // Check that we did compact
329     assertTrue("Number of store files should go down", numFiles1 > numFiles2);
330     // Check that it was a minor compaction.
331     assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
332 
333     // Make sure that we have only deleted family2 from secondRowBytes
334     result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
335     assertEquals(expectedResultsAfterDelete, result.size());
336     // but we still have firstrow
337     result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
338     assertEquals(compactionThreshold, result.size());
339   }
340 
341   private void verifyCounts(int countRow1, int countRow2) throws Exception {
342     int count1 = 0;
343     int count2 = 0;
344     for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
345       HFileScanner scanner = f.getReader().getScanner(false, false);
346       scanner.seekTo();
347       do {
348         byte [] row = scanner.getKeyValue().getRow();
349         if (Bytes.equals(row, STARTROW)) {
350           count1++;
351         } else if(Bytes.equals(row, secondRowBytes)) {
352           count2++;
353         }
354       } while(scanner.next());
355     }
356     assertEquals(countRow1,count1);
357     assertEquals(countRow2,count2);
358   }
359 
360   /**
361    * Verify that you can stop a long-running compaction
362    * (used during RS shutdown)
363    * @throws Exception
364    */
365   public void testInterruptCompaction() throws Exception {
366     assertEquals(0, count());
367 
368     // lower the polling interval for this test
369     int origWI = Store.closeCheckInterval;
370     Store.closeCheckInterval = 10*1000; // 10 KB
371 
372     try {
373       // Create a couple store files w/ 15KB (over 10KB interval)
374       int jmax = (int) Math.ceil(15.0/compactionThreshold);
375       byte [] pad = new byte[1000]; // 1 KB chunk
376       for (int i = 0; i < compactionThreshold; i++) {
377         HRegionIncommon loader = new HRegionIncommon(r);
378         Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
379         for (int j = 0; j < jmax; j++) {
380           p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
381         }
382         addContent(loader, Bytes.toString(COLUMN_FAMILY));
383         loader.put(p);
384         loader.flushcache();
385       }
386 
387       HRegion spyR = spy(r);
388       doAnswer(new Answer() {
389         public Object answer(InvocationOnMock invocation) throws Throwable {
390           r.writestate.writesEnabled = false;
391           return invocation.callRealMethod();
392         }
393       }).when(spyR).doRegionCompactionPrep();
394 
395       // force a minor compaction, but not before requesting a stop
396       spyR.compactStores();
397 
398       // ensure that the compaction stopped, all old files are intact,
399       Store s = r.stores.get(COLUMN_FAMILY);
400       assertEquals(compactionThreshold, s.getStorefilesCount());
401       assertTrue(s.getStorefilesSize() > 15*1000);
402       // and no new store files persisted past compactStores()
403       FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
404       assertEquals(0, ls.length);
405 
406     } finally {
407       // don't mess up future tests
408       r.writestate.writesEnabled = true;
409       Store.closeCheckInterval = origWI;
410 
411       // Delete all Store information once done using
412       for (int i = 0; i < compactionThreshold; i++) {
413         Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
414         byte [][] famAndQf = {COLUMN_FAMILY, null};
415         delete.deleteFamily(famAndQf[0]);
416         r.delete(delete, null, true);
417       }
418       r.flushcache();
419 
420       // Multiple versions allowed for an entry, so the delete isn't enough
421       // Lower TTL and expire to ensure that all our entries have been wiped
422       final int ttlInSeconds = 1;
423       for (Store store: this.r.stores.values()) {
424         store.ttl = ttlInSeconds * 1000;
425       }
426       Thread.sleep(ttlInSeconds * 1000);
427 
428       r.compactStores(true);
429       assertEquals(0, count());
430     }
431   }
432 
433   private int count() throws IOException {
434     int count = 0;
435     for (StoreFile f: this.r.stores.
436         get(COLUMN_FAMILY_TEXT).getStorefiles()) {
437       HFileScanner scanner = f.getReader().getScanner(false, false);
438       if (!scanner.seekTo()) {
439         continue;
440       }
441       do {
442         count++;
443       } while(scanner.next());
444     }
445     return count;
446   }
447 
448   private void createStoreFile(final HRegion region) throws IOException {
449     HRegionIncommon loader = new HRegionIncommon(region);
450     addContent(loader, Bytes.toString(COLUMN_FAMILY));
451     loader.flushcache();
452   }
453 
454   private void createSmallerStoreFile(final HRegion region) throws IOException {
455     HRegionIncommon loader = new HRegionIncommon(region);
456     addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
457     		"bbb").getBytes(), null);
458     loader.flushcache();
459   }
460 }