1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.filter;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
38  import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.regionserver.InternalScanner;
41  import org.apache.hadoop.hbase.util.Bytes;
42  
43  import junit.framework.TestCase;
44  
45  public class TestDependentColumnFilter extends TestCase {
46    private final Log LOG = LogFactory.getLog(this.getClass());
47    private static final byte[][] ROWS = {
48  	  Bytes.toBytes("test1"),Bytes.toBytes("test2")
49    };
50    private static final byte[][] FAMILIES = {
51  	  Bytes.toBytes("familyOne"),Bytes.toBytes("familyTwo")
52    };
53    private static final long STAMP_BASE = System.currentTimeMillis();
54    private static final long[] STAMPS = {
55  	  STAMP_BASE-100, STAMP_BASE-200, STAMP_BASE-300
56    };
57    private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
58    private static final byte[][] BAD_VALS = {
59  	  Bytes.toBytes("bad1"), Bytes.toBytes("bad2"), Bytes.toBytes("bad3")
60    };
61    private static final byte[] MATCH_VAL = Bytes.toBytes("match");
62    private HBaseTestingUtility testUtil;
63  
64    List<KeyValue> testVals;
65    private HRegion region;
66  
67    @Override
68    protected void setUp() throws Exception {
69      super.setUp();
70  
71      testUtil = new HBaseTestingUtility();
72  
73      testVals = makeTestVals();
74  
75      HTableDescriptor htd = new HTableDescriptor(getName());
76      htd.addFamily(new HColumnDescriptor(FAMILIES[0]));
77      htd.addFamily(new HColumnDescriptor(FAMILIES[1]));
78      HRegionInfo info = new HRegionInfo(htd, null, null, false);
79      this.region = HRegion.createHRegion(info, testUtil.getTestDir(), testUtil.getConfiguration());
80      addData();
81    }
82  
83    @Override
84    protected void tearDown() throws Exception {
85      super.tearDown();
86      this.region.close();
87    }
88  
89    private void addData() throws IOException {
90      Put put = new Put(ROWS[0]);
91      // add in an entry for each stamp, with 2 as a "good" value
92      put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
93      put.add(FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]);
94      put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
95      // add in entries for stamps 0 and 2.
96      // without a value check both will be "accepted"
97      // with one 2 will be accepted(since the corresponding ts entry
98      // has a matching value
99      put.add(FAMILIES[1], QUALIFIER, STAMPS[0], BAD_VALS[0]);
100     put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
101 
102     this.region.put(put);
103 
104     put = new Put(ROWS[1]);
105     put.add(FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]);
106     // there is no corresponding timestamp for this so it should never pass
107     put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
108     // if we reverse the qualifiers this one should pass
109     put.add(FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL);
110     // should pass
111     put.add(FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]);
112 
113     this.region.put(put);
114   }
115 
116   private List<KeyValue> makeTestVals() {
117 	List<KeyValue> testVals = new ArrayList<KeyValue>();
118 	testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[0], BAD_VALS[0]));
119 	testVals.add(new KeyValue(ROWS[0], FAMILIES[0], QUALIFIER, STAMPS[1], BAD_VALS[1]));
120 	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[1], BAD_VALS[2]));
121 	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[0], MATCH_VAL));
122 	testVals.add(new KeyValue(ROWS[0], FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]));
123 
124 	return testVals;
125   }
126 
127   /**
128    * This shouldn't be confused with TestFilter#verifyScan
129    * as expectedKeys is not the per row total, but the scan total
130    *
131    * @param s
132    * @param expectedRows
133    * @param expectedCells
134    * @throws IOException
135    */
136   private void verifyScan(Scan s, long expectedRows, long expectedCells)
137   throws IOException {
138     InternalScanner scanner = this.region.getScanner(s);
139     List<KeyValue> results = new ArrayList<KeyValue>();
140     int i = 0;
141     int cells = 0;
142     for (boolean done = true; done; i++) {
143       done = scanner.next(results);
144       Arrays.sort(results.toArray(new KeyValue[results.size()]),
145           KeyValue.COMPARATOR);
146       LOG.info("counter=" + i + ", " + results);
147       if (results.isEmpty()) break;
148       cells += results.size();
149       assertTrue("Scanned too many rows! Only expected " + expectedRows +
150           " total but already scanned " + (i+1), expectedRows > i);
151       assertTrue("Expected " + expectedCells + " cells total but " +
152           "already scanned " + cells, expectedCells >= cells);
153       results.clear();
154     }
155     assertEquals("Expected " + expectedRows + " rows but scanned " + i +
156         " rows", expectedRows, i);
157     assertEquals("Expected " + expectedCells + " cells but scanned " + cells +
158             " cells", expectedCells, cells);
159   }
160 
161   /**
162    * Test scans using a DependentColumnFilter
163    */
164   public void testScans() throws Exception {
165     Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
166 
167     Scan scan = new Scan();
168     scan.setFilter(filter);
169     scan.setMaxVersions(Integer.MAX_VALUE);
170 
171     verifyScan(scan, 2, 8);
172 
173     // drop the filtering cells
174     filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true);
175     scan = new Scan();
176     scan.setFilter(filter);
177     scan.setMaxVersions(Integer.MAX_VALUE);
178 
179     verifyScan(scan, 2, 3);
180 
181     // include a comparator operation
182     filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, false,
183         CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
184     scan = new Scan();
185     scan.setFilter(filter);
186     scan.setMaxVersions(Integer.MAX_VALUE);
187 
188     /*
189      * expecting to get the following 3 cells
190      * row 0
191      *   put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
192      *   put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
193      * row 1
194      *   put.add(FAMILIES[0], QUALIFIER, STAMPS[2], MATCH_VAL);
195      */
196     verifyScan(scan, 2, 3);
197 
198     // include a comparator operation and drop comparator
199     filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER, true,
200         CompareOp.EQUAL, new BinaryComparator(MATCH_VAL));
201     scan = new Scan();
202     scan.setFilter(filter);
203     scan.setMaxVersions(Integer.MAX_VALUE);
204 
205     /*
206      * expecting to get the following 1 cell
207      * row 0
208      *   put.add(FAMILIES[1], QUALIFIER, STAMPS[2], BAD_VALS[2]);
209      */
210     verifyScan(scan, 1, 1);
211 
212   }
213 
214   /**
215    * Test that the filter correctly drops rows without a corresponding timestamp
216    *
217    * @throws Exception
218    */
219   public void testFilterDropping() throws Exception {
220     Filter filter = new DependentColumnFilter(FAMILIES[0], QUALIFIER);
221     List<KeyValue> accepted = new ArrayList<KeyValue>();
222     for(KeyValue val : testVals) {
223       if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
224         accepted.add(val);
225       }
226     }
227     assertEquals("check all values accepted from filterKeyValue", 5, accepted.size());
228 
229     filter.filterRow(accepted);
230     assertEquals("check filterRow(List<KeyValue>) dropped cell without corresponding column entry", 4, accepted.size());
231 
232     // start do it again with dependent column dropping on
233     filter = new DependentColumnFilter(FAMILIES[1], QUALIFIER, true);
234     accepted.clear();
235     for(KeyValue val : testVals) {
236         if(filter.filterKeyValue(val) == ReturnCode.INCLUDE) {
237           accepted.add(val);
238         }
239       }
240       assertEquals("check the filtering column cells got dropped", 2, accepted.size());
241 
242       filter.filterRow(accepted);
243       assertEquals("check cell retention", 2, accepted.size());
244   }
245 }