1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.Map;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.client.Delete;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.io.Writable;
36 import org.apache.hadoop.mapreduce.JobContext;
37 import org.apache.hadoop.mapreduce.OutputCommitter;
38 import org.apache.hadoop.mapreduce.OutputFormat;
39 import org.apache.hadoop.mapreduce.RecordWriter;
40 import org.apache.hadoop.mapreduce.TaskAttemptContext;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Writable> {
59
60 public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
61
62 public static final boolean WAL_ON = true;
63
64 public static final boolean WAL_OFF = false;
65
66
67
68 protected static class MultiTableRecordWriter extends
69 RecordWriter<ImmutableBytesWritable, Writable> {
70 private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
71 Map<ImmutableBytesWritable, HTable> tables;
72 Configuration conf;
73 boolean useWriteAheadLogging;
74
75
76
77
78
79
80
81
82 public MultiTableRecordWriter(Configuration conf,
83 boolean useWriteAheadLogging) {
84 LOG.debug("Created new MultiTableRecordReader with WAL "
85 + (useWriteAheadLogging ? "on" : "off"));
86 this.tables = new HashMap<ImmutableBytesWritable, HTable>();
87 this.conf = conf;
88 this.useWriteAheadLogging = useWriteAheadLogging;
89 }
90
91
92
93
94
95
96
97
98 HTable getTable(ImmutableBytesWritable tableName) throws IOException {
99 if (!tables.containsKey(tableName)) {
100 LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
101 HTable table = new HTable(conf, tableName.get());
102 table.setAutoFlush(false);
103 tables.put(tableName, table);
104 }
105 return tables.get(tableName);
106 }
107
108 @Override
109 public void close(TaskAttemptContext context) throws IOException {
110 for (HTable table : tables.values()) {
111 table.flushCommits();
112 }
113 }
114
115
116
117
118
119
120
121
122
123
124
125 @Override
126 public void write(ImmutableBytesWritable tableName, Writable action) throws IOException {
127 HTable table = getTable(tableName);
128
129 if (action instanceof Put) {
130 Put put = new Put((Put) action);
131 put.setWriteToWAL(useWriteAheadLogging);
132 table.put(put);
133 } else if (action instanceof Delete) {
134 Delete delete = new Delete((Delete) action);
135 table.delete(delete);
136 } else
137 throw new IllegalArgumentException(
138 "action must be either Delete or Put");
139 }
140 }
141
142 @Override
143 public void checkOutputSpecs(JobContext context) throws IOException,
144 InterruptedException {
145
146
147 }
148
149 @Override
150 public OutputCommitter getOutputCommitter(TaskAttemptContext context)
151 throws IOException, InterruptedException {
152 return new TableOutputCommitter();
153 }
154
155 @Override
156 public RecordWriter<ImmutableBytesWritable, Writable> getRecordWriter(TaskAttemptContext context)
157 throws IOException, InterruptedException {
158 Configuration conf = context.getConfiguration();
159 return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
160 conf.getBoolean(WAL_PROPERTY, WAL_ON));
161 }
162
163 }