1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master.handler;
21
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.HRegionInfo;
29 import org.apache.hadoop.hbase.Server;
30 import org.apache.hadoop.hbase.TableNotFoundException;
31 import org.apache.hadoop.hbase.catalog.CatalogTracker;
32 import org.apache.hadoop.hbase.catalog.MetaReader;
33 import org.apache.hadoop.hbase.executor.EventHandler;
34 import org.apache.hadoop.hbase.master.AssignmentManager;
35 import org.apache.hadoop.hbase.master.BulkAssigner;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.zookeeper.KeeperException;
38
39
40
41
42 public class EnableTableHandler extends EventHandler {
43 private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
44 private final byte [] tableName;
45 private final String tableNameStr;
46 private final AssignmentManager assignmentManager;
47 private final CatalogTracker ct;
48
49 public EnableTableHandler(Server server, byte [] tableName,
50 CatalogTracker catalogTracker, AssignmentManager assignmentManager)
51 throws TableNotFoundException, IOException {
52 super(server, EventType.C_M_ENABLE_TABLE);
53 this.tableName = tableName;
54 this.tableNameStr = Bytes.toString(tableName);
55 this.ct = catalogTracker;
56 this.assignmentManager = assignmentManager;
57
58 if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
59 throw new TableNotFoundException(Bytes.toString(tableName));
60 }
61 }
62
63 @Override
64 public void process() {
65 try {
66 LOG.info("Attemping to enable the table " + this.tableNameStr);
67 handleEnableTable();
68 } catch (IOException e) {
69 LOG.error("Error trying to enable the table " + this.tableNameStr, e);
70 } catch (KeeperException e) {
71 LOG.error("Error trying to enable the table " + this.tableNameStr, e);
72 }
73 }
74
75 private void handleEnableTable() throws IOException, KeeperException {
76 if (this.assignmentManager.getZKTable().isEnabledTable(this.tableNameStr)) {
77 LOG.info("Table " + tableNameStr + " is already enabled; skipping enable");
78 return;
79 }
80
81
82
83
84 this.assignmentManager.getZKTable().setEnablingTable(this.tableNameStr);
85 boolean done = false;
86 while (true) {
87
88
89 List<HRegionInfo> regionsInMeta =
90 MetaReader.getTableRegions(this.ct, tableName, true);
91 int countOfRegionsInTable = regionsInMeta.size();
92 List<HRegionInfo> regions = regionsToAssign(regionsInMeta);
93 if (regions.size() == 0) {
94 done = true;
95 break;
96 }
97 LOG.info("Table has " + countOfRegionsInTable + " regions of which " +
98 regions.size() + " are online.");
99 BulkEnabler bd = new BulkEnabler(this.server, regions,
100 countOfRegionsInTable);
101 try {
102 if (bd.bulkAssign()) {
103 done = true;
104 break;
105 }
106 } catch (InterruptedException e) {
107 LOG.warn("Enable was interrupted");
108
109 Thread.currentThread().interrupt();
110 break;
111 }
112 }
113
114 if (done) this.assignmentManager.getZKTable().setEnabledTable(this.tableNameStr);
115 LOG.info("Enabled table is done=" + done);
116 }
117
118
119
120
121
122
123
124 private List<HRegionInfo> regionsToAssign(final List<HRegionInfo> regionsInMeta)
125 throws IOException {
126 final List<HRegionInfo> onlineRegions =
127 this.assignmentManager.getRegionsOfTable(tableName);
128 regionsInMeta.removeAll(onlineRegions);
129 return regionsInMeta;
130 }
131
132
133
134
135 class BulkEnabler extends BulkAssigner {
136 private final List<HRegionInfo> regions;
137
138 private final int countOfRegionsInTable;
139
140 BulkEnabler(final Server server, final List<HRegionInfo> regions,
141 final int countOfRegionsInTable) {
142 super(server);
143 this.regions = regions;
144 this.countOfRegionsInTable = countOfRegionsInTable;
145 }
146
147 @Override
148 protected void populatePool(ExecutorService pool) {
149 for (HRegionInfo region: regions) {
150 if (assignmentManager.isRegionInTransition(region) != null) continue;
151 final HRegionInfo hri = region;
152 pool.execute(new Runnable() {
153 public void run() {
154 assignmentManager.assign(hri, true);
155 }
156 });
157 }
158 }
159
160 @Override
161 protected boolean waitUntilDone(long timeout)
162 throws InterruptedException {
163 long startTime = System.currentTimeMillis();
164 long remaining = timeout;
165 List<HRegionInfo> regions = null;
166 while (!server.isStopped() && remaining > 0) {
167 Thread.sleep(waitingTimeForEvents);
168 regions = assignmentManager.getRegionsOfTable(tableName);
169 if (isDone(regions)) break;
170 remaining = timeout - (System.currentTimeMillis() - startTime);
171 }
172 return isDone(regions);
173 }
174
175 private boolean isDone(final List<HRegionInfo> regions) {
176 return regions != null && regions.size() >= this.countOfRegionsInTable;
177 }
178 }
179 }