1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.DroppedSnapshotException;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.RemoteExceptionHandler;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.hadoop.util.StringUtils;
30
31 import com.google.common.base.Preconditions;
32
33 import java.io.IOException;
34 import java.lang.management.ManagementFactory;
35 import java.util.ConcurrentModificationException;
36 import java.util.EnumSet;
37 import java.util.HashMap;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.SortedMap;
41 import java.util.TreeSet;
42 import java.util.concurrent.BlockingQueue;
43 import java.util.concurrent.DelayQueue;
44 import java.util.concurrent.Delayed;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.ReentrantLock;
49
50
51
52
53
54
55
56
57
58
59 class MemStoreFlusher extends Thread implements FlushRequester {
60 static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
61
62
63 private final BlockingQueue<FlushQueueEntry> flushQueue =
64 new DelayQueue<FlushQueueEntry>();
65 private final Map<HRegion, FlushRegionEntry> regionsInQueue =
66 new HashMap<HRegion, FlushRegionEntry>();
67 private AtomicBoolean wakeupPending = new AtomicBoolean();
68
69 private final long threadWakeFrequency;
70 private final HRegionServer server;
71 private final ReentrantLock lock = new ReentrantLock();
72 private final Condition flushOccurred = lock.newCondition();
73
74 protected final long globalMemStoreLimit;
75 protected final long globalMemStoreLimitLowMark;
76
77 private static final float DEFAULT_UPPER = 0.4f;
78 private static final float DEFAULT_LOWER = 0.35f;
79 private static final String UPPER_KEY =
80 "hbase.regionserver.global.memstore.upperLimit";
81 private static final String LOWER_KEY =
82 "hbase.regionserver.global.memstore.lowerLimit";
83 private long blockingStoreFilesNumber;
84 private long blockingWaitTime;
85
86
87
88
89
90 public MemStoreFlusher(final Configuration conf,
91 final HRegionServer server) {
92 super();
93 this.server = server;
94 this.threadWakeFrequency =
95 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
96 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
97 this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER,
98 UPPER_KEY, conf);
99 long lower = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, conf);
100 if (lower > this.globalMemStoreLimit) {
101 lower = this.globalMemStoreLimit;
102 LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit " +
103 "because supplied " + LOWER_KEY + " was > " + UPPER_KEY);
104 }
105 this.globalMemStoreLimitLowMark = lower;
106 this.blockingStoreFilesNumber =
107 conf.getInt("hbase.hstore.blockingStoreFiles", 7);
108 if (this.blockingStoreFilesNumber == -1) {
109 this.blockingStoreFilesNumber = 1 +
110 conf.getInt("hbase.hstore.compactionThreshold", 3);
111 }
112 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
113 90000);
114 LOG.info("globalMemStoreLimit=" +
115 StringUtils.humanReadableInt(this.globalMemStoreLimit) +
116 ", globalMemStoreLimitLowMark=" +
117 StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
118 ", maxHeap=" + StringUtils.humanReadableInt(max));
119 }
120
121
122
123
124
125
126
127
128
129
130 static long globalMemStoreLimit(final long max,
131 final float defaultLimit, final String key, final Configuration c) {
132 float limit = c.getFloat(key, defaultLimit);
133 return getMemStoreLimit(max, limit, defaultLimit);
134 }
135
136 static long getMemStoreLimit(final long max, final float limit,
137 final float defaultLimit) {
138 float effectiveLimit = limit;
139 if (limit >= 0.9f || limit < 0.1f) {
140 LOG.warn("Setting global memstore limit to default of " + defaultLimit +
141 " because supplied value outside allowed range of 0.1 -> 0.9");
142 effectiveLimit = defaultLimit;
143 }
144 return (long)(max * effectiveLimit);
145 }
146
147
148
149
150
151
152
153 private boolean flushOneForGlobalPressure() {
154 SortedMap<Long, HRegion> regionsBySize =
155 server.getCopyOfOnlineRegionsSortedBySize();
156
157
158
159 Set<byte[]> excludedRegionNames = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
160
161 boolean flushedOne = false;
162 while (!flushedOne) {
163
164
165 HRegion bestFlushableRegion = getBiggestMemstoreRegion(
166 regionsBySize, excludedRegionNames, true);
167
168 HRegion bestAnyRegion = getBiggestMemstoreRegion(
169 regionsBySize, excludedRegionNames, false);
170
171 if (bestAnyRegion == null) {
172 LOG.error("Above memory mark but there are no flushable regions!");
173 return false;
174 }
175
176 HRegion regionToFlush;
177 if (bestFlushableRegion != null &&
178 bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
179
180
181
182
183 LOG.info("Under global heap pressure: " +
184 "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
185 "store files, but is " +
186 StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
187 " vs best flushable region's " +
188 StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
189 ". Choosing the bigger.");
190 regionToFlush = bestAnyRegion;
191 } else {
192 if (bestFlushableRegion == null) {
193 regionToFlush = bestAnyRegion;
194 } else {
195 regionToFlush = bestFlushableRegion;
196 }
197 }
198
199 Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
200
201 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
202 flushedOne = flushRegion(regionToFlush, true);
203 if (!flushedOne) {
204 LOG.info("Excluding unflushable region " + regionToFlush +
205 " - trying to find a different region to flush.");
206 excludedRegionNames.add(regionToFlush.getRegionName());
207 }
208 }
209 return true;
210 }
211
212 @Override
213 public void run() {
214 while (!this.server.isStopped()) {
215 FlushQueueEntry fqe = null;
216 try {
217 wakeupPending.set(false);
218 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
219 if (fqe == null || fqe instanceof WakeupFlushThread) {
220 if (isAboveLowWaterMark()) {
221 LOG.info("Flush thread woke up with memory above low water.");
222 if (!flushOneForGlobalPressure()) {
223
224
225
226
227
228 lock.lock();
229 try {
230 Thread.sleep(1000);
231 flushOccurred.signalAll();
232 } finally {
233 lock.unlock();
234 }
235 }
236
237 wakeupFlushThread();
238 }
239 continue;
240 }
241 FlushRegionEntry fre = (FlushRegionEntry)fqe;
242 if (!flushRegion(fre)) {
243 break;
244 }
245 } catch (InterruptedException ex) {
246 continue;
247 } catch (ConcurrentModificationException ex) {
248 continue;
249 } catch (Exception ex) {
250 LOG.error("Cache flusher failed for entry " + fqe, ex);
251 if (!server.checkFileSystem()) {
252 break;
253 }
254 }
255 }
256 this.regionsInQueue.clear();
257 this.flushQueue.clear();
258
259
260 lock.lock();
261 try {
262 flushOccurred.signalAll();
263 } finally {
264 lock.unlock();
265 }
266 LOG.info(getName() + " exiting");
267 }
268
269 private void wakeupFlushThread() {
270 if (wakeupPending.compareAndSet(false, true)) {
271 flushQueue.add(new WakeupFlushThread());
272 }
273 }
274
275 private HRegion getBiggestMemstoreRegion(
276 SortedMap<Long, HRegion> regionsBySize,
277 Set<byte[]> excludedRegionNames,
278 boolean checkStoreFileCount) {
279 synchronized (regionsInQueue) {
280 for (HRegion region : regionsBySize.values()) {
281 if (excludedRegionNames.contains(region.getRegionName())) {
282 continue;
283 }
284
285 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
286 continue;
287 }
288 return region;
289 }
290 }
291 return null;
292 }
293
294
295
296
297 private boolean isAboveHighWaterMark() {
298 return server.getGlobalMemStoreSize() >= globalMemStoreLimit;
299 }
300
301
302
303
304 private boolean isAboveLowWaterMark() {
305 return server.getGlobalMemStoreSize() >= globalMemStoreLimitLowMark;
306 }
307
308 public void requestFlush(HRegion r) {
309 synchronized (regionsInQueue) {
310 if (!regionsInQueue.containsKey(r)) {
311
312
313 FlushRegionEntry fqe = new FlushRegionEntry(r);
314 this.regionsInQueue.put(r, fqe);
315 this.flushQueue.add(fqe);
316 }
317 }
318 }
319
320 public int getFlushQueueSize() {
321 return flushQueue.size();
322 }
323
324
325
326
327 void interruptIfNecessary() {
328 lock.lock();
329 try {
330 this.interrupt();
331 } finally {
332 lock.unlock();
333 }
334 }
335
336
337
338
339
340
341
342
343
344 private boolean flushRegion(final FlushRegionEntry fqe) {
345 HRegion region = fqe.region;
346 if (!fqe.region.getRegionInfo().isMetaRegion() &&
347 isTooManyStoreFiles(region)) {
348 if (fqe.isMaximumWait(this.blockingWaitTime)) {
349 LOG.info("Waited " + (System.currentTimeMillis() - fqe.createTime) +
350 "ms on a compaction to clean up 'too many store files'; waited " +
351 "long enough... proceeding with flush of " +
352 region.getRegionNameAsString());
353 } else {
354
355 if (fqe.getRequeueCount() <= 0) {
356
357 LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
358 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
359 }
360 this.server.compactSplitThread.requestCompaction(region, getName());
361
362
363 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
364
365 return true;
366 }
367 }
368 return flushRegion(region, false);
369 }
370
371
372
373
374
375
376
377
378
379
380
381
382
383 private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
384 synchronized (this.regionsInQueue) {
385 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
386 if (fqe != null && emergencyFlush) {
387
388
389 flushQueue.remove(fqe);
390 }
391 lock.lock();
392 }
393 try {
394 if (region.flushcache()) {
395 server.compactSplitThread.requestCompaction(region, getName());
396 }
397 server.getMetrics().addFlush(region.getRecentFlushInfo());
398 } catch (DroppedSnapshotException ex) {
399
400
401
402
403
404 server.abort("Replay of HLog required. Forcing server shutdown", ex);
405 return false;
406 } catch (IOException ex) {
407 LOG.error("Cache flush failed" +
408 (region != null ? (" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
409 RemoteExceptionHandler.checkIOException(ex));
410 if (!server.checkFileSystem()) {
411 return false;
412 }
413 } finally {
414 flushOccurred.signalAll();
415 lock.unlock();
416 }
417 return true;
418 }
419
420 private boolean isTooManyStoreFiles(HRegion region) {
421 for (Store hstore: region.stores.values()) {
422 if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
423 return true;
424 }
425 }
426 return false;
427 }
428
429
430
431
432
433
434
435 public synchronized void reclaimMemStoreMemory() {
436 if (isAboveHighWaterMark()) {
437 lock.lock();
438 try {
439 while (isAboveHighWaterMark() && !server.isStopped()) {
440 wakeupFlushThread();
441 try {
442
443
444 flushOccurred.await(5, TimeUnit.SECONDS);
445 } catch (InterruptedException ie) {
446 Thread.currentThread().interrupt();
447 }
448 }
449 } finally {
450 lock.unlock();
451 }
452 } else if (isAboveLowWaterMark()) {
453 wakeupFlushThread();
454 }
455 }
456
457 interface FlushQueueEntry extends Delayed {}
458
459
460
461
462 static class WakeupFlushThread implements FlushQueueEntry {
463 @Override
464 public long getDelay(TimeUnit unit) {
465 return 0;
466 }
467
468 @Override
469 public int compareTo(Delayed o) {
470 return -1;
471 }
472 }
473
474
475
476
477
478
479
480
481
482 static class FlushRegionEntry implements FlushQueueEntry {
483 private final HRegion region;
484
485 private final long createTime;
486 private long whenToExpire;
487 private int requeueCount = 0;
488
489 FlushRegionEntry(final HRegion r) {
490 this.region = r;
491 this.createTime = System.currentTimeMillis();
492 this.whenToExpire = this.createTime;
493 }
494
495
496
497
498
499 public boolean isMaximumWait(final long maximumWait) {
500 return (System.currentTimeMillis() - this.createTime) > maximumWait;
501 }
502
503
504
505
506
507 public int getRequeueCount() {
508 return this.requeueCount;
509 }
510
511
512
513
514
515
516
517 public FlushRegionEntry requeue(final long when) {
518 this.whenToExpire = System.currentTimeMillis() + when;
519 this.requeueCount++;
520 return this;
521 }
522
523 @Override
524 public long getDelay(TimeUnit unit) {
525 return unit.convert(this.whenToExpire - System.currentTimeMillis(),
526 TimeUnit.MILLISECONDS);
527 }
528
529 @Override
530 public int compareTo(Delayed other) {
531 return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
532 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
533 }
534
535 @Override
536 public String toString() {
537 return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
538 }
539 }
540 }