1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseConfiguration;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.client.Get;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.io.hfile.HFile;
46 import org.apache.hadoop.hbase.regionserver.FlushRequester;
47 import org.apache.hadoop.hbase.regionserver.HRegion;
48 import org.apache.hadoop.hbase.regionserver.Store;
49 import org.apache.hadoop.hbase.security.User;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdge;
52 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53 import org.junit.After;
54 import org.junit.AfterClass;
55 import org.junit.Before;
56 import org.junit.BeforeClass;
57 import org.junit.Test;
58
59
60
61
62 public class TestWALReplay {
63 public static final Log LOG = LogFactory.getLog(TestWALReplay.class);
64 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
65 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
66 private Path hbaseRootDir = null;
67 private Path oldLogDir;
68 private Path logDir;
69 private FileSystem fs;
70 private Configuration conf;
71
72 @BeforeClass
73 public static void setUpBeforeClass() throws Exception {
74 Configuration conf = TEST_UTIL.getConfiguration();
75 conf.setBoolean("dfs.support.append", true);
76
77 conf.setInt("dfs.client.block.recovery.retries", 2);
78 TEST_UTIL.startMiniDFSCluster(3);
79 TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
80 Path hbaseRootDir =
81 TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
82 LOG.info("hbase.rootdir=" + hbaseRootDir);
83 conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
84 }
85
86 @AfterClass
87 public static void tearDownAfterClass() throws Exception {
88 TEST_UTIL.shutdownMiniDFSCluster();
89 }
90
91 @Before
92 public void setUp() throws Exception {
93 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
94 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
95 this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
96 this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
97 this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
98 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
99 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
100 }
101 }
102
103 @After
104 public void tearDown() throws Exception {
105 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
106 }
107
108
109
110
111 private void deleteDir(final Path p) throws IOException {
112 if (this.fs.exists(p)) {
113 if (!this.fs.delete(p, true)) {
114 throw new IOException("Failed remove of " + p);
115 }
116 }
117 }
118
119
120
121
122
123
124 @Test
125 public void test2727() throws Exception {
126
127
128 final String tableNameStr = "test2727";
129 HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
130 Path basedir = new Path(hbaseRootDir, tableNameStr);
131 deleteDir(basedir);
132 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
133
134 final byte [] tableName = Bytes.toBytes(tableNameStr);
135 final byte [] rowName = tableName;
136
137 HLog wal1 = createWAL(this.conf);
138
139 final int countPerFamily = 1000;
140 for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
141 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal1);
142 }
143 wal1.close();
144 runWALSplit(this.conf);
145
146 HLog wal2 = createWAL(this.conf);
147
148 wal2.setSequenceNumber(wal1.getSequenceNumber());
149
150 for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
151 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal2);
152 }
153 wal2.close();
154 runWALSplit(this.conf);
155
156 HLog wal3 = createWAL(this.conf);
157 wal3.setSequenceNumber(wal2.getSequenceNumber());
158 try {
159 final HRegion region = new HRegion(basedir, wal3, this.fs, this.conf, hri,
160 null);
161 long seqid = region.initialize();
162 assertTrue(seqid > wal3.getSequenceNumber());
163
164
165 region.close();
166 } finally {
167 wal3.closeAndDelete();
168 }
169 }
170
171
172
173
174
175
176
177
178
179
180 @Test
181 public void testRegionMadeOfBulkLoadedFilesOnly()
182 throws IOException, SecurityException, IllegalArgumentException,
183 NoSuchFieldException, IllegalAccessException, InterruptedException {
184 final String tableNameStr = "testReplayEditsWrittenViaHRegion";
185 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
186 final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
187 deleteDir(basedir);
188 HLog wal = createWAL(this.conf);
189 HRegion region = HRegion.openHRegion(hri, wal, this.conf);
190 Path f = new Path(basedir, "hfile");
191 HFile.Writer writer = new HFile.Writer(this.fs, f);
192 byte [] family = hri.getTableDesc().getFamilies().iterator().next().getName();
193 byte [] row = Bytes.toBytes(tableNameStr);
194 writer.append(new KeyValue(row, family, family, row));
195 writer.close();
196 region.bulkLoadHFile(f.toString(), family);
197
198 region.put((new Put(row)).add(family, family, family));
199 wal.sync();
200
201
202 final Configuration newConf = HBaseConfiguration.create(this.conf);
203 User user = HBaseTestingUtility.getDifferentUser(newConf,
204 tableNameStr);
205 user.runAs(new PrivilegedExceptionAction() {
206 public Object run() throws Exception {
207 runWALSplit(newConf);
208 HLog wal2 = createWAL(newConf);
209 HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
210 newConf, hri, null);
211 long seqid2 = region2.initialize();
212 assertTrue(seqid2 > -1);
213
214
215 region2.close();
216 wal2.closeAndDelete();
217 return null;
218 }
219 });
220 }
221
222
223
224
225
226
227
228
229
230
231 @Test
232 public void testReplayEditsWrittenViaHRegion()
233 throws IOException, SecurityException, IllegalArgumentException,
234 NoSuchFieldException, IllegalAccessException, InterruptedException {
235 final String tableNameStr = "testReplayEditsWrittenViaHRegion";
236 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
237 final Path basedir = new Path(this.hbaseRootDir, tableNameStr);
238 deleteDir(basedir);
239 final byte[] rowName = Bytes.toBytes(tableNameStr);
240 final int countPerFamily = 10;
241
242
243
244
245 HLog wal = createWAL(this.conf);
246 HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, null);
247 long seqid = region.initialize();
248
249 wal.setSequenceNumber(seqid);
250 boolean first = true;
251 for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
252 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
253 if (first ) {
254
255 region.flushcache();
256 first = false;
257 }
258 }
259
260 final Get g = new Get(rowName);
261 Result result = region.get(g, null);
262 assertEquals(countPerFamily * hri.getTableDesc().getFamilies().size(),
263 result.size());
264
265
266
267 region.close();
268 wal.close();
269 runWALSplit(this.conf);
270 HLog wal2 = createWAL(this.conf);
271 HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, null) {
272 @Override
273 protected boolean restoreEdit(Store s, KeyValue kv) {
274 super.restoreEdit(s, kv);
275 throw new RuntimeException("Called when it should not have been!");
276 }
277 };
278 long seqid2 = region2.initialize();
279
280 wal2.setSequenceNumber(seqid2);
281 assertTrue(seqid + result.size() < seqid2);
282
283
284
285
286 for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
287 addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
288 }
289
290 final Result result2 = region2.get(g, null);
291 assertEquals(2 * result.size(), result2.size());
292 wal2.sync();
293
294
295 HBaseTestingUtility.setMaxRecoveryErrorCount(wal2.getOutputStream(), 1);
296 final Configuration newConf = HBaseConfiguration.create(this.conf);
297 User user = HBaseTestingUtility.getDifferentUser(newConf,
298 tableNameStr);
299 user.runAs(new PrivilegedExceptionAction() {
300 public Object run() throws Exception {
301 runWALSplit(newConf);
302 FileSystem newFS = FileSystem.get(newConf);
303
304 HLog wal3 = createWAL(newConf);
305 final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
306 HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, null) {
307 @Override
308 protected boolean restoreEdit(Store s, KeyValue kv) {
309 boolean b = super.restoreEdit(s, kv);
310 countOfRestoredEdits.incrementAndGet();
311 return b;
312 }
313 };
314 long seqid3 = region3.initialize();
315
316 wal3.setSequenceNumber(seqid3);
317 Result result3 = region3.get(g, null);
318
319 assertEquals(result2.size(), result3.size());
320 assertEquals(hri.getTableDesc().getFamilies().size() * countPerFamily,
321 countOfRestoredEdits.get());
322
323
324 region3.close();
325 wal3.closeAndDelete();
326 return null;
327 }
328 });
329 }
330
331
332
333
334
335
336 @Test
337 public void testReplayEditsWrittenIntoWAL() throws Exception {
338 final String tableNameStr = "testReplayEditsWrittenIntoWAL";
339 final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableNameStr);
340 final Path basedir = new Path(hbaseRootDir, tableNameStr);
341 deleteDir(basedir);
342 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
343 final HLog wal = createWAL(this.conf);
344 final byte[] tableName = Bytes.toBytes(tableNameStr);
345 final byte[] rowName = tableName;
346 final byte[] regionName = hri.getEncodedNameAsBytes();
347
348
349 final int countPerFamily = 1000;
350 for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
351 addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal);
352 }
353
354
355 long logSeqId = wal.startCacheFlush();
356 wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
357
358
359 WALEdit edit = new WALEdit();
360 long now = ee.currentTimeMillis();
361 edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
362 now, rowName));
363 wal.append(hri, tableName, edit, now);
364
365
366 edit = new WALEdit();
367 now = ee.currentTimeMillis();
368 edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
369 KeyValue.Type.DeleteFamily));
370 wal.append(hri, tableName, edit, now);
371
372
373 wal.sync();
374
375
376 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
377
378
379 final Configuration newConf = HBaseConfiguration.create(this.conf);
380 User user = HBaseTestingUtility.getDifferentUser(newConf,
381 ".replay.wal.secondtime");
382 user.runAs(new PrivilegedExceptionAction(){
383 public Object run() throws Exception {
384 runWALSplit(newConf);
385 FileSystem newFS = FileSystem.get(newConf);
386
387 newConf.setInt("hbase.hregion.memstore.flush.size", 1024 * 100);
388
389 HLog newWal = createWAL(newConf);
390 final AtomicInteger flushcount = new AtomicInteger(0);
391 try {
392 final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri,
393 null) {
394 protected boolean internalFlushcache(HLog wal, long myseqid)
395 throws IOException {
396 boolean b = super.internalFlushcache(wal, myseqid);
397 flushcount.incrementAndGet();
398 return b;
399 };
400 };
401 long seqid = region.initialize();
402
403 assertTrue(flushcount.get() > 0);
404 assertTrue(seqid > wal.getSequenceNumber());
405
406 Get get = new Get(rowName);
407 Result result = region.get(get, -1);
408
409 assertEquals(countPerFamily * (hri.getTableDesc().getFamilies().size() - 1),
410 result.size());
411 region.close();
412 } finally {
413 newWal.closeAndDelete();
414 }
415 return null;
416 }
417 });
418 }
419
420
421
422 class TestFlusher implements FlushRequester {
423 private int count = 0;
424 private HRegion r;
425
426 @Override
427 public void requestFlush(HRegion region) {
428 count++;
429 try {
430 r.flushcache();
431 } catch (IOException e) {
432 throw new RuntimeException("Exception flushing", e);
433 }
434 }
435 }
436
437 private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
438 final byte [] rowName, final byte [] family,
439 final int count, EnvironmentEdge ee, final HLog wal)
440 throws IOException {
441 String familyStr = Bytes.toString(family);
442 for (int j = 0; j < count; j++) {
443 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
444 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
445 WALEdit edit = new WALEdit();
446 edit.add(new KeyValue(rowName, family, qualifierBytes,
447 ee.currentTimeMillis(), columnBytes));
448 wal.append(hri, tableName, edit, ee.currentTimeMillis());
449 }
450 }
451
452 private void addRegionEdits (final byte [] rowName, final byte [] family,
453 final int count, EnvironmentEdge ee, final HRegion r,
454 final String qualifierPrefix)
455 throws IOException {
456 for (int j = 0; j < count; j++) {
457 byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
458 Put p = new Put(rowName);
459 p.add(family, qualifier, ee.currentTimeMillis(), rowName);
460 r.put(p);
461 }
462 }
463
464
465
466
467
468
469 private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
470 HTableDescriptor htd = new HTableDescriptor(tableName);
471 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
472 htd.addFamily(a);
473 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
474 htd.addFamily(b);
475 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
476 htd.addFamily(c);
477 return new HRegionInfo(htd, null, null, false);
478 }
479
480
481
482
483
484
485
486
487 private Path runWALSplit(final Configuration c) throws IOException {
488 FileSystem fs = FileSystem.get(c);
489 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
490 this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
491 List<Path> splits = logSplitter.splitLog();
492
493 assertEquals(1, splits.size());
494
495 assertTrue(fs.exists(splits.get(0)));
496 LOG.info("Split file=" + splits.get(0));
497 return splits.get(0);
498 }
499
500
501
502
503
504
505 private HLog createWAL(final Configuration c) throws IOException {
506 HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
507
508
509 HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
510 return wal;
511 }
512 }