View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import java.util.LinkedList;
23  import java.util.Queue;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.util.Bytes;
30  
31  /**
32   * A simple pool of HTable instances.<p>
33   *
34   * Each HTablePool acts as a pool for all tables.  To use, instantiate an
35   * HTablePool and use {@link #getTable(String)} to get an HTable from the pool.
36   * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}.
37   * 
38   * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable
39   * references that will ever be retained for each table.  Otherwise the default
40   * is {@link Integer#MAX_VALUE}.
41   *
42   * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}.
43   */
44  public class HTablePool {
45    private final ConcurrentMap<String, LinkedList<HTableInterface>> tables =
46      new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
47    private final Configuration config;
48    private final int maxSize;
49    private final HTableInterfaceFactory tableFactory;
50  
51    /**
52     * Default Constructor.  Default HBaseConfiguration and no limit on pool size.
53     */
54    public HTablePool() {
55      this(HBaseConfiguration.create(), Integer.MAX_VALUE);
56    }
57  
58    /**
59     * Constructor to set maximum versions and use the specified configuration.
60     * @param config configuration
61     * @param maxSize maximum number of references to keep for each table
62     */
63    public HTablePool(final Configuration config, final int maxSize) {
64      this(config, maxSize, null);
65    }
66  
67    public HTablePool(final Configuration config, final int maxSize,
68        final HTableInterfaceFactory tableFactory) {
69      // Make a new configuration instance so I can safely cleanup when
70      // done with the pool.
71      this.config = config == null? new Configuration(): new Configuration(config);
72      this.maxSize = maxSize;
73      this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
74    }
75  
76    /**
77     * Get a reference to the specified table from the pool.<p>
78     *
79     * Create a new one if one is not available.
80     * @param tableName table name
81     * @return a reference to the specified table
82     * @throws RuntimeException if there is a problem instantiating the HTable
83     */
84    public HTableInterface getTable(String tableName) {
85      LinkedList<HTableInterface> queue = tables.get(tableName);
86      if(queue == null) {
87        queue = new LinkedList<HTableInterface>();
88        tables.putIfAbsent(tableName, queue);
89        return createHTable(tableName);
90      }
91      HTableInterface table;
92      synchronized(queue) {
93        table = queue.poll();
94      }
95      if(table == null) {
96        return createHTable(tableName);
97      }
98      return table;
99    }
100 
101   /**
102    * Get a reference to the specified table from the pool.<p>
103    *
104    * Create a new one if one is not available.
105    * @param tableName table name
106    * @return a reference to the specified table
107    * @throws RuntimeException if there is a problem instantiating the HTable
108    */
109   public HTableInterface getTable(byte [] tableName) {
110     return getTable(Bytes.toString(tableName));
111   }
112 
113   /**
114    * Puts the specified HTable back into the pool.<p>
115    *
116    * If the pool already contains <i>maxSize</i> references to the table,
117    * then the table instance gets closed after flushing buffered edits.
118    * @param table table
119    */
120   public void putTable(HTableInterface table) {
121     LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
122     synchronized(queue) {
123       if(queue.size() >= maxSize) {
124         // release table instance since we're not reusing it
125         this.tableFactory.releaseHTableInterface(table);
126         return;
127       }
128       queue.add(table);
129     }
130   }
131 
132   protected HTableInterface createHTable(String tableName) {
133     return this.tableFactory.createHTableInterface(config, Bytes.toBytes(tableName));
134   }
135 
136   /**
137    * Closes all the HTable instances , belonging to the given table, in the table pool.
138    * <p>
139    * Note: this is a 'shutdown' of the given table pool and different from
140    * {@link #putTable(HTableInterface)}, that is used to return the table
141    * instance to the pool for future re-use.
142    *
143    * @param tableName
144    */
145   public void closeTablePool(final String tableName)  {
146     Queue<HTableInterface> queue = tables.get(tableName);
147     synchronized (queue) {
148       HTableInterface table = queue.poll();
149       while (table != null) {
150         this.tableFactory.releaseHTableInterface(table);
151         table = queue.poll();
152       }
153     }
154     HConnectionManager.deleteConnection(this.config, true);
155   }
156 
157   /**
158    * See {@link #closeTablePool(String)}.
159    *
160    * @param tableName
161    */
162   public void closeTablePool(final byte[] tableName)  {
163     closeTablePool(Bytes.toString(tableName));
164   }
165 
166   int getCurrentPoolSize(String tableName) {
167     Queue<HTableInterface> queue = tables.get(tableName);
168     synchronized(queue) {
169       return queue.size();
170     }
171   }
172 }