View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.handler;
21  
22  import java.io.IOException;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.HRegionInfo;
28  import org.apache.hadoop.hbase.Server;
29  import org.apache.hadoop.hbase.executor.EventHandler;
30  import org.apache.hadoop.hbase.regionserver.HRegion;
31  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
32  import org.apache.hadoop.hbase.util.CancelableProgressable;
33  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
34  import org.apache.zookeeper.KeeperException;
35  
36  /**
37   * Handles opening of a region on a region server.
38   * <p>
39   * This is executed after receiving an OPEN RPC from the master or client.
40   */
41  public class OpenRegionHandler extends EventHandler {
42    private static final Log LOG = LogFactory.getLog(OpenRegionHandler.class);
43  
44    private final RegionServerServices rsServices;
45  
46    private final HRegionInfo regionInfo;
47  
48    // We get version of our znode at start of open process and monitor it across
49    // the total open. We'll fail the open if someone hijacks our znode; we can
50    // tell this has happened if version is not as expected.
51    private volatile int version = -1;
52  
53    public OpenRegionHandler(final Server server,
54        final RegionServerServices rsServices, HRegionInfo regionInfo) {
55      this(server, rsServices, regionInfo, EventType.M_RS_OPEN_REGION);
56    }
57  
58    protected OpenRegionHandler(final Server server,
59        final RegionServerServices rsServices, final HRegionInfo regionInfo,
60        EventType eventType) {
61      super(server, eventType);
62      this.rsServices = rsServices;
63      this.regionInfo = regionInfo;
64    }
65  
66    public HRegionInfo getRegionInfo() {
67      return regionInfo;
68    }
69  
70    @Override
71    public void process() throws IOException {
72      try {
73        final String name = regionInfo.getRegionNameAsString();
74        LOG.debug("Processing open of " + name);
75        if (this.server.isStopped() || this.rsServices.isStopping()) {
76          LOG.info("Server stopping or stopped, skipping open of " + name);
77          return;
78        }
79        final String encodedName = regionInfo.getEncodedName();
80  
81        // Check that this region is not already online
82        HRegion region = this.rsServices.getFromOnlineRegions(encodedName);
83        if (region != null) {
84          LOG.warn("Attempted open of " + name +
85            " but already online on this server");
86          return;
87        }
88  
89        // If fails, just return.  Someone stole the region from under us.
90        // Calling transitionZookeeperOfflineToOpening initalizes this.version.
91        if (!transitionZookeeperOfflineToOpening(encodedName)) {
92          LOG.warn("Region was hijacked? It no longer exists, encodedName=" +
93            encodedName);
94          return;
95        }
96  
97        // Open region.  After a successful open, failures in subsequent
98        // processing needs to do a close as part of cleanup.
99        region = openRegion();
100       if (region == null) return;
101       boolean failed = true;
102       if (tickleOpening("post_region_open")) {
103         if (updateMeta(region)) failed = false;
104       }
105 
106       if (failed || this.server.isStopped() ||
107           this.rsServices.isStopping()) {
108         cleanupFailedOpen(region);
109         return;
110       }
111 
112       if (!transitionToOpened(region)) {
113         cleanupFailedOpen(region);
114         return;
115       }
116 
117       // Done!  Successful region open
118       LOG.debug("Opened " + name);
119     } finally {
120       this.rsServices.getRegionsInTransitionInRS().
121           remove(this.regionInfo.getEncodedNameAsBytes());
122     }
123   }
124 
125   /**
126    * Update ZK, ROOT or META.  This can take a while if for example the
127    * .META. is not available -- if server hosting .META. crashed and we are
128    * waiting on it to come back -- so run in a thread and keep updating znode
129    * state meantime so master doesn't timeout our region-in-transition.
130    * Caller must cleanup region if this fails.
131    */
132   private boolean updateMeta(final HRegion r) {
133     if (this.server.isStopped() || this.rsServices.isStopping()) {
134       return false;
135     }
136     // Object we do wait/notify on.  Make it boolean.  If set, we're done.
137     // Else, wait.
138     final AtomicBoolean signaller = new AtomicBoolean(false);
139     PostOpenDeployTasksThread t = new PostOpenDeployTasksThread(r,
140       this.server, this.rsServices, signaller);
141     t.start();
142     int assignmentTimeout = this.server.getConfiguration().
143       getInt("hbase.master.assignment.timeoutmonitor.period", 10000);
144     // Total timeout for meta edit.  If we fail adding the edit then close out
145     // the region and let it be assigned elsewhere.
146     long timeout = assignmentTimeout * 10;
147     long now = System.currentTimeMillis();
148     long endTime = now + timeout;
149     // Let our period at which we update OPENING state to be be 1/3rd of the
150     // regions-in-transition timeout period.
151     long period = Math.max(1, assignmentTimeout/ 3);
152     long lastUpdate = now;
153     while (!signaller.get() && t.isAlive() && !this.server.isStopped() &&
154         !this.rsServices.isStopping() && (endTime > now)) {
155       long elapsed = now - lastUpdate;
156       if (elapsed > period) {
157         // Only tickle OPENING if postOpenDeployTasks is taking some time.
158         lastUpdate = now;
159         tickleOpening("post_open_deploy");
160       }
161       synchronized (signaller) {
162         try {
163           signaller.wait(period);
164         } catch (InterruptedException e) {
165           // Go to the loop check.
166         }
167       }
168       now = System.currentTimeMillis();
169     }
170     // Is thread still alive?  We may have left above loop because server is
171     // stopping or we timed out the edit.  Is so, interrupt it.
172     if (t.isAlive()) {
173       if (!signaller.get()) {
174         // Thread still running; interrupt
175         LOG.debug("Interrupting thread " + t);
176         t.interrupt();
177       }
178       try {
179         t.join();
180       } catch (InterruptedException ie) {
181         LOG.warn("Interrupted joining " +
182           r.getRegionInfo().getRegionNameAsString(), ie);
183         Thread.currentThread().interrupt();
184       }
185     }
186     // Was there an exception opening the region?  This should trigger on
187     // InterruptedException too.  If so, we failed.
188     return !t.interrupted() && t.getException() == null;
189   }
190 
191   /**
192    * Thread to run region post open tasks.  Call {@link #getException()} after
193    * the thread finishes to check for exceptions running
194    * {@link RegionServerServices#postOpenDeployTasks(HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker, boolean)}.
195    */
196   static class PostOpenDeployTasksThread extends Thread {
197     private Exception exception = null;
198     private final Server server;
199     private final RegionServerServices services;
200     private final HRegion region;
201     private final AtomicBoolean signaller;
202 
203     PostOpenDeployTasksThread(final HRegion region, final Server server,
204         final RegionServerServices services, final AtomicBoolean signaller) {
205       super("PostOpenDeployTasks:" + region.getRegionInfo().getEncodedName());
206       this.setDaemon(true);
207       this.server = server;
208       this.services = services;
209       this.region = region;
210       this.signaller = signaller;
211     }
212 
213     public void run() {
214       try {
215         this.services.postOpenDeployTasks(this.region,
216           this.server.getCatalogTracker(), false);
217       } catch (Exception e) {
218         LOG.warn("Exception running postOpenDeployTasks; region=" +
219           this.region.getRegionInfo().getEncodedName(), e);
220         this.exception = e;
221       }
222       // We're done.  Set flag then wake up anyone waiting on thread to complete.
223       this.signaller.set(true);
224       synchronized (this.signaller) {
225         this.signaller.notify();
226       }
227     }
228 
229     /**
230      * @return Null or the run exception; call this method after thread is done.
231      */
232     Exception getException() {
233       return this.exception;
234     }
235   }
236 
237   /**
238    * @param r Region we're working on.
239    * @return Transition znode to OPENED state.
240    * @throws IOException 
241    */
242   private boolean transitionToOpened(final HRegion r) throws IOException {
243     boolean result = false;
244     HRegionInfo hri = r.getRegionInfo();
245     final String name = hri.getRegionNameAsString();
246     // Finally, Transition ZK node to OPENED
247     try {
248       if (ZKAssign.transitionNodeOpened(this.server.getZooKeeper(), hri,
249           this.server.getServerName(), this.version) == -1) {
250         LOG.warn("Completed the OPEN of region " + name +
251           " but when transitioning from " +
252           " OPENING to OPENED got a version mismatch, someone else clashed " +
253           "so now unassigning -- closing region");
254       } else {
255         result = true;
256       }
257     } catch (KeeperException e) {
258       LOG.error("Failed transitioning node " + name +
259         " from OPENING to OPENED -- closing region", e);
260     }
261     return result;
262   }
263 
264   /**
265    * @return Instance of HRegion if successful open else null.
266    */
267   HRegion openRegion() {
268     HRegion region = null;
269     try {
270       // Instantiate the region.  This also periodically tickles our zk OPENING
271       // state so master doesn't timeout this region in transition.
272       region = HRegion.openHRegion(this.regionInfo, this.rsServices.getWAL(),
273         this.server.getConfiguration(), this.rsServices.getFlushRequester(),
274         new CancelableProgressable() {
275           public boolean progress() {
276             // We may lose the znode ownership during the open.  Currently its
277             // too hard interrupting ongoing region open.  Just let it complete
278             // and check we still have the znode after region open.
279             return tickleOpening("open_region_progress");
280           }
281         });
282     } catch (IOException e) {
283       // We failed open.  Let our znode expire in regions-in-transition and
284       // Master will assign elsewhere.  Presumes nothing to close.
285       LOG.error("Failed open of region=" +
286         this.regionInfo.getRegionNameAsString(), e);
287     }
288     return region;
289   }
290 
291   private void cleanupFailedOpen(final HRegion region) throws IOException {
292     if (region != null) region.close();
293     this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName());
294   }
295 
296   /**
297    * Transition ZK node from OFFLINE to OPENING.
298    * @param encodedName Name of the znode file (Region encodedName is the znode
299    * name).
300    * @return True if successful transition.
301    */
302   boolean transitionZookeeperOfflineToOpening(final String encodedName) {
303     // TODO: should also handle transition from CLOSED?
304     try {
305       // Initialize the znode version.
306       this.version =
307         ZKAssign.transitionNodeOpening(server.getZooKeeper(),
308           regionInfo, server.getServerName());
309     } catch (KeeperException e) {
310       LOG.error("Error transition from OFFLINE to OPENING for region=" +
311         encodedName, e);
312     }
313     boolean b = isGoodVersion();
314     if (!b) {
315       LOG.warn("Failed transition from OFFLINE to OPENING for region=" +
316         encodedName);
317     }
318     return b;
319   }
320 
321   /**
322    * Update our OPENING state in zookeeper.
323    * Do this so master doesn't timeout this region-in-transition.
324    * @param context Some context to add to logs if failure
325    * @return True if successful transition.
326    */
327   boolean tickleOpening(final String context) {
328     // If previous checks failed... do not try again.
329     if (!isGoodVersion()) return false;
330     String encodedName = this.regionInfo.getEncodedName();
331     try {
332       this.version =
333         ZKAssign.retransitionNodeOpening(server.getZooKeeper(),
334           this.regionInfo, this.server.getServerName(), this.version);
335     } catch (KeeperException e) {
336       server.abort("Exception refreshing OPENING; region=" + encodedName +
337         ", context=" + context, e);
338       this.version = -1;
339     }
340     boolean b = isGoodVersion();
341     if (!b) {
342       LOG.warn("Failed refreshing OPENING; region=" + encodedName +
343         ", context=" + context);
344     }
345     return b;
346   }
347 
348   private boolean isGoodVersion() {
349     return this.version != -1;
350   }
351 }