1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23 import java.io.OutputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.commons.logging.impl.Log4JLogger;
32
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HTableDescriptor;
38 import org.apache.hadoop.hbase.MiniHBaseCluster;
39 import org.apache.hadoop.hbase.regionserver.HRegionServer;
40 import org.apache.hadoop.hbase.regionserver.HRegion;
41 import org.apache.hadoop.hbase.client.HBaseAdmin;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.FSUtils;
46 import org.apache.hadoop.hdfs.DFSClient;
47 import org.apache.hadoop.hdfs.MiniDFSCluster;
48
49 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
50 import org.apache.hadoop.hdfs.server.datanode.DataNode;
51 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
52 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
53 import org.apache.log4j.Level;
54 import org.junit.AfterClass;
55 import org.junit.BeforeClass;
56 import org.junit.Test;
57
58 import static org.junit.Assert.assertTrue;
59
60
61
62
63 public class TestLogRolling {
64 private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
65 private HRegionServer server;
66 private HLog log;
67 private String tableName;
68 private byte[] value;
69 private static FileSystem fs;
70 private static MiniDFSCluster dfsCluster;
71 private static HBaseAdmin admin;
72 private static MiniHBaseCluster cluster;
73 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
74
75
76 {
77 ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
78 ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
79 ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
80 ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
81 ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
82 ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
83 ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
84 }
85
86
87
88
89
90 public TestLogRolling() {
91
92 super();
93 this.server = null;
94 this.log = null;
95 this.tableName = null;
96 this.value = null;
97
98 String className = this.getClass().getName();
99 StringBuilder v = new StringBuilder(className);
100 while (v.length() < 1000) {
101 v.append(className);
102 }
103 value = Bytes.toBytes(v.toString());
104 }
105
106
107
108 @BeforeClass
109 public static void setUpBeforeClass() throws Exception {
110
111
112 TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 768L * 1024L);
113
114
115 TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
116
117
118 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
119
120
121 TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
122
123
124 TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
125
126
127
128 TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
129
130
131
132 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
133
134
135 TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
136 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
137
138
139 TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
140 TEST_UTIL.startMiniCluster(2);
141
142 cluster = TEST_UTIL.getHBaseCluster();
143 dfsCluster = TEST_UTIL.getDFSCluster();
144 fs = TEST_UTIL.getTestFileSystem();
145 admin = TEST_UTIL.getHBaseAdmin();
146 }
147
148 @AfterClass
149 public static void tearDownAfterClass() throws IOException {
150 TEST_UTIL.cleanupTestDir();
151 TEST_UTIL.shutdownMiniCluster();
152 }
153
154 private void startAndWriteData() throws IOException {
155
156 new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
157 this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
158 this.log = server.getWAL();
159
160
161 HTableDescriptor desc = new HTableDescriptor(tableName);
162 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
163 admin.createTable(desc);
164 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
165
166 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
167 this.log = server.getWAL();
168 for (int i = 1; i <= 256; i++) {
169 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
170 put.add(HConstants.CATALOG_FAMILY, null, value);
171 table.put(put);
172 if (i % 32 == 0) {
173
174 try {
175 Thread.sleep(2000);
176 } catch (InterruptedException e) {
177
178 }
179 }
180 }
181 }
182
183
184
185
186
187
188 @Test
189 public void testLogRolling() throws FailedLogCloseException, IOException {
190 this.tableName = getName();
191 startAndWriteData();
192 LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
193
194
195
196 List<HRegion> regions =
197 new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
198 for (HRegion r: regions) {
199 r.flushcache();
200 }
201
202
203 log.rollWriter();
204
205 int count = log.getNumLogFiles();
206 LOG.info("after flushing all regions and rolling logs there are " +
207 log.getNumLogFiles() + " log files");
208 assertTrue(("actual count: " + count), count <= 2);
209 }
210
211 private static String getName() {
212 return "TestLogRolling";
213 }
214
215 void writeData(HTable table, int rownum) throws IOException {
216 Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
217 put.add(HConstants.CATALOG_FAMILY, null, value);
218 table.put(put);
219
220
221 try {
222 Thread.sleep(2000);
223 } catch (InterruptedException e) {
224
225 }
226 }
227
228
229
230
231 DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
232 IllegalAccessException, InvocationTargetException {
233 OutputStream stm = log.getOutputStream();
234 Method getPipeline = null;
235 for (Method m : stm.getClass().getDeclaredMethods()) {
236 if (m.getName().endsWith("getPipeline")) {
237 getPipeline = m;
238 getPipeline.setAccessible(true);
239 break;
240 }
241 }
242
243 assertTrue("Need DFSOutputStream.getPipeline() for this test",
244 null != getPipeline);
245 Object repl = getPipeline.invoke(stm, new Object[] {}
246 return (DatanodeInfo[]) repl;
247 }
248
249
250
251
252
253
254
255
256
257
258 @Test
259 public void testLogRollOnDatanodeDeath() throws IOException,
260 InterruptedException, IllegalArgumentException, IllegalAccessException,
261 InvocationTargetException {
262 assertTrue("This test requires HLog file replication.",
263 fs.getDefaultReplication() > 1);
264 LOG.info("Replication=" + fs.getDefaultReplication());
265
266 new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
267
268 this.server = cluster.getRegionServer(0);
269 this.log = server.getWAL();
270
271
272 String tableName = getName();
273 HTableDescriptor desc = new HTableDescriptor(tableName);
274 desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
275
276 if (admin.tableExists(tableName)) {
277 admin.disableTable(tableName);
278 admin.deleteTable(tableName);
279 }
280 admin.createTable(desc);
281 HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
282
283 server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
284 this.log = server.getWAL();
285
286 assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
287
288 assertTrue("Need append support for this test", FSUtils
289 .isAppendSupported(TEST_UTIL.getConfiguration()));
290
291
292 dfsCluster
293 .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
294 dfsCluster.waitActive();
295 assertTrue(dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
296
297 writeData(table, 2);
298
299 table.setAutoFlush(true);
300
301 long curTime = System.currentTimeMillis();
302 long oldFilenum = log.getFilenum();
303 assertTrue("Log should have a timestamp older than now",
304 curTime > oldFilenum && oldFilenum != -1);
305
306 assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
307 DatanodeInfo[] pipeline = getPipeline(log);
308 assertTrue(pipeline.length == fs.getDefaultReplication());
309
310
311 assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
312 Thread.sleep(10000);
313
314 writeData(table, 2);
315 long newFilenum = log.getFilenum();
316
317 assertTrue("Missing datanode should've triggered a log roll",
318 newFilenum > oldFilenum && newFilenum > curTime);
319
320
321 writeData(table, 3);
322 assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
323 assertTrue("New log file should have the default replication", log
324 .getLogReplication() == fs.getDefaultReplication());
325 }
326 }