1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FSDataInputStream;
26 import org.apache.hadoop.fs.FSDataOutputStream;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.fs.PathFilter;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.RemoteExceptionHandler;
34 import org.apache.hadoop.hbase.master.HMaster;
35 import org.apache.hadoop.hbase.regionserver.HRegion;
36 import org.apache.hadoop.hdfs.DistributedFileSystem;
37 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
38 import org.apache.hadoop.hdfs.protocol.FSConstants;
39 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
40 import org.apache.hadoop.io.SequenceFile;
41
42 import java.io.DataInputStream;
43 import java.io.EOFException;
44 import java.io.FileNotFoundException;
45 import java.io.IOException;
46 import java.io.InterruptedIOException;
47 import java.lang.reflect.InvocationTargetException;
48 import java.net.URI;
49 import java.net.URISyntaxException;
50 import java.util.HashMap;
51 import java.util.Map;
52
53
54
55
56 public class FSUtils {
57 private static final Log LOG = LogFactory.getLog(FSUtils.class);
58
59
60
61
62 private FSUtils() {
63 super();
64 }
65
66
67
68
69
70
71
72
73 public static boolean deleteDirectory(final FileSystem fs, final Path dir)
74 throws IOException {
75 return fs.exists(dir) && fs.delete(dir, true);
76 }
77
78
79
80
81
82
83
84
85 public Path checkdir(final FileSystem fs, final Path dir) throws IOException {
86 if (!fs.exists(dir)) {
87 fs.mkdirs(dir);
88 }
89 return dir;
90 }
91
92
93
94
95
96
97
98
99 public static Path create(final FileSystem fs, final Path p)
100 throws IOException {
101 if (fs.exists(p)) {
102 throw new IOException("File already exists " + p.toString());
103 }
104 if (!fs.createNewFile(p)) {
105 throw new IOException("Failed create of " + p);
106 }
107 return p;
108 }
109
110
111
112
113
114
115
116 public static void checkFileSystemAvailable(final FileSystem fs)
117 throws IOException {
118 if (!(fs instanceof DistributedFileSystem)) {
119 return;
120 }
121 IOException exception = null;
122 DistributedFileSystem dfs = (DistributedFileSystem) fs;
123 try {
124 if (dfs.exists(new Path("/"))) {
125 return;
126 }
127 } catch (IOException e) {
128 exception = RemoteExceptionHandler.checkIOException(e);
129 }
130 try {
131 fs.close();
132 } catch (Exception e) {
133 LOG.error("file system close failed: ", e);
134 }
135 IOException io = new IOException("File system is not available");
136 io.initCause(exception);
137 throw io;
138 }
139
140
141
142
143
144
145
146 public static void checkDfsSafeMode(final Configuration conf)
147 throws IOException {
148 boolean isInSafeMode = false;
149 FileSystem fs = FileSystem.get(conf);
150 if (fs instanceof DistributedFileSystem) {
151 DistributedFileSystem dfs = (DistributedFileSystem)fs;
152
153 isInSafeMode = dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET);
154 }
155 if (isInSafeMode) {
156 throw new IOException("File system is in safemode, it can't be written now");
157 }
158 }
159
160
161
162
163
164
165
166
167
168 public static String getVersion(FileSystem fs, Path rootdir)
169 throws IOException {
170 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
171 String version = null;
172 if (fs.exists(versionFile)) {
173 FSDataInputStream s =
174 fs.open(versionFile);
175 try {
176 version = DataInputStream.readUTF(s);
177 } catch (EOFException eof) {
178 LOG.warn("Version file was empty, odd, will try to set it.");
179 } finally {
180 s.close();
181 }
182 }
183 return version;
184 }
185
186
187
188
189
190
191
192
193
194
195 public static void checkVersion(FileSystem fs, Path rootdir,
196 boolean message) throws IOException {
197 checkVersion(fs, rootdir, message, 0);
198 }
199
200
201
202
203
204
205
206
207
208
209
210 public static void checkVersion(FileSystem fs, Path rootdir,
211 boolean message, int wait) throws IOException {
212 String version = getVersion(fs, rootdir);
213
214 if (version == null) {
215 if (!rootRegionExists(fs, rootdir)) {
216
217
218 FSUtils.setVersion(fs, rootdir, wait);
219 return;
220 }
221 } else if (version.compareTo(HConstants.FILE_SYSTEM_VERSION) == 0)
222 return;
223
224
225
226 String msg = "File system needs to be upgraded."
227 + " You have version " + version
228 + " and I want version " + HConstants.FILE_SYSTEM_VERSION
229 + ". Run the '${HBASE_HOME}/bin/hbase migrate' script.";
230 if (message) {
231 System.out.println("WARNING! " + msg);
232 }
233 throw new FileSystemVersionException(msg);
234 }
235
236
237
238
239
240
241
242
243 public static void setVersion(FileSystem fs, Path rootdir)
244 throws IOException {
245 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, 0);
246 }
247
248
249
250
251
252
253
254
255
256 public static void setVersion(FileSystem fs, Path rootdir, int wait)
257 throws IOException {
258 setVersion(fs, rootdir, HConstants.FILE_SYSTEM_VERSION, wait);
259 }
260
261
262
263
264
265
266
267
268
269
270 public static void setVersion(FileSystem fs, Path rootdir, String version,
271 int wait) throws IOException {
272 Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
273 while (true) {
274 try {
275 FSDataOutputStream s = fs.create(versionFile);
276 s.writeUTF(version);
277 LOG.debug("Created version file at " + rootdir.toString() +
278 " set its version at:" + version);
279 s.close();
280 return;
281 } catch (IOException e) {
282 if (wait > 0) {
283 LOG.warn("Unable to create version file at " + rootdir.toString() +
284 ", retrying: " + e.getMessage());
285 fs.delete(versionFile, false);
286 try {
287 Thread.sleep(wait);
288 } catch (InterruptedException ex) {
289
290 }
291 }
292 }
293 }
294 }
295
296
297
298
299
300
301
302
303 public static Path validateRootPath(Path root) throws IOException {
304 try {
305 URI rootURI = new URI(root.toString());
306 String scheme = rootURI.getScheme();
307 if (scheme == null) {
308 throw new IOException("Root directory does not have a scheme");
309 }
310 return root;
311 } catch (URISyntaxException e) {
312 IOException io = new IOException("Root directory path is not a valid " +
313 "URI -- check your " + HConstants.HBASE_DIR + " configuration");
314 io.initCause(e);
315 throw io;
316 }
317 }
318
319
320
321
322
323
324
325 public static void waitOnSafeMode(final Configuration conf,
326 final long wait)
327 throws IOException {
328 FileSystem fs = FileSystem.get(conf);
329 if (!(fs instanceof DistributedFileSystem)) return;
330 DistributedFileSystem dfs = (DistributedFileSystem)fs;
331
332 while (dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET)) {
333 LOG.info("Waiting for dfs to exit safe mode...");
334 try {
335 Thread.sleep(wait);
336 } catch (InterruptedException e) {
337
338 }
339 }
340 }
341
342
343
344
345
346
347
348
349
350
351
352 public static String getPath(Path p) {
353 return p.toUri().getPath();
354 }
355
356
357
358
359
360
361
362 public static Path getRootDir(final Configuration c) throws IOException {
363 Path p = new Path(c.get(HConstants.HBASE_DIR));
364 FileSystem fs = p.getFileSystem(c);
365 return p.makeQualified(fs);
366 }
367
368
369
370
371
372
373
374
375
376 public static boolean rootRegionExists(FileSystem fs, Path rootdir)
377 throws IOException {
378 Path rootRegionDir =
379 HRegion.getRegionDir(rootdir, HRegionInfo.ROOT_REGIONINFO);
380 return fs.exists(rootRegionDir);
381 }
382
383
384
385
386
387
388
389
390
391
392 public static boolean isMajorCompacted(final FileSystem fs,
393 final Path hbaseRootDir)
394 throws IOException {
395
396 FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
397 for (FileStatus tableDir : tableDirs) {
398
399
400
401
402 Path d = tableDir.getPath();
403 if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
404 continue;
405 }
406 FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
407 for (FileStatus regionDir : regionDirs) {
408 Path dd = regionDir.getPath();
409 if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
410 continue;
411 }
412
413 FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
414 for (FileStatus familyDir : familyDirs) {
415 Path family = familyDir.getPath();
416
417 FileStatus[] familyStatus = fs.listStatus(family);
418 if (familyStatus.length > 1) {
419 LOG.debug(family.toString() + " has " + familyStatus.length +
420 " files.");
421 return false;
422 }
423 }
424 }
425 }
426 return true;
427 }
428
429
430
431
432
433
434
435
436
437
438 public static int getTotalTableFragmentation(final HMaster master)
439 throws IOException {
440 Map<String, Integer> map = getTableFragmentation(master);
441 return map != null && map.size() > 0 ? map.get("-TOTAL-") : -1;
442 }
443
444
445
446
447
448
449
450
451
452
453 public static Map<String, Integer> getTableFragmentation(
454 final HMaster master)
455 throws IOException {
456 Path path = getRootDir(master.getConfiguration());
457
458 FileSystem fs = path.getFileSystem(master.getConfiguration());
459 return getTableFragmentation(fs, path);
460 }
461
462
463
464
465
466
467
468
469
470
471
472 public static Map<String, Integer> getTableFragmentation(
473 final FileSystem fs, final Path hbaseRootDir)
474 throws IOException {
475 Map<String, Integer> frags = new HashMap<String, Integer>();
476 int cfCountTotal = 0;
477 int cfFragTotal = 0;
478 DirFilter df = new DirFilter(fs);
479
480 FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, df);
481 for (FileStatus tableDir : tableDirs) {
482
483
484
485
486 Path d = tableDir.getPath();
487 if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
488 continue;
489 }
490 int cfCount = 0;
491 int cfFrag = 0;
492 FileStatus[] regionDirs = fs.listStatus(d, df);
493 for (FileStatus regionDir : regionDirs) {
494 Path dd = regionDir.getPath();
495 if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
496 continue;
497 }
498
499 FileStatus[] familyDirs = fs.listStatus(dd, df);
500 for (FileStatus familyDir : familyDirs) {
501 cfCount++;
502 cfCountTotal++;
503 Path family = familyDir.getPath();
504
505 FileStatus[] familyStatus = fs.listStatus(family);
506 if (familyStatus.length > 1) {
507 cfFrag++;
508 cfFragTotal++;
509 }
510 }
511 }
512
513 frags.put(d.getName(), Math.round((float) cfFrag / cfCount * 100));
514 }
515
516 frags.put("-TOTAL-", Math.round((float) cfFragTotal / cfCountTotal * 100));
517 return frags;
518 }
519
520
521
522
523
524
525
526
527 public static boolean isPre020FileLayout(final FileSystem fs,
528 final Path hbaseRootDir)
529 throws IOException {
530 Path mapfiles = new Path(new Path(new Path(new Path(hbaseRootDir, "-ROOT-"),
531 "70236052"), "info"), "mapfiles");
532 return fs.exists(mapfiles);
533 }
534
535
536
537
538
539
540
541
542
543
544
545
546 public static boolean isMajorCompactedPre020(final FileSystem fs,
547 final Path hbaseRootDir)
548 throws IOException {
549
550 FileStatus [] tableDirs = fs.listStatus(hbaseRootDir, new DirFilter(fs));
551 for (FileStatus tableDir : tableDirs) {
552
553
554
555
556 Path d = tableDir.getPath();
557 if (d.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
558 continue;
559 }
560 FileStatus[] regionDirs = fs.listStatus(d, new DirFilter(fs));
561 for (FileStatus regionDir : regionDirs) {
562 Path dd = regionDir.getPath();
563 if (dd.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME)) {
564 continue;
565 }
566
567 FileStatus[] familyDirs = fs.listStatus(dd, new DirFilter(fs));
568 for (FileStatus familyDir : familyDirs) {
569 Path family = familyDir.getPath();
570 FileStatus[] infoAndMapfile = fs.listStatus(family);
571
572 if (infoAndMapfile.length != 0 && infoAndMapfile.length != 2) {
573 LOG.debug(family.toString() +
574 " has more than just info and mapfile: " + infoAndMapfile.length);
575 return false;
576 }
577
578 for (int ll = 0; ll < 2; ll++) {
579 if (infoAndMapfile[ll].getPath().getName().equals("info") ||
580 infoAndMapfile[ll].getPath().getName().equals("mapfiles"))
581 continue;
582 LOG.debug("Unexpected directory name: " +
583 infoAndMapfile[ll].getPath());
584 return false;
585 }
586
587
588 FileStatus[] familyStatus =
589 fs.listStatus(new Path(family, "mapfiles"));
590 if (familyStatus.length > 1) {
591 LOG.debug(family.toString() + " has " + familyStatus.length +
592 " files.");
593 return false;
594 }
595 }
596 }
597 }
598 return true;
599 }
600
601
602
603
604 public static class DirFilter implements PathFilter {
605 private final FileSystem fs;
606
607 public DirFilter(final FileSystem fs) {
608 this.fs = fs;
609 }
610
611 public boolean accept(Path p) {
612 boolean isdir = false;
613 try {
614 isdir = this.fs.getFileStatus(p).isDir();
615 } catch (IOException e) {
616 e.printStackTrace();
617 }
618 return isdir;
619 }
620 }
621
622
623
624
625
626
627
628
629 public static boolean isAppendSupported(final Configuration conf) {
630 boolean append = conf.getBoolean("dfs.support.append", false);
631 if (append) {
632 try {
633
634
635
636 SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
637 append = true;
638 } catch (SecurityException e) {
639 } catch (NoSuchMethodException e) {
640 append = false;
641 }
642 } else {
643 try {
644 FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
645 } catch (NoSuchMethodException e) {
646 append = false;
647 }
648 }
649 return append;
650 }
651
652
653
654
655
656
657 public static boolean isHDFS(final Configuration conf) throws IOException {
658 FileSystem fs = FileSystem.get(conf);
659 String scheme = fs.getUri().getScheme();
660 return scheme.equalsIgnoreCase("hdfs");
661 }
662
663
664
665
666
667
668
669
670 public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
671 throws IOException{
672 if (!isAppendSupported(conf)) {
673 LOG.warn("Running on HDFS without append enabled may result in data loss");
674 return;
675 }
676
677
678 if (!(fs instanceof DistributedFileSystem)) {
679 return;
680 }
681 LOG.info("Recovering file " + p);
682 long startWaiting = System.currentTimeMillis();
683
684
685 boolean recovered = false;
686 while (!recovered) {
687 try {
688 try {
689 if (fs instanceof DistributedFileSystem) {
690 DistributedFileSystem dfs = (DistributedFileSystem)fs;
691 DistributedFileSystem.class.getMethod("recoverLease",
692 new Class[] {Path.class}).invoke(dfs, p);
693 } else {
694 throw new Exception("Not a DistributedFileSystem");
695 }
696 } catch (InvocationTargetException ite) {
697
698 throw (IOException) ite.getCause();
699 } catch (Exception e) {
700 LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
701 ", trying fs.append instead");
702 FSDataOutputStream out = fs.append(p);
703 out.close();
704 }
705 recovered = true;
706 } catch (IOException e) {
707 e = RemoteExceptionHandler.checkIOException(e);
708 if (e instanceof AlreadyBeingCreatedException) {
709
710
711
712
713 long waitedFor = System.currentTimeMillis() - startWaiting;
714 if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
715 LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
716 ":" + e.getMessage());
717 }
718 } else if (e instanceof LeaseExpiredException &&
719 e.getMessage().contains("File does not exist")) {
720
721 throw new FileNotFoundException(
722 "The given HLog wasn't found at " + p.toString());
723 } else {
724 throw new IOException("Failed to open " + p + " for append", e);
725 }
726 }
727 try {
728 Thread.sleep(1000);
729 } catch (InterruptedException ex) {
730 new InterruptedIOException().initCause(ex);
731 }
732 }
733 LOG.info("Finished lease recover attempt for " + p);
734 }
735 }