1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseTestCase;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.regionserver.wal.HLog;
35 import org.apache.hadoop.hbase.client.Delete;
36 import org.apache.hadoop.hbase.client.Get;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hdfs.MiniDFSCluster;
43
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import static org.mockito.Mockito.doAnswer;
47 import static org.mockito.Mockito.spy;
48
49
50
51
52
53 public class TestCompaction extends HBaseTestCase {
54 static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
55 private HRegion r = null;
56 private Path compactionDir = null;
57 private Path regionCompactionDir = null;
58 private static final byte [] COLUMN_FAMILY = fam1;
59 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
60 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
61 private int compactionThreshold;
62 private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
63 final private byte[] col1, col2;
64
65 private MiniDFSCluster cluster;
66
67
68 public TestCompaction() throws Exception {
69 super();
70
71
72 conf.setInt("hbase.hregion.memstore.flush.size", 1024*1024);
73 conf.setInt("hbase.hregion.memstore.block.multiplier", 100);
74 this.cluster = null;
75 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
76
77 firstRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
78 secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
79
80 secondRowBytes[START_KEY_BYTES.length - 1]++;
81 thirdRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
82 thirdRowBytes[START_KEY_BYTES.length - 1]++;
83 thirdRowBytes[START_KEY_BYTES.length - 1]++;
84 col1 = "column1".getBytes(HConstants.UTF8_ENCODING);
85 col2 = "column2".getBytes(HConstants.UTF8_ENCODING);
86 }
87
88 @Override
89 public void setUp() throws Exception {
90 this.cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
91
92 this.conf.set(HConstants.HBASE_DIR,
93 this.cluster.getFileSystem().getHomeDirectory().toString());
94 super.setUp();
95 HTableDescriptor htd = createTableDescriptor(getName());
96 this.r = createNewHRegion(htd, null, null);
97 }
98
99 @Override
100 public void tearDown() throws Exception {
101 HLog hlog = r.getLog();
102 this.r.close();
103 hlog.closeAndDelete();
104 if (this.cluster != null) {
105 shutdownDfs(cluster);
106 }
107 super.tearDown();
108 }
109
110
111
112
113
114
115
116 public void testMajorCompactingToNoOutput() throws IOException {
117 createStoreFile(r);
118 for (int i = 0; i < compactionThreshold; i++) {
119 createStoreFile(r);
120 }
121
122 InternalScanner s = r.getScanner(new Scan());
123 do {
124 List<KeyValue> results = new ArrayList<KeyValue>();
125 boolean result = s.next(results);
126 r.delete(new Delete(results.get(0).getRow()), null, false);
127 if (!result) break;
128 } while(true);
129
130 r.flushcache();
131
132 r.compactStores(true);
133 s = r.getScanner(new Scan());
134 int counter = 0;
135 do {
136 List<KeyValue> results = new ArrayList<KeyValue>();
137 boolean result = s.next(results);
138 if (!result) break;
139 counter++;
140 } while(true);
141 assertEquals(0, counter);
142 }
143
144
145
146
147
148
149 public void testMajorCompaction() throws Exception {
150 createStoreFile(r);
151 for (int i = 0; i < compactionThreshold; i++) {
152 createStoreFile(r);
153 }
154
155 addContent(new HRegionIncommon(r), Bytes.toString(COLUMN_FAMILY));
156
157
158
159
160
161 Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
162 assertEquals(compactionThreshold, result.size());
163
164 r.flushcache();
165 r.compactStores(true);
166
167
168
169 byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);
170 secondRowBytes[START_KEY_BYTES.length - 1]++;
171
172
173 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null);
174 assertEquals(compactionThreshold, result.size());
175
176
177
178
179
180
181 Delete delete = new Delete(secondRowBytes, System.currentTimeMillis(), null);
182 byte [][] famAndQf = {COLUMN_FAMILY, null};
183 delete.deleteFamily(famAndQf[0]);
184 r.delete(delete, null, true);
185
186
187 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
188 assertTrue("Second row should have been deleted", result.isEmpty());
189
190 r.flushcache();
191
192 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
193 assertTrue("Second row should have been deleted", result.isEmpty());
194
195
196 createSmallerStoreFile(this.r);
197 r.flushcache();
198
199 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
200 assertTrue("Second row should still be deleted", result.isEmpty());
201
202
203 r.compactStores(true);
204 assertEquals(r.getStore(COLUMN_FAMILY_TEXT).getStorefiles().size(), 1);
205
206 result = r.get(new Get(secondRowBytes).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null );
207 assertTrue("Second row should still be deleted", result.isEmpty());
208
209
210
211
212 verifyCounts(3,0);
213
214
215
216 final int ttlInSeconds = 1;
217 for (Store store: this.r.stores.values()) {
218 store.ttl = ttlInSeconds * 1000;
219 }
220 Thread.sleep(ttlInSeconds * 1000);
221
222 r.compactStores(true);
223 int count = count();
224 assertTrue("Should not see anything after TTL has expired", count == 0);
225 }
226
227 public void testMinorCompactionWithDeleteRow() throws Exception {
228 Delete deleteRow = new Delete(secondRowBytes);
229 testMinorCompactionWithDelete(deleteRow);
230 }
231 public void testMinorCompactionWithDeleteColumn1() throws Exception {
232 Delete dc = new Delete(secondRowBytes);
233
234 dc.deleteColumns(fam2, col2);
235 testMinorCompactionWithDelete(dc);
236 }
237 public void testMinorCompactionWithDeleteColumn2() throws Exception {
238 Delete dc = new Delete(secondRowBytes);
239 dc.deleteColumn(fam2, col2);
240
241
242
243
244
245
246
247 testMinorCompactionWithDelete(dc, 3);
248 }
249 public void testMinorCompactionWithDeleteColumnFamily() throws Exception {
250 Delete deleteCF = new Delete(secondRowBytes);
251 deleteCF.deleteFamily(fam2);
252 testMinorCompactionWithDelete(deleteCF);
253 }
254 public void testMinorCompactionWithDeleteVersion1() throws Exception {
255 Delete deleteVersion = new Delete(secondRowBytes);
256 deleteVersion.deleteColumns(fam2, col2, 2);
257
258
259
260 testMinorCompactionWithDelete(deleteVersion, 1);
261 }
262 public void testMinorCompactionWithDeleteVersion2() throws Exception {
263 Delete deleteVersion = new Delete(secondRowBytes);
264 deleteVersion.deleteColumn(fam2, col2, 1);
265
266
267
268
269
270
271 testMinorCompactionWithDelete(deleteVersion, 2);
272 }
273
274
275
276
277
278
279
280 private void testMinorCompactionWithDelete(Delete delete) throws Exception {
281 testMinorCompactionWithDelete(delete, 0);
282 }
283 private void testMinorCompactionWithDelete(Delete delete, int expectedResultsAfterDelete) throws Exception {
284 HRegionIncommon loader = new HRegionIncommon(r);
285 for (int i = 0; i < compactionThreshold + 1; i++) {
286 addContent(loader, Bytes.toString(fam1), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
287 addContent(loader, Bytes.toString(fam1), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
288 addContent(loader, Bytes.toString(fam2), Bytes.toString(col1), firstRowBytes, thirdRowBytes, i);
289 addContent(loader, Bytes.toString(fam2), Bytes.toString(col2), firstRowBytes, thirdRowBytes, i);
290 r.flushcache();
291 }
292
293 Result result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
294 assertEquals(compactionThreshold, result.size());
295 result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
296 assertEquals(compactionThreshold, result.size());
297
298
299
300
301
302 r.delete(delete, null, true);
303
304
305 result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
306 assertEquals(expectedResultsAfterDelete, result.size());
307
308 result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
309 assertEquals(compactionThreshold, result.size());
310
311 r.flushcache();
312
313
314
315
316 result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
317 assertEquals(expectedResultsAfterDelete, result.size());
318
319 result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
320 assertEquals(compactionThreshold, result.size());
321
322
323 Store store2 = this.r.stores.get(fam2);
324 int numFiles1 = store2.getStorefiles().size();
325 assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold);
326 store2.compactRecent(compactionThreshold);
327 int numFiles2 = store2.getStorefiles().size();
328
329 assertTrue("Number of store files should go down", numFiles1 > numFiles2);
330
331 assertTrue("Was not supposed to be a major compaction", numFiles2 > 1);
332
333
334 result = r.get(new Get(secondRowBytes).addColumn(fam2, col2).setMaxVersions(100), null);
335 assertEquals(expectedResultsAfterDelete, result.size());
336
337 result = r.get(new Get(firstRowBytes).addColumn(fam1, col1).setMaxVersions(100), null);
338 assertEquals(compactionThreshold, result.size());
339 }
340
341 private void verifyCounts(int countRow1, int countRow2) throws Exception {
342 int count1 = 0;
343 int count2 = 0;
344 for (StoreFile f: this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
345 HFileScanner scanner = f.getReader().getScanner(false, false);
346 scanner.seekTo();
347 do {
348 byte [] row = scanner.getKeyValue().getRow();
349 if (Bytes.equals(row, STARTROW)) {
350 count1++;
351 } else if(Bytes.equals(row, secondRowBytes)) {
352 count2++;
353 }
354 } while(scanner.next());
355 }
356 assertEquals(countRow1,count1);
357 assertEquals(countRow2,count2);
358 }
359
360
361
362
363
364
365 public void testInterruptCompaction() throws Exception {
366 assertEquals(0, count());
367
368
369 int origWI = Store.closeCheckInterval;
370 Store.closeCheckInterval = 10*1000;
371
372 try {
373
374 int jmax = (int) Math.ceil(15.0/compactionThreshold);
375 byte [] pad = new byte[1000];
376 for (int i = 0; i < compactionThreshold; i++) {
377 HRegionIncommon loader = new HRegionIncommon(r);
378 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
379 for (int j = 0; j < jmax; j++) {
380 p.add(COLUMN_FAMILY, Bytes.toBytes(j), pad);
381 }
382 addContent(loader, Bytes.toString(COLUMN_FAMILY));
383 loader.put(p);
384 loader.flushcache();
385 }
386
387 HRegion spyR = spy(r);
388 doAnswer(new Answer() {
389 public Object answer(InvocationOnMock invocation) throws Throwable {
390 r.writestate.writesEnabled = false;
391 return invocation.callRealMethod();
392 }
393 }).when(spyR).doRegionCompactionPrep();
394
395
396 spyR.compactStores();
397
398
399 Store s = r.stores.get(COLUMN_FAMILY);
400 assertEquals(compactionThreshold, s.getStorefilesCount());
401 assertTrue(s.getStorefilesSize() > 15*1000);
402
403 FileStatus[] ls = cluster.getFileSystem().listStatus(r.getTmpDir());
404 assertEquals(0, ls.length);
405
406 } finally {
407
408 r.writestate.writesEnabled = true;
409 Store.closeCheckInterval = origWI;
410
411
412 for (int i = 0; i < compactionThreshold; i++) {
413 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
414 byte [][] famAndQf = {COLUMN_FAMILY, null};
415 delete.deleteFamily(famAndQf[0]);
416 r.delete(delete, null, true);
417 }
418 r.flushcache();
419
420
421
422 final int ttlInSeconds = 1;
423 for (Store store: this.r.stores.values()) {
424 store.ttl = ttlInSeconds * 1000;
425 }
426 Thread.sleep(ttlInSeconds * 1000);
427
428 r.compactStores(true);
429 assertEquals(0, count());
430 }
431 }
432
433 private int count() throws IOException {
434 int count = 0;
435 for (StoreFile f: this.r.stores.
436 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
437 HFileScanner scanner = f.getReader().getScanner(false, false);
438 if (!scanner.seekTo()) {
439 continue;
440 }
441 do {
442 count++;
443 } while(scanner.next());
444 }
445 return count;
446 }
447
448 private void createStoreFile(final HRegion region) throws IOException {
449 HRegionIncommon loader = new HRegionIncommon(region);
450 addContent(loader, Bytes.toString(COLUMN_FAMILY));
451 loader.flushcache();
452 }
453
454 private void createSmallerStoreFile(final HRegion region) throws IOException {
455 HRegionIncommon loader = new HRegionIncommon(region);
456 addContent(loader, Bytes.toString(COLUMN_FAMILY), ("" +
457 "bbb").getBytes(), null);
458 loader.flushcache();
459 }
460 }