1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableSet;
29 import java.util.Random;
30 import java.util.TreeMap;
31 import java.util.TreeSet;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.BlockLocation;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HServerAddress;
42 import org.apache.hadoop.hbase.HServerInfo;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class LoadBalancer {
61 private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
62 private static final Random RANDOM = new Random(System.currentTimeMillis());
63
64 private float slop;
65
66 LoadBalancer(Configuration conf) {
67 this.slop = conf.getFloat("hbase.regions.slop", (float) 0.0);
68 if (slop < 0) slop = 0;
69 else if (slop > 1) slop = 1;
70 }
71
72 static class RegionPlanComparator implements Comparator<RegionPlan> {
73 @Override
74 public int compare(RegionPlan l, RegionPlan r) {
75 long diff = r.getRegionInfo().getRegionId() - l.getRegionInfo().getRegionId();
76 if (diff < 0) return -1;
77 if (diff > 0) return 1;
78 return 0;
79 }
80 }
81 static RegionPlanComparator rpComparator = new RegionPlanComparator();
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 public List<RegionPlan> balanceCluster(
154 Map<HServerInfo,List<HRegionInfo>> clusterState) {
155 long startTime = System.currentTimeMillis();
156
157
158 TreeMap<HServerInfo,List<HRegionInfo>> serversByLoad =
159 new TreeMap<HServerInfo,List<HRegionInfo>>(
160 new HServerInfo.LoadComparator());
161 int numServers = clusterState.size();
162 if (numServers == 0) {
163 LOG.debug("numServers=0 so skipping load balancing");
164 return null;
165 }
166 int numRegions = 0;
167 StringBuilder strBalanceParam = new StringBuilder("Server information: ");
168
169
170 for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
171 clusterState.entrySet()) {
172 server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
173 numRegions += server.getKey().getLoad().getNumberOfRegions();
174 serversByLoad.put(server.getKey(), server.getValue());
175
176 strBalanceParam.append(server.getKey().getServerName()).append("=")
177 .append(server.getValue().size()).append(", ");
178 }
179 strBalanceParam.delete(strBalanceParam.length() - 2,
180 strBalanceParam.length());
181 LOG.debug(strBalanceParam.toString());
182
183
184 float average = (float)numRegions / numServers;
185
186 int floor = (int) Math.floor(average * (1 - slop));
187 int ceiling = (int) Math.ceil(average * (1 + slop));
188 if(serversByLoad.lastKey().getLoad().getNumberOfRegions() <= ceiling &&
189 serversByLoad.firstKey().getLoad().getNumberOfRegions() >= floor) {
190
191 LOG.info("Skipping load balancing. servers=" + numServers + " " +
192 "regions=" + numRegions + " average=" + average + " " +
193 "mostloaded=" + serversByLoad.lastKey().getLoad().getNumberOfRegions() +
194 " leastloaded=" + serversByLoad.firstKey().getLoad().getNumberOfRegions());
195 return null;
196 }
197 int min = numRegions / numServers;
198 int max = numRegions % numServers == 0 ? min : min + 1;
199
200
201 strBalanceParam.delete(0, strBalanceParam.length());
202 strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
203 .append(", numServers=").append(numServers).append(", max=").append(max)
204 .append(", min=").append(min);
205 LOG.debug(strBalanceParam.toString());
206
207
208
209 List<RegionPlan> regionsToMove = new ArrayList<RegionPlan>();
210 int regionidx = 0;
211
212
213 int serversOverloaded = 0;
214 Map<HServerInfo,BalanceInfo> serverBalanceInfo =
215 new TreeMap<HServerInfo,BalanceInfo>();
216 for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
217 serversByLoad.descendingMap().entrySet()) {
218 HServerInfo serverInfo = server.getKey();
219 int regionCount = serverInfo.getLoad().getNumberOfRegions();
220 if(regionCount <= max) {
221 serverBalanceInfo.put(serverInfo, new BalanceInfo(0, 0));
222 break;
223 }
224 serversOverloaded++;
225 List<HRegionInfo> regions = randomize(server.getValue());
226 int numToOffload = Math.min(regionCount - max, regions.size());
227 int numTaken = 0;
228
229 for (HRegionInfo hri: regions) {
230
231 if (hri.isMetaRegion()) continue;
232 regionsToMove.add(new RegionPlan(hri, serverInfo, null));
233 numTaken++;
234 if (numTaken >= numToOffload) break;
235 }
236 serverBalanceInfo.put(serverInfo,
237 new BalanceInfo(numToOffload, (-1)*numTaken));
238 }
239
240
241 int serversUnderloaded = 0;
242 int neededRegions = 0;
243 for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
244 serversByLoad.entrySet()) {
245 int regionCount = server.getKey().getLoad().getNumberOfRegions();
246 if(regionCount >= min) {
247 break;
248 }
249 serversUnderloaded++;
250 int numToTake = min - regionCount;
251 int numTaken = 0;
252 while(numTaken < numToTake && regionidx < regionsToMove.size()) {
253 regionsToMove.get(regionidx).setDestination(server.getKey());
254 numTaken++;
255 regionidx++;
256 }
257 serverBalanceInfo.put(server.getKey(), new BalanceInfo(0, numTaken));
258
259 if(numTaken < numToTake) {
260 neededRegions += (numToTake - numTaken);
261 }
262 }
263
264
265
266 if(neededRegions == 0 && regionidx == regionsToMove.size()) {
267 long endTime = System.currentTimeMillis();
268 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
269 "Moving " + regionsToMove.size() + " regions off of " +
270 serversOverloaded + " overloaded servers onto " +
271 serversUnderloaded + " less loaded servers");
272 return regionsToMove;
273 }
274
275
276
277
278
279 if (neededRegions != 0) {
280
281 for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
282 serversByLoad.descendingMap().entrySet()) {
283 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
284 int idx =
285 balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
286 if (idx >= server.getValue().size()) break;
287 HRegionInfo region = server.getValue().get(idx);
288 if (region.isMetaRegion()) continue;
289 regionsToMove.add(new RegionPlan(region, server.getKey(), null));
290 if(--neededRegions == 0) {
291
292 break;
293 }
294 }
295 }
296
297
298
299
300
301 for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
302 serversByLoad.entrySet()) {
303 int regionCount = server.getKey().getLoad().getNumberOfRegions();
304 if (regionCount >= min) break;
305 BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey());
306 if(balanceInfo != null) {
307 regionCount += balanceInfo.getNumRegionsAdded();
308 }
309 if(regionCount >= min) {
310 continue;
311 }
312 int numToTake = min - regionCount;
313 int numTaken = 0;
314 while(numTaken < numToTake && regionidx < regionsToMove.size()) {
315 regionsToMove.get(regionidx).setDestination(server.getKey());
316 numTaken++;
317 regionidx++;
318 }
319 }
320
321
322 if(regionidx != regionsToMove.size()) {
323 for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
324 serversByLoad.entrySet()) {
325 int regionCount = server.getKey().getLoad().getNumberOfRegions();
326 if(regionCount >= max) {
327 break;
328 }
329 regionsToMove.get(regionidx).setDestination(server.getKey());
330 regionidx++;
331 if(regionidx == regionsToMove.size()) {
332 break;
333 }
334 }
335 }
336
337 long endTime = System.currentTimeMillis();
338
339 if (regionidx != regionsToMove.size() || neededRegions != 0) {
340
341 LOG.warn("regionidx=" + regionidx + ", regionsToMove=" + regionsToMove.size() +
342 ", numServers=" + numServers + ", serversOverloaded=" + serversOverloaded +
343 ", serversUnderloaded=" + serversUnderloaded);
344 StringBuilder sb = new StringBuilder();
345 for (Map.Entry<HServerInfo, List<HRegionInfo>> e: clusterState.entrySet()) {
346 if (sb.length() > 0) sb.append(", ");
347 sb.append(e.getKey().getServerName());
348 sb.append(" ");
349 sb.append(e.getValue().size());
350 }
351 LOG.warn("Input " + sb.toString());
352 }
353
354
355 LOG.info("Calculated a load balance in " + (endTime-startTime) + "ms. " +
356 "Moving " + regionsToMove.size() + " regions off of " +
357 serversOverloaded + " overloaded servers onto " +
358 serversUnderloaded + " less loaded servers");
359
360 return regionsToMove;
361 }
362
363
364
365
366
367 static List<HRegionInfo> randomize(final List<HRegionInfo> regions) {
368 Collections.shuffle(regions, RANDOM);
369 return regions;
370 }
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385 private static class BalanceInfo {
386
387 private final int nextRegionForUnload;
388 private final int numRegionsAdded;
389
390 public BalanceInfo(int nextRegionForUnload, int numRegionsAdded) {
391 this.nextRegionForUnload = nextRegionForUnload;
392 this.numRegionsAdded = numRegionsAdded;
393 }
394
395 public int getNextRegionForUnload() {
396 return nextRegionForUnload;
397 }
398
399 public int getNumRegionsAdded() {
400 return numRegionsAdded;
401 }
402 }
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421 public static Map<HServerInfo, List<HRegionInfo>> roundRobinAssignment(
422 List<HRegionInfo> regions, List<HServerInfo> servers) {
423 if(regions.size() == 0 || servers.size() == 0) {
424 return null;
425 }
426 Map<HServerInfo,List<HRegionInfo>> assignments =
427 new TreeMap<HServerInfo,List<HRegionInfo>>();
428 int numRegions = regions.size();
429 int numServers = servers.size();
430 int max = (int)Math.ceil((float)numRegions/numServers);
431 int serverIdx = 0;
432 if (numServers > 1) {
433 serverIdx = RANDOM.nextInt(numServers);
434 }
435 int regionIdx = 0;
436 for (int j = 0; j < numServers; j++) {
437 HServerInfo server = servers.get((j+serverIdx) % numServers);
438 List<HRegionInfo> serverRegions = new ArrayList<HRegionInfo>(max);
439 for (int i=regionIdx; i<numRegions; i += numServers) {
440 serverRegions.add(regions.get(i % numRegions));
441 }
442 assignments.put(server, serverRegions);
443 regionIdx++;
444 }
445 return assignments;
446 }
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464 public static Map<HServerInfo, List<HRegionInfo>> retainAssignment(
465 Map<HRegionInfo, HServerAddress> regions, List<HServerInfo> servers) {
466 Map<HServerInfo, List<HRegionInfo>> assignments =
467 new TreeMap<HServerInfo, List<HRegionInfo>>();
468
469 Map<HServerAddress, HServerInfo> serverMap =
470 new TreeMap<HServerAddress, HServerInfo>();
471 for (HServerInfo server : servers) {
472 serverMap.put(server.getServerAddress(), server);
473 assignments.put(server, new ArrayList<HRegionInfo>());
474 }
475 for (Map.Entry<HRegionInfo, HServerAddress> region : regions.entrySet()) {
476 HServerAddress hsa = region.getValue();
477 HServerInfo server = hsa == null? null: serverMap.get(hsa);
478 if (server != null) {
479 assignments.get(server).add(region.getKey());
480 } else {
481 assignments.get(servers.get(RANDOM.nextInt(assignments.size()))).add(
482 region.getKey());
483 }
484 }
485 return assignments;
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506 @SuppressWarnings("unused")
507 private List<String> getTopBlockLocations(FileSystem fs, HRegionInfo region)
508 throws IOException {
509 String encodedName = region.getEncodedName();
510 Path path = new Path("/hbase/table/" + encodedName);
511 FileStatus status = fs.getFileStatus(path);
512 BlockLocation [] blockLocations =
513 fs.getFileBlockLocations(status, 0, status.getLen());
514 Map<HostAndWeight,HostAndWeight> hostWeights =
515 new TreeMap<HostAndWeight,HostAndWeight>(new HostAndWeight.HostComparator());
516 for(BlockLocation bl : blockLocations) {
517 String [] hosts = bl.getHosts();
518 long len = bl.getLength();
519 for(String host : hosts) {
520 HostAndWeight haw = hostWeights.get(host);
521 if(haw == null) {
522 haw = new HostAndWeight(host, len);
523 hostWeights.put(haw, haw);
524 } else {
525 haw.addWeight(len);
526 }
527 }
528 }
529 NavigableSet<HostAndWeight> orderedHosts = new TreeSet<HostAndWeight>(
530 new HostAndWeight.WeightComparator());
531 orderedHosts.addAll(hostWeights.values());
532 List<String> topHosts = new ArrayList<String>(orderedHosts.size());
533 for(HostAndWeight haw : orderedHosts.descendingSet()) {
534 topHosts.add(haw.getHost());
535 }
536 return topHosts;
537 }
538
539
540
541
542
543
544
545
546
547
548
549 private static class HostAndWeight {
550
551 private final String host;
552 private long weight;
553
554 public HostAndWeight(String host, long weight) {
555 this.host = host;
556 this.weight = weight;
557 }
558
559 public void addWeight(long weight) {
560 this.weight += weight;
561 }
562
563 public String getHost() {
564 return host;
565 }
566
567 public long getWeight() {
568 return weight;
569 }
570
571 private static class HostComparator implements Comparator<HostAndWeight> {
572 @Override
573 public int compare(HostAndWeight l, HostAndWeight r) {
574 return l.getHost().compareTo(r.getHost());
575 }
576 }
577
578 private static class WeightComparator implements Comparator<HostAndWeight> {
579 @Override
580 public int compare(HostAndWeight l, HostAndWeight r) {
581 if(l.getWeight() == r.getWeight()) {
582 return l.getHost().compareTo(r.getHost());
583 }
584 return l.getWeight() < r.getWeight() ? -1 : 1;
585 }
586 }
587 }
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607 public static Map<HRegionInfo,HServerInfo> immediateAssignment(
608 List<HRegionInfo> regions, List<HServerInfo> servers) {
609 Map<HRegionInfo,HServerInfo> assignments =
610 new TreeMap<HRegionInfo,HServerInfo>();
611 for(HRegionInfo region : regions) {
612 assignments.put(region, servers.get(RANDOM.nextInt(servers.size())));
613 }
614 return assignments;
615 }
616
617 public static HServerInfo randomAssignment(List<HServerInfo> servers) {
618 if (servers == null || servers.isEmpty()) {
619 LOG.warn("Wanted to do random assignment but no servers to assign to");
620 return null;
621 }
622 return servers.get(RANDOM.nextInt(servers.size()));
623 }
624
625
626
627
628
629
630
631
632
633
634
635 public static class RegionPlan implements Comparable<RegionPlan> {
636 private final HRegionInfo hri;
637 private final HServerInfo source;
638 private HServerInfo dest;
639
640
641
642
643
644
645
646
647
648
649
650
651 public RegionPlan(final HRegionInfo hri, HServerInfo source, HServerInfo dest) {
652 this.hri = hri;
653 this.source = source;
654 this.dest = dest;
655 }
656
657
658
659
660 public void setDestination(HServerInfo dest) {
661 this.dest = dest;
662 }
663
664
665
666
667
668 public HServerInfo getSource() {
669 return source;
670 }
671
672
673
674
675
676 public HServerInfo getDestination() {
677 return dest;
678 }
679
680
681
682
683
684 public String getRegionName() {
685 return this.hri.getEncodedName();
686 }
687
688 public HRegionInfo getRegionInfo() {
689 return this.hri;
690 }
691
692
693
694
695
696 @Override
697 public int compareTo(RegionPlan o) {
698 return getRegionName().compareTo(o.getRegionName());
699 }
700
701 @Override
702 public String toString() {
703 return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
704 (this.source == null? "": this.source.getServerName()) +
705 ", dest=" + (this.dest == null? "": this.dest.getServerName());
706 }
707 }
708 }