1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.executor;
21  
22  import static org.junit.Assert.assertEquals;
23  
24  import java.io.IOException;
25  import java.util.concurrent.ThreadPoolExecutor;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
34  import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
35  import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
36  import org.junit.Test;
37  import static org.mockito.Mockito.*;
38  
39  public class TestExecutorService {
40    private static final Log LOG = LogFactory.getLog(TestExecutorService.class);
41  
42    @Test
43    public void testExecutorService() throws Exception {
44      int maxThreads = 5;
45      int maxTries = 10;
46      int sleepInterval = 10;
47  
48      Server mockedServer = mock(Server.class);
49      when(mockedServer.getConfiguration()).thenReturn(HBaseConfiguration.create());
50  
51      // Start an executor service pool with max 5 threads
52      ExecutorService executorService = new ExecutorService("unit_test");
53      executorService.startExecutorService(
54        ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
55  
56      Executor executor =
57        executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
58      ThreadPoolExecutor pool = executor.threadPoolExecutor;
59  
60      // Assert no threads yet
61      assertEquals(0, pool.getPoolSize());
62  
63      AtomicBoolean lock = new AtomicBoolean(true);
64      AtomicInteger counter = new AtomicInteger(0);
65  
66      // Submit maxThreads executors.
67      for (int i = 0; i < maxThreads; i++) {
68        executorService.submit(
69          new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
70              lock, counter));
71      }
72  
73      // The TestEventHandler will increment counter when it starts.
74      int tries = 0;
75      while (counter.get() < maxThreads && tries < maxTries) {
76        LOG.info("Waiting for all event handlers to start...");
77        Thread.sleep(sleepInterval);
78        tries++;
79      }
80  
81      // Assert that pool is at max threads.
82      assertEquals(maxThreads, counter.get());
83      assertEquals(maxThreads, pool.getPoolSize());
84  
85      // Now interrupt the running Executor
86      synchronized (lock) {
87        lock.set(false);
88        lock.notifyAll();
89      }
90  
91      // Executor increments counter again on way out so.... test that happened.
92      while (counter.get() < (maxThreads * 2) && tries < maxTries) {
93        System.out.println("Waiting for all event handlers to finish...");
94        Thread.sleep(sleepInterval);
95        tries++;
96      }
97  
98      assertEquals(maxThreads * 2, counter.get());
99      assertEquals(maxThreads, pool.getPoolSize());
100 
101     // Add more than the number of threads items.
102     // Make sure we don't get RejectedExecutionException.
103     for (int i = 0; i < (2 * maxThreads); i++) {
104       executorService.submit(
105         new TestEventHandler(mockedServer, EventType.M_SERVER_SHUTDOWN,
106             lock, counter));
107     }
108     // Now interrupt the running Executor
109     synchronized (lock) {
110       lock.set(false);
111       lock.notifyAll();
112     }
113 
114     // Make sure threads are still around even after their timetolive expires.
115     Thread.sleep(executor.keepAliveTimeInMillis * 2);
116     assertEquals(maxThreads, pool.getPoolSize());
117   }
118 
119   public static class TestEventHandler extends EventHandler {
120     private AtomicBoolean lock;
121     private AtomicInteger counter;
122 
123     public TestEventHandler(Server server, EventType eventType,
124                             AtomicBoolean lock, AtomicInteger counter) {
125       super(server, eventType);
126       this.lock = lock;
127       this.counter = counter;
128     }
129 
130     @Override
131     public void process() throws IOException {
132       int num = counter.incrementAndGet();
133       LOG.info("Running process #" + num + ", threadName=" +
134         Thread.currentThread().getName());
135       synchronized (lock) {
136         while (lock.get()) {
137           try {
138             lock.wait();
139           } catch (InterruptedException e) {
140             // do nothing
141           }
142         }
143       }
144       counter.incrementAndGet();
145     }
146   }
147 }