1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.catalog;
21  
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.net.ConnectException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import junit.framework.Assert;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.Abortable;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HServerAddress;
39  import org.apache.hadoop.hbase.HServerInfo;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
42  import org.apache.hadoop.hbase.client.Get;
43  import org.apache.hadoop.hbase.client.HConnection;
44  import org.apache.hadoop.hbase.client.Result;
45  import org.apache.hadoop.hbase.ipc.HRegionInterface;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.Threads;
48  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
49  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50  import org.apache.hadoop.util.Progressable;
51  import org.apache.zookeeper.KeeperException;
52  import org.junit.After;
53  import org.junit.AfterClass;
54  import org.junit.Before;
55  import org.junit.BeforeClass;
56  import org.junit.Test;
57  import org.mockito.Matchers;
58  import org.mockito.Mockito;
59  
60  /**
61   * Test {@link CatalogTracker}
62   */
63  public class TestCatalogTracker {
64    private static final Log LOG = LogFactory.getLog(TestCatalogTracker.class);
65    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
66    private static final HServerAddress HSA =
67      new HServerAddress("example.org:1234");
68    private ZooKeeperWatcher watcher;
69    private Abortable abortable;
70  
71    @BeforeClass public static void beforeClass() throws Exception {
72      UTIL.startMiniZKCluster();
73    }
74  
75    @AfterClass public static void afterClass() throws IOException {
76      UTIL.getZkCluster().shutdown();
77    }
78  
79    @Before public void before() throws IOException {
80      this.abortable = new Abortable() {
81        @Override
82        public void abort(String why, Throwable e) {
83          LOG.info(why, e);
84        }
85      };
86      this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(),
87        this.getClass().getSimpleName(), this.abortable);
88    }
89  
90    @After public void after() {
91      this.watcher.close();
92    }
93  
94    private CatalogTracker constructAndStartCatalogTracker()
95    throws IOException, InterruptedException {
96      return constructAndStartCatalogTracker(null);
97    }
98  
99    private CatalogTracker constructAndStartCatalogTracker(final HConnection c)
100   throws IOException, InterruptedException {
101     CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable);
102     ct.start();
103     return ct;
104   }
105 
106   /**
107    * Test that we get notification if .META. moves.
108    * @throws IOException 
109    * @throws InterruptedException 
110    * @throws KeeperException 
111    */
112   @Test public void testThatIfMETAMovesWeAreNotified()
113   throws IOException, InterruptedException, KeeperException {
114     HConnection connection = Mockito.mock(HConnection.class);
115     constructAndStartCatalogTracker(connection);
116     try {
117       RootLocationEditor.setRootLocation(this.watcher,
118         new HServerAddress("example.com:1234"));
119     } finally {
120       // Clean out root location or later tests will be confused... they presume
121       // start fresh in zk.
122       RootLocationEditor.deleteRootLocation(this.watcher);
123     }
124   }
125 
126   /**
127    * Test interruptable while blocking wait on root and meta.
128    * @throws IOException
129    * @throws InterruptedException
130    */
131   @Test public void testInterruptWaitOnMetaAndRoot()
132   throws IOException, InterruptedException {
133     final CatalogTracker ct = constructAndStartCatalogTracker();
134     HServerAddress hsa = ct.getRootLocation();
135     Assert.assertNull(hsa);
136     HServerAddress meta = ct.getMetaLocation();
137     Assert.assertNull(meta);
138     Thread t = new Thread() {
139       @Override
140       public void run() {
141         try {
142           ct.waitForMeta();
143         } catch (InterruptedException e) {
144           throw new RuntimeException("Interrupted", e);
145         }
146       }
147     };
148     t.start();
149     while (!t.isAlive()) Threads.sleep(1);
150     Threads.sleep(1);
151     assertTrue(t.isAlive());
152     ct.stop();
153     // Join the thread... should exit shortly.
154     t.join();
155   }
156 
157   @Test public void testGetMetaServerConnectionFails()
158   throws IOException, InterruptedException, KeeperException {
159     HConnection connection = Mockito.mock(HConnection.class);
160     ConnectException connectException =
161       new ConnectException("Connection refused");
162     final HRegionInterface implementation =
163       Mockito.mock(HRegionInterface.class);
164     Mockito.when(implementation.get((byte [])Mockito.any(), (Get)Mockito.any())).
165       thenThrow(connectException);
166     Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())).
167       thenReturn(implementation);
168     Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false));
169     final CatalogTracker ct = constructAndStartCatalogTracker(connection);
170     try {
171       RootLocationEditor.setRootLocation(this.watcher,
172         new HServerAddress("example.com:1234"));
173       Assert.assertFalse(ct.verifyMetaRegionLocation(100));
174     } finally {
175       // Clean out root location or later tests will be confused... they presume
176       // start fresh in zk.
177       RootLocationEditor.deleteRootLocation(this.watcher);
178     }
179   }
180 
181   /**
182    * Test get of root region fails properly if nothing to connect to.
183    * @throws IOException
184    * @throws InterruptedException
185    * @throws KeeperException
186    */
187   @Test
188   public void testVerifyRootRegionLocationFails()
189   throws IOException, InterruptedException, KeeperException {
190     HConnection connection = Mockito.mock(HConnection.class);
191     ConnectException connectException =
192       new ConnectException("Connection refused");
193     final HRegionInterface implementation =
194       Mockito.mock(HRegionInterface.class);
195     Mockito.when(implementation.getRegionInfo((byte [])Mockito.any())).
196       thenThrow(connectException);
197     Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())).
198       thenReturn(implementation);
199     Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false));
200     final CatalogTracker ct = constructAndStartCatalogTracker(connection);
201     try {
202       RootLocationEditor.setRootLocation(this.watcher,
203         new HServerAddress("example.com:1234"));
204       Assert.assertFalse(ct.verifyRootRegionLocation(100));
205     } finally {
206       // Clean out root location or later tests will be confused... they presume
207       // start fresh in zk.
208       RootLocationEditor.deleteRootLocation(this.watcher);
209     }
210   }
211 
212   @Test (expected = NotAllMetaRegionsOnlineException.class)
213   public void testTimeoutWaitForRoot()
214   throws IOException, InterruptedException {
215     final CatalogTracker ct = constructAndStartCatalogTracker();
216     ct.waitForRoot(100);
217   }
218 
219   @Test (expected = NotAllMetaRegionsOnlineException.class)
220   public void testTimeoutWaitForMeta()
221   throws IOException, InterruptedException {
222     final CatalogTracker ct = constructAndStartCatalogTracker();
223     ct.waitForMeta(100);
224   }
225 
226   /**
227    * Test waiting on root w/ no timeout specified.
228    * @throws IOException
229    * @throws InterruptedException
230    * @throws KeeperException
231    */
232   @Test public void testNoTimeoutWaitForRoot()
233   throws IOException, InterruptedException, KeeperException {
234     final CatalogTracker ct = constructAndStartCatalogTracker();
235     HServerAddress hsa = ct.getRootLocation();
236     Assert.assertNull(hsa);
237 
238     // Now test waiting on root location getting set.
239     Thread t = new WaitOnMetaThread(ct);
240     startWaitAliveThenWaitItLives(t, 1000);
241     // Set a root location.
242     hsa = setRootLocation();
243     // Join the thread... should exit shortly.
244     t.join();
245     // Now root is available.
246     Assert.assertTrue(ct.getRootLocation().equals(hsa));
247   }
248 
249   private HServerAddress setRootLocation() throws KeeperException {
250     RootLocationEditor.setRootLocation(this.watcher, HSA);
251     return HSA;
252   }
253 
254   /**
255    * Test waiting on meta w/ no timeout specified.
256    * @throws IOException
257    * @throws InterruptedException
258    * @throws KeeperException
259    */
260   @Test public void testNoTimeoutWaitForMeta()
261   throws IOException, InterruptedException, KeeperException {
262     // Mock an HConnection and a HRegionInterface implementation.  Have the
263     // HConnection return the HRI.  Have the HRI return a few mocked up responses
264     // to make our test work.
265     HConnection connection = Mockito.mock(HConnection.class);
266     HRegionInterface  mockHRI = Mockito.mock(HRegionInterface.class);
267     // Make the HRI return an answer no matter how Get is called.  Same for
268     // getHRegionInfo.  Thats enough for this test.
269     Mockito.when(connection.getHRegionConnection((HServerAddress)Mockito.any(), Mockito.anyBoolean())).
270       thenReturn(mockHRI);
271 
272     final CatalogTracker ct = constructAndStartCatalogTracker(connection);
273     HServerAddress hsa = ct.getMetaLocation();
274     Assert.assertNull(hsa);
275 
276     // Now test waiting on meta location getting set.
277     Thread t = new WaitOnMetaThread(ct) {
278       @Override
279       void doWaiting() throws InterruptedException {
280         this.ct.waitForMeta();
281       }
282     };
283     startWaitAliveThenWaitItLives(t, 1000);
284 
285     // Now the ct is up... set into the mocks some answers that make it look
286     // like things have been getting assigned.  Make it so we'll return a
287     // location (no matter what the Get is).  Same for getHRegionInfo -- always
288     // just return the meta region.
289     List<KeyValue> kvs = new ArrayList<KeyValue>();
290     kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
291       HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
292       Bytes.toBytes(HSA.toString())));
293     final Result result = new Result(kvs);
294     Mockito.when(mockHRI.get((byte [])Mockito.any(), (Get)Mockito.any())).
295       thenReturn(result);
296     Mockito.when(mockHRI.getRegionInfo((byte [])Mockito.any())).
297       thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
298     // This should trigger wake up of meta wait (Its the removal of the meta
299     // region unassigned node that triggers catalogtrackers that a meta has
300     // been assigned.
301     String node = ct.getMetaNodeTracker().getNode();
302     ZKUtil.createAndFailSilent(this.watcher, node);
303     MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO,
304       new HServerInfo(HSA, -1, "example.com"));
305     ZKUtil.deleteNode(this.watcher, node);
306     // Join the thread... should exit shortly.
307     t.join();
308     // Now meta is available.
309     Assert.assertTrue(ct.getMetaLocation().equals(HSA));
310   }
311 
312   private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
313     t.start();
314     while(!t.isAlive()) {
315       // Wait
316     }
317     // Wait one second.
318     Threads.sleep(ms);
319     Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
320   }
321 
322   class CountingProgressable implements Progressable {
323     final AtomicInteger counter = new AtomicInteger(0);
324     @Override
325     public void progress() {
326       this.counter.incrementAndGet();
327     }
328   }
329 
330   /**
331    * Wait on META.
332    * Default is wait on -ROOT-.
333    */
334   class WaitOnMetaThread extends Thread {
335     final CatalogTracker ct;
336 
337     WaitOnMetaThread(final CatalogTracker ct) {
338       super("WaitOnMeta");
339       this.ct = ct;
340     }
341 
342     @Override
343     public void run() {
344       try {
345         doWaiting();
346       } catch (InterruptedException e) {
347         throw new RuntimeException("Failed wait", e);
348       }
349       LOG.info("Exiting " + getName());
350     }
351 
352     void doWaiting() throws InterruptedException {
353       this.ct.waitForRoot();
354     }
355   }
356 }