1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.ArrayList;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.Put;
31
32 import org.apache.hadoop.hbase.regionserver.HRegionServer;
33 import org.apache.hadoop.hbase.regionserver.HRegion;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.JVMClusterUtil;
36 import org.apache.hadoop.hbase.util.Threads;
37
38
39
40
41 public class TestRegionRebalancing extends HBaseClusterTestCase {
42 final Log LOG = LogFactory.getLog(this.getClass().getName());
43 HTable table;
44
45 HTableDescriptor desc;
46
47 final byte[] FIVE_HUNDRED_KBYTES;
48
49 final byte [] FAMILY_NAME = Bytes.toBytes("col");
50
51
52 public TestRegionRebalancing() {
53 super(1);
54 FIVE_HUNDRED_KBYTES = new byte[500 * 1024];
55 for (int i = 0; i < 500 * 1024; i++) {
56 FIVE_HUNDRED_KBYTES[i] = 'x';
57 }
58
59 desc = new HTableDescriptor("test");
60 desc.addFamily(new HColumnDescriptor(FAMILY_NAME));
61 }
62
63
64
65
66 @Override
67 public void preHBaseClusterSetup() throws IOException {
68
69 List<byte []> startKeys = new ArrayList<byte []>();
70 startKeys.add(null);
71 for (int i = 10; i < 29; i++) {
72 startKeys.add(Bytes.toBytes("row_" + i));
73 }
74 startKeys.add(null);
75 LOG.info(startKeys.size() + " start keys generated");
76
77 List<HRegion> regions = new ArrayList<HRegion>();
78 for (int i = 0; i < 20; i++) {
79 regions.add(createAregion(startKeys.get(i), startKeys.get(i+1)));
80 }
81
82
83
84
85 createRootAndMetaRegions();
86 for (HRegion region : regions) {
87 HRegion.addRegionToMETA(meta, region);
88 }
89 closeRootAndMeta();
90 }
91
92
93
94
95
96
97 public void testRebalancing() throws IOException {
98 table = new HTable(conf, "test");
99 assertEquals("Test table should have 20 regions",
100 20, table.getStartKeys().length);
101
102
103 assertRegionsAreBalanced();
104
105 LOG.debug("Adding 2nd region server.");
106
107 LOG.info("Started=" +
108 cluster.startRegionServer().getRegionServer().getServerName());
109 cluster.getMaster().balance();
110 assertRegionsAreBalanced();
111
112
113 LOG.debug("Adding 3rd region server.");
114 LOG.info("Started=" +
115 cluster.startRegionServer().getRegionServer().getServerName());
116 cluster.getMaster().balance();
117 assertRegionsAreBalanced();
118
119
120 LOG.debug("Killing the 3rd region server.");
121 LOG.info("Stopped=" + cluster.stopRegionServer(2, false));
122 cluster.waitOnRegionServer(2);
123 cluster.getMaster().balance();
124 assertRegionsAreBalanced();
125
126
127 LOG.debug("Adding 3rd region server");
128 LOG.info("Started=" +
129 cluster.startRegionServer().getRegionServer().getServerName());
130 LOG.debug("Adding 4th region server");
131 LOG.info("Started=" +
132 cluster.startRegionServer().getRegionServer().getServerName());
133 cluster.getMaster().balance();
134 assertRegionsAreBalanced();
135
136 for (int i = 0; i < 6; i++){
137 LOG.debug("Adding " + (i + 5) + "th region server");
138 cluster.startRegionServer();
139 }
140 cluster.getMaster().balance();
141 assertRegionsAreBalanced();
142 }
143
144
145 private int getRegionCount() {
146 int total = 0;
147 for (HRegionServer server : getOnlineRegionServers()) {
148 total += server.getOnlineRegions().size();
149 }
150 return total;
151 }
152
153
154
155
156
157
158 private void assertRegionsAreBalanced() {
159
160
161 boolean success = false;
162 float slop = (float)0.1;
163 if (slop <= 0) slop = 1;
164
165 for (int i = 0; i < 5; i++) {
166 success = true;
167
168 waitForAllRegionsAssigned();
169
170 int regionCount = getRegionCount();
171 List<HRegionServer> servers = getOnlineRegionServers();
172 double avg = cluster.getMaster().getServerManager().getAverageLoad();
173 int avgLoadPlusSlop = (int)Math.ceil(avg * (1 + slop));
174 int avgLoadMinusSlop = (int)Math.floor(avg * (1 - slop)) - 1;
175 LOG.debug("There are " + servers.size() + " servers and " + regionCount
176 + " regions. Load Average: " + avg + " low border: " + avgLoadMinusSlop
177 + ", up border: " + avgLoadPlusSlop + "; attempt: " + i);
178
179 for (HRegionServer server : servers) {
180 int serverLoad = server.getOnlineRegions().size();
181 LOG.debug(server.getServerName() + " Avg: " + avg + " actual: " + serverLoad);
182 if (!(avg > 2.0 && serverLoad <= avgLoadPlusSlop
183 && serverLoad >= avgLoadMinusSlop)) {
184 LOG.debug(server.getServerName() + " Isn't balanced!!! Avg: " + avg +
185 " actual: " + serverLoad + " slop: " + slop);
186 success = false;
187 }
188 }
189
190 if (!success) {
191
192
193 try {
194 Thread.sleep(10000);
195 } catch (InterruptedException e) {}
196
197 cluster.getMaster().balance();
198 continue;
199 }
200
201
202 return;
203 }
204
205
206 fail("After 5 attempts, region assignments were not balanced.");
207 }
208
209 private List<HRegionServer> getOnlineRegionServers() {
210 List<HRegionServer> list = new ArrayList<HRegionServer>();
211 for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
212 if (rst.getRegionServer().isOnline()) {
213 list.add(rst.getRegionServer());
214 }
215 }
216 return list;
217 }
218
219
220
221
222 private void waitForAllRegionsAssigned() {
223 while (getRegionCount() < 22) {
224
225 LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now.");
226 try {
227 Thread.sleep(1000);
228 } catch (InterruptedException e) {}
229 }
230 }
231
232
233
234
235
236 private HRegion createAregion(byte [] startKey, byte [] endKey)
237 throws IOException {
238 HRegion region = createNewHRegion(desc, startKey, endKey);
239 byte [] keyToWrite = startKey == null ? Bytes.toBytes("row_000") : startKey;
240 Put put = new Put(keyToWrite);
241 put.add(FAMILY_NAME, null, Bytes.toBytes("test"));
242 region.put(put);
243 region.close();
244 region.getLog().closeAndDelete();
245 return region;
246 }
247 }