1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import static org.junit.Assert.*;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.SortedSet;
35  import java.util.TreeMap;
36  import java.util.TreeSet;
37  
38  import junit.framework.Assert;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HServerAddress;
46  import org.apache.hadoop.hbase.HServerInfo;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.junit.BeforeClass;
51  import org.junit.Test;
52  
53  public class TestLoadBalancer {
54    private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class);
55  
56    private static LoadBalancer loadBalancer;
57  
58    private static Random rand;
59  
60    @BeforeClass
61    public static void beforeAllTests() throws Exception {
62      Configuration conf = HBaseConfiguration.create();
63      conf.set("hbase.regions.slop", "0");
64      loadBalancer = new LoadBalancer(conf);
65      rand = new Random();
66    }
67  
68    // int[testnum][servernumber] -> numregions
69    int [][] clusterStateMocks = new int [][] {
70        // 1 node
71        new int [] { 0 },
72        new int [] { 1 },
73        new int [] { 10 },
74        // 2 node
75        new int [] { 0, 0 },
76        new int [] { 2, 0 },
77        new int [] { 2, 1 },
78        new int [] { 2, 2 },
79        new int [] { 2, 3 },
80        new int [] { 2, 4 },
81        new int [] { 1, 1 },
82        new int [] { 0, 1 },
83        new int [] { 10, 1 },
84        new int [] { 14, 1432 },
85        new int [] { 47, 53 },
86        // 3 node
87        new int [] { 0, 1, 2 },
88        new int [] { 1, 2, 3 },
89        new int [] { 0, 2, 2 },
90        new int [] { 0, 3, 0 },
91        new int [] { 0, 4, 0 },
92        new int [] { 20, 20, 0 },
93        // 4 node
94        new int [] { 0, 1, 2, 3 },
95        new int [] { 4, 0, 0, 0 },
96        new int [] { 5, 0, 0, 0 },
97        new int [] { 6, 6, 0, 0 },
98        new int [] { 6, 2, 0, 0 },
99        new int [] { 6, 1, 0, 0 },
100       new int [] { 6, 0, 0, 0 },
101       new int [] { 4, 4, 4, 7 },
102       new int [] { 4, 4, 4, 8 },
103       new int [] { 0, 0, 0, 7 },
104       // 5 node
105       new int [] { 1, 1, 1, 1, 4 },
106       // more nodes
107       new int [] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 },
108       new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 },
109       new int [] { 6, 6, 5, 6, 6, 6, 6, 6, 6, 1 },
110       new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 54 },
111       new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 55 },
112       new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 56 },
113       new int [] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 16 },
114       new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 8 },
115       new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 9 },
116       new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 10 },
117       new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 123 },
118       new int [] { 1, 1, 1, 1, 1, 1, 1, 1, 1, 155 },
119       new int [] { 0, 0, 144, 1, 1, 1, 1, 1123, 133, 138, 12, 1444 },
120       new int [] { 0, 0, 144, 1, 0, 4, 1, 1123, 133, 138, 12, 1444 },
121       new int [] { 1538, 1392, 1561, 1557, 1535, 1553, 1385, 1542, 1619 }
122   };
123 
124   int [][] regionsAndServersMocks = new int [][] {
125       // { num regions, num servers }
126       new int [] { 0, 0 },
127       new int [] { 0, 1 },
128       new int [] { 1, 1 },
129       new int [] { 2, 1 },
130       new int [] { 10, 1 },
131       new int [] { 1, 2 },
132       new int [] { 2, 2 },
133       new int [] { 3, 2 },
134       new int [] { 1, 3 },
135       new int [] { 2, 3 },
136       new int [] { 3, 3 },
137       new int [] { 25, 3 },
138       new int [] { 2, 10 },
139       new int [] { 2, 100 },
140       new int [] { 12, 10 },
141       new int [] { 12, 100 },
142   };
143 
144   @Test
145   public void testRandomizer() {
146     for(int [] mockCluster : clusterStateMocks) {
147       if (mockCluster.length < 5) continue;
148       Map<HServerInfo, List<HRegionInfo>> servers =
149         mockClusterServers(mockCluster);
150       for (Map.Entry<HServerInfo, List<HRegionInfo>> e: servers.entrySet()) {
151         List<HRegionInfo> original = e.getValue();
152         if (original.size() < 5) continue;
153         // Try ten times in case random chances upon original order more than
154         // one or two times in a row.
155         boolean same = true;
156         for (int i = 0; i < 10 && same; i++) {
157           List<HRegionInfo> copy = new ArrayList<HRegionInfo>(original);
158           System.out.println("Randomizing before " + copy.size());
159           for (HRegionInfo hri: copy) {
160             System.out.println(hri.getEncodedName());
161           }
162           List<HRegionInfo> randomized = LoadBalancer.randomize(copy);
163           System.out.println("Randomizing after " + randomized.size());
164           for (HRegionInfo hri: randomized) {
165             System.out.println(hri.getEncodedName());
166           }
167           if (original.equals(randomized)) continue;
168           same = false;
169           break;
170         }
171         assertFalse(same);
172       }
173     }
174   }
175 
176   /**
177    * Test the load balancing algorithm.
178    *
179    * Invariant is that all servers should be hosting either
180    * floor(average) or ceiling(average)
181    *
182    * @throws Exception
183    */
184   @Test
185   public void testBalanceCluster() throws Exception {
186 
187     for(int [] mockCluster : clusterStateMocks) {
188       Map<HServerInfo,List<HRegionInfo>> servers = mockClusterServers(mockCluster);
189       LOG.info("Mock Cluster : " + printMock(servers) + " " + printStats(servers));
190       List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
191       List<HServerInfo> balancedCluster = reconcile(servers, plans);
192       LOG.info("Mock Balance : " + printMock(balancedCluster));
193       assertClusterAsBalanced(balancedCluster);
194       for(Map.Entry<HServerInfo, List<HRegionInfo>> entry : servers.entrySet()) {
195         returnRegions(entry.getValue());
196         returnServer(entry.getKey());
197       }
198     }
199 
200   }
201 
202   /**
203    * Invariant is that all servers have between floor(avg) and ceiling(avg)
204    * number of regions.
205    */
206   public void assertClusterAsBalanced(List<HServerInfo> servers) {
207     int numServers = servers.size();
208     int numRegions = 0;
209     int maxRegions = 0;
210     int minRegions = Integer.MAX_VALUE;
211     for(HServerInfo server : servers) {
212       int nr = server.getLoad().getNumberOfRegions();
213       if(nr > maxRegions) {
214         maxRegions = nr;
215       }
216       if(nr < minRegions) {
217         minRegions = nr;
218       }
219       numRegions += nr;
220     }
221     if(maxRegions - minRegions < 2) {
222       // less than 2 between max and min, can't balance
223       return;
224     }
225     int min = numRegions / numServers;
226     int max = numRegions % numServers == 0 ? min : min + 1;
227 
228     for(HServerInfo server : servers) {
229       assertTrue(server.getLoad().getNumberOfRegions() <= max);
230       assertTrue(server.getLoad().getNumberOfRegions() >= min);
231     }
232   }
233 
234   /**
235    * Tests immediate assignment.
236    *
237    * Invariant is that all regions have an assignment.
238    *
239    * @throws Exception
240    */
241   @Test
242   public void testImmediateAssignment() throws Exception {
243     for(int [] mock : regionsAndServersMocks) {
244       LOG.debug("testImmediateAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
245       List<HRegionInfo> regions = randomRegions(mock[0]);
246       List<HServerInfo> servers = randomServers(mock[1], 0);
247       Map<HRegionInfo,HServerInfo> assignments =
248         LoadBalancer.immediateAssignment(regions, servers);
249       assertImmediateAssignment(regions, servers, assignments);
250       returnRegions(regions);
251       returnServers(servers);
252     }
253   }
254 
255   /**
256    * All regions have an assignment.
257    * @param regions
258    * @param servers
259    * @param assignments
260    */
261   private void assertImmediateAssignment(List<HRegionInfo> regions,
262       List<HServerInfo> servers, Map<HRegionInfo,HServerInfo> assignments) {
263     for(HRegionInfo region : regions) {
264       assertTrue(assignments.containsKey(region));
265     }
266   }
267 
268   /**
269    * Tests the bulk assignment used during cluster startup.
270    *
271    * Round-robin.  Should yield a balanced cluster so same invariant as the load
272    * balancer holds, all servers holding either floor(avg) or ceiling(avg).
273    *
274    * @throws Exception
275    */
276   @Test
277   public void testBulkAssignment() throws Exception {
278     for(int [] mock : regionsAndServersMocks) {
279       LOG.debug("testBulkAssignment with " + mock[0] + " regions and " + mock[1] + " servers");
280       List<HRegionInfo> regions = randomRegions(mock[0]);
281       List<HServerInfo> servers = randomServers(mock[1], 0);
282       Map<HServerInfo,List<HRegionInfo>> assignments =
283         LoadBalancer.roundRobinAssignment(regions, servers);
284       float average = (float)regions.size()/servers.size();
285       int min = (int)Math.floor(average);
286       int max = (int)Math.ceil(average);
287       if(assignments != null && !assignments.isEmpty()) {
288         for(List<HRegionInfo> regionList : assignments.values()) {
289           assertTrue(regionList.size() == min || regionList.size() == max);
290         }
291       }
292       returnRegions(regions);
293       returnServers(servers);
294     }
295   }
296 
297   /**
298    * Test the cluster startup bulk assignment which attempts to retain
299    * assignment info.
300    * @throws Exception
301    */
302   @Test
303   public void testRetainAssignment() throws Exception {
304     // Test simple case where all same servers are there
305     List<HServerInfo> servers = randomServers(10, 10);
306     List<HRegionInfo> regions = randomRegions(100);
307     Map<HRegionInfo, HServerAddress> existing =
308       new TreeMap<HRegionInfo, HServerAddress>();
309     for (int i=0;i<regions.size();i++) {
310       existing.put(regions.get(i),
311           servers.get(i % servers.size()).getServerAddress());
312     }
313     Map<HServerInfo, List<HRegionInfo>> assignment =
314       LoadBalancer.retainAssignment(existing, servers);
315     assertRetainedAssignment(existing, servers, assignment);
316 
317     // Include two new servers that were not there before
318     List<HServerInfo> servers2 = new ArrayList<HServerInfo>(servers);
319     servers2.add(randomServer(10));
320     servers2.add(randomServer(10));
321     assignment = LoadBalancer.retainAssignment(existing, servers2);
322     assertRetainedAssignment(existing, servers2, assignment);
323 
324     // Remove two of the servers that were previously there
325     List<HServerInfo> servers3 = new ArrayList<HServerInfo>(servers);
326     servers3.remove(servers3.size()-1);
327     servers3.remove(servers3.size()-2);
328     assignment = LoadBalancer.retainAssignment(existing, servers3);
329     assertRetainedAssignment(existing, servers3, assignment);
330   }
331 
332   /**
333    * Asserts a valid retained assignment plan.
334    * <p>
335    * Must meet the following conditions:
336    * <ul>
337    *   <li>Every input region has an assignment, and to an online server
338    *   <li>If a region had an existing assignment to a server with the same
339    *       address a a currently online server, it will be assigned to it
340    * </ul>
341    * @param existing
342    * @param servers
343    * @param assignment
344    */
345   private void assertRetainedAssignment(
346       Map<HRegionInfo, HServerAddress> existing, List<HServerInfo> servers,
347       Map<HServerInfo, List<HRegionInfo>> assignment) {
348     // Verify condition 1, every region assigned, and to online server
349     Set<HServerInfo> onlineServerSet = new TreeSet<HServerInfo>(servers);
350     Set<HRegionInfo> assignedRegions = new TreeSet<HRegionInfo>();
351     for (Map.Entry<HServerInfo, List<HRegionInfo>> a : assignment.entrySet()) {
352       assertTrue("Region assigned to server that was not listed as online",
353           onlineServerSet.contains(a.getKey()));
354       for (HRegionInfo r : a.getValue()) assignedRegions.add(r);
355     }
356     assertEquals(existing.size(), assignedRegions.size());
357 
358     // Verify condition 2, if server had existing assignment, must have same
359     Set<HServerAddress> onlineAddresses = new TreeSet<HServerAddress>();
360     for (HServerInfo s : servers) onlineAddresses.add(s.getServerAddress());
361     for (Map.Entry<HServerInfo, List<HRegionInfo>> a : assignment.entrySet()) {
362       for (HRegionInfo r : a.getValue()) {
363         HServerAddress address = existing.get(r);
364         if (address != null && onlineAddresses.contains(address)) {
365           assertTrue(a.getKey().getServerAddress().equals(address));
366         }
367       }
368     }
369   }
370 
371   private String printStats(Map<HServerInfo, List<HRegionInfo>> servers) {
372     int numServers = servers.size();
373     int totalRegions = 0;
374     for(HServerInfo server : servers.keySet()) {
375       totalRegions += server.getLoad().getNumberOfRegions();
376     }
377     float average = (float)totalRegions / numServers;
378     int max = (int)Math.ceil(average);
379     int min = (int)Math.floor(average);
380     return "[srvr=" + numServers + " rgns=" + totalRegions + " avg=" + average + " max=" + max + " min=" + min + "]";
381   }
382 
383   private String printMock(Map<HServerInfo, List<HRegionInfo>> servers) {
384     return printMock(Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()])));
385   }
386 
387   private String printMock(List<HServerInfo> balancedCluster) {
388     SortedSet<HServerInfo> sorted = new TreeSet<HServerInfo>(balancedCluster);
389     HServerInfo [] arr = sorted.toArray(new HServerInfo[sorted.size()]);
390     StringBuilder sb = new StringBuilder(sorted.size() * 4 + 4);
391     sb.append("{ ");
392     for(int i=0;i<arr.length;i++) {
393       if(i != 0) {
394         sb.append(" , ");
395       }
396       sb.append(arr[i].getLoad().getNumberOfRegions());
397     }
398     sb.append(" }");
399     return sb.toString();
400   }
401 
402   /**
403    * This assumes the RegionPlan HSI instances are the same ones in the map, so
404    * actually no need to even pass in the map, but I think it's clearer.
405    * @param servers
406    * @param plans
407    * @return
408    */
409   private List<HServerInfo> reconcile(
410       Map<HServerInfo, List<HRegionInfo>> servers, List<RegionPlan> plans) {
411     if(plans != null) {
412       for(RegionPlan plan : plans) {
413         plan.getSource().getLoad().setNumberOfRegions(
414             plan.getSource().getLoad().getNumberOfRegions() - 1);
415         plan.getDestination().getLoad().setNumberOfRegions(
416             plan.getDestination().getLoad().getNumberOfRegions() + 1);
417       }
418     }
419     return Arrays.asList(servers.keySet().toArray(new HServerInfo[servers.size()]));
420   }
421 
422   private Map<HServerInfo, List<HRegionInfo>> mockClusterServers(
423       int [] mockCluster) {
424     int numServers = mockCluster.length;
425     Map<HServerInfo,List<HRegionInfo>> servers =
426       new TreeMap<HServerInfo,List<HRegionInfo>>();
427     for(int i=0;i<numServers;i++) {
428       int numRegions = mockCluster[i];
429       HServerInfo server = randomServer(numRegions);
430       List<HRegionInfo> regions = randomRegions(numRegions);
431       servers.put(server, regions);
432     }
433     return servers;
434   }
435 
436   private Queue<HRegionInfo> regionQueue = new LinkedList<HRegionInfo>();
437 
438   private List<HRegionInfo> randomRegions(int numRegions) {
439     List<HRegionInfo> regions = new ArrayList<HRegionInfo>(numRegions);
440     byte [] start = new byte[16];
441     byte [] end = new byte[16];
442     rand.nextBytes(start);
443     rand.nextBytes(end);
444     for(int i=0;i<numRegions;i++) {
445       if(!regionQueue.isEmpty()) {
446         regions.add(regionQueue.poll());
447         continue;
448       }
449       Bytes.putInt(start, 0, numRegions << 1);
450       Bytes.putInt(end, 0, (numRegions << 1) + 1);
451       HRegionInfo hri = new HRegionInfo(
452           new HTableDescriptor(Bytes.toBytes("table" + i)), start, end);
453       regions.add(hri);
454     }
455     return regions;
456   }
457 
458   private void returnRegions(List<HRegionInfo> regions) {
459     regionQueue.addAll(regions);
460   }
461 
462   private Queue<HServerInfo> serverQueue = new LinkedList<HServerInfo>();
463 
464   private HServerInfo randomServer(int numRegions) {
465     if(!serverQueue.isEmpty()) {
466       HServerInfo server = this.serverQueue.poll();
467       server.getLoad().setNumberOfRegions(numRegions);
468       return server;
469     }
470     String host = "127.0.0.1";
471     int port = rand.nextInt(60000);
472     long startCode = rand.nextLong();
473     HServerInfo hsi =
474       new HServerInfo(new HServerAddress(host, port), startCode, port, host);
475     hsi.getLoad().setNumberOfRegions(numRegions);
476     return hsi;
477   }
478 
479   private List<HServerInfo> randomServers(int numServers, int numRegionsPerServer) {
480     List<HServerInfo> servers = new ArrayList<HServerInfo>(numServers);
481     for(int i=0;i<numServers;i++) {
482       servers.add(randomServer(numRegionsPerServer));
483     }
484     return servers;
485   }
486 
487   private void returnServer(HServerInfo server) {
488     serverQueue.add(server);
489   }
490 
491   private void returnServers(List<HServerInfo> servers) {
492     serverQueue.addAll(servers);
493   }
494 }