View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.executor;
21  
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Map.Entry;
25  import java.util.concurrent.BlockingQueue;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.LinkedBlockingQueue;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
34  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
35  
36  import com.google.common.util.concurrent.ThreadFactoryBuilder;
37  
38  /**
39   * This is a generic executor service. This component abstracts a
40   * threadpool, a queue to which {@link EventHandler.EventType}s can be submitted,
41   * and a <code>Runnable</code> that handles the object that is added to the queue.
42   *
43   * <p>In order to create a new service, create an instance of this class and
44   * then do: <code>instance.startExecutorService("myService");</code>.  When done
45   * call {@link #shutdown()}.
46   *
47   * <p>In order to use the service created above, call
48   * {@link #submit(EventHandler)}. Register pre- and post- processing listeners
49   * by registering your implementation of {@link EventHandler.EventHandlerListener}
50   * with {@link #registerListener(EventHandler.EventType, EventHandler.EventHandlerListener)}.  Be sure
51   * to deregister your listener when done via {@link #unregisterListener(EventHandler.EventType)}.
52   */
53  public class ExecutorService {
54    private static final Log LOG = LogFactory.getLog(ExecutorService.class);
55  
56    // hold the all the executors created in a map addressable by their names
57    private final ConcurrentHashMap<String, Executor> executorMap =
58      new ConcurrentHashMap<String, Executor>();
59  
60    // listeners that are called before and after an event is processed
61    private ConcurrentHashMap<EventHandler.EventType, EventHandlerListener> eventHandlerListeners =
62      new ConcurrentHashMap<EventHandler.EventType, EventHandlerListener>();
63  
64    // Name of the server hosting this executor service.
65    private final String servername;
66  
67    /**
68     * The following is a list of all executor types, both those that run in the
69     * master and those that run in the regionserver.
70     */
71    public enum ExecutorType {
72  
73      // Master executor services
74      MASTER_CLOSE_REGION        (1),
75      MASTER_OPEN_REGION         (2),
76      MASTER_SERVER_OPERATIONS   (3),
77      MASTER_TABLE_OPERATIONS    (4),
78      MASTER_RS_SHUTDOWN         (5),
79      MASTER_META_SERVER_OPERATIONS (6),
80  
81      // RegionServer executor services
82      RS_OPEN_REGION             (20),
83      RS_OPEN_ROOT               (21),
84      RS_OPEN_META               (22),
85      RS_CLOSE_REGION            (23),
86      RS_CLOSE_ROOT              (24),
87      RS_CLOSE_META              (25);
88  
89      ExecutorType(int value) {}
90  
91      /**
92       * @param serverName
93       * @return Conflation of the executor type and the passed servername.
94       */
95      String getExecutorName(String serverName) {
96        return this.toString() + "-" + serverName;
97      }
98    }
99  
100   /**
101    * Returns the executor service type (the thread pool instance) for the
102    * passed event handler type.
103    * @param type EventHandler type.
104    */
105   public ExecutorType getExecutorServiceType(final EventHandler.EventType type) {
106     switch(type) {
107       // Master executor services
108 
109       case RS_ZK_REGION_CLOSED:
110         return ExecutorType.MASTER_CLOSE_REGION;
111 
112       case RS_ZK_REGION_OPENED:
113         return ExecutorType.MASTER_OPEN_REGION;
114 
115       case M_SERVER_SHUTDOWN:
116         return ExecutorType.MASTER_SERVER_OPERATIONS;
117 
118       case M_META_SERVER_SHUTDOWN:
119         return ExecutorType.MASTER_META_SERVER_OPERATIONS;
120 
121       case C_M_DELETE_TABLE:
122       case C_M_DISABLE_TABLE:
123       case C_M_ENABLE_TABLE:
124       case C_M_MODIFY_TABLE:
125         return ExecutorType.MASTER_TABLE_OPERATIONS;
126 
127       // RegionServer executor services
128 
129       case M_RS_OPEN_REGION:
130         return ExecutorType.RS_OPEN_REGION;
131 
132       case M_RS_OPEN_ROOT:
133         return ExecutorType.RS_OPEN_ROOT;
134 
135       case M_RS_OPEN_META:
136         return ExecutorType.RS_OPEN_META;
137 
138       case M_RS_CLOSE_REGION:
139         return ExecutorType.RS_CLOSE_REGION;
140 
141       case M_RS_CLOSE_ROOT:
142         return ExecutorType.RS_CLOSE_ROOT;
143 
144       case M_RS_CLOSE_META:
145         return ExecutorType.RS_CLOSE_META;
146 
147       default:
148         throw new RuntimeException("Unhandled event type " + type);
149     }
150   }
151 
152   /**
153    * Default constructor.
154    * @param servername Name of the hosting server.
155    */
156   public ExecutorService(final String servername) {
157     super();
158     this.servername = servername;
159   }
160 
161   /**
162    * Start an executor service with a given name. If there was a service already
163    * started with the same name, this throws a RuntimeException.
164    * @param name Name of the service to start.
165    */
166   void startExecutorService(String name, int maxThreads) {
167     if (this.executorMap.get(name) != null) {
168       throw new RuntimeException("An executor service with the name " + name +
169         " is already running!");
170     }
171     Executor hbes = new Executor(name, maxThreads, this.eventHandlerListeners);
172     if (this.executorMap.putIfAbsent(name, hbes) != null) {
173       throw new RuntimeException("An executor service with the name " + name +
174       " is already running (2)!");
175     }
176     LOG.debug("Starting executor service name=" + name +
177       ", corePoolSize=" + hbes.threadPoolExecutor.getCorePoolSize() +
178       ", maxPoolSize=" + hbes.threadPoolExecutor.getMaximumPoolSize());
179   }
180 
181   boolean isExecutorServiceRunning(String name) {
182     return this.executorMap.containsKey(name);
183   }
184 
185   public void shutdown() {
186     for(Entry<String, Executor> entry: this.executorMap.entrySet()) {
187       List<Runnable> wasRunning =
188         entry.getValue().threadPoolExecutor.shutdownNow();
189       if (!wasRunning.isEmpty()) {
190         LOG.info(entry.getKey() + " had " + wasRunning + " on shutdown");
191       }
192     }
193     this.executorMap.clear();
194   }
195 
196   Executor getExecutor(final ExecutorType type) {
197     return getExecutor(type.getExecutorName(this.servername));
198   }
199 
200   Executor getExecutor(String name) {
201     Executor executor = this.executorMap.get(name);
202     if (executor == null) {
203       LOG.debug("Executor service [" + name + "] not found in " + this.executorMap);
204     }
205     return executor;
206   }
207 
208 
209   public void startExecutorService(final ExecutorType type, final int maxThreads) {
210     String name = type.getExecutorName(this.servername);
211     if (isExecutorServiceRunning(name)) {
212       LOG.debug("Executor service " + toString() + " already running on " +
213         this.servername);
214       return;
215     }
216     startExecutorService(name, maxThreads);
217   }
218 
219   public void submit(final EventHandler eh) {
220     getExecutor(getExecutorServiceType(eh.getEventType())).submit(eh);
221   }
222 
223   /**
224    * Subscribe to updates before and after processing instances of
225    * {@link EventHandler.EventType}.  Currently only one listener per
226    * event type.
227    * @param type Type of event we're registering listener for
228    * @param listener The listener to run.
229    */
230   public void registerListener(final EventHandler.EventType type,
231       final EventHandlerListener listener) {
232     this.eventHandlerListeners.put(type, listener);
233   }
234 
235   /**
236    * Stop receiving updates before and after processing instances of
237    * {@link EventHandler.EventType}
238    * @param type Type of event we're registering listener for
239    * @return The listener we removed or null if we did not remove it.
240    */
241   public EventHandlerListener unregisterListener(final EventHandler.EventType type) {
242     return this.eventHandlerListeners.remove(type);
243   }
244 
245   /**
246    * Executor instance.
247    */
248   static class Executor {
249     // how long to retain excess threads
250     final long keepAliveTimeInMillis = 1000;
251     // the thread pool executor that services the requests
252     final ThreadPoolExecutor threadPoolExecutor;
253     // work queue to use - unbounded queue
254     final BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
255     private final String name;
256     private final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners;
257 
258     protected Executor(String name, int maxThreads,
259         final Map<EventHandler.EventType, EventHandlerListener> eventHandlerListeners) {
260       this.name = name;
261       this.eventHandlerListeners = eventHandlerListeners;
262       // create the thread pool executor
263       this.threadPoolExecutor = new ThreadPoolExecutor(maxThreads, maxThreads,
264           keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
265       // name the threads for this threadpool
266       ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
267       tfb.setNameFormat(this.name + "-%d");
268       this.threadPoolExecutor.setThreadFactory(tfb.build());
269     }
270 
271     /**
272      * Submit the event to the queue for handling.
273      * @param event
274      */
275     void submit(final EventHandler event) {
276       // If there is a listener for this type, make sure we call the before
277       // and after process methods.
278       EventHandlerListener listener =
279         this.eventHandlerListeners.get(event.getEventType());
280       if (listener != null) {
281         event.setListener(listener);
282       }
283       this.threadPoolExecutor.execute(event);
284     }
285   }
286 }