1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce.hadoopbackport;
19
20 import java.io.IOException;
21 import java.lang.reflect.Array;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24
25 import org.apache.hadoop.conf.Configurable;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.io.BinaryComparable;
30 import org.apache.hadoop.io.NullWritable;
31 import org.apache.hadoop.io.SequenceFile;
32 import org.apache.hadoop.io.RawComparator;
33 import org.apache.hadoop.io.WritableComparable;
34 import org.apache.hadoop.mapreduce.Job;
35 import org.apache.hadoop.mapreduce.Partitioner;
36 import org.apache.hadoop.util.ReflectionUtils;
37
38
39
40
41
42
43
44
45 public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
46 extends Partitioner<K,V> implements Configurable {
47
48 private Node partitions;
49 public static final String DEFAULT_PATH = "_partition.lst";
50 public static final String PARTITIONER_PATH =
51 "mapreduce.totalorderpartitioner.path";
52 public static final String MAX_TRIE_DEPTH =
53 "mapreduce.totalorderpartitioner.trie.maxdepth";
54 public static final String NATURAL_ORDER =
55 "mapreduce.totalorderpartitioner.naturalorder";
56 Configuration conf;
57
58 public TotalOrderPartitioner() { }
59
60
61
62
63
64
65
66
67
68
69
70 @SuppressWarnings("unchecked")
71 public void setConf(Configuration conf) {
72 try {
73 this.conf = conf;
74 String parts = getPartitionFile(conf);
75 final Path partFile = new Path(parts);
76 final FileSystem fs = (DEFAULT_PATH.equals(parts))
77 ? FileSystem.getLocal(conf)
78 : partFile.getFileSystem(conf);
79
80 Job job = new Job(conf);
81 Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
82 K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
83 if (splitPoints.length != job.getNumReduceTasks() - 1) {
84 throw new IOException("Wrong number of partitions in keyset:"
85 + splitPoints.length);
86 }
87 RawComparator<K> comparator =
88 (RawComparator<K>) job.getSortComparator();
89 for (int i = 0; i < splitPoints.length - 1; ++i) {
90 if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
91 throw new IOException("Split points are out of order");
92 }
93 }
94 boolean natOrder =
95 conf.getBoolean(NATURAL_ORDER, true);
96 if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
97 partitions = buildTrie((BinaryComparable[])splitPoints, 0,
98 splitPoints.length, new byte[0],
99
100
101
102
103
104
105
106 conf.getInt(MAX_TRIE_DEPTH, 200));
107 } else {
108 partitions = new BinarySearchNode(splitPoints, comparator);
109 }
110 } catch (IOException e) {
111 throw new IllegalArgumentException("Can't read partitions file", e);
112 }
113 }
114
115 public Configuration getConf() {
116 return conf;
117 }
118
119
120 @SuppressWarnings("unchecked")
121 public int getPartition(K key, V value, int numPartitions) {
122 return partitions.findPartition(key);
123 }
124
125
126
127
128
129
130 public static void setPartitionFile(Configuration conf, Path p) {
131 conf.set(PARTITIONER_PATH, p.toString());
132 }
133
134
135
136
137
138 public static String getPartitionFile(Configuration conf) {
139 return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
140 }
141
142
143
144
145 interface Node<T> {
146
147
148
149
150 int findPartition(T key);
151 }
152
153
154
155
156
157
158 static abstract class TrieNode implements Node<BinaryComparable> {
159 private final int level;
160 TrieNode(int level) {
161 this.level = level;
162 }
163 int getLevel() {
164 return level;
165 }
166 }
167
168
169
170
171
172
173 class BinarySearchNode implements Node<K> {
174 private final K[] splitPoints;
175 private final RawComparator<K> comparator;
176 BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
177 this.splitPoints = splitPoints;
178 this.comparator = comparator;
179 }
180 public int findPartition(K key) {
181 final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
182 return (pos < 0) ? -pos : pos;
183 }
184 }
185
186
187
188
189
190 class InnerTrieNode extends TrieNode {
191 private TrieNode[] child = new TrieNode[256];
192
193 InnerTrieNode(int level) {
194 super(level);
195 }
196 public int findPartition(BinaryComparable key) {
197 int level = getLevel();
198 if (key.getLength() <= level) {
199 return child[0].findPartition(key);
200 }
201 return child[0xFF & key.getBytes()[level]].findPartition(key);
202 }
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 private TrieNode LeafTrieNodeFactory
219 (int level, BinaryComparable[] splitPoints, int lower, int upper) {
220 switch (upper - lower) {
221 case 0:
222 return new UnsplitTrieNode(level, lower);
223
224 case 1:
225 return new SinglySplitTrieNode(level, splitPoints, lower);
226
227 default:
228 return new LeafTrieNode(level, splitPoints, lower, upper);
229 }
230 }
231
232
233
234
235
236
237
238
239 private class LeafTrieNode extends TrieNode {
240 final int lower;
241 final int upper;
242 final BinaryComparable[] splitPoints;
243 LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
244 super(level);
245 this.lower = lower;
246 this.upper = upper;
247 this.splitPoints = splitPoints;
248 }
249 public int findPartition(BinaryComparable key) {
250 final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
251 return (pos < 0) ? -pos : pos;
252 }
253 }
254
255 private class UnsplitTrieNode extends TrieNode {
256 final int result;
257
258 UnsplitTrieNode(int level, int value) {
259 super(level);
260 this.result = value;
261 }
262
263 public int findPartition(BinaryComparable key) {
264 return result;
265 }
266 }
267
268 private class SinglySplitTrieNode extends TrieNode {
269 final int lower;
270 final BinaryComparable mySplitPoint;
271
272 SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
273 super(level);
274 this.lower = lower;
275 this.mySplitPoint = splitPoints[lower];
276 }
277
278 public int findPartition(BinaryComparable key) {
279 return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
280 }
281 }
282
283
284
285
286
287
288
289
290
291
292
293 @SuppressWarnings("unchecked")
294 private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
295 Configuration conf) throws IOException {
296 SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
297 ArrayList<K> parts = new ArrayList<K>();
298 K key = ReflectionUtils.newInstance(keyClass, conf);
299 NullWritable value = NullWritable.get();
300 while (reader.next(key, value)) {
301 parts.add(key);
302 key = ReflectionUtils.newInstance(keyClass, conf);
303 }
304 reader.close();
305 return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
306 }
307
308
309
310
311
312
313
314
315
316 private class CarriedTrieNodeRef
317 {
318 TrieNode content;
319
320 CarriedTrieNodeRef() {
321 content = null;
322 }
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336 private TrieNode buildTrie(BinaryComparable[] splits, int lower,
337 int upper, byte[] prefix, int maxDepth) {
338 return buildTrieRec
339 (splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
340 }
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 private TrieNode buildTrieRec(BinaryComparable[] splits, int lower,
363 int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
364 final int depth = prefix.length;
365
366
367 if (depth >= maxDepth || lower >= upper - 1) {
368
369
370 if (lower == upper && ref.content != null) {
371 return ref.content;
372 }
373 TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
374 ref.content = lower == upper ? result : null;
375 return result;
376 }
377 InnerTrieNode result = new InnerTrieNode(depth);
378 byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
379
380 int currentBound = lower;
381 for(int ch = 0; ch < 0xFF; ++ch) {
382 trial[depth] = (byte) (ch + 1);
383 lower = currentBound;
384 while (currentBound < upper) {
385 if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
386 break;
387 }
388 currentBound += 1;
389 }
390 trial[depth] = (byte) ch;
391 result.child[0xFF & ch]
392 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
393 }
394
395 trial[depth] = (byte)0xFF;
396 result.child[0xFF]
397 = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
398
399 return result;
400 }
401 }