1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  
23  import java.io.IOException;
24  import java.util.Collection;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.HRegionInfo;
33  import org.apache.hadoop.hbase.MiniHBaseCluster;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.ResultScanner;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.executor.EventHandler;
40  import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
41  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
42  import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo;
43  import org.apache.hadoop.hbase.regionserver.HRegionServer;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Threads;
46  import org.apache.hadoop.hbase.util.Writables;
47  import org.junit.AfterClass;
48  import org.junit.Assert;
49  import org.junit.Before;
50  import org.junit.BeforeClass;
51  import org.junit.Test;
52  import static org.junit.Assert.assertTrue;
53  
54  /**
55   * Test open and close of regions using zk.
56   */
57  public class TestZKBasedOpenCloseRegion {
58    private static final Log LOG = LogFactory.getLog(TestZKBasedOpenCloseRegion.class);
59    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60    private static final String TABLENAME = "TestZKBasedOpenCloseRegion";
61    private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
62      Bytes.toBytes("b"), Bytes.toBytes("c")};
63    private static int countOfRegions;
64  
65    @BeforeClass public static void beforeAllTests() throws Exception {
66      Configuration c = TEST_UTIL.getConfiguration();
67      c.setBoolean("dfs.support.append", true);
68      c.setInt("hbase.regionserver.info.port", 0);
69      TEST_UTIL.startMiniCluster(2);
70      TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
71      HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
72      countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
73      waitUntilAllRegionsAssigned();
74      addToEachStartKey(countOfRegions);
75    }
76  
77    @AfterClass public static void afterAllTests() throws IOException {
78      TEST_UTIL.shutdownMiniCluster();
79    }
80  
81    @Before public void setup() throws IOException {
82      if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
83        // Need at least two servers.
84        LOG.info("Started new server=" +
85          TEST_UTIL.getHBaseCluster().startRegionServer());
86  
87      }
88      waitUntilAllRegionsAssigned();
89    }
90  
91    /**
92     * Test we reopen a region once closed.
93     * @throws Exception
94     */
95    @Test (timeout=300000) public void testReOpenRegion()
96    throws Exception {
97      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
98      LOG.info("Number of region servers = " +
99        cluster.getLiveRegionServerThreads().size());
100 
101     int rsIdx = 0;
102     HRegionServer regionServer =
103       TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
104     HRegionInfo hri = getNonMetaRegion(regionServer.getOnlineRegions());
105     LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
106 
107     AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
108     AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
109 
110     EventHandlerListener closeListener =
111       new ReopenEventListener(hri.getRegionNameAsString(),
112           closeEventProcessed, EventType.RS_ZK_REGION_CLOSED);
113     cluster.getMaster().executorService.
114       registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener);
115 
116     EventHandlerListener openListener =
117       new ReopenEventListener(hri.getRegionNameAsString(),
118           reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
119     cluster.getMaster().executorService.
120       registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
121 
122     LOG.info("Unassign " + hri.getRegionNameAsString());
123     cluster.getMaster().assignmentManager.unassign(hri);
124 
125     while (!closeEventProcessed.get()) {
126       Threads.sleep(100);
127     }
128 
129     while (!reopenEventProcessed.get()) {
130       Threads.sleep(100);
131     }
132 
133     LOG.info("Done with testReOpenRegion");
134   }
135 
136   private HRegionInfo getNonMetaRegion(final Collection<HRegionInfo> regions) {
137     HRegionInfo hri = null;
138     for (HRegionInfo i: regions) {
139       LOG.info(i.getRegionNameAsString());
140       if (!i.isMetaRegion()) {
141         hri = i;
142         break;
143       }
144     }
145     return hri;
146   }
147 
148   public static class ReopenEventListener implements EventHandlerListener {
149     private static final Log LOG = LogFactory.getLog(ReopenEventListener.class);
150     String regionName;
151     AtomicBoolean eventProcessed;
152     EventType eventType;
153 
154     public ReopenEventListener(String regionName,
155         AtomicBoolean eventProcessed, EventType eventType) {
156       this.regionName = regionName;
157       this.eventProcessed = eventProcessed;
158       this.eventType = eventType;
159     }
160 
161     @Override
162     public void beforeProcess(EventHandler event) {
163       if(event.getEventType() == eventType) {
164         LOG.info("Received " + eventType + " and beginning to process it");
165       }
166     }
167 
168     @Override
169     public void afterProcess(EventHandler event) {
170       LOG.info("afterProcess(" + event + ")");
171       if(event.getEventType() == eventType) {
172         LOG.info("Finished processing " + eventType);
173         String regionName = "";
174         if(eventType == EventType.RS_ZK_REGION_OPENED) {
175           TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
176           regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
177         } else if(eventType == EventType.RS_ZK_REGION_CLOSED) {
178           TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
179           regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
180         }
181         if(this.regionName.equals(regionName)) {
182           eventProcessed.set(true);
183         }
184         synchronized(eventProcessed) {
185           eventProcessed.notifyAll();
186         }
187       }
188     }
189   }
190 
191   /**
192    * This test shows how a region won't be able to be assigned to a RS
193    * if it's already "processing" it.
194    * @throws Exception
195    */
196   @Test
197   public void testRSAlreadyProcessingRegion() throws Exception {
198     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
199 
200     HRegionServer hr0 =
201         cluster.getLiveRegionServerThreads().get(0).getRegionServer();
202     HRegionServer hr1 =
203         cluster.getLiveRegionServerThreads().get(1).getRegionServer();
204     HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions());
205 
206     // fake that hr1 is processing the region
207     hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes());
208 
209     AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
210     EventHandlerListener openListener =
211       new ReopenEventListener(hri.getRegionNameAsString(),
212           reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
213     cluster.getMaster().executorService.
214       registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
215 
216     // now ask the master to move the region to hr1, will fail
217     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
218         Bytes.toBytes(hr1.getServerName()));
219 
220     while (!reopenEventProcessed.get()) {
221       Threads.sleep(100);
222     }
223 
224     // make sure the region came back
225     assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
226 
227     // remove the block and reset the boolean
228     hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
229     reopenEventProcessed.set(false);
230 
231     // move the region again, but this time it will work
232     TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
233         Bytes.toBytes(hr1.getServerName()));
234 
235     while (!reopenEventProcessed.get()) {
236       Threads.sleep(100);
237     }
238 
239     // make sure the region has moved from the original RS
240     assertTrue(hr0.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
241 
242   }
243 
244   @Test (timeout=300000) public void testCloseRegion()
245   throws Exception {
246     LOG.info("Running testCloseRegion");
247     MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
248     LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
249 
250     int rsIdx = 0;
251     HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
252     HRegionInfo hri = getNonMetaRegion(regionServer.getOnlineRegions());
253     LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
254 
255     AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
256     EventHandlerListener listener =
257       new CloseRegionEventListener(hri.getRegionNameAsString(),
258           closeEventProcessed);
259     cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener);
260 
261     cluster.getMaster().assignmentManager.unassign(hri);
262 
263     while (!closeEventProcessed.get()) {
264       Threads.sleep(100);
265     }
266     LOG.info("Done with testCloseRegion");
267   }
268 
269   public static class CloseRegionEventListener implements EventHandlerListener {
270     private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
271     String regionToClose;
272     AtomicBoolean closeEventProcessed;
273 
274     public CloseRegionEventListener(String regionToClose,
275         AtomicBoolean closeEventProcessed) {
276       this.regionToClose = regionToClose;
277       this.closeEventProcessed = closeEventProcessed;
278     }
279 
280     @Override
281     public void afterProcess(EventHandler event) {
282       LOG.info("afterProcess(" + event + ")");
283       if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) {
284         LOG.info("Finished processing CLOSE REGION");
285         TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
286         if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) {
287           LOG.info("Setting closeEventProcessed flag");
288           closeEventProcessed.set(true);
289         } else {
290           LOG.info("Region to close didn't match");
291         }
292       }
293     }
294 
295     @Override
296     public void beforeProcess(EventHandler event) {
297       if(event.getEventType() == EventType.M_RS_CLOSE_REGION) {
298         LOG.info("Received CLOSE RPC and beginning to process it");
299       }
300     }
301   }
302 
303 
304   private static void waitUntilAllRegionsAssigned()
305   throws IOException {
306     HTable meta = new HTable(TEST_UTIL.getConfiguration(),
307       HConstants.META_TABLE_NAME);
308     while (true) {
309       int rows = 0;
310       Scan scan = new Scan();
311       scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
312       ResultScanner s = meta.getScanner(scan);
313       for (Result r = null; (r = s.next()) != null;) {
314         byte [] b =
315           r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
316         if (b == null || b.length <= 0) {
317           break;
318         }
319         rows++;
320       }
321       s.close();
322       // If I get to here and all rows have a Server, then all have been assigned.
323       if (rows >= countOfRegions) {
324         break;
325       }
326       LOG.info("Found=" + rows);
327       Threads.sleep(1000);
328     }
329   }
330 
331   /*
332    * Add to each of the regions in .META. a value.  Key is the startrow of the
333    * region (except its 'aaa' for first region).  Actual value is the row name.
334    * @param expected
335    * @return
336    * @throws IOException
337    */
338   private static int addToEachStartKey(final int expected) throws IOException {
339     HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
340     HTable meta = new HTable(TEST_UTIL.getConfiguration(),
341         HConstants.META_TABLE_NAME);
342     int rows = 0;
343     Scan scan = new Scan();
344     scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
345     ResultScanner s = meta.getScanner(scan);
346     for (Result r = null; (r = s.next()) != null;) {
347       byte [] b =
348         r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
349       if (b == null || b.length <= 0) {
350         break;
351       }
352       HRegionInfo hri = Writables.getHRegionInfo(b);
353       // If start key, add 'aaa'.
354       byte [] row = getStartKey(hri);
355       Put p = new Put(row);
356       p.add(getTestFamily(), getTestQualifier(), row);
357       t.put(p);
358       rows++;
359     }
360     s.close();
361     Assert.assertEquals(expected, rows);
362     return rows;
363   }
364 
365   private static byte [] getStartKey(final HRegionInfo hri) {
366     return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
367         Bytes.toBytes("aaa"): hri.getStartKey();
368   }
369 
370   private static byte [] getTestFamily() {
371     return FAMILIES[0];
372   }
373 
374   private static byte [] getTestQualifier() {
375     return getTestFamily();
376   }
377 
378   public static void main(String args[]) throws Exception {
379     TestZKBasedOpenCloseRegion.beforeAllTests();
380 
381     TestZKBasedOpenCloseRegion test = new TestZKBasedOpenCloseRegion();
382     test.setup();
383     test.testCloseRegion();
384 
385     TestZKBasedOpenCloseRegion.afterAllTests();
386   }
387 }