1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.util.LinkedList;
23 import java.util.Queue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.util.Bytes;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class HTablePool {
45 private final ConcurrentMap<String, LinkedList<HTableInterface>> tables =
46 new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
47 private final Configuration config;
48 private final int maxSize;
49 private final HTableInterfaceFactory tableFactory;
50
51
52
53
54 public HTablePool() {
55 this(HBaseConfiguration.create(), Integer.MAX_VALUE);
56 }
57
58
59
60
61
62
63 public HTablePool(final Configuration config, final int maxSize) {
64 this(config, maxSize, null);
65 }
66
67 public HTablePool(final Configuration config, final int maxSize,
68 final HTableInterfaceFactory tableFactory) {
69
70
71 this.config = config == null? new Configuration(): new Configuration(config);
72 this.maxSize = maxSize;
73 this.tableFactory = tableFactory == null? new HTableFactory(): tableFactory;
74 }
75
76
77
78
79
80
81
82
83
84 public HTableInterface getTable(String tableName) {
85 LinkedList<HTableInterface> queue = tables.get(tableName);
86 if(queue == null) {
87 queue = new LinkedList<HTableInterface>();
88 tables.putIfAbsent(tableName, queue);
89 return createHTable(tableName);
90 }
91 HTableInterface table;
92 synchronized(queue) {
93 table = queue.poll();
94 }
95 if(table == null) {
96 return createHTable(tableName);
97 }
98 return table;
99 }
100
101
102
103
104
105
106
107
108
109 public HTableInterface getTable(byte [] tableName) {
110 return getTable(Bytes.toString(tableName));
111 }
112
113
114
115
116
117
118
119
120 public void putTable(HTableInterface table) {
121 LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName()));
122 synchronized(queue) {
123 if(queue.size() >= maxSize) {
124
125 this.tableFactory.releaseHTableInterface(table);
126 return;
127 }
128 queue.add(table);
129 }
130 }
131
132 protected HTableInterface createHTable(String tableName) {
133 return this.tableFactory.createHTableInterface(config, Bytes.toBytes(tableName));
134 }
135
136
137
138
139
140
141
142
143
144
145 public void closeTablePool(final String tableName) {
146 Queue<HTableInterface> queue = tables.get(tableName);
147 synchronized (queue) {
148 HTableInterface table = queue.poll();
149 while (table != null) {
150 this.tableFactory.releaseHTableInterface(table);
151 table = queue.poll();
152 }
153 }
154 HConnectionManager.deleteConnection(this.config, true);
155 }
156
157
158
159
160
161
162 public void closeTablePool(final byte[] tableName) {
163 closeTablePool(Bytes.toString(tableName));
164 }
165
166 int getCurrentPoolSize(String tableName) {
167 Queue<HTableInterface> queue = tables.get(tableName);
168 synchronized(queue) {
169 return queue.size();
170 }
171 }
172 }