View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.UnknownScannerException;
27  import org.apache.hadoop.hbase.client.HTable;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.filter.Filter;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.util.Writables;
35  
36  import org.apache.hadoop.util.StringUtils;
37  
38  
39  /**
40   * Iterate over an HBase table data, return (Text, RowResult) pairs
41   */
42  public class TableRecordReaderImpl {
43    static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
44  
45    private byte [] startRow;
46    private byte [] endRow;
47    private byte [] lastRow;
48    private Filter trrRowFilter;
49    private ResultScanner scanner;
50    private HTable htable;
51    private byte [][] trrInputColumns;
52  
53    /**
54     * Restart from survivable exceptions by creating a new scanner.
55     *
56     * @param firstRow
57     * @throws IOException
58     */
59    public void restart(byte[] firstRow) throws IOException {
60      if ((endRow != null) && (endRow.length > 0)) {
61        if (trrRowFilter != null) {
62          Scan scan = new Scan(firstRow, endRow);
63          scan.addColumns(trrInputColumns);
64          scan.setFilter(trrRowFilter);
65          scan.setCacheBlocks(false);
66          this.scanner = this.htable.getScanner(scan);
67        } else {
68          LOG.debug("TIFB.restart, firstRow: " +
69              Bytes.toStringBinary(firstRow) + ", endRow: " +
70              Bytes.toStringBinary(endRow));
71          Scan scan = new Scan(firstRow, endRow);
72          scan.addColumns(trrInputColumns);
73          this.scanner = this.htable.getScanner(scan);
74        }
75      } else {
76        LOG.debug("TIFB.restart, firstRow: " +
77            Bytes.toStringBinary(firstRow) + ", no endRow");
78  
79        Scan scan = new Scan(firstRow);
80        scan.addColumns(trrInputColumns);
81  //      scan.setFilter(trrRowFilter);
82        this.scanner = this.htable.getScanner(scan);
83      }
84    }
85  
86    /**
87     * Build the scanner. Not done in constructor to allow for extension.
88     *
89     * @throws IOException
90     */
91    public void init() throws IOException {
92      restart(startRow);
93    }
94  
95    byte[] getStartRow() {
96      return this.startRow;
97    }
98    /**
99     * @param htable the {@link HTable} to scan.
100    */
101   public void setHTable(HTable htable) {
102     this.htable = htable;
103   }
104 
105   /**
106    * @param inputColumns the columns to be placed in {@link Result}.
107    */
108   public void setInputColumns(final byte [][] inputColumns) {
109     this.trrInputColumns = inputColumns;
110   }
111 
112   /**
113    * @param startRow the first row in the split
114    */
115   public void setStartRow(final byte [] startRow) {
116     this.startRow = startRow;
117   }
118 
119   /**
120    *
121    * @param endRow the last row in the split
122    */
123   public void setEndRow(final byte [] endRow) {
124     this.endRow = endRow;
125   }
126 
127   /**
128    * @param rowFilter the {@link Filter} to be used.
129    */
130   public void setRowFilter(Filter rowFilter) {
131     this.trrRowFilter = rowFilter;
132   }
133 
134   public void close() {
135     this.scanner.close();
136   }
137 
138   /**
139    * @return ImmutableBytesWritable
140    *
141    * @see org.apache.hadoop.mapred.RecordReader#createKey()
142    */
143   public ImmutableBytesWritable createKey() {
144     return new ImmutableBytesWritable();
145   }
146 
147   /**
148    * @return RowResult
149    *
150    * @see org.apache.hadoop.mapred.RecordReader#createValue()
151    */
152   public Result createValue() {
153     return new Result();
154   }
155 
156   public long getPos() {
157     // This should be the ordinal tuple in the range;
158     // not clear how to calculate...
159     return 0;
160   }
161 
162   public float getProgress() {
163     // Depends on the total number of tuples and getPos
164     return 0;
165   }
166 
167   /**
168    * @param key HStoreKey as input key.
169    * @param value MapWritable as input value
170    * @return true if there was more data
171    * @throws IOException
172    */
173   public boolean next(ImmutableBytesWritable key, Result value)
174   throws IOException {
175     Result result;
176     try {
177       result = this.scanner.next();
178     } catch (UnknownScannerException e) {
179       LOG.debug("recovered from " + StringUtils.stringifyException(e));
180       restart(lastRow);
181       this.scanner.next();    // skip presumed already mapped row
182       result = this.scanner.next();
183     }
184 
185     if (result != null && result.size() > 0) {
186       key.set(result.getRow());
187       lastRow = key.get();
188       Writables.copyWritable(result, value);
189       return true;
190     }
191     return false;
192   }
193 }