1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configurable;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
27 import org.apache.hadoop.hbase.util.Base64;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.hadoop.mapreduce.Partitioner;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE>
47 implements Configurable {
48 private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class);
49
50 @Deprecated
51 public static final String START = "hbase.simpletotalorder.start";
52 @Deprecated
53 public static final String END = "hbase.simpletotalorder.end";
54
55 static final String START_BASE64 = "hbase.simpletotalorder.start.base64";
56 static final String END_BASE64 = "hbase.simpletotalorder.end.base64";
57
58 private Configuration c;
59 private byte [] startkey;
60 private byte [] endkey;
61 private byte [][] splits;
62 private int lastReduces = -1;
63
64 public static void setStartKey(Configuration conf, byte[] startKey) {
65 conf.set(START_BASE64, Base64.encodeBytes(startKey));
66 }
67
68 public static void setEndKey(Configuration conf, byte[] endKey) {
69 conf.set(END_BASE64, Base64.encodeBytes(endKey));
70 }
71
72 @SuppressWarnings("deprecation")
73 static byte[] getStartKey(Configuration conf) {
74 return getKeyFromConf(conf, START_BASE64, START);
75 }
76
77 @SuppressWarnings("deprecation")
78 static byte[] getEndKey(Configuration conf) {
79 return getKeyFromConf(conf, END_BASE64, END);
80 }
81
82 private static byte[] getKeyFromConf(Configuration conf,
83 String base64Key, String deprecatedKey) {
84 String encoded = conf.get(base64Key);
85 if (encoded != null) {
86 return Base64.decode(encoded);
87 }
88 String oldStyleVal = conf.get(deprecatedKey);
89 if (oldStyleVal == null) {
90 return null;
91 }
92 LOG.warn("Using deprecated configuration " + deprecatedKey +
93 " - please use static accessor methods instead.");
94 return Bytes.toBytes(oldStyleVal);
95 }
96
97 @Override
98 public int getPartition(final ImmutableBytesWritable key, final VALUE value,
99 final int reduces) {
100 if (reduces == 1) return 0;
101 if (this.lastReduces != reduces) {
102 this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1);
103 for (int i = 0; i < splits.length; i++) {
104 LOG.info(Bytes.toStringBinary(splits[i]));
105 }
106 }
107 int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(),
108 key.getLength(), Bytes.BYTES_RAWCOMPARATOR);
109
110 if (pos < 0) {
111 pos++;
112 pos *= -1;
113 if (pos == 0) {
114
115 throw new RuntimeException("Key outside start/stop range: " +
116 key.toString());
117 }
118 pos--;
119 }
120 return pos;
121 }
122
123 @Override
124 public Configuration getConf() {
125 return this.c;
126 }
127
128 @Override
129 public void setConf(Configuration conf) {
130 this.c = conf;
131 this.startkey = getStartKey(conf);
132 this.endkey = getEndKey(conf);
133 if (startkey == null || endkey == null) {
134 throw new RuntimeException(this.getClass() + " not configured");
135 }
136 LOG.info("startkey=" + Bytes.toStringBinary(startkey) +
137 ", endkey=" + Bytes.toStringBinary(endkey));
138 }
139 }