1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.apache.hadoop.hbase.HBaseTestCase;
30  import org.apache.hadoop.hbase.KeyValue;
31  import org.apache.hadoop.hbase.util.Bytes;
32  
33  
34  public class TestKeyValueHeap extends HBaseTestCase {
35    private static final boolean PRINT = false;
36  
37    List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
38  
39    private byte[] row1;
40    private byte[] fam1;
41    private byte[] col1;
42    private byte[] data;
43  
44    private byte[] row2;
45    private byte[] fam2;
46    private byte[] col2;
47  
48    private byte[] col3;
49    private byte[] col4;
50    private byte[] col5;
51  
52    public void setUp() throws Exception {
53      super.setUp();
54      data = Bytes.toBytes("data");
55      row1 = Bytes.toBytes("row1");
56      fam1 = Bytes.toBytes("fam1");
57      col1 = Bytes.toBytes("col1");
58      row2 = Bytes.toBytes("row2");
59      fam2 = Bytes.toBytes("fam2");
60      col2 = Bytes.toBytes("col2");
61      col3 = Bytes.toBytes("col3");
62      col4 = Bytes.toBytes("col4");
63      col5 = Bytes.toBytes("col5");
64    }
65  
66    public void testSorted() throws IOException{
67      //Cases that need to be checked are:
68      //1. The "smallest" KeyValue is in the same scanners as current
69      //2. Current scanner gets empty
70  
71      List<KeyValue> l1 = new ArrayList<KeyValue>();
72      l1.add(new KeyValue(row1, fam1, col5, data));
73      l1.add(new KeyValue(row2, fam1, col1, data));
74      l1.add(new KeyValue(row2, fam1, col2, data));
75      scanners.add(new Scanner(l1));
76  
77      List<KeyValue> l2 = new ArrayList<KeyValue>();
78      l2.add(new KeyValue(row1, fam1, col1, data));
79      l2.add(new KeyValue(row1, fam1, col2, data));
80      scanners.add(new Scanner(l2));
81  
82      List<KeyValue> l3 = new ArrayList<KeyValue>();
83      l3.add(new KeyValue(row1, fam1, col3, data));
84      l3.add(new KeyValue(row1, fam1, col4, data));
85      l3.add(new KeyValue(row1, fam2, col1, data));
86      l3.add(new KeyValue(row1, fam2, col2, data));
87      l3.add(new KeyValue(row2, fam1, col3, data));
88      scanners.add(new Scanner(l3));
89  
90      List<KeyValue> expected = new ArrayList<KeyValue>();
91      expected.add(new KeyValue(row1, fam1, col1, data));
92      expected.add(new KeyValue(row1, fam1, col2, data));
93      expected.add(new KeyValue(row1, fam1, col3, data));
94      expected.add(new KeyValue(row1, fam1, col4, data));
95      expected.add(new KeyValue(row1, fam1, col5, data));
96      expected.add(new KeyValue(row1, fam2, col1, data));
97      expected.add(new KeyValue(row1, fam2, col2, data));
98      expected.add(new KeyValue(row2, fam1, col1, data));
99      expected.add(new KeyValue(row2, fam1, col2, data));
100     expected.add(new KeyValue(row2, fam1, col3, data));
101 
102     //Creating KeyValueHeap
103     KeyValueHeap kvh =
104       new KeyValueHeap(scanners, KeyValue.COMPARATOR);
105 
106     List<KeyValue> actual = new ArrayList<KeyValue>();
107     while(kvh.peek() != null){
108       actual.add(kvh.next());
109     }
110 
111     assertEquals(expected.size(), actual.size());
112     for(int i=0; i<expected.size(); i++){
113       assertEquals(expected.get(i), actual.get(i));
114       if(PRINT){
115         System.out.println("expected " +expected.get(i)+
116             "\nactual   " +actual.get(i) +"\n");
117       }
118     }
119 
120     //Check if result is sorted according to Comparator
121     for(int i=0; i<actual.size()-1; i++){
122       int ret = KeyValue.COMPARATOR.compare(actual.get(i), actual.get(i+1));
123       assertTrue(ret < 0);
124     }
125 
126   }
127 
128   public void testSeek() throws IOException {
129     //Cases:
130     //1. Seek KeyValue that is not in scanner
131     //2. Check that smallest that is returned from a seek is correct
132 
133     List<KeyValue> l1 = new ArrayList<KeyValue>();
134     l1.add(new KeyValue(row1, fam1, col5, data));
135     l1.add(new KeyValue(row2, fam1, col1, data));
136     l1.add(new KeyValue(row2, fam1, col2, data));
137     scanners.add(new Scanner(l1));
138 
139     List<KeyValue> l2 = new ArrayList<KeyValue>();
140     l2.add(new KeyValue(row1, fam1, col1, data));
141     l2.add(new KeyValue(row1, fam1, col2, data));
142     scanners.add(new Scanner(l2));
143 
144     List<KeyValue> l3 = new ArrayList<KeyValue>();
145     l3.add(new KeyValue(row1, fam1, col3, data));
146     l3.add(new KeyValue(row1, fam1, col4, data));
147     l3.add(new KeyValue(row1, fam2, col1, data));
148     l3.add(new KeyValue(row1, fam2, col2, data));
149     l3.add(new KeyValue(row2, fam1, col3, data));
150     scanners.add(new Scanner(l3));
151 
152     List<KeyValue> expected = new ArrayList<KeyValue>();
153     expected.add(new KeyValue(row2, fam1, col1, data));
154 
155     //Creating KeyValueHeap
156     KeyValueHeap kvh =
157       new KeyValueHeap(scanners, KeyValue.COMPARATOR);
158 
159     KeyValue seekKv = new KeyValue(row2, fam1, null, null);
160     kvh.seek(seekKv);
161 
162     List<KeyValue> actual = new ArrayList<KeyValue>();
163     actual.add(kvh.peek());
164 
165     assertEquals(expected.size(), actual.size());
166     for(int i=0; i<expected.size(); i++){
167       assertEquals(expected.get(i), actual.get(i));
168       if(PRINT){
169         System.out.println("expected " +expected.get(i)+
170             "\nactual   " +actual.get(i) +"\n");
171       }
172     }
173 
174   }
175 
176   public void testScannerLeak() throws IOException {
177     // Test for unclosed scanners (HBASE-1927)
178 
179     List<KeyValue> l1 = new ArrayList<KeyValue>();
180     l1.add(new KeyValue(row1, fam1, col5, data));
181     l1.add(new KeyValue(row2, fam1, col1, data));
182     l1.add(new KeyValue(row2, fam1, col2, data));
183     scanners.add(new Scanner(l1));
184 
185     List<KeyValue> l2 = new ArrayList<KeyValue>();
186     l2.add(new KeyValue(row1, fam1, col1, data));
187     l2.add(new KeyValue(row1, fam1, col2, data));
188     scanners.add(new Scanner(l2));
189 
190     List<KeyValue> l3 = new ArrayList<KeyValue>();
191     l3.add(new KeyValue(row1, fam1, col3, data));
192     l3.add(new KeyValue(row1, fam1, col4, data));
193     l3.add(new KeyValue(row1, fam2, col1, data));
194     l3.add(new KeyValue(row1, fam2, col2, data));
195     l3.add(new KeyValue(row2, fam1, col3, data));
196     scanners.add(new Scanner(l3));
197 
198     List<KeyValue> l4 = new ArrayList<KeyValue>();
199     scanners.add(new Scanner(l4));
200 
201     //Creating KeyValueHeap
202     KeyValueHeap kvh = new KeyValueHeap(scanners, KeyValue.COMPARATOR);
203 
204     while(kvh.next() != null);
205 
206     for(KeyValueScanner scanner : scanners) {
207       assertTrue(((Scanner)scanner).isClosed());
208     }
209   }
210 
211   private static class Scanner implements KeyValueScanner {
212     private Iterator<KeyValue> iter;
213     private KeyValue current;
214     private boolean closed = false;
215 
216     public Scanner(List<KeyValue> list) {
217       Collections.sort(list, KeyValue.COMPARATOR);
218       iter = list.iterator();
219       if(iter.hasNext()){
220         current = iter.next();
221       }
222     }
223 
224     public KeyValue peek() {
225       return current;
226     }
227 
228     public KeyValue next() {
229       KeyValue oldCurrent = current;
230       if(iter.hasNext()){
231         current = iter.next();
232       } else {
233         current = null;
234       }
235       return oldCurrent;
236     }
237 
238     public void close(){
239       closed = true;
240     }
241 
242     public boolean isClosed() {
243       return closed;
244     }
245 
246     public boolean seek(KeyValue seekKv) {
247       while(iter.hasNext()){
248         KeyValue next = iter.next();
249         int ret = KeyValue.COMPARATOR.compare(next, seekKv);
250         if(ret >= 0){
251           current = next;
252           return true;
253         }
254       }
255       return false;
256     }
257 
258     @Override
259     public boolean reseek(KeyValue key) throws IOException {
260       return seek(key);
261     }
262 
263     @Override
264     public long getSequenceID() {
265       return 0;
266     }
267   }
268 
269 }