1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.IOException;
23  import java.io.OutputStream;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.util.ArrayList;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.commons.logging.impl.Log4JLogger;
32  
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HTableDescriptor;
38  import org.apache.hadoop.hbase.MiniHBaseCluster;
39  import org.apache.hadoop.hbase.regionserver.HRegionServer;
40  import org.apache.hadoop.hbase.regionserver.HRegion;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.Put;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.FSUtils;
46  import org.apache.hadoop.hdfs.DFSClient;
47  import org.apache.hadoop.hdfs.MiniDFSCluster;
48  
49  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
50  import org.apache.hadoop.hdfs.server.datanode.DataNode;
51  import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
52  import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
53  import org.apache.log4j.Level;
54  import org.junit.AfterClass;
55  import org.junit.BeforeClass;
56  import org.junit.Test;
57  
58  import static org.junit.Assert.assertTrue;
59  
60  /**
61   * Test log deletion as logs are rolled.
62   */
63  public class TestLogRolling  {
64    private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
65    private HRegionServer server;
66    private HLog log;
67    private String tableName;
68    private byte[] value;
69    private static FileSystem fs;
70    private static MiniDFSCluster dfsCluster;
71    private static HBaseAdmin admin;
72    private static MiniHBaseCluster cluster;
73    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74  
75   // verbose logging on classes that are touched in these tests
76   {
77     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
78     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
79     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
80     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
81     ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
82     ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
83     ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
84   }
85  
86    /**
87     * constructor
88     * @throws Exception
89     */
90    public TestLogRolling()  {
91      // start one regionserver and a minidfs.
92      super();
93        this.server = null;
94        this.log = null;
95        this.tableName = null;
96        this.value = null;
97  
98        String className = this.getClass().getName();
99        StringBuilder v = new StringBuilder(className);
100       while (v.length() < 1000) {
101         v.append(className);
102       }
103       value = Bytes.toBytes(v.toString());
104   }
105 
106   // Need to override this setup so we can edit the config before it gets sent
107  // to the HDFS & HBase cluster startup.
108  @BeforeClass
109   public static void setUpBeforeClass() throws Exception {
110     /**** configuration for testLogRolling ****/
111     // Force a region split after every 768KB
112     TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 768L * 1024L);
113 
114     // We roll the log after every 32 writes
115     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
116 
117     // For less frequently updated regions flush after every 2 flushes
118     TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
119 
120     // We flush the cache after every 8192 bytes
121     TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
122 
123     // Increase the amount of time between client retries
124     TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
125 
126     // Reduce thread wake frequency so that other threads can get
127     // a chance to run.
128     TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
129 
130    /**** configuration for testLogRollOnDatanodeDeath ****/
131    // make sure log.hflush() calls syncFs() to open a pipeline
132     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
133    // lower the namenode & datanode heartbeat so the namenode
134    // quickly detects datanode failures
135     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
136     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
137    // the namenode might still try to choose the recently-dead datanode
138    // for a pipeline, so try to a new pipeline multiple times
139     TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
140     TEST_UTIL.startMiniCluster(2);
141 
142     cluster = TEST_UTIL.getHBaseCluster();
143     dfsCluster = TEST_UTIL.getDFSCluster();
144     fs = TEST_UTIL.getTestFileSystem();
145     admin = TEST_UTIL.getHBaseAdmin();
146   }
147 
148   @AfterClass
149   public  static void tearDownAfterClass() throws IOException  {
150     TEST_UTIL.cleanupTestDir();
151     TEST_UTIL.shutdownMiniCluster();
152   }
153 
154   private void startAndWriteData() throws IOException {
155     // When the META table can be opened, the region servers are running
156     new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
157     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
158     this.log = server.getWAL();
159 
160     // Create the test table and open it
161     HTableDescriptor desc = new HTableDescriptor(tableName);
162     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
163     admin.createTable(desc);
164     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
165 
166     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
167     this.log = server.getWAL();
168     for (int i = 1; i <= 256; i++) {    // 256 writes should cause 8 log rolls
169       Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
170       put.add(HConstants.CATALOG_FAMILY, null, value);
171       table.put(put);
172       if (i % 32 == 0) {
173         // After every 32 writes sleep to let the log roller run
174         try {
175           Thread.sleep(2000);
176         } catch (InterruptedException e) {
177           // continue
178         }
179       }
180     }
181   }
182 
183   /**
184    * Tests that logs are deleted
185    * @throws IOException
186    * @throws FailedLogCloseException
187    */
188   @Test
189   public void testLogRolling() throws FailedLogCloseException, IOException {
190     this.tableName = getName();
191       startAndWriteData();
192       LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
193 
194       // flush all regions
195 
196       List<HRegion> regions =
197         new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
198       for (HRegion r: regions) {
199         r.flushcache();
200       }
201 
202       // Now roll the log
203       log.rollWriter();
204 
205       int count = log.getNumLogFiles();
206       LOG.info("after flushing all regions and rolling logs there are " +
207           log.getNumLogFiles() + " log files");
208       assertTrue(("actual count: " + count), count <= 2);
209   }
210 
211   private static String getName() {
212     return "TestLogRolling";
213   }
214 
215   void writeData(HTable table, int rownum) throws IOException {
216     Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
217     put.add(HConstants.CATALOG_FAMILY, null, value);
218     table.put(put);
219 
220     // sleep to let the log roller run (if it needs to)
221     try {
222       Thread.sleep(2000);
223     } catch (InterruptedException e) {
224       // continue
225     }
226   }
227 
228   /**
229    * Give me the HDFS pipeline for this log file
230    */
231   DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
232       IllegalAccessException, InvocationTargetException {
233     OutputStream stm = log.getOutputStream();
234     Method getPipeline = null;
235     for (Method m : stm.getClass().getDeclaredMethods()) {
236       if (m.getName().endsWith("getPipeline")) {
237         getPipeline = m;
238         getPipeline.setAccessible(true);
239         break;
240       }
241     }
242 
243     assertTrue("Need DFSOutputStream.getPipeline() for this test",
244         null != getPipeline);
245     Object repl = getPipeline.invoke(stm, new Object[] {} /* NO_ARGS */);
246     return (DatanodeInfo[]) repl;
247   }
248 
249   /**
250    * Tests that logs are rolled upon detecting datanode death
251    * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
252    * @throws IOException
253    * @throws InterruptedException
254    * @throws InvocationTargetException 
255    * @throws IllegalAccessException
256    * @throws IllegalArgumentException 
257     */
258   @Test
259   public void testLogRollOnDatanodeDeath() throws IOException,
260       InterruptedException, IllegalArgumentException, IllegalAccessException,
261       InvocationTargetException {
262     assertTrue("This test requires HLog file replication.",
263       fs.getDefaultReplication() > 1);
264     LOG.info("Replication=" + fs.getDefaultReplication());
265     // When the META table can be opened, the region servers are running
266     new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
267 
268     this.server = cluster.getRegionServer(0);
269     this.log = server.getWAL();
270     
271     // Create the test table and open it
272     String tableName = getName();
273     HTableDescriptor desc = new HTableDescriptor(tableName);
274     desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
275 
276     if (admin.tableExists(tableName)) {
277       admin.disableTable(tableName);
278       admin.deleteTable(tableName);
279     }
280     admin.createTable(desc);
281     HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
282 
283     server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
284     this.log = server.getWAL();
285 
286     assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
287     // don't run this test without append support (HDFS-200 & HDFS-142)
288     assertTrue("Need append support for this test", FSUtils
289         .isAppendSupported(TEST_UTIL.getConfiguration()));
290 
291     // add up the datanode count, to ensure proper replication when we kill 1
292     dfsCluster
293         .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
294     dfsCluster.waitActive();
295     assertTrue(dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
296 
297     writeData(table, 2);
298 
299     table.setAutoFlush(true);
300 
301     long curTime = System.currentTimeMillis();
302     long oldFilenum = log.getFilenum();
303     assertTrue("Log should have a timestamp older than now",
304         curTime > oldFilenum && oldFilenum != -1);
305 
306     assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
307     DatanodeInfo[] pipeline = getPipeline(log);
308     assertTrue(pipeline.length == fs.getDefaultReplication());
309 
310     // kill a datanode in the pipeline to force a log roll on the next sync()
311     assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
312     Thread.sleep(10000);
313     // this write should succeed, but trigger a log roll
314     writeData(table, 2);
315     long newFilenum = log.getFilenum();
316 
317     assertTrue("Missing datanode should've triggered a log roll",
318         newFilenum > oldFilenum && newFilenum > curTime);
319 
320     // write some more log data (this should use a new hdfs_out)
321     writeData(table, 3);
322     assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
323     assertTrue("New log file should have the default replication", log
324         .getLogReplication() == fs.getDefaultReplication());
325   }
326 }