1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.lang.reflect.Method;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.commons.logging.impl.Log4JLogger;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FSDataOutputStream;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.FSUtils;
48  import org.apache.hadoop.hdfs.DFSClient;
49  import org.apache.hadoop.hdfs.MiniDFSCluster;
50  import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
51  import org.apache.hadoop.hdfs.server.datanode.DataNode;
52  import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
53  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
54  import org.apache.hadoop.io.SequenceFile;
55  import org.apache.log4j.Level;
56  import org.junit.After;
57  import org.junit.Before;
58  import org.junit.BeforeClass;
59  import org.junit.Test;
60  
61  /** JUnit test case for HLog */
62  public class TestHLog  {
63    private static final Log LOG = LogFactory.getLog(TestHLog.class);
64    {
65      ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
66      ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
67      ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
68      ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
69      ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
70    }
71  
72    private static Configuration conf;
73    private static FileSystem fs;
74    private static Path dir;
75    private static MiniDFSCluster cluster;
76    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
77    private static Path hbaseDir;
78    private static Path oldLogDir;
79  
80    @Before
81    public void setUp() throws Exception {
82  
83      FileStatus[] entries = fs.listStatus(new Path("/"));
84      for (FileStatus dir : entries) {
85        fs.delete(dir.getPath(), true);
86      }
87  
88    }
89  
90    @After
91    public void tearDown() throws Exception {
92    }
93    @BeforeClass
94    public static void setUpBeforeClass() throws Exception {
95      // Make block sizes small.
96      TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
97      // needed for testAppendClose()
98      TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
99      // quicker heartbeat interval for faster DN death notification
100     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
101     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
102     TEST_UTIL.getConfiguration().setInt("dfs.socket.timeout", 5000);
103     TEST_UTIL.getConfiguration().setInt(
104       "ipc.client.connection.maxidletime", 500);
105     // faster failover with cluster.shutdown();fs.close() idiom
106     TEST_UTIL.getConfiguration()
107         .setInt("ipc.client.connect.max.retries", 1);
108     TEST_UTIL.getConfiguration().setInt(
109         "dfs.client.block.recovery.retries", 1);
110     TEST_UTIL.startMiniCluster(3);
111 
112     conf = TEST_UTIL.getConfiguration();
113     cluster = TEST_UTIL.getDFSCluster();
114     fs = cluster.getFileSystem();
115 
116     hbaseDir = new Path(TEST_UTIL.getConfiguration().get("hbase.rootdir"));
117     oldLogDir = new Path(hbaseDir, ".oldlogs");
118     dir = new Path(hbaseDir, getName());
119   }
120   private static String getName() {
121     // TODO Auto-generated method stub
122     return "TestHLog";
123   }
124 
125   /**
126    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
127    * would fail.
128    * @throws IOException
129    */
130   @Test
131   public void testSplit() throws IOException {
132 
133     final byte [] tableName = Bytes.toBytes(getName());
134     final byte [] rowName = tableName;
135     Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
136     HLog log = new HLog(fs, logdir, oldLogDir, conf);
137     final int howmany = 3;
138     HRegionInfo[] infos = new HRegionInfo[3];
139     Path tabledir = new Path(hbaseDir, getName());
140     fs.mkdirs(tabledir);
141     for(int i = 0; i < howmany; i++) {
142       infos[i] = new HRegionInfo(new HTableDescriptor(tableName),
143                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
144       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
145       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
146     }
147     // Add edits for three regions.
148     try {
149       for (int ii = 0; ii < howmany; ii++) {
150         for (int i = 0; i < howmany; i++) {
151 
152           for (int j = 0; j < howmany; j++) {
153             WALEdit edit = new WALEdit();
154             byte [] family = Bytes.toBytes("column");
155             byte [] qualifier = Bytes.toBytes(Integer.toString(j));
156             byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
157             edit.add(new KeyValue(rowName, family, qualifier,
158                 System.currentTimeMillis(), column));
159             LOG.info("Region " + i + ": " + edit);
160             log.append(infos[i], tableName, edit,
161               System.currentTimeMillis());
162           }
163         }
164         log.rollWriter();
165       }
166       log.close();
167       HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
168           hbaseDir, logdir, this.oldLogDir, this.fs);
169       List<Path> splits =
170         logSplitter.splitLog();
171       verifySplits(splits, howmany);
172       log = null;
173     } finally {
174       if (log != null) {
175         log.closeAndDelete();
176       }
177     }
178   }
179 
180   /**
181    * Test new HDFS-265 sync.
182    * @throws Exception
183    */
184   @Test
185   public void Broken_testSync() throws Exception {
186     byte [] bytes = Bytes.toBytes(getName());
187     // First verify that using streams all works.
188     Path p = new Path(dir, getName() + ".fsdos");
189     FSDataOutputStream out = fs.create(p);
190     out.write(bytes);
191     out.sync();
192     FSDataInputStream in = fs.open(p);
193     assertTrue(in.available() > 0);
194     byte [] buffer = new byte [1024];
195     int read = in.read(buffer);
196     assertEquals(bytes.length, read);
197     out.close();
198     in.close();
199     Path subdir = new Path(dir, "hlogdir");
200     HLog wal = new HLog(fs, subdir, oldLogDir, conf);
201     final int total = 20;
202 
203     HRegionInfo info = new HRegionInfo(new HTableDescriptor(bytes),
204                 null,null, false);
205 
206     for (int i = 0; i < total; i++) {
207       WALEdit kvs = new WALEdit();
208       kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
209       wal.append(info, bytes, kvs, System.currentTimeMillis());
210     }
211     // Now call sync and try reading.  Opening a Reader before you sync just
212     // gives you EOFE.
213     wal.sync();
214     // Open a Reader.
215     Path walPath = wal.computeFilename();
216     HLog.Reader reader = HLog.getReader(fs, walPath, conf);
217     int count = 0;
218     HLog.Entry entry = new HLog.Entry();
219     while ((entry = reader.next(entry)) != null) count++;
220     assertEquals(total, count);
221     reader.close();
222     // Add test that checks to see that an open of a Reader works on a file
223     // that has had a sync done on it.
224     for (int i = 0; i < total; i++) {
225       WALEdit kvs = new WALEdit();
226       kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
227       wal.append(info, bytes, kvs, System.currentTimeMillis());
228     }
229     reader = HLog.getReader(fs, walPath, conf);
230     count = 0;
231     while((entry = reader.next(entry)) != null) count++;
232     assertTrue(count >= total);
233     reader.close();
234     // If I sync, should see double the edits.
235     wal.sync();
236     reader = HLog.getReader(fs, walPath, conf);
237     count = 0;
238     while((entry = reader.next(entry)) != null) count++;
239     assertEquals(total * 2, count);
240     // Now do a test that ensures stuff works when we go over block boundary,
241     // especially that we return good length on file.
242     final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
243     for (int i = 0; i < total; i++) {
244       WALEdit kvs = new WALEdit();
245       kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
246       wal.append(info, bytes, kvs, System.currentTimeMillis());
247     }
248     // Now I should have written out lots of blocks.  Sync then read.
249     wal.sync();
250     reader = HLog.getReader(fs, walPath, conf);
251     count = 0;
252     while((entry = reader.next(entry)) != null) count++;
253     assertEquals(total * 3, count);
254     reader.close();
255     // Close it and ensure that closed, Reader gets right length also.
256     wal.close();
257     reader = HLog.getReader(fs, walPath, conf);
258     count = 0;
259     while((entry = reader.next(entry)) != null) count++;
260     assertEquals(total * 3, count);
261     reader.close();
262   }
263 
264   /**
265    * Test the findMemstoresWithEditsEqualOrOlderThan method.
266    * @throws IOException
267    */
268   @Test
269   public void testFindMemstoresWithEditsEqualOrOlderThan() throws IOException {
270     Map<byte [], Long> regionsToSeqids = new HashMap<byte [], Long>();
271     for (int i = 0; i < 10; i++) {
272       Long l = Long.valueOf(i);
273       regionsToSeqids.put(l.toString().getBytes(), l);
274     }
275     byte [][] regions =
276       HLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
277     assertEquals(2, regions.length);
278     assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
279         Bytes.equals(regions[0], "1".getBytes()));
280     regions = HLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
281     int count = 4;
282     assertEquals(count, regions.length);
283     // Regions returned are not ordered.
284     for (int i = 0; i < count; i++) {
285       assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
286         Bytes.equals(regions[i], "1".getBytes()) ||
287         Bytes.equals(regions[i], "2".getBytes()) ||
288         Bytes.equals(regions[i], "3".getBytes()));
289     }
290   }
291 
292   private void verifySplits(List<Path> splits, final int howmany)
293   throws IOException {
294     assertEquals(howmany, splits.size());
295     for (int i = 0; i < splits.size(); i++) {
296       LOG.info("Verifying=" + splits.get(i));
297       HLog.Reader reader = HLog.getReader(fs, splits.get(i), conf);
298       try {
299         int count = 0;
300         String previousRegion = null;
301         long seqno = -1;
302         HLog.Entry entry = new HLog.Entry();
303         while((entry = reader.next(entry)) != null) {
304           HLogKey key = entry.getKey();
305           String region = Bytes.toString(key.getEncodedRegionName());
306           // Assert that all edits are for same region.
307           if (previousRegion != null) {
308             assertEquals(previousRegion, region);
309           }
310           LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
311           assertTrue(seqno < key.getLogSeqNum());
312           seqno = key.getLogSeqNum();
313           previousRegion = region;
314           count++;
315         }
316         assertEquals(howmany * howmany, count);
317       } finally {
318         reader.close();
319       }
320     }
321   }
322   
323   // For this test to pass, requires:
324   // 1. HDFS-200 (append support)
325   // 2. HDFS-988 (SafeMode should freeze file operations
326   //              [FSNamesystem.nextGenerationStampForBlock])
327   // 3. HDFS-142 (on restart, maintain pendingCreates)
328   @Test
329   public void testAppendClose() throws Exception {
330     byte [] tableName = Bytes.toBytes(getName());
331     HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
332         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
333     Path subdir = new Path(dir, "hlogdir");
334     Path archdir = new Path(dir, "hlogdir_archive");
335     HLog wal = new HLog(fs, subdir, archdir, conf);
336     final int total = 20;
337 
338     for (int i = 0; i < total; i++) {
339       WALEdit kvs = new WALEdit();
340       kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
341       wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
342     }
343     // Now call sync to send the data to HDFS datanodes
344     wal.sync();
345      int namenodePort = cluster.getNameNodePort();
346     final Path walPath = wal.computeFilename();
347     
348 
349     // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
350     try {
351       cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
352       cluster.shutdown();
353       try {
354         // wal.writer.close() will throw an exception,
355         // but still call this since it closes the LogSyncer thread first
356         wal.close();
357       } catch (IOException e) {
358         LOG.info(e);
359       }
360       fs.close(); // closing FS last so DFSOutputStream can't call close
361       LOG.info("STOPPED first instance of the cluster");
362     } finally {
363       // Restart the cluster
364       while (cluster.isClusterUp()){
365         LOG.error("Waiting for cluster to go down");
366         Thread.sleep(1000);
367       }
368 
369       // Workaround a strange issue with Hadoop's RPC system - if we don't
370       // sleep here, the new datanodes will pick up a cached IPC connection to
371       // the old (dead) NN and fail to start. Sleeping 2 seconds goes past
372       // the idle time threshold configured in the conf above
373       Thread.sleep(2000);
374 
375       cluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, null, null, null);
376       cluster.waitActive();
377       fs = cluster.getFileSystem();
378       LOG.info("START second instance.");
379     }
380 
381     // set the lease period to be 1 second so that the
382     // namenode triggers lease recovery upon append request
383     Method setLeasePeriod = cluster.getClass()
384       .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
385     setLeasePeriod.setAccessible(true);
386     setLeasePeriod.invoke(cluster,
387                           new Object[]{new Long(1000), new Long(1000)});
388     try {
389       Thread.sleep(1000);
390     } catch (InterruptedException e) {
391       LOG.info(e);
392     }
393     
394     // Now try recovering the log, like the HMaster would do
395     final FileSystem recoveredFs = fs;
396     final Configuration rlConf = conf;
397     
398     class RecoverLogThread extends Thread {
399       public Exception exception = null;
400       public void run() {
401           try {
402             FSUtils.recoverFileLease(recoveredFs, walPath, rlConf);
403           } catch (IOException e) {
404             exception = e;
405           }
406       }
407     }
408 
409     RecoverLogThread t = new RecoverLogThread();
410     t.start();
411     // Timeout after 60 sec. Without correct patches, would be an infinite loop
412     t.join(60 * 1000);
413     if(t.isAlive()) {
414       t.interrupt();
415       throw new Exception("Timed out waiting for HLog.recoverLog()");
416     }
417 
418     if (t.exception != null)
419       throw t.exception;
420 
421     // Make sure you can read all the content
422     SequenceFile.Reader reader
423       = new SequenceFile.Reader(this.fs, walPath, this.conf);
424     int count = 0;
425     HLogKey key = HLog.newKey(conf);
426     WALEdit val = new WALEdit();
427     while (reader.next(key, val)) {
428       count++;
429       assertTrue("Should be one KeyValue per WALEdit",
430                  val.getKeyValues().size() == 1);
431     }
432     assertEquals(total, count);
433     reader.close();
434   }
435 
436   /**
437    * Tests that we can write out an edit, close, and then read it back in again.
438    * @throws IOException
439    */
440   @Test
441   public void testEditAdd() throws IOException {
442     final int COL_COUNT = 10;
443     final byte [] tableName = Bytes.toBytes("tablename");
444     final byte [] row = Bytes.toBytes("row");
445     HLog.Reader reader = null;
446     HLog log = new HLog(fs, dir, oldLogDir, conf);
447     try {
448       // Write columns named 1, 2, 3, etc. and then values of single byte
449       // 1, 2, 3...
450       long timestamp = System.currentTimeMillis();
451       WALEdit cols = new WALEdit();
452       for (int i = 0; i < COL_COUNT; i++) {
453         cols.add(new KeyValue(row, Bytes.toBytes("column"),
454             Bytes.toBytes(Integer.toString(i)),
455           timestamp, new byte[] { (byte)(i + '0') }));
456       }
457       HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
458         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
459       log.append(info, tableName, cols, System.currentTimeMillis());
460       long logSeqId = log.startCacheFlush();
461       log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId, info.isMetaRegion());
462       log.close();
463       Path filename = log.computeFilename();
464       log = null;
465       // Now open a reader on the log and assert append worked.
466       reader = HLog.getReader(fs, filename, conf);
467       // Above we added all columns on a single row so we only read one
468       // entry in the below... thats why we have '1'.
469       for (int i = 0; i < 1; i++) {
470         HLog.Entry entry = reader.next(null);
471         if (entry == null) break;
472         HLogKey key = entry.getKey();
473         WALEdit val = entry.getEdit();
474         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
475         assertTrue(Bytes.equals(tableName, key.getTablename()));
476         KeyValue kv = val.getKeyValues().get(0);
477         assertTrue(Bytes.equals(row, kv.getRow()));
478         assertEquals((byte)(i + '0'), kv.getValue()[0]);
479         System.out.println(key + " " + val);
480       }
481       HLog.Entry entry = null;
482       while ((entry = reader.next(null)) != null) {
483         HLogKey key = entry.getKey();
484         WALEdit val = entry.getEdit();
485         // Assert only one more row... the meta flushed row.
486         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
487         assertTrue(Bytes.equals(tableName, key.getTablename()));
488         KeyValue kv = val.getKeyValues().get(0);
489         assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
490         assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
491         assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
492           val.getKeyValues().get(0).getValue()));
493         System.out.println(key + " " + val);
494       }
495     } finally {
496       if (log != null) {
497         log.closeAndDelete();
498       }
499       if (reader != null) {
500         reader.close();
501       }
502     }
503   }
504 
505   /**
506    * @throws IOException
507    */
508   @Test
509   public void testAppend() throws IOException {
510     final int COL_COUNT = 10;
511     final byte [] tableName = Bytes.toBytes("tablename");
512     final byte [] row = Bytes.toBytes("row");
513     Reader reader = null;
514     HLog log = new HLog(fs, dir, oldLogDir, conf);
515     try {
516       // Write columns named 1, 2, 3, etc. and then values of single byte
517       // 1, 2, 3...
518       long timestamp = System.currentTimeMillis();
519       WALEdit cols = new WALEdit();
520       for (int i = 0; i < COL_COUNT; i++) {
521         cols.add(new KeyValue(row, Bytes.toBytes("column"),
522           Bytes.toBytes(Integer.toString(i)),
523           timestamp, new byte[] { (byte)(i + '0') }));
524       }
525       HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
526           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
527       log.append(hri, tableName, cols, System.currentTimeMillis());
528       long logSeqId = log.startCacheFlush();
529       log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
530       log.close();
531       Path filename = log.computeFilename();
532       log = null;
533       // Now open a reader on the log and assert append worked.
534       reader = HLog.getReader(fs, filename, conf);
535       HLog.Entry entry = reader.next();
536       assertEquals(COL_COUNT, entry.getEdit().size());
537       int idx = 0;
538       for (KeyValue val : entry.getEdit().getKeyValues()) {
539         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
540           entry.getKey().getEncodedRegionName()));
541         assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
542         assertTrue(Bytes.equals(row, val.getRow()));
543         assertEquals((byte)(idx + '0'), val.getValue()[0]);
544         System.out.println(entry.getKey() + " " + val);
545         idx++;
546       }
547 
548       // Get next row... the meta flushed row.
549       entry = reader.next();
550       assertEquals(1, entry.getEdit().size());
551       for (KeyValue val : entry.getEdit().getKeyValues()) {
552         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
553           entry.getKey().getEncodedRegionName()));
554         assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
555         assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
556         assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
557         assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
558           val.getValue()));
559         System.out.println(entry.getKey() + " " + val);
560       }
561     } finally {
562       if (log != null) {
563         log.closeAndDelete();
564       }
565       if (reader != null) {
566         reader.close();
567       }
568     }
569   }
570 
571   /**
572    * Test that we can visit entries before they are appended
573    * @throws Exception
574    */
575   @Test
576   public void testVisitors() throws Exception {
577     final int COL_COUNT = 10;
578     final byte [] tableName = Bytes.toBytes("tablename");
579     final byte [] row = Bytes.toBytes("row");
580     HLog log = new HLog(fs, dir, oldLogDir, conf);
581     DumbWALObserver visitor = new DumbWALObserver();
582     log.registerWALActionsListener(visitor);
583     long timestamp = System.currentTimeMillis();
584     HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
585         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
586     for (int i = 0; i < COL_COUNT; i++) {
587       WALEdit cols = new WALEdit();
588       cols.add(new KeyValue(row, Bytes.toBytes("column"),
589           Bytes.toBytes(Integer.toString(i)),
590           timestamp, new byte[]{(byte) (i + '0')}));
591       log.append(hri, tableName, cols, System.currentTimeMillis());
592     }
593     assertEquals(COL_COUNT, visitor.increments);
594     log.unregisterWALActionsListener(visitor);
595     WALEdit cols = new WALEdit();
596     cols.add(new KeyValue(row, Bytes.toBytes("column"),
597         Bytes.toBytes(Integer.toString(11)),
598         timestamp, new byte[]{(byte) (11 + '0')}));
599     log.append(hri, tableName, cols, System.currentTimeMillis());
600     assertEquals(COL_COUNT, visitor.increments);
601   }
602 
603   @Test
604   public void testLogCleaning() throws Exception {
605     LOG.info("testLogCleaning");
606     final byte [] tableName = Bytes.toBytes("testLogCleaning");
607     final byte [] tableName2 = Bytes.toBytes("testLogCleaning2");
608 
609     HLog log = new HLog(fs, dir, oldLogDir, conf);
610     HRegionInfo hri = new HRegionInfo(new HTableDescriptor(tableName),
611         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
612     HRegionInfo hri2 = new HRegionInfo(new HTableDescriptor(tableName2),
613         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
614 
615     // Add a single edit and make sure that rolling won't remove the file
616     // Before HBASE-3198 it used to delete it
617     addEdits(log, hri, tableName, 1);
618     log.rollWriter();
619     assertEquals(1, log.getNumLogFiles());
620 
621     // See if there's anything wrong with more than 1 edit
622     addEdits(log, hri, tableName, 2);
623     log.rollWriter();
624     assertEquals(2, log.getNumLogFiles());
625 
626     // Now mix edits from 2 regions, still no flushing
627     addEdits(log, hri, tableName, 1);
628     addEdits(log, hri2, tableName2, 1);
629     addEdits(log, hri, tableName, 1);
630     addEdits(log, hri2, tableName2, 1);
631     log.rollWriter();
632     assertEquals(3, log.getNumLogFiles());
633 
634     // Flush the first region, we expect to see the first two files getting
635     // archived
636     long seqId = log.startCacheFlush();
637     log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
638     log.rollWriter();
639     assertEquals(2, log.getNumLogFiles());
640 
641     // Flush the second region, which removes all the remaining output files
642     // since the oldest was completely flushed and the two others only contain
643     // flush information
644     seqId = log.startCacheFlush();
645     log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
646     log.rollWriter();
647     assertEquals(0, log.getNumLogFiles());
648   }
649 
650   private void addEdits(HLog log, HRegionInfo hri, byte [] tableName,
651                         int times) throws IOException {
652     final byte [] row = Bytes.toBytes("row");
653     for (int i = 0; i < times; i++) {
654       long timestamp = System.currentTimeMillis();
655       WALEdit cols = new WALEdit();
656       cols.add(new KeyValue(row, row, row, timestamp, row));
657       log.append(hri, tableName, cols, timestamp);
658     }
659   }
660 
661   static class DumbWALObserver implements WALObserver {
662     int increments = 0;
663 
664     @Override
665     public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
666                                          WALEdit logEdit) {
667       increments++;
668     }
669 
670     @Override
671     public void logRolled(Path newFile) {
672       // TODO Auto-generated method stub
673       
674     }
675 
676     @Override
677     public void logRollRequested() {
678       // TODO Auto-generated method stub
679       
680     }
681 
682     @Override
683     public void logCloseRequested() {
684       // not interested
685     }
686   }
687 }