1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.lang.ref.SoftReference;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NavigableSet;
31  import java.util.concurrent.ConcurrentSkipListSet;
32  
33  import junit.framework.TestCase;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.fs.FileStatus;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.FileUtil;
42  import org.apache.hadoop.fs.FilterFileSystem;
43  import org.apache.hadoop.fs.LocalFileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HBaseTestingUtility;
47  import org.apache.hadoop.hbase.HColumnDescriptor;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.KeyValue;
52  import org.apache.hadoop.hbase.client.Get;
53  import org.apache.hadoop.hbase.regionserver.wal.HLog;
54  import org.apache.hadoop.hbase.security.User;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
57  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
58  import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
59  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
60  
61  import com.google.common.base.Joiner;
62  
63  /**
64   * Test class fosr the Store
65   */
66  public class TestStore extends TestCase {
67    public static final Log LOG = LogFactory.getLog(TestStore.class);
68  
69    Store store;
70    byte [] table = Bytes.toBytes("table");
71    byte [] family = Bytes.toBytes("family");
72  
73    byte [] row = Bytes.toBytes("row");
74    byte [] row2 = Bytes.toBytes("row2");
75    byte [] qf1 = Bytes.toBytes("qf1");
76    byte [] qf2 = Bytes.toBytes("qf2");
77    byte [] qf3 = Bytes.toBytes("qf3");
78    byte [] qf4 = Bytes.toBytes("qf4");
79    byte [] qf5 = Bytes.toBytes("qf5");
80    byte [] qf6 = Bytes.toBytes("qf6");
81  
82    NavigableSet<byte[]> qualifiers =
83      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
84  
85    List<KeyValue> expected = new ArrayList<KeyValue>();
86    List<KeyValue> result = new ArrayList<KeyValue>();
87  
88    long id = System.currentTimeMillis();
89    Get get = new Get(row);
90  
91    private static final String DIR = HBaseTestingUtility.getTestDir() + "/TestStore/";
92  
93    /**
94     * Setup
95     * @throws IOException
96     */
97    @Override
98    public void setUp() throws IOException {
99      qualifiers.add(qf1);
100     qualifiers.add(qf3);
101     qualifiers.add(qf5);
102 
103     Iterator<byte[]> iter = qualifiers.iterator();
104     while(iter.hasNext()){
105       byte [] next = iter.next();
106       expected.add(new KeyValue(row, family, next, 1, (byte[])null));
107       get.addColumn(family, next);
108     }
109   }
110 
111   private void init(String methodName) throws IOException {
112     init(methodName, HBaseConfiguration.create());
113   }
114 
115   private void init(String methodName, Configuration conf)
116   throws IOException {
117     //Setting up a Store
118     Path basedir = new Path(DIR+methodName);
119     Path logdir = new Path(DIR+methodName+"/logs");
120     Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
121     HColumnDescriptor hcd = new HColumnDescriptor(family);
122     FileSystem fs = FileSystem.get(conf);
123 
124     fs.delete(logdir, true);
125 
126     HTableDescriptor htd = new HTableDescriptor(table);
127     htd.addFamily(hcd);
128     HRegionInfo info = new HRegionInfo(htd, null, null, false);
129     HLog hlog = new HLog(fs, logdir, oldLogDir, conf);
130     HRegion region = new HRegion(basedir, hlog, fs, conf, info, null);
131 
132     store = new Store(basedir, region, hcd, fs, conf);
133   }
134 
135 
136   //////////////////////////////////////////////////////////////////////////////
137   // Get tests
138   //////////////////////////////////////////////////////////////////////////////
139 
140   /**
141    * Test for hbase-1686.
142    * @throws IOException
143    */
144   public void testEmptyStoreFile() throws IOException {
145     init(this.getName());
146     // Write a store file.
147     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
148     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
149     flush(1);
150     // Now put in place an empty store file.  Its a little tricky.  Have to
151     // do manually with hacked in sequence id.
152     StoreFile f = this.store.getStorefiles().get(0);
153     Path storedir = f.getPath().getParent();
154     long seqid = f.getMaxSequenceId();
155     Configuration c = HBaseConfiguration.create();
156     FileSystem fs = FileSystem.get(c);
157     StoreFile.Writer w = StoreFile.createWriter(fs, storedir,
158         StoreFile.DEFAULT_BLOCKSIZE_SMALL);
159     w.appendMetadata(seqid + 1, false);
160     w.close();
161     this.store.close();
162     // Reopen it... should pick up two files
163     this.store = new Store(storedir.getParent().getParent(),
164       this.store.getHRegion(),
165       this.store.getFamily(), fs, c);
166     System.out.println(this.store.getHRegionInfo().getEncodedName());
167     assertEquals(2, this.store.getStorefilesCount());
168 
169     result = HBaseTestingUtility.getFromStoreFile(store,
170         get.getRow(),
171         qualifiers);
172     assertEquals(1, result.size());
173   }
174 
175   /**
176    * Getting data from memstore only
177    * @throws IOException
178    */
179   public void testGet_FromMemStoreOnly() throws IOException {
180     init(this.getName());
181 
182     //Put data in memstore
183     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
184     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
185     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
186     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
187     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
188     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
189 
190     //Get
191     result = HBaseTestingUtility.getFromStoreFile(store,
192         get.getRow(), qualifiers);
193 
194     //Compare
195     assertCheck();
196   }
197 
198   /**
199    * Getting data from files only
200    * @throws IOException
201    */
202   public void testGet_FromFilesOnly() throws IOException {
203     init(this.getName());
204 
205     //Put data in memstore
206     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
207     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
208     //flush
209     flush(1);
210 
211     //Add more data
212     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
213     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
214     //flush
215     flush(2);
216 
217     //Add more data
218     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
219     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
220     //flush
221     flush(3);
222 
223     //Get
224     result = HBaseTestingUtility.getFromStoreFile(store,
225         get.getRow(),
226         qualifiers);
227     //this.store.get(get, qualifiers, result);
228 
229     //Need to sort the result since multiple files
230     Collections.sort(result, KeyValue.COMPARATOR);
231 
232     //Compare
233     assertCheck();
234   }
235 
236   /**
237    * Getting data from memstore and files
238    * @throws IOException
239    */
240   public void testGet_FromMemStoreAndFiles() throws IOException {
241     init(this.getName());
242 
243     //Put data in memstore
244     this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
245     this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
246     //flush
247     flush(1);
248 
249     //Add more data
250     this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
251     this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
252     //flush
253     flush(2);
254 
255     //Add more data
256     this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
257     this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
258 
259     //Get
260     result = HBaseTestingUtility.getFromStoreFile(store,
261         get.getRow(), qualifiers);
262 
263     //Need to sort the result since multiple files
264     Collections.sort(result, KeyValue.COMPARATOR);
265 
266     //Compare
267     assertCheck();
268   }
269 
270   private void flush(int storeFilessize) throws IOException{
271     this.store.snapshot();
272     flushStore(store, id++);
273     assertEquals(storeFilessize, this.store.getStorefiles().size());
274     assertEquals(0, this.store.memstore.kvset.size());
275   }
276 
277   private void assertCheck() {
278     assertEquals(expected.size(), result.size());
279     for(int i=0; i<expected.size(); i++) {
280       assertEquals(expected.get(i), result.get(i));
281     }
282   }
283 
284   //////////////////////////////////////////////////////////////////////////////
285   // IncrementColumnValue tests
286   //////////////////////////////////////////////////////////////////////////////
287   /*
288    * test the internal details of how ICV works, especially during a flush scenario.
289    */
290   public void testIncrementColumnValue_ICVDuringFlush()
291       throws IOException, InterruptedException {
292     init(this.getName());
293 
294     long oldValue = 1L;
295     long newValue = 3L;
296     this.store.add(new KeyValue(row, family, qf1,
297         System.currentTimeMillis(),
298         Bytes.toBytes(oldValue)));
299 
300     // snapshot the store.
301     this.store.snapshot();
302 
303     // add other things:
304     this.store.add(new KeyValue(row, family, qf2,
305         System.currentTimeMillis(),
306         Bytes.toBytes(oldValue)));
307 
308     // update during the snapshot.
309     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
310 
311     // memstore should have grown by some amount.
312     assertTrue(ret > 0);
313 
314     // then flush.
315     flushStore(store, id++);
316     assertEquals(1, this.store.getStorefiles().size());
317     // from the one we inserted up there, and a new one
318     assertEquals(2, this.store.memstore.kvset.size());
319 
320     // how many key/values for this row are there?
321     Get get = new Get(row);
322     get.addColumn(family, qf1);
323     get.setMaxVersions(); // all versions.
324     List<KeyValue> results = new ArrayList<KeyValue>();
325 
326     results = HBaseTestingUtility.getFromStoreFile(store, get);
327     assertEquals(2, results.size());
328 
329     long ts1 = results.get(0).getTimestamp();
330     long ts2 = results.get(1).getTimestamp();
331 
332     assertTrue(ts1 > ts2);
333 
334     assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
335     assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
336   }
337 
338   public void testICV_negMemstoreSize()  throws IOException {
339       init(this.getName());
340 
341     long time = 100;
342     ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
343     ee.setValue(time);
344     EnvironmentEdgeManagerTestHelper.injectEdge(ee);
345     long newValue = 3L;
346     long size = 0;
347 
348 
349     size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
350         System.currentTimeMillis(),
351         Bytes.toBytes(newValue)));
352     size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
353         System.currentTimeMillis(),
354         Bytes.toBytes(newValue)));
355     size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
356         System.currentTimeMillis(),
357         Bytes.toBytes(newValue)));
358     size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
359         System.currentTimeMillis(),
360         Bytes.toBytes(newValue)));
361     size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
362         System.currentTimeMillis(),
363         Bytes.toBytes(newValue)));
364 
365 
366     for ( int i = 0 ; i < 10000 ; ++i) {
367       newValue++;
368 
369       long ret = this.store.updateColumnValue(row, family, qf1, newValue);
370       long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
371 
372       if (ret != 0) System.out.println("ret: " + ret);
373       if (ret2 != 0) System.out.println("ret2: " + ret2);
374 
375       assertTrue("ret: " + ret, ret >= 0);
376       size += ret;
377       assertTrue("ret2: " + ret2, ret2 >= 0);
378       size += ret2;
379 
380 
381       if (i % 1000 == 0)
382         ee.setValue(++time);
383     }
384 
385     long computedSize=0;
386     for (KeyValue kv : this.store.memstore.kvset) {
387       long kvsize = this.store.memstore.heapSizeChange(kv, true);
388       //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize());
389       computedSize += kvsize;
390     }
391     assertEquals(computedSize, size);
392   }
393 
394   public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
395     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
396     EnvironmentEdgeManagerTestHelper.injectEdge(mee);
397     init(this.getName());
398 
399     long oldValue = 1L;
400     long newValue = 3L;
401     this.store.add(new KeyValue(row, family, qf1,
402         EnvironmentEdgeManager.currentTimeMillis(),
403         Bytes.toBytes(oldValue)));
404 
405     // snapshot the store.
406     this.store.snapshot();
407 
408     // update during the snapshot, the exact same TS as the Put (lololol)
409     long ret = this.store.updateColumnValue(row, family, qf1, newValue);
410 
411     // memstore should have grown by some amount.
412     assertTrue(ret > 0);
413 
414     // then flush.
415     flushStore(store, id++);
416     assertEquals(1, this.store.getStorefiles().size());
417     assertEquals(1, this.store.memstore.kvset.size());
418 
419     // now increment again:
420     newValue += 1;
421     this.store.updateColumnValue(row, family, qf1, newValue);
422 
423     // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again:
424     newValue += 1;
425     this.store.updateColumnValue(row, family, qf1, newValue);
426 
427     // the second TS should be TS=2 or higher., even though 'time=1' right now.
428 
429 
430     // how many key/values for this row are there?
431     Get get = new Get(row);
432     get.addColumn(family, qf1);
433     get.setMaxVersions(); // all versions.
434     List<KeyValue> results = new ArrayList<KeyValue>();
435 
436     results = HBaseTestingUtility.getFromStoreFile(store, get);
437     assertEquals(2, results.size());
438 
439     long ts1 = results.get(0).getTimestamp();
440     long ts2 = results.get(1).getTimestamp();
441 
442     assertTrue(ts1 > ts2);
443     assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
444     assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
445 
446     mee.setValue(2); // time goes up slightly
447     newValue += 1;
448     this.store.updateColumnValue(row, family, qf1, newValue);
449 
450     results = HBaseTestingUtility.getFromStoreFile(store, get);
451     assertEquals(2, results.size());
452 
453     ts1 = results.get(0).getTimestamp();
454     ts2 = results.get(1).getTimestamp();
455 
456     assertTrue(ts1 > ts2);
457     assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
458     assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
459   }
460 
461   public void testHandleErrorsInFlush() throws Exception {
462     LOG.info("Setting up a faulty file system that cannot write");
463 
464     final Configuration conf = HBaseConfiguration.create();
465     User user = User.createUserForTesting(conf,
466         "testhandleerrorsinflush", new String[]{"foo"});
467     // Inject our faulty LocalFileSystem
468     conf.setClass("fs.file.impl", FaultyFileSystem.class,
469         FileSystem.class);
470     user.runAs(new PrivilegedExceptionAction<Object>() {
471       public Object run() throws Exception {
472         // Make sure it worked (above is sensitive to caching details in hadoop core)
473         FileSystem fs = FileSystem.get(conf);
474         assertEquals(FaultyFileSystem.class, fs.getClass());
475 
476         // Initialize region
477         init(getName(), conf);
478 
479         LOG.info("Adding some data");
480         store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
481         store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
482         store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
483 
484         LOG.info("Before flush, we should have no files");
485         FileStatus[] files = fs.listStatus(store.getHomedir());
486         Path[] paths = FileUtil.stat2Paths(files);
487         System.err.println("Got paths: " + Joiner.on(",").join(paths));
488         assertEquals(0, paths.length);
489 
490         //flush
491         try {
492           LOG.info("Flushing");
493           flush(1);
494           fail("Didn't bubble up IOE!");
495         } catch (IOException ioe) {
496           assertTrue(ioe.getMessage().contains("Fault injected"));
497         }
498 
499         LOG.info("After failed flush, we should still have no files!");
500         files = fs.listStatus(store.getHomedir());
501         paths = FileUtil.stat2Paths(files);
502         System.err.println("Got paths: " + Joiner.on(",").join(paths));
503         assertEquals(0, paths.length);
504         return null;
505       }
506     });
507   }
508 
509 
510   static class FaultyFileSystem extends FilterFileSystem {
511     List<SoftReference<FaultyOutputStream>> outStreams =
512       new ArrayList<SoftReference<FaultyOutputStream>>();
513     private long faultPos = 200;
514 
515     public FaultyFileSystem() {
516       super(new LocalFileSystem());
517       System.err.println("Creating faulty!");
518     }
519 
520     @Override
521     public FSDataOutputStream create(Path p) throws IOException {
522       return new FaultyOutputStream(super.create(p), faultPos);
523     }
524 
525   }
526 
527   static class FaultyOutputStream extends FSDataOutputStream {
528     volatile long faultPos = Long.MAX_VALUE;
529 
530     public FaultyOutputStream(FSDataOutputStream out,
531         long faultPos) throws IOException {
532       super(out, null);
533       this.faultPos = faultPos;
534     }
535 
536     @Override
537     public void write(byte[] buf, int offset, int length) throws IOException {
538       System.err.println("faulty stream write at pos " + getPos());
539       injectFault();
540       super.write(buf, offset, length);
541     }
542 
543     private void injectFault() throws IOException {
544       if (getPos() >= faultPos) {
545         throw new IOException("Fault injected");
546       }
547     }
548   }
549 
550 
551 
552   private static void flushStore(Store store, long id) throws IOException {
553     StoreFlusher storeFlusher = store.getStoreFlusher(id);
554     storeFlusher.prepare();
555     storeFlusher.flushCache();
556     storeFlusher.commit();
557   }
558 
559 
560 
561   /**
562    * Generate a list of KeyValues for testing based on given parameters
563    * @param timestamps
564    * @param numRows
565    * @param qualifier
566    * @param family
567    * @return
568    */
569   List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
570       byte[] qualifier, byte[] family) {
571     List<KeyValue> kvList = new ArrayList<KeyValue>();
572     for (int i=1;i<=numRows;i++) {
573       byte[] b = Bytes.toBytes(i);
574       for (long timestamp: timestamps) {
575         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
576       }
577     }
578     return kvList;
579   }
580 
581   /**
582    * Test to ensure correctness when using Stores with multiple timestamps
583    * @throws IOException
584    */
585   public void testMultipleTimestamps() throws IOException {
586     int numRows = 1;
587     long[] timestamps1 = new long[] {1,5,10,20};
588     long[] timestamps2 = new long[] {30,80};
589 
590     init(this.getName());
591 
592     List<KeyValue> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
593     for (KeyValue kv : kvList1) {
594       this.store.add(kv);
595     }
596 
597     this.store.snapshot();
598     flushStore(store, id++);
599 
600     List<KeyValue> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
601     for(KeyValue kv : kvList2) {
602       this.store.add(kv);
603     }
604 
605     List<KeyValue> result;
606     Get get = new Get(Bytes.toBytes(1));
607     get.addColumn(family,qf1);
608 
609     get.setTimeRange(0,15);
610     result = HBaseTestingUtility.getFromStoreFile(store, get);
611     assertTrue(result.size()>0);
612 
613     get.setTimeRange(40,90);
614     result = HBaseTestingUtility.getFromStoreFile(store, get);
615     assertTrue(result.size()>0);
616 
617     get.setTimeRange(10,45);
618     result = HBaseTestingUtility.getFromStoreFile(store, get);
619     assertTrue(result.size()>0);
620 
621     get.setTimeRange(80,145);
622     result = HBaseTestingUtility.getFromStoreFile(store, get);
623     assertTrue(result.size()>0);
624 
625     get.setTimeRange(1,2);
626     result = HBaseTestingUtility.getFromStoreFile(store, get);
627     assertTrue(result.size()>0);
628 
629     get.setTimeRange(90,200);
630     result = HBaseTestingUtility.getFromStoreFile(store, get);
631     assertTrue(result.size()==0);
632   }
633 }