1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapred;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.mapred.FileAlreadyExistsException;
32 import org.apache.hadoop.mapred.InvalidJobConfException;
33 import org.apache.hadoop.mapred.JobConf;
34 import org.apache.hadoop.mapred.FileOutputFormat;
35 import org.apache.hadoop.mapred.RecordWriter;
36 import org.apache.hadoop.mapred.Reporter;
37 import org.apache.hadoop.util.Progressable;
38
39
40
41
42 @Deprecated
43 public class TableOutputFormat extends
44 FileOutputFormat<ImmutableBytesWritable, Put> {
45
46
47 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
48 private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
49
50
51
52
53
54 protected static class TableRecordWriter
55 implements RecordWriter<ImmutableBytesWritable, Put> {
56 private HTable m_table;
57
58
59
60
61
62
63 public TableRecordWriter(HTable table) {
64 m_table = table;
65 }
66
67 public void close(Reporter reporter)
68 throws IOException {
69 m_table.flushCommits();
70 }
71
72 public void write(ImmutableBytesWritable key,
73 Put value) throws IOException {
74 m_table.put(new Put(value));
75 }
76 }
77
78 @Override
79 @SuppressWarnings("unchecked")
80 public RecordWriter getRecordWriter(FileSystem ignored,
81 JobConf job, String name, Progressable progress) throws IOException {
82
83
84
85 String tableName = job.get(OUTPUT_TABLE);
86 HTable table = null;
87 try {
88 table = new HTable(HBaseConfiguration.create(job), tableName);
89 } catch(IOException e) {
90 LOG.error(e);
91 throw e;
92 }
93 table.setAutoFlush(false);
94 return new TableRecordWriter(table);
95 }
96
97 @Override
98 public void checkOutputSpecs(FileSystem ignored, JobConf job)
99 throws FileAlreadyExistsException, InvalidJobConfException, IOException {
100
101 String tableName = job.get(OUTPUT_TABLE);
102 if(tableName == null) {
103 throw new IOException("Must specify table name");
104 }
105 }
106 }