View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.client.HTable;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.ResultScanner;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.util.StringUtils;
32  
33  /**
34   * Iterate over an HBase table data, return (ImmutableBytesWritable, Result)
35   * pairs.
36   */
37  public class TableRecordReaderImpl {
38  
39  
40    static final Log LOG = LogFactory.getLog(TableRecordReader.class);
41  
42    private ResultScanner scanner = null;
43    private Scan scan = null;
44    private HTable htable = null;
45    private byte[] lastRow = null;
46    private ImmutableBytesWritable key = null;
47    private Result value = null;
48  
49    /**
50     * Restart from survivable exceptions by creating a new scanner.
51     *
52     * @param firstRow  The first row to start at.
53     * @throws IOException When restarting fails.
54     */
55    public void restart(byte[] firstRow) throws IOException {
56      Scan newScan = new Scan(scan);
57      newScan.setStartRow(firstRow);
58      this.scanner = this.htable.getScanner(newScan);
59    }
60  
61    /**
62     * Build the scanner. Not done in constructor to allow for extension.
63     *
64     * @throws IOException When restarting the scan fails.
65     */
66    public void init() throws IOException {
67      restart(scan.getStartRow());
68    }
69  
70    /**
71     * Sets the HBase table.
72     *
73     * @param htable  The {@link HTable} to scan.
74     */
75    public void setHTable(HTable htable) {
76      this.htable = htable;
77    }
78  
79    /**
80     * Sets the scan defining the actual details like columns etc.
81     *
82     * @param scan  The scan to set.
83     */
84    public void setScan(Scan scan) {
85      this.scan = scan;
86    }
87  
88    /**
89     * Closes the split.
90     *
91     *
92     */
93    public void close() {
94      this.scanner.close();
95    }
96  
97    /**
98     * Returns the current key.
99     *
100    * @return The current key.
101    * @throws IOException
102    * @throws InterruptedException When the job is aborted.
103    */
104   public ImmutableBytesWritable getCurrentKey() throws IOException,
105       InterruptedException {
106     return key;
107   }
108 
109   /**
110    * Returns the current value.
111    *
112    * @return The current value.
113    * @throws IOException When the value is faulty.
114    * @throws InterruptedException When the job is aborted.
115    */
116   public Result getCurrentValue() throws IOException, InterruptedException {
117     return value;
118   }
119 
120 
121   /**
122    * Positions the record reader to the next record.
123    *
124    * @return <code>true</code> if there was another record.
125    * @throws IOException When reading the record failed.
126    * @throws InterruptedException When the job was aborted.
127    */
128   public boolean nextKeyValue() throws IOException, InterruptedException {
129     if (key == null) key = new ImmutableBytesWritable();
130     if (value == null) value = new Result();
131     try {
132       value = this.scanner.next();
133     } catch (IOException e) {
134       LOG.debug("recovered from " + StringUtils.stringifyException(e));
135       if (lastRow == null) {
136         LOG.warn("We are restarting the first next() invocation," +
137             " if your mapper's restarted a few other times like this" +
138             " then you should consider killing this job and investigate" +
139             " why it's taking so long.");
140         lastRow = scan.getStartRow();
141       }
142       restart(lastRow);
143       scanner.next();    // skip presumed already mapped row
144       value = scanner.next();
145     }
146     if (value != null && value.size() > 0) {
147       key.set(value.getRow());
148       lastRow = key.get();
149       return true;
150     }
151     return false;
152   }
153 
154   /**
155    * The current progress of the record reader through its data.
156    *
157    * @return A number between 0.0 and 1.0, the fraction of the data read.
158    */
159   public float getProgress() {
160     // Depends on the total number of tuples
161     return 0;
162   }
163 
164 }