1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.ListIterator;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.Server;
42 import org.apache.hadoop.hbase.catalog.MetaEditor;
43 import org.apache.hadoop.hbase.io.Reference.Range;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.CancelableProgressable;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47 import org.apache.hadoop.hbase.util.FSUtils;
48 import org.apache.hadoop.hbase.util.PairOfSameType;
49 import org.apache.zookeeper.KeeperException;
50
51 import com.google.common.util.concurrent.ThreadFactoryBuilder;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 class SplitTransaction {
77 private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
78 private static final String SPLITDIR = "splits";
79
80
81
82
83 private final HRegion parent;
84 private HRegionInfo hri_a;
85 private HRegionInfo hri_b;
86 private Path splitdir;
87 private long fileSplitTimeout = 30000;
88
89
90
91
92 private final byte [] splitrow;
93
94
95
96
97
98
99 enum JournalEntry {
100
101
102
103 CREATE_SPLIT_DIR,
104
105
106
107 CLOSED_PARENT_REGION,
108
109
110
111 OFFLINED_PARENT,
112
113
114
115 STARTED_REGION_A_CREATION,
116
117
118
119 STARTED_REGION_B_CREATION,
120
121
122
123
124
125 PONR
126 }
127
128
129
130
131 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
132
133
134
135
136
137
138
139
140
141 SplitTransaction(final HRegion r, final byte [] splitrow) {
142 this.parent = r;
143 this.splitrow = splitrow;
144 this.splitdir = getSplitDir(this.parent);
145 }
146
147
148
149
150
151
152 public boolean prepare() {
153 if (this.parent.isClosed() || this.parent.isClosing()) return false;
154
155 if (this.splitrow == null) return false;
156 HRegionInfo hri = this.parent.getRegionInfo();
157 parent.prepareToSplit();
158
159 byte [] startKey = hri.getStartKey();
160 byte [] endKey = hri.getEndKey();
161 if (Bytes.equals(startKey, splitrow) ||
162 !this.parent.getRegionInfo().containsRow(splitrow)) {
163 LOG.info("Split row is not inside region key range or is equal to " +
164 "startkey: " + Bytes.toStringBinary(this.splitrow));
165 return false;
166 }
167 long rid = getDaughterRegionIdTimestamp(hri);
168 this.hri_a = new HRegionInfo(hri.getTableDesc(), startKey, this.splitrow,
169 false, rid);
170 this.hri_b = new HRegionInfo(hri.getTableDesc(), this.splitrow, endKey,
171 false, rid);
172 return true;
173 }
174
175
176
177
178
179
180 private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
181 long rid = EnvironmentEdgeManager.currentTimeMillis();
182
183
184 if (rid < hri.getRegionId()) {
185 LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
186 " but current time here is " + rid);
187 rid = hri.getRegionId() + 1;
188 }
189 return rid;
190 }
191
192
193
194
195
196
197
198
199
200
201 PairOfSameType<HRegion> execute(final Server server,
202 final RegionServerServices services)
203 throws IOException {
204 LOG.info("Starting split of region " + this.parent);
205 if ((server != null && server.isStopped()) ||
206 (services != null && services.isStopping())) {
207 throw new IOException("Server is stopped or stopping");
208 }
209 assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs";
210
211
212 boolean testing = server == null? true:
213 server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
214 this.fileSplitTimeout = testing ? this.fileSplitTimeout :
215 server.getConfiguration().getLong(
216 "hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout);
217
218 createSplitDir(this.parent.getFilesystem(), this.splitdir);
219 this.journal.add(JournalEntry.CREATE_SPLIT_DIR);
220
221 List<StoreFile> hstoreFilesToSplit = this.parent.close(false);
222 if (hstoreFilesToSplit == null) {
223
224
225
226
227
228 throw new IOException("Failed to close region: already closed by " +
229 "another thread");
230 }
231 this.journal.add(JournalEntry.CLOSED_PARENT_REGION);
232
233 if (!testing) {
234 services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName());
235 }
236 this.journal.add(JournalEntry.OFFLINED_PARENT);
237
238
239
240
241
242
243
244 splitStoreFiles(this.splitdir, hstoreFilesToSplit);
245
246
247
248
249
250 this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
251 HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester);
252
253
254 this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
255 HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester);
256
257
258 if (!testing) {
259 MetaEditor.offlineParentInMeta(server.getCatalogTracker(),
260 this.parent.getRegionInfo(), a.getRegionInfo(), b.getRegionInfo());
261 }
262
263
264
265
266
267
268
269 this.journal.add(JournalEntry.PONR);
270
271 DaughterOpener aOpener = new DaughterOpener(server, services, a);
272 DaughterOpener bOpener = new DaughterOpener(server, services, b);
273 aOpener.start();
274 bOpener.start();
275 try {
276 aOpener.join();
277 bOpener.join();
278 } catch (InterruptedException e) {
279 Thread.currentThread().interrupt();
280 throw new IOException("Interrupted " + e.getMessage());
281 }
282 if (aOpener.getException() != null) {
283 throw new IOException("Failed " +
284 aOpener.getName(), aOpener.getException());
285 }
286 if (bOpener.getException() != null) {
287 throw new IOException("Failed " +
288 bOpener.getName(), bOpener.getException());
289 }
290
291
292
293
294 return new PairOfSameType<HRegion>(a, b);
295 }
296
297
298
299
300
301 class DaughterOpener extends Thread {
302 private final RegionServerServices services;
303 private final Server server;
304 private final HRegion r;
305 private Throwable t = null;
306
307 DaughterOpener(final Server s, final RegionServerServices services,
308 final HRegion r) {
309 super((s == null? "null": s.getServerName()) + "-daughterOpener=" +
310 r.getRegionInfo().getEncodedName());
311 setDaemon(true);
312 this.services = services;
313 this.server = s;
314 this.r = r;
315 }
316
317
318
319
320
321 Throwable getException() {
322 return this.t;
323 }
324
325 @Override
326 public void run() {
327 try {
328 openDaughterRegion(this.server, this.services, r);
329 } catch (Throwable t) {
330 this.t = t;
331 }
332 }
333 }
334
335
336
337
338
339
340
341
342
343 void openDaughterRegion(final Server server,
344 final RegionServerServices services, final HRegion daughter)
345 throws IOException, KeeperException {
346 boolean stopping = services != null && services.isStopping();
347 boolean stopped = server != null && server.isStopped();
348 if (stopped || stopping) {
349 MetaEditor.addDaughter(server.getCatalogTracker(),
350 daughter.getRegionInfo(), null);
351 LOG.info("Not opening daughter " +
352 daughter.getRegionInfo().getRegionNameAsString() +
353 " because stopping=" + stopping + ", stopped=" + server.isStopped());
354 return;
355 }
356 HRegionInfo hri = daughter.getRegionInfo();
357 LoggingProgressable reporter = (server == null)? null:
358 new LoggingProgressable(hri, server.getConfiguration());
359 HRegion r = daughter.openHRegion(reporter);
360 if (services != null) {
361 services.postOpenDeployTasks(r, server.getCatalogTracker(), true);
362 }
363 }
364
365 static class LoggingProgressable implements CancelableProgressable {
366 private final HRegionInfo hri;
367 private long lastLog = -1;
368 private final long interval;
369
370 LoggingProgressable(final HRegionInfo hri, final Configuration c) {
371 this.hri = hri;
372 this.interval = c.getLong("hbase.regionserver.split.daughter.open.log.interval",
373 10000);
374 }
375
376 @Override
377 public boolean progress() {
378 long now = System.currentTimeMillis();
379 if (now - lastLog > this.interval) {
380 LOG.info("Opening " + this.hri.getRegionNameAsString());
381 this.lastLog = now;
382 }
383 return true;
384 }
385 }
386
387 private static Path getSplitDir(final HRegion r) {
388 return new Path(r.getRegionDir(), SPLITDIR);
389 }
390
391
392
393
394
395
396
397
398 private static void createSplitDir(final FileSystem fs, final Path splitdir)
399 throws IOException {
400 if (fs.exists(splitdir)) throw new IOException("Splitdir already exits? " + splitdir);
401 if (!fs.mkdirs(splitdir)) throw new IOException("Failed create of " + splitdir);
402 }
403
404 private static void cleanupSplitDir(final FileSystem fs, final Path splitdir)
405 throws IOException {
406
407 deleteDir(fs, splitdir, false);
408 }
409
410
411
412
413
414
415
416
417 private static void deleteDir(final FileSystem fs, final Path dir,
418 final boolean mustPreExist)
419 throws IOException {
420 if (!fs.exists(dir)) {
421 if (mustPreExist) throw new IOException(dir.toString() + " does not exist!");
422 } else if (!fs.delete(dir, true)) {
423 throw new IOException("Failed delete of " + dir);
424 }
425 }
426
427 private void splitStoreFiles(final Path splitdir,
428 final List<StoreFile> hstoreFilesToSplit)
429 throws IOException {
430 if (hstoreFilesToSplit == null) {
431
432 throw new IOException("Close returned empty list of StoreFiles");
433 }
434
435
436
437 int nbFiles = hstoreFilesToSplit.size();
438 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
439 builder.setNameFormat("StoreFileSplitter-%1$d");
440 ThreadFactory factory = builder.build();
441 ThreadPoolExecutor threadPool =
442 (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
443 List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
444
445
446 for (StoreFile sf: hstoreFilesToSplit) {
447
448 StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
449 futures.add(threadPool.submit(sfs));
450 }
451
452 threadPool.shutdown();
453
454
455 try {
456 boolean stillRunning = !threadPool.awaitTermination(
457 this.fileSplitTimeout, TimeUnit.MILLISECONDS);
458 if (stillRunning) {
459 threadPool.shutdownNow();
460 throw new IOException("Took too long to split the" +
461 " files and create the references, aborting split");
462 }
463 } catch (InterruptedException e) {
464 Thread.currentThread().interrupt();
465 throw new IOException("Interrupted while waiting for file splitters", e);
466 }
467
468
469 for (Future future : futures) {
470 try {
471 future.get();
472 } catch (InterruptedException e) {
473 Thread.currentThread().interrupt();
474 throw new IOException(
475 "Interrupted while trying to get the results of file splitters", e);
476 } catch (ExecutionException e) {
477 throw new IOException(e);
478 }
479 }
480 }
481
482 private void splitStoreFile(final StoreFile sf, final Path splitdir)
483 throws IOException {
484 FileSystem fs = this.parent.getFilesystem();
485 byte [] family = sf.getFamily();
486 String encoded = this.hri_a.getEncodedName();
487 Path storedir = Store.getStoreHomedir(splitdir, encoded, family);
488 StoreFile.split(fs, storedir, sf, this.splitrow, Range.bottom);
489 encoded = this.hri_b.getEncodedName();
490 storedir = Store.getStoreHomedir(splitdir, encoded, family);
491 StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
492 }
493
494
495
496
497
498 class StoreFileSplitter implements Callable<Void> {
499
500 private final StoreFile sf;
501 private final Path splitdir;
502
503
504
505
506
507
508 public StoreFileSplitter(final StoreFile sf, final Path splitdir) {
509 this.sf = sf;
510 this.splitdir = splitdir;
511 }
512
513 public Void call() throws IOException {
514 splitStoreFile(sf, splitdir);
515 return null;
516 }
517 }
518
519
520
521
522
523
524
525
526 HRegion createDaughterRegion(final HRegionInfo hri,
527 final FlushRequester flusher)
528 throws IOException {
529
530 FileSystem fs = this.parent.getFilesystem();
531 Path regionDir = getSplitDirForDaughter(this.parent.getFilesystem(),
532 this.splitdir, hri);
533 HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
534 this.parent.getLog(), fs, this.parent.getConf(),
535 hri, flusher);
536 HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
537 return r;
538 }
539
540 private static void cleanupDaughterRegion(final FileSystem fs,
541 final Path tabledir, final String encodedName)
542 throws IOException {
543 Path regiondir = HRegion.getRegionDir(tabledir, encodedName);
544
545 deleteDir(fs, regiondir, false);
546 }
547
548
549
550
551
552
553
554
555
556
557 private static Path getSplitDirForDaughter(final FileSystem fs,
558 final Path splitdir, final HRegionInfo hri)
559 throws IOException {
560 return new Path(splitdir, hri.getEncodedName());
561 }
562
563
564
565
566
567
568
569
570 public boolean rollback(final Server server, final RegionServerServices services)
571 throws IOException {
572 boolean result = true;
573 FileSystem fs = this.parent.getFilesystem();
574 ListIterator<JournalEntry> iterator =
575 this.journal.listIterator(this.journal.size());
576
577 while (iterator.hasPrevious()) {
578 JournalEntry je = iterator.previous();
579 switch(je) {
580 case CREATE_SPLIT_DIR:
581 this.parent.writestate.writesEnabled = true;
582 cleanupSplitDir(fs, this.splitdir);
583 break;
584
585 case CLOSED_PARENT_REGION:
586
587
588
589
590
591 this.parent.initialize();
592 break;
593
594 case STARTED_REGION_A_CREATION:
595 cleanupDaughterRegion(fs, this.parent.getTableDir(),
596 this.hri_a.getEncodedName());
597 break;
598
599 case STARTED_REGION_B_CREATION:
600 cleanupDaughterRegion(fs, this.parent.getTableDir(),
601 this.hri_b.getEncodedName());
602 break;
603
604 case OFFLINED_PARENT:
605 if (services != null) services.addToOnlineRegions(this.parent);
606 break;
607
608 case PONR:
609
610
611 return false;
612
613 default:
614 throw new RuntimeException("Unhandled journal entry: " + je);
615 }
616 }
617 return result;
618 }
619
620 HRegionInfo getFirstDaughter() {
621 return hri_a;
622 }
623
624 HRegionInfo getSecondDaughter() {
625 return hri_b;
626 }
627
628
629 Path getSplitDir() {
630 return this.splitdir;
631 }
632
633
634
635
636
637
638
639
640
641 static void cleanupAnySplitDetritus(final HRegion r) throws IOException {
642 Path splitdir = getSplitDir(r);
643 FileSystem fs = r.getFilesystem();
644 if (!fs.exists(splitdir)) return;
645
646
647
648
649
650
651
652 FileStatus [] daughters = fs.listStatus(splitdir, new FSUtils.DirFilter(fs));
653 for (int i = 0; i < daughters.length; i++) {
654 cleanupDaughterRegion(fs, r.getTableDir(),
655 daughters[i].getPath().getName());
656 }
657 cleanupSplitDir(r.getFilesystem(), splitdir);
658 LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
659 }
660 }