1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.NavigableMap;
28 import java.util.TreeMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configurable;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.hbase.HBaseClusterTestCase;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.client.HBaseAdmin;
41 import org.apache.hadoop.hbase.client.HTable;
42 import org.apache.hadoop.hbase.client.Put;
43 import org.apache.hadoop.hbase.client.Result;
44 import org.apache.hadoop.hbase.client.ResultScanner;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.io.MapWritable;
49 import org.apache.hadoop.io.Text;
50 import org.apache.hadoop.mapred.MiniMRCluster;
51 import org.apache.hadoop.mapreduce.Job;
52 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
53
54 public class TestTimeRangeMapRed extends HBaseClusterTestCase {
55
56 private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
57
58 private static final byte [] KEY = Bytes.toBytes("row1");
59 private static final NavigableMap<Long, Boolean> TIMESTAMP =
60 new TreeMap<Long, Boolean>();
61 static {
62 TIMESTAMP.put((long)1245620000, false);
63 TIMESTAMP.put((long)1245620005, true);
64 TIMESTAMP.put((long)1245620010, true);
65 TIMESTAMP.put((long)1245620055, true);
66 TIMESTAMP.put((long)1245620100, true);
67 TIMESTAMP.put((long)1245620150, false);
68 TIMESTAMP.put((long)1245620250, false);
69 }
70 static final long MINSTAMP = 1245620005;
71 static final long MAXSTAMP = 1245620100 + 1;
72
73 static final byte[] TABLE_NAME = Bytes.toBytes("table123");
74 static final byte[] FAMILY_NAME = Bytes.toBytes("text");
75 static final byte[] COLUMN_NAME = Bytes.toBytes("input");
76
77 protected HTableDescriptor desc;
78 protected HTable table;
79
80 public TestTimeRangeMapRed() {
81 super();
82 System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
83 conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));
84 this.setOpenMetaTable(true);
85 }
86
87 @Override
88 public void setUp() throws Exception {
89 super.setUp();
90 desc = new HTableDescriptor(TABLE_NAME);
91 HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
92 col.setMaxVersions(Integer.MAX_VALUE);
93 desc.addFamily(col);
94 HBaseAdmin admin = new HBaseAdmin(conf);
95 admin.createTable(desc);
96 table = new HTable(conf, desc.getName());
97 }
98
99 private static class ProcessTimeRangeMapper
100 extends TableMapper<ImmutableBytesWritable, MapWritable>
101 implements Configurable {
102
103 private Configuration conf = null;
104 private HTable table = null;
105
106 @Override
107 public void map(ImmutableBytesWritable key, Result result,
108 Context context)
109 throws IOException {
110 List<Long> tsList = new ArrayList<Long>();
111 for (KeyValue kv : result.sorted()) {
112 tsList.add(kv.getTimestamp());
113 }
114
115 for (Long ts : tsList) {
116 Put put = new Put(key.get());
117 put.add(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
118 table.put(put);
119 }
120 table.flushCommits();
121 }
122
123 @Override
124 public Configuration getConf() {
125 return conf;
126 }
127
128 @Override
129 public void setConf(Configuration configuration) {
130 this.conf = configuration;
131 try {
132 table = new HTable(HBaseConfiguration.create(conf), TABLE_NAME);
133 } catch (IOException e) {
134 e.printStackTrace();
135 }
136 }
137
138 }
139
140 public void testTimeRangeMapRed()
141 throws IOException, InterruptedException, ClassNotFoundException {
142 prepareTest();
143 runTestOnTable();
144 verify();
145 }
146
147 private void prepareTest() throws IOException {
148 for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
149 Put put = new Put(KEY);
150 put.add(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
151 table.put(put);
152 }
153 table.flushCommits();
154 }
155
156 private void runTestOnTable()
157 throws IOException, InterruptedException, ClassNotFoundException {
158 MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
159 Job job = null;
160 try {
161 job = new Job(conf, "test123");
162 job.setOutputFormatClass(NullOutputFormat.class);
163 job.setNumReduceTasks(0);
164 Scan scan = new Scan();
165 scan.addColumn(FAMILY_NAME, COLUMN_NAME);
166 scan.setTimeRange(MINSTAMP, MAXSTAMP);
167 scan.setMaxVersions();
168 TableMapReduceUtil.initTableMapperJob(Bytes.toString(TABLE_NAME),
169 scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
170 job.waitForCompletion(true);
171 } catch (IOException e) {
172
173 e.printStackTrace();
174 } finally {
175 mrCluster.shutdown();
176 if (job != null) {
177 FileUtil.fullyDelete(
178 new File(job.getConfiguration().get("hadoop.tmp.dir")));
179 }
180 }
181 }
182
183 private void verify() throws IOException {
184 Scan scan = new Scan();
185 scan.addColumn(FAMILY_NAME, COLUMN_NAME);
186 scan.setMaxVersions(1);
187 ResultScanner scanner = table.getScanner(scan);
188 for (Result r: scanner) {
189 for (KeyValue kv : r.sorted()) {
190 log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(kv.getFamily())
191 + "\t" + Bytes.toString(kv.getQualifier())
192 + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(kv.getValue()));
193 assertEquals(TIMESTAMP.get(kv.getTimestamp()), (Boolean)Bytes.toBoolean(kv.getValue()));
194 }
195 }
196 scanner.close();
197 }
198
199 }