View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.mapred.FileAlreadyExistsException;
32  import org.apache.hadoop.mapred.InvalidJobConfException;
33  import org.apache.hadoop.mapred.JobConf;
34  import org.apache.hadoop.mapred.FileOutputFormat;
35  import org.apache.hadoop.mapred.RecordWriter;
36  import org.apache.hadoop.mapred.Reporter;
37  import org.apache.hadoop.util.Progressable;
38  
39  /**
40   * Convert Map/Reduce output and write it to an HBase table
41   */
42  @Deprecated
43  public class TableOutputFormat extends
44  FileOutputFormat<ImmutableBytesWritable, Put> {
45  
46    /** JobConf parameter that specifies the output table */
47    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
48    private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
49  
50    /**
51     * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
52     * and write to an HBase table
53     */
54    protected static class TableRecordWriter
55      implements RecordWriter<ImmutableBytesWritable, Put> {
56      private HTable m_table;
57  
58      /**
59       * Instantiate a TableRecordWriter with the HBase HClient for writing.
60       *
61       * @param table
62       */
63      public TableRecordWriter(HTable table) {
64        m_table = table;
65      }
66  
67      public void close(Reporter reporter)
68        throws IOException {
69        m_table.flushCommits();
70      }
71  
72      public void write(ImmutableBytesWritable key,
73          Put value) throws IOException {
74        m_table.put(new Put(value));
75      }
76    }
77  
78    @Override
79    @SuppressWarnings("unchecked")
80    public RecordWriter getRecordWriter(FileSystem ignored,
81        JobConf job, String name, Progressable progress) throws IOException {
82  
83      // expecting exactly one path
84  
85      String tableName = job.get(OUTPUT_TABLE);
86      HTable table = null;
87      try {
88        table = new HTable(HBaseConfiguration.create(job), tableName);
89      } catch(IOException e) {
90        LOG.error(e);
91        throw e;
92      }
93      table.setAutoFlush(false);
94      return new TableRecordWriter(table);
95    }
96  
97    @Override
98    public void checkOutputSpecs(FileSystem ignored, JobConf job)
99    throws FileAlreadyExistsException, InvalidJobConfException, IOException {
100 
101     String tableName = job.get(OUTPUT_TABLE);
102     if(tableName == null) {
103       throw new IOException("Must specify table name");
104     }
105   }
106 }