1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.lang.Thread.UncaughtExceptionHandler;
23 import java.util.concurrent.Executors;
24
25 import org.apache.hadoop.hbase.Server;
26
27 import com.google.common.util.concurrent.ThreadFactoryBuilder;
28
29
30
31
32
33
34
35
36
37 public abstract class BulkAssigner {
38 final Server server;
39
40
41
42
43 public BulkAssigner(final Server server) {
44 this.server = server;
45 }
46
47
48
49
50 protected String getThreadNamePrefix() {
51 return this.server.getServerName() + "-" + this.getClass().getName();
52 }
53
54 protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
55 return new UncaughtExceptionHandler() {
56 @Override
57 public void uncaughtException(Thread t, Throwable e) {
58
59 server.abort("Uncaught exception in " + t.getName(), e);
60 }
61 };
62 }
63
64 protected int getThreadCount() {
65 return this.server.getConfiguration().
66 getInt("hbase.bulk.assignment.threadpool.size", 20);
67 }
68
69 protected long getTimeoutOnRIT() {
70 return this.server.getConfiguration().
71 getLong("hbase.bulk.assignment.waiton.empty.rit", 5 * 60 * 1000);
72 }
73
74 protected abstract void populatePool(final java.util.concurrent.ExecutorService pool);
75
76 public boolean bulkAssign() throws InterruptedException {
77 return bulkAssign(true);
78 }
79
80
81
82
83
84
85
86 public boolean bulkAssign(boolean sync) throws InterruptedException {
87 boolean result = false;
88 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
89 builder.setDaemon(true);
90 builder.setNameFormat(getThreadNamePrefix() + "-%1$d");
91 builder.setUncaughtExceptionHandler(getUncaughtExceptionHandler());
92 int threadCount = getThreadCount();
93 java.util.concurrent.ExecutorService pool =
94 Executors.newFixedThreadPool(threadCount, builder.build());
95 try {
96 populatePool(pool);
97
98
99 if (sync) result = waitUntilDone(getTimeoutOnRIT());
100 } finally {
101
102 pool.shutdown();
103 }
104 return result;
105 }
106
107
108
109
110
111
112
113 protected abstract boolean waitUntilDone(final long timeout)
114 throws InterruptedException;
115 }