1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.handler;
21
22 import java.io.IOException;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.Server;
29 import org.apache.hadoop.hbase.executor.EventHandler;
30 import org.apache.hadoop.hbase.regionserver.HRegion;
31 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32 import org.apache.hadoop.hbase.util.CancelableProgressable;
33 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
34 import org.apache.zookeeper.KeeperException;
35
36
37
38
39
40
41 public class OpenRegionHandler extends EventHandler {
42 private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
43
44 private final RegionServerServices rsServices;
45
46 private final HRegionInfo regionInfo;
47
48
49
50
51 private volatile int version = -1;
52
53 public OpenRegionHandler(final Server server,
54 final RegionServerServices rsServices, HRegionInfo regionInfo) {
55 this(server, rsServices, regionInfo, EventType.M_RS_OPEN_REGION);
56 }
57
58 protected OpenRegionHandler(final Server server,
59 final RegionServerServices rsServices, final HRegionInfo regionInfo,
60 EventType eventType) {
61 super(server, eventType);
62 this.rsServices = rsServices;
63 this.regionInfo = regionInfo;
64 }
65
66 public HRegionInfo getRegionInfo() {
67 return regionInfo;
68 }
69
70 @Override
71 public void process() throws IOException {
72 try {
73 final String name = regionInfo.getRegionNameAsString();
74 LOG.debug("Processing open of " + name);
75 if (this.server.isStopped() || this.rsServices.isStopping()) {
76 LOG.info("Server stopping or stopped, skipping open of " + name);
77 return;
78 }
79 final String encodedName = regionInfo.getEncodedName();
80
81
82 HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
83 if (region != null) {
84 LOG.warn("Attempted open of " + name +
85 " but already online on this server");
86 return;
87 }
88
89
90
91 if (!transitionZookeeperOfflineToOpening(encodedName)) {
92 LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
93 encodedName);
94 return;
95 }
96
97
98
99 region = openRegion();
100 if (region == null) return;
101 boolean failed = true;
102 if (tickleOpening("post_region_open")) {
103 if (updateMeta(region)) failed = false;
104 }
105
106 if (failed || this.server.isStopped() ||
107 this.rsServices.isStopping()) {
108 cleanupFailedOpen(region);
109 return;
110 }
111
112 if (!transitionToOpened(region)) {
113 cleanupFailedOpen(region);
114 return;
115 }
116
117
118 LOG.debug("Opened " + name);
119 } finally {
120 this.rsServices.getRegionsInTransitionInRS().
121 remove(this.regionInfo.getEncodedNameAsBytes());
122 }
123 }
124
125
126
127
128
129
130
131
132 private boolean updateMeta(final HRegion r) {
133 if (this.server.isStopped() || this.rsServices.isStopping()) {
134 return false;
135 }
136
137
138 final AtomicBoolean signaller = new AtomicBoolean(false);
139 PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
140 this.server, this.rsServices, signaller);
141 t.start();
142 int assignmentTimeout = this.server.getConfiguration().
143 getInt("hbase.master.assignment.timeoutmonitor.period", 10000);
144
145
146 long timeout = assignmentTimeout * 10;
147 long now = System.currentTimeMillis();
148 long endTime = now + timeout;
149
150
151 long period = Math.max(1, assignmentTimeout/ 3);
152 long lastUpdate = now;
153 while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
154 !this.rsServices.isStopping() && (endTime > now)) {
155 long elapsed = now - lastUpdate;
156 if (elapsed > period) {
157
158 lastUpdate = now;
159 tickleOpening("post_open_deploy");
160 }
161 synchronized (signaller) {
162 try {
163 signaller.wait(period);
164 } catch (InterruptedException e) {
165
166 }
167 }
168 now = System.currentTimeMillis();
169 }
170
171
172 if (t.isAlive()) {
173 if (!signaller.get()) {
174
175 LOG.debug("Interrupting thread " + t);
176 t.interrupt();
177 }
178 try {
179 t.join();
180 } catch (InterruptedException ie) {
181 LOG.warn("Interrupted joining " +
182 r.getRegionInfo().getRegionNameAsString(), ie);
183 Thread.currentThread().interrupt();
184 }
185 }
186
187
188 return !t.interrupted() && t.getException() == null;
189 }
190
191
192
193
194
195
196 static class PostOpenDeployTasksThread extends Thread {
197 private Exception exception = null;
198 private final Server server;
199 private final RegionServerServices services;
200 private final HRegion region;
201 private final AtomicBoolean signaller;
202
203 PostOpenDeployTasksThread(final HRegion region, final Server server,
204 final RegionServerServices services, final AtomicBoolean signaller) {
205 super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
206 this.setDaemon(true);
207 this.server = server;
208 this.services = services;
209 this.region = region;
210 this.signaller = signaller;
211 }
212
213 public void run() {
214 try {
215 this.services.postOpenDeployTasks(this.region,
216 this.server.getCatalogTracker(), false);
217 } catch (Exception e) {
218 LOG.warn("Exception running postOpenDeployTasks; region=" +
219 this.region.getRegionInfo().getEncodedName(), e);
220 this.exception = e;
221 }
222
223 this.signaller.set(true);
224 synchronized (this.signaller) {
225 this.signaller.notify();
226 }
227 }
228
229
230
231
232 Exception getException() {
233 return this.exception;
234 }
235 }
236
237
238
239
240
241
242 private boolean transitionToOpened(final HRegion r) throws IOException {
243 boolean result = false;
244 HRegionInfo hri = r.getRegionInfo();
245 final String name = hri.getRegionNameAsString();
246
247 try {
248 if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
249 this.server.getServerName(), this.version) == -1) {
250 LOG.warn("Completed the OPEN of region " + name +
251 " but when transitioning from " +
252 " OPENING to OPENED got a version mismatch, someone else clashed " +
253 "so now unassigning -- closing region");
254 } else {
255 result = true;
256 }
257 } catch (KeeperException e) {
258 LOG.error("Failed transitioning node " + name +
259 " from OPENING to OPENED -- closing region", e);
260 }
261 return result;
262 }
263
264
265
266
267 HRegion openRegion() {
268 HRegion region = null;
269 try {
270
271
272 region = HRegion.openHRegion(this.regionInfo, this.rsServices.getWAL(),
273 this.server.getConfiguration(), this.rsServices.getFlushRequester(),
274 new CancelableProgressable() {
275 public boolean progress() {
276
277
278
279 return tickleOpening("open_region_progress");
280 }
281 });
282 } catch (IOException e) {
283
284
285 LOG.error("Failed open of region=" +
286 this.regionInfo.getRegionNameAsString(), e);
287 }
288 return region;
289 }
290
291 private void cleanupFailedOpen(final HRegion region) throws IOException {
292 if (region != null) region.close();
293 this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
294 }
295
296
297
298
299
300
301
302 boolean transitionZookeeperOfflineToOpening(final String encodedName) {
303
304 try {
305
306 this.version =
307 ZKAssign.transitionNodeOpening(server.getZooKeeper(),
308 regionInfo, server.getServerName());
309 } catch (KeeperException e) {
310 LOG.error("Error transition from OFFLINE to OPENING for region=" +
311 encodedName, e);
312 }
313 boolean b = isGoodVersion();
314 if (!b) {
315 LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
316 encodedName);
317 }
318 return b;
319 }
320
321
322
323
324
325
326
327 boolean tickleOpening(final String context) {
328
329 if (!isGoodVersion()) return false;
330 String encodedName = this.regionInfo.getEncodedName();
331 try {
332 this.version =
333 ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
334 this.regionInfo, this.server.getServerName(), this.version);
335 } catch (KeeperException e) {
336 server.abort("Exception refreshing OPENING; region=" + encodedName +
337 ", context=" + context, e);
338 this.version = -1;
339 }
340 boolean b = isGoodVersion();
341 if (!b) {
342 LOG.warn("Failed refreshing OPENING; region=" + encodedName +
343 ", context=" + context);
344 }
345 return b;
346 }
347
348 private boolean isGoodVersion() {
349 return this.version != -1;
350 }
351 }