1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver;
22
23 import java.io.IOException;
24 import java.lang.ref.SoftReference;
25 import java.security.PrivilegedExceptionAction;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.NavigableSet;
31 import java.util.concurrent.ConcurrentSkipListSet;
32
33 import junit.framework.TestCase;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataOutputStream;
39 import org.apache.hadoop.fs.FileStatus;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.FileUtil;
42 import org.apache.hadoop.fs.FilterFileSystem;
43 import org.apache.hadoop.fs.LocalFileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HBaseTestingUtility;
47 import org.apache.hadoop.hbase.HColumnDescriptor;
48 import org.apache.hadoop.hbase.HConstants;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.HTableDescriptor;
51 import org.apache.hadoop.hbase.KeyValue;
52 import org.apache.hadoop.hbase.client.Get;
53 import org.apache.hadoop.hbase.regionserver.wal.HLog;
54 import org.apache.hadoop.hbase.security.User;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
57 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
58 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
59 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
60
61 import com.google.common.base.Joiner;
62
63
64
65
66 public class TestStore extends TestCase {
67 public static final Log LOG = LogFactory.getLog(TestStore.class);
68
69 Store store;
70 byte [] table = Bytes.toBytes("table");
71 byte [] family = Bytes.toBytes("family");
72
73 byte [] row = Bytes.toBytes("row");
74 byte [] row2 = Bytes.toBytes("row2");
75 byte [] qf1 = Bytes.toBytes("qf1");
76 byte [] qf2 = Bytes.toBytes("qf2");
77 byte [] qf3 = Bytes.toBytes("qf3");
78 byte [] qf4 = Bytes.toBytes("qf4");
79 byte [] qf5 = Bytes.toBytes("qf5");
80 byte [] qf6 = Bytes.toBytes("qf6");
81
82 NavigableSet<byte[]> qualifiers =
83 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
84
85 List<KeyValue> expected = new ArrayList<KeyValue>();
86 List<KeyValue> result = new ArrayList<KeyValue>();
87
88 long id = System.currentTimeMillis();
89 Get get = new Get(row);
90
91 private static final String DIR = HBaseTestingUtility.getTestDir() + "/TestStore/";
92
93
94
95
96
97 @Override
98 public void setUp() throws IOException {
99 qualifiers.add(qf1);
100 qualifiers.add(qf3);
101 qualifiers.add(qf5);
102
103 Iterator<byte[]> iter = qualifiers.iterator();
104 while(iter.hasNext()){
105 byte [] next = iter.next();
106 expected.add(new KeyValue(row, family, next, 1, (byte[])null));
107 get.addColumn(family, next);
108 }
109 }
110
111 private void init(String methodName) throws IOException {
112 init(methodName, HBaseConfiguration.create());
113 }
114
115 private void init(String methodName, Configuration conf)
116 throws IOException {
117
118 Path basedir = new Path(DIR+methodName);
119 Path logdir = new Path(DIR+methodName+"/logs");
120 Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
121 HColumnDescriptor hcd = new HColumnDescriptor(family);
122 FileSystem fs = FileSystem.get(conf);
123
124 fs.delete(logdir, true);
125
126 HTableDescriptor htd = new HTableDescriptor(table);
127 htd.addFamily(hcd);
128 HRegionInfo info = new HRegionInfo(htd, null, null, false);
129 HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
130 HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
131
132 store = new Store(basedir, region, hcd, fs, conf);
133 }
134
135
136
137
138
139
140
141
142
143
144 public void testEmptyStoreFile() throws IOException {
145 init(this.getName());
146
147 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
148 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
149 flush(1);
150
151
152 StoreFile f = this.store.getStorefiles().get(0);
153 Path storedir = f.getPath().getParent();
154 long seqid = f.getMaxSequenceId();
155 Configuration c = HBaseConfiguration.create();
156 FileSystem fs = FileSystem.get(c);
157 StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
158 StoreFile.DEFAULT_BLOCKSIZE_SMALL);
159 w.appendMetadata(seqid + 1, false);
160 w.close();
161 this.store.close();
162
163 this.store = new Store(storedir.getParent().getParent(),
164 this.store.getHRegion(),
165 this.store.getFamily(), fs, c);
166 System.out.println(this.store.getHRegionInfo().getEncodedName());
167 assertEquals(2, this.store.getStorefilesCount());
168
169 result = HBaseTestingUtility.getFromStoreFile(store,
170 get.getRow(),
171 qualifiers);
172 assertEquals(1, result.size());
173 }
174
175
176
177
178
179 public void testGet_FromMemStoreOnly() throws IOException {
180 init(this.getName());
181
182
183 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
184 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
185 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
186 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
187 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
188 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
189
190
191 result = HBaseTestingUtility.getFromStoreFile(store,
192 get.getRow(), qualifiers);
193
194
195 assertCheck();
196 }
197
198
199
200
201
202 public void testGet_FromFilesOnly() throws IOException {
203 init(this.getName());
204
205
206 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
207 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
208
209 flush(1);
210
211
212 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
213 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
214
215 flush(2);
216
217
218 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
219 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
220
221 flush(3);
222
223
224 result = HBaseTestingUtility.getFromStoreFile(store,
225 get.getRow(),
226 qualifiers);
227
228
229
230 Collections.sort(result, KeyValue.COMPARATOR);
231
232
233 assertCheck();
234 }
235
236
237
238
239
240 public void testGet_FromMemStoreAndFiles() throws IOException {
241 init(this.getName());
242
243
244 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
245 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
246
247 flush(1);
248
249
250 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
251 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
252
253 flush(2);
254
255
256 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
257 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
258
259
260 result = HBaseTestingUtility.getFromStoreFile(store,
261 get.getRow(), qualifiers);
262
263
264 Collections.sort(result, KeyValue.COMPARATOR);
265
266
267 assertCheck();
268 }
269
270 private void flush(int storeFilessize) throws IOException{
271 this.store.snapshot();
272 flushStore(store, id++);
273 assertEquals(storeFilessize, this.store.getStorefiles().size());
274 assertEquals(0, this.store.memstore.kvset.size());
275 }
276
277 private void assertCheck() {
278 assertEquals(expected.size(), result.size());
279 for(int i=0; i<expected.size(); i++) {
280 assertEquals(expected.get(i), result.get(i));
281 }
282 }
283
284
285
286
287
288
289
290 public void testIncrementColumnValue_ICVDuringFlush()
291 throws IOException, InterruptedException {
292 init(this.getName());
293
294 long oldValue = 1L;
295 long newValue = 3L;
296 this.store.add(new KeyValue(row, family, qf1,
297 System.currentTimeMillis(),
298 Bytes.toBytes(oldValue)));
299
300
301 this.store.snapshot();
302
303
304 this.store.add(new KeyValue(row, family, qf2,
305 System.currentTimeMillis(),
306 Bytes.toBytes(oldValue)));
307
308
309 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
310
311
312 assertTrue(ret > 0);
313
314
315 flushStore(store, id++);
316 assertEquals(1, this.store.getStorefiles().size());
317
318 assertEquals(2, this.store.memstore.kvset.size());
319
320
321 Get get = new Get(row);
322 get.addColumn(family, qf1);
323 get.setMaxVersions();
324 List<KeyValue> results = new ArrayList<KeyValue>();
325
326 results = HBaseTestingUtility.getFromStoreFile(store, get);
327 assertEquals(2, results.size());
328
329 long ts1 = results.get(0).getTimestamp();
330 long ts2 = results.get(1).getTimestamp();
331
332 assertTrue(ts1 > ts2);
333
334 assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
335 assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
336 }
337
338 public void testICV_negMemstoreSize() throws IOException {
339 init(this.getName());
340
341 long time = 100;
342 ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
343 ee.setValue(time);
344 EnvironmentEdgeManagerTestHelper.injectEdge(ee);
345 long newValue = 3L;
346 long size = 0;
347
348
349 size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
350 System.currentTimeMillis(),
351 Bytes.toBytes(newValue)));
352 size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
353 System.currentTimeMillis(),
354 Bytes.toBytes(newValue)));
355 size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
356 System.currentTimeMillis(),
357 Bytes.toBytes(newValue)));
358 size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
359 System.currentTimeMillis(),
360 Bytes.toBytes(newValue)));
361 size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
362 System.currentTimeMillis(),
363 Bytes.toBytes(newValue)));
364
365
366 for ( int i = 0 ; i < 10000 ; ++i) {
367 newValue++;
368
369 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
370 long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
371
372 if (ret != 0) System.out.println("ret: " + ret);
373 if (ret2 != 0) System.out.println("ret2: " + ret2);
374
375 assertTrue("ret: " + ret, ret >= 0);
376 size += ret;
377 assertTrue("ret2: " + ret2, ret2 >= 0);
378 size += ret2;
379
380
381 if (i % 1000 == 0)
382 ee.setValue(++time);
383 }
384
385 long computedSize=0;
386 for (KeyValue kv : this.store.memstore.kvset) {
387 long kvsize = this.store.memstore.heapSizeChange(kv, true);
388
389 computedSize += kvsize;
390 }
391 assertEquals(computedSize, size);
392 }
393
394 public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
395 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
396 EnvironmentEdgeManagerTestHelper.injectEdge(mee);
397 init(this.getName());
398
399 long oldValue = 1L;
400 long newValue = 3L;
401 this.store.add(new KeyValue(row, family, qf1,
402 EnvironmentEdgeManager.currentTimeMillis(),
403 Bytes.toBytes(oldValue)));
404
405
406 this.store.snapshot();
407
408
409 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
410
411
412 assertTrue(ret > 0);
413
414
415 flushStore(store, id++);
416 assertEquals(1, this.store.getStorefiles().size());
417 assertEquals(1, this.store.memstore.kvset.size());
418
419
420 newValue += 1;
421 this.store.updateColumnValue(row, family, qf1, newValue);
422
423
424 newValue += 1;
425 this.store.updateColumnValue(row, family, qf1, newValue);
426
427
428
429
430
431 Get get = new Get(row);
432 get.addColumn(family, qf1);
433 get.setMaxVersions();
434 List<KeyValue> results = new ArrayList<KeyValue>();
435
436 results = HBaseTestingUtility.getFromStoreFile(store, get);
437 assertEquals(2, results.size());
438
439 long ts1 = results.get(0).getTimestamp();
440 long ts2 = results.get(1).getTimestamp();
441
442 assertTrue(ts1 > ts2);
443 assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
444 assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
445
446 mee.setValue(2);
447 newValue += 1;
448 this.store.updateColumnValue(row, family, qf1, newValue);
449
450 results = HBaseTestingUtility.getFromStoreFile(store, get);
451 assertEquals(2, results.size());
452
453 ts1 = results.get(0).getTimestamp();
454 ts2 = results.get(1).getTimestamp();
455
456 assertTrue(ts1 > ts2);
457 assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
458 assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
459 }
460
461 public void testHandleErrorsInFlush() throws Exception {
462 LOG.info("Setting up a faulty file system that cannot write");
463
464 final Configuration conf = HBaseConfiguration.create();
465 User user = User.createUserForTesting(conf,
466 "testhandleerrorsinflush", new String[]{"foo"});
467
468 conf.setClass("fs.file.impl", FaultyFileSystem.class,
469 FileSystem.class);
470 user.runAs(new PrivilegedExceptionAction<Object>() {
471 public Object run() throws Exception {
472
473 FileSystem fs = FileSystem.get(conf);
474 assertEquals(FaultyFileSystem.class, fs.getClass());
475
476
477 init(getName(), conf);
478
479 LOG.info("Adding some data");
480 store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
481 store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
482 store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
483
484 LOG.info("Before flush, we should have no files");
485 FileStatus[] files = fs.listStatus(store.getHomedir());
486 Path[] paths = FileUtil.stat2Paths(files);
487 System.err.println("Got paths: " + Joiner.on(",").join(paths));
488 assertEquals(0, paths.length);
489
490
491 try {
492 LOG.info("Flushing");
493 flush(1);
494 fail("Didn't bubble up IOE!");
495 } catch (IOException ioe) {
496 assertTrue(ioe.getMessage().contains("Fault injected"));
497 }
498
499 LOG.info("After failed flush, we should still have no files!");
500 files = fs.listStatus(store.getHomedir());
501 paths = FileUtil.stat2Paths(files);
502 System.err.println("Got paths: " + Joiner.on(",").join(paths));
503 assertEquals(0, paths.length);
504 return null;
505 }
506 });
507 }
508
509
510 static class FaultyFileSystem extends FilterFileSystem {
511 List<SoftReference<FaultyOutputStream>> outStreams =
512 new ArrayList<SoftReference<FaultyOutputStream>>();
513 private long faultPos = 200;
514
515 public FaultyFileSystem() {
516 super(new LocalFileSystem());
517 System.err.println("Creating faulty!");
518 }
519
520 @Override
521 public FSDataOutputStream create(Path p) throws IOException {
522 return new FaultyOutputStream(super.create(p), faultPos);
523 }
524
525 }
526
527 static class FaultyOutputStream extends FSDataOutputStream {
528 volatile long faultPos = Long.MAX_VALUE;
529
530 public FaultyOutputStream(FSDataOutputStream out,
531 long faultPos) throws IOException {
532 super(out, null);
533 this.faultPos = faultPos;
534 }
535
536 @Override
537 public void write(byte[] buf, int offset, int length) throws IOException {
538 System.err.println("faulty stream write at pos " + getPos());
539 injectFault();
540 super.write(buf, offset, length);
541 }
542
543 private void injectFault() throws IOException {
544 if (getPos() >= faultPos) {
545 throw new IOException("Fault injected");
546 }
547 }
548 }
549
550
551
552 private static void flushStore(Store store, long id) throws IOException {
553 StoreFlusher storeFlusher = store.getStoreFlusher(id);
554 storeFlusher.prepare();
555 storeFlusher.flushCache();
556 storeFlusher.commit();
557 }
558
559
560
561
562
563
564
565
566
567
568
569 List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
570 byte[] qualifier, byte[] family) {
571 List<KeyValue> kvList = new ArrayList<KeyValue>();
572 for (int i=1;i<=numRows;i++) {
573 byte[] b = Bytes.toBytes(i);
574 for (long timestamp: timestamps) {
575 kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
576 }
577 }
578 return kvList;
579 }
580
581
582
583
584
585 public void testMultipleTimestamps() throws IOException {
586 int numRows = 1;
587 long[] timestamps1 = new long[] {1,5,10,20};
588 long[] timestamps2 = new long[] {30,80};
589
590 init(this.getName());
591
592 List<KeyValue> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
593 for (KeyValue kv : kvList1) {
594 this.store.add(kv);
595 }
596
597 this.store.snapshot();
598 flushStore(store, id++);
599
600 List<KeyValue> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
601 for(KeyValue kv : kvList2) {
602 this.store.add(kv);
603 }
604
605 List<KeyValue> result;
606 Get get = new Get(Bytes.toBytes(1));
607 get.addColumn(family,qf1);
608
609 get.setTimeRange(0,15);
610 result = HBaseTestingUtility.getFromStoreFile(store, get);
611 assertTrue(result.size()>0);
612
613 get.setTimeRange(40,90);
614 result = HBaseTestingUtility.getFromStoreFile(store, get);
615 assertTrue(result.size()>0);
616
617 get.setTimeRange(10,45);
618 result = HBaseTestingUtility.getFromStoreFile(store, get);
619 assertTrue(result.size()>0);
620
621 get.setTimeRange(80,145);
622 result = HBaseTestingUtility.getFromStoreFile(store, get);
623 assertTrue(result.size()>0);
624
625 get.setTimeRange(1,2);
626 result = HBaseTestingUtility.getFromStoreFile(store, get);
627 assertTrue(result.size()>0);
628
629 get.setTimeRange(90,200);
630 result = HBaseTestingUtility.getFromStoreFile(store, get);
631 assertTrue(result.size()==0);
632 }
633 }