1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configurable;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileUtil;
35  import org.apache.hadoop.hbase.HBaseClusterTestCase;
36  import org.apache.hadoop.hbase.HBaseConfiguration;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.client.HBaseAdmin;
41  import org.apache.hadoop.hbase.client.HTable;
42  import org.apache.hadoop.hbase.client.Put;
43  import org.apache.hadoop.hbase.client.Result;
44  import org.apache.hadoop.hbase.client.ResultScanner;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.io.MapWritable;
49  import org.apache.hadoop.io.Text;
50  import org.apache.hadoop.mapred.MiniMRCluster;
51  import org.apache.hadoop.mapreduce.Job;
52  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
53  
54  public class TestTimeRangeMapRed extends HBaseClusterTestCase {
55  
56    private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
57  
58    private static final byte [] KEY = Bytes.toBytes("row1");
59    private static final NavigableMap<Long, Boolean> TIMESTAMP =
60      new TreeMap<Long, Boolean>();
61    static {
62      TIMESTAMP.put((long)1245620000, false);
63      TIMESTAMP.put((long)1245620005, true); // include
64      TIMESTAMP.put((long)1245620010, true); // include
65      TIMESTAMP.put((long)1245620055, true); // include
66      TIMESTAMP.put((long)1245620100, true); // include
67      TIMESTAMP.put((long)1245620150, false);
68      TIMESTAMP.put((long)1245620250, false);
69    }
70    static final long MINSTAMP = 1245620005;
71    static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it.
72  
73    static final byte[] TABLE_NAME = Bytes.toBytes("table123");
74    static final byte[] FAMILY_NAME = Bytes.toBytes("text");
75    static final byte[] COLUMN_NAME = Bytes.toBytes("input");
76  
77    protected HTableDescriptor desc;
78    protected HTable table;
79  
80    public TestTimeRangeMapRed() {
81      super();
82      System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
83      conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));
84      this.setOpenMetaTable(true);
85    }
86  
87    @Override
88    public void setUp() throws Exception {
89      super.setUp();
90      desc = new HTableDescriptor(TABLE_NAME);
91      HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
92      col.setMaxVersions(Integer.MAX_VALUE);
93      desc.addFamily(col);
94      HBaseAdmin admin = new HBaseAdmin(conf);
95      admin.createTable(desc);
96      table = new HTable(conf, desc.getName());
97    }
98  
99    private static class ProcessTimeRangeMapper
100   extends TableMapper<ImmutableBytesWritable, MapWritable>
101   implements Configurable {
102 
103     private Configuration conf = null;
104     private HTable table = null;
105 
106     @Override
107     public void map(ImmutableBytesWritable key, Result result,
108         Context context)
109     throws IOException {
110       List<Long> tsList = new ArrayList<Long>();
111       for (KeyValue kv : result.sorted()) {
112         tsList.add(kv.getTimestamp());
113       }
114 
115       for (Long ts : tsList) {
116         Put put = new Put(key.get());
117         put.add(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
118         table.put(put);
119       }
120       table.flushCommits();
121     }
122 
123     @Override
124     public Configuration getConf() {
125       return conf;
126     }
127 
128     @Override
129     public void setConf(Configuration configuration) {
130       this.conf = configuration;
131       try {
132         table = new HTable(HBaseConfiguration.create(conf), TABLE_NAME);
133       } catch (IOException e) {
134         e.printStackTrace();
135       }
136     }
137 
138   }
139 
140   public void testTimeRangeMapRed()
141   throws IOException, InterruptedException, ClassNotFoundException {
142     prepareTest();
143     runTestOnTable();
144     verify();
145   }
146 
147   private void prepareTest() throws IOException {
148     for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
149       Put put = new Put(KEY);
150       put.add(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
151       table.put(put);
152     }
153     table.flushCommits();
154   }
155 
156   private void runTestOnTable()
157   throws IOException, InterruptedException, ClassNotFoundException {
158     MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
159     Job job = null;
160     try {
161       job = new Job(conf, "test123");
162       job.setOutputFormatClass(NullOutputFormat.class);
163       job.setNumReduceTasks(0);
164       Scan scan = new Scan();
165       scan.addColumn(FAMILY_NAME, COLUMN_NAME);
166       scan.setTimeRange(MINSTAMP, MAXSTAMP);
167       scan.setMaxVersions();
168       TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME),
169         scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
170       job.waitForCompletion(true);
171     } catch (IOException e) {
172       // TODO Auto-generated catch block
173       e.printStackTrace();
174     } finally {
175       mrCluster.shutdown();
176       if (job != null) {
177         FileUtil.fullyDelete(
178           new File(job.getConfiguration().get("hadoop.tmp.dir")));
179       }
180     }
181   }
182 
183   private void verify() throws IOException {
184     Scan scan = new Scan();
185     scan.addColumn(FAMILY_NAME, COLUMN_NAME);
186     scan.setMaxVersions(1);
187     ResultScanner scanner = table.getScanner(scan);
188     for (Result r: scanner) {
189       for (KeyValue kv : r.sorted()) {
190         log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(kv.getFamily())
191             + "\t" + Bytes.toString(kv.getQualifier())
192             + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(kv.getValue()));
193         assertEquals(TIMESTAMP.get(kv.getTimestamp()), (Boolean)Bytes.toBoolean(kv.getValue()));
194       }
195     }
196     scanner.close();
197   }
198 
199 }