1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.MiniHBaseCluster;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.ResultScanner;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.executor.EventHandler;
40 import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
41 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
42 import org.apache.hadoop.hbase.master.handler.TotesHRegionInfo;
43 import org.apache.hadoop.hbase.regionserver.HRegionServer;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.Threads;
46 import org.apache.hadoop.hbase.util.Writables;
47 import org.junit.AfterClass;
48 import org.junit.Assert;
49 import org.junit.Before;
50 import org.junit.BeforeClass;
51 import org.junit.Test;
52 import static org.junit.Assert.assertTrue;
53
54
55
56
57 public class TestZKBasedOpenCloseRegion {
58 private static final Log LOG = LogFactory.getLog(TestZKBasedOpenCloseRegion.class);
59 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60 private static final String TABLENAME = "TestZKBasedOpenCloseRegion";
61 private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
62 Bytes.toBytes("b"), Bytes.toBytes("c")};
63 private static int countOfRegions;
64
65 @BeforeClass public static void beforeAllTests() throws Exception {
66 Configuration c = TEST_UTIL.getConfiguration();
67 c.setBoolean("dfs.support.append", true);
68 c.setInt("hbase.regionserver.info.port", 0);
69 TEST_UTIL.startMiniCluster(2);
70 TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
71 HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
72 countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
73 waitUntilAllRegionsAssigned();
74 addToEachStartKey(countOfRegions);
75 }
76
77 @AfterClass public static void afterAllTests() throws IOException {
78 TEST_UTIL.shutdownMiniCluster();
79 }
80
81 @Before public void setup() throws IOException {
82 if (TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size() < 2) {
83
84 LOG.info("Started new server=" +
85 TEST_UTIL.getHBaseCluster().startRegionServer());
86
87 }
88 waitUntilAllRegionsAssigned();
89 }
90
91
92
93
94
95 @Test (timeout=300000) public void testReOpenRegion()
96 throws Exception {
97 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
98 LOG.info("Number of region servers = " +
99 cluster.getLiveRegionServerThreads().size());
100
101 int rsIdx = 0;
102 HRegionServer regionServer =
103 TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
104 HRegionInfo hri = getNonMetaRegion(regionServer.getOnlineRegions());
105 LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
106
107 AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
108 AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
109
110 EventHandlerListener closeListener =
111 new ReopenEventListener(hri.getRegionNameAsString(),
112 closeEventProcessed, EventType.RS_ZK_REGION_CLOSED);
113 cluster.getMaster().executorService.
114 registerListener(EventType.RS_ZK_REGION_CLOSED, closeListener);
115
116 EventHandlerListener openListener =
117 new ReopenEventListener(hri.getRegionNameAsString(),
118 reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
119 cluster.getMaster().executorService.
120 registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
121
122 LOG.info("Unassign " + hri.getRegionNameAsString());
123 cluster.getMaster().assignmentManager.unassign(hri);
124
125 while (!closeEventProcessed.get()) {
126 Threads.sleep(100);
127 }
128
129 while (!reopenEventProcessed.get()) {
130 Threads.sleep(100);
131 }
132
133 LOG.info("Done with testReOpenRegion");
134 }
135
136 private HRegionInfo getNonMetaRegion(final Collection<HRegionInfo> regions) {
137 HRegionInfo hri = null;
138 for (HRegionInfo i: regions) {
139 LOG.info(i.getRegionNameAsString());
140 if (!i.isMetaRegion()) {
141 hri = i;
142 break;
143 }
144 }
145 return hri;
146 }
147
148 public static class ReopenEventListener implements EventHandlerListener {
149 private static final Log LOG = LogFactory.getLog(ReopenEventListener.class);
150 String regionName;
151 AtomicBoolean eventProcessed;
152 EventType eventType;
153
154 public ReopenEventListener(String regionName,
155 AtomicBoolean eventProcessed, EventType eventType) {
156 this.regionName = regionName;
157 this.eventProcessed = eventProcessed;
158 this.eventType = eventType;
159 }
160
161 @Override
162 public void beforeProcess(EventHandler event) {
163 if(event.getEventType() == eventType) {
164 LOG.info("Received " + eventType + " and beginning to process it");
165 }
166 }
167
168 @Override
169 public void afterProcess(EventHandler event) {
170 LOG.info("afterProcess(" + event + ")");
171 if(event.getEventType() == eventType) {
172 LOG.info("Finished processing " + eventType);
173 String regionName = "";
174 if(eventType == EventType.RS_ZK_REGION_OPENED) {
175 TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
176 regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
177 } else if(eventType == EventType.RS_ZK_REGION_CLOSED) {
178 TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
179 regionName = hriCarrier.getHRegionInfo().getRegionNameAsString();
180 }
181 if(this.regionName.equals(regionName)) {
182 eventProcessed.set(true);
183 }
184 synchronized(eventProcessed) {
185 eventProcessed.notifyAll();
186 }
187 }
188 }
189 }
190
191
192
193
194
195
196 @Test
197 public void testRSAlreadyProcessingRegion() throws Exception {
198 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
199
200 HRegionServer hr0 =
201 cluster.getLiveRegionServerThreads().get(0).getRegionServer();
202 HRegionServer hr1 =
203 cluster.getLiveRegionServerThreads().get(1).getRegionServer();
204 HRegionInfo hri = getNonMetaRegion(hr0.getOnlineRegions());
205
206
207 hr1.getRegionsInTransitionInRS().add(hri.getEncodedNameAsBytes());
208
209 AtomicBoolean reopenEventProcessed = new AtomicBoolean(false);
210 EventHandlerListener openListener =
211 new ReopenEventListener(hri.getRegionNameAsString(),
212 reopenEventProcessed, EventType.RS_ZK_REGION_OPENED);
213 cluster.getMaster().executorService.
214 registerListener(EventType.RS_ZK_REGION_OPENED, openListener);
215
216
217 TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
218 Bytes.toBytes(hr1.getServerName()));
219
220 while (!reopenEventProcessed.get()) {
221 Threads.sleep(100);
222 }
223
224
225 assertTrue(hr1.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
226
227
228 hr1.getRegionsInTransitionInRS().remove(hri.getEncodedNameAsBytes());
229 reopenEventProcessed.set(false);
230
231
232 TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
233 Bytes.toBytes(hr1.getServerName()));
234
235 while (!reopenEventProcessed.get()) {
236 Threads.sleep(100);
237 }
238
239
240 assertTrue(hr0.getOnlineRegion(hri.getEncodedNameAsBytes()) == null);
241
242 }
243
244 @Test (timeout=300000) public void testCloseRegion()
245 throws Exception {
246 LOG.info("Running testCloseRegion");
247 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
248 LOG.info("Number of region servers = " + cluster.getLiveRegionServerThreads().size());
249
250 int rsIdx = 0;
251 HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
252 HRegionInfo hri = getNonMetaRegion(regionServer.getOnlineRegions());
253 LOG.debug("Asking RS to close region " + hri.getRegionNameAsString());
254
255 AtomicBoolean closeEventProcessed = new AtomicBoolean(false);
256 EventHandlerListener listener =
257 new CloseRegionEventListener(hri.getRegionNameAsString(),
258 closeEventProcessed);
259 cluster.getMaster().executorService.registerListener(EventType.RS_ZK_REGION_CLOSED, listener);
260
261 cluster.getMaster().assignmentManager.unassign(hri);
262
263 while (!closeEventProcessed.get()) {
264 Threads.sleep(100);
265 }
266 LOG.info("Done with testCloseRegion");
267 }
268
269 public static class CloseRegionEventListener implements EventHandlerListener {
270 private static final Log LOG = LogFactory.getLog(CloseRegionEventListener.class);
271 String regionToClose;
272 AtomicBoolean closeEventProcessed;
273
274 public CloseRegionEventListener(String regionToClose,
275 AtomicBoolean closeEventProcessed) {
276 this.regionToClose = regionToClose;
277 this.closeEventProcessed = closeEventProcessed;
278 }
279
280 @Override
281 public void afterProcess(EventHandler event) {
282 LOG.info("afterProcess(" + event + ")");
283 if(event.getEventType() == EventType.RS_ZK_REGION_CLOSED) {
284 LOG.info("Finished processing CLOSE REGION");
285 TotesHRegionInfo hriCarrier = (TotesHRegionInfo)event;
286 if (regionToClose.equals(hriCarrier.getHRegionInfo().getRegionNameAsString())) {
287 LOG.info("Setting closeEventProcessed flag");
288 closeEventProcessed.set(true);
289 } else {
290 LOG.info("Region to close didn't match");
291 }
292 }
293 }
294
295 @Override
296 public void beforeProcess(EventHandler event) {
297 if(event.getEventType() == EventType.M_RS_CLOSE_REGION) {
298 LOG.info("Received CLOSE RPC and beginning to process it");
299 }
300 }
301 }
302
303
304 private static void waitUntilAllRegionsAssigned()
305 throws IOException {
306 HTable meta = new HTable(TEST_UTIL.getConfiguration(),
307 HConstants.META_TABLE_NAME);
308 while (true) {
309 int rows = 0;
310 Scan scan = new Scan();
311 scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
312 ResultScanner s = meta.getScanner(scan);
313 for (Result r = null; (r = s.next()) != null;) {
314 byte [] b =
315 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
316 if (b == null || b.length <= 0) {
317 break;
318 }
319 rows++;
320 }
321 s.close();
322
323 if (rows >= countOfRegions) {
324 break;
325 }
326 LOG.info("Found=" + rows);
327 Threads.sleep(1000);
328 }
329 }
330
331
332
333
334
335
336
337
338 private static int addToEachStartKey(final int expected) throws IOException {
339 HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
340 HTable meta = new HTable(TEST_UTIL.getConfiguration(),
341 HConstants.META_TABLE_NAME);
342 int rows = 0;
343 Scan scan = new Scan();
344 scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
345 ResultScanner s = meta.getScanner(scan);
346 for (Result r = null; (r = s.next()) != null;) {
347 byte [] b =
348 r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
349 if (b == null || b.length <= 0) {
350 break;
351 }
352 HRegionInfo hri = Writables.getHRegionInfo(b);
353
354 byte [] row = getStartKey(hri);
355 Put p = new Put(row);
356 p.add(getTestFamily(), getTestQualifier(), row);
357 t.put(p);
358 rows++;
359 }
360 s.close();
361 Assert.assertEquals(expected, rows);
362 return rows;
363 }
364
365 private static byte [] getStartKey(final HRegionInfo hri) {
366 return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
367 Bytes.toBytes("aaa"): hri.getStartKey();
368 }
369
370 private static byte [] getTestFamily() {
371 return FAMILIES[0];
372 }
373
374 private static byte [] getTestQualifier() {
375 return getTestFamily();
376 }
377
378 public static void main(String args[]) throws Exception {
379 TestZKBasedOpenCloseRegion.beforeAllTests();
380
381 TestZKBasedOpenCloseRegion test = new TestZKBasedOpenCloseRegion();
382 test.setup();
383 test.testCloseRegion();
384
385 TestZKBasedOpenCloseRegion.afterAllTests();
386 }
387 }