View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.client.HTable;
28  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.mapred.JobConf;
31  import org.apache.hadoop.mapred.Partitioner;
32  
33  
34  /**
35   * This is used to partition the output keys into groups of keys.
36   * Keys are grouped according to the regions that currently exist
37   * so that each reducer fills a single region so load is distributed.
38   *
39   * @param <K2>
40   * @param <V2>
41   */
42  @Deprecated
43  public class HRegionPartitioner<K2,V2>
44  implements Partitioner<ImmutableBytesWritable, V2> {
45    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
46    private HTable table;
47    private byte[][] startKeys;
48  
49    public void configure(JobConf job) {
50      try {
51        this.table = new HTable(HBaseConfiguration.create(job),
52          job.get(TableOutputFormat.OUTPUT_TABLE));
53      } catch (IOException e) {
54        LOG.error(e);
55      }
56  
57      try {
58        this.startKeys = this.table.getStartKeys();
59      } catch (IOException e) {
60        LOG.error(e);
61      }
62    }
63  
64    public int getPartition(ImmutableBytesWritable key,
65        V2 value, int numPartitions) {
66      byte[] region = null;
67      // Only one region return 0
68      if (this.startKeys.length == 1){
69        return 0;
70      }
71      try {
72        // Not sure if this is cached after a split so we could have problems
73        // here if a region splits while mapping
74        region = table.getRegionLocation(key.get()).getRegionInfo().getStartKey();
75      } catch (IOException e) {
76        LOG.error(e);
77      }
78      for (int i = 0; i < this.startKeys.length; i++){
79        if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
80          if (i >= numPartitions-1){
81            // cover if we have less reduces then regions.
82            return (Integer.toString(i).hashCode()
83                & Integer.MAX_VALUE) % numPartitions;
84          }
85          return i;
86        }
87      }
88      // if above fails to find start key that match we need to return something
89      return 0;
90    }
91  }