1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configurable;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.client.Delete;
31 import org.apache.hadoop.hbase.client.HConnectionManager;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.mapreduce.JobContext;
37 import org.apache.hadoop.mapreduce.OutputCommitter;
38 import org.apache.hadoop.mapreduce.OutputFormat;
39 import org.apache.hadoop.mapreduce.RecordWriter;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41
42
43
44
45
46
47
48
49 public class TableOutputFormat<KEY> extends OutputFormat<KEY, Writable>
50 implements Configurable {
51
52 private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
53
54
55 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
56
57
58
59
60
61
62
63 public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
64
65
66 public static final String
67 REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
68
69 public static final String
70 REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
71
72
73 private Configuration conf = null;
74
75 private HTable table;
76
77
78
79
80
81
82 protected static class TableRecordWriter<KEY>
83 extends RecordWriter<KEY, Writable> {
84
85
86 private HTable table;
87
88
89
90
91
92
93 public TableRecordWriter(HTable table) {
94 this.table = table;
95 }
96
97
98
99
100
101
102
103
104 @Override
105 public void close(TaskAttemptContext context)
106 throws IOException {
107 table.flushCommits();
108
109
110
111
112
113 HConnectionManager.deleteAllConnections(true);
114 }
115
116
117
118
119
120
121
122
123
124 @Override
125 public void write(KEY key, Writable value)
126 throws IOException {
127 if (value instanceof Put) this.table.put(new Put((Put)value));
128 else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
129 else throw new IOException("Pass a Delete or a Put");
130 }
131 }
132
133
134
135
136
137
138
139
140
141
142 @Override
143 public RecordWriter<KEY, Writable> getRecordWriter(
144 TaskAttemptContext context)
145 throws IOException, InterruptedException {
146 return new TableRecordWriter<KEY>(this.table);
147 }
148
149
150
151
152
153
154
155
156
157 @Override
158 public void checkOutputSpecs(JobContext context) throws IOException,
159 InterruptedException {
160
161
162 }
163
164
165
166
167
168
169
170
171
172
173 @Override
174 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
175 throws IOException, InterruptedException {
176 return new TableOutputCommitter();
177 }
178
179 public Configuration getConf() {
180 return conf;
181 }
182
183 @Override
184 public void setConf(Configuration otherConf) {
185 this.conf = HBaseConfiguration.create(otherConf);
186 String tableName = this.conf.get(OUTPUT_TABLE);
187 String address = this.conf.get(QUORUM_ADDRESS);
188 String serverClass = this.conf.get(REGION_SERVER_CLASS);
189 String serverImpl = this.conf.get(REGION_SERVER_IMPL);
190 try {
191 if (address != null) {
192 ZKUtil.applyClusterKeyToConf(this.conf, address);
193 }
194 if (serverClass != null) {
195 this.conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
196 this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
197 }
198 this.table = new HTable(this.conf, tableName);
199 this.table.setAutoFlush(false);
200 LOG.info("Created table instance for " + tableName);
201 } catch(IOException e) {
202 LOG.error(e);
203 }
204 }
205 }