1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24 import java.io.OutputStream;
25 import java.lang.reflect.Field;
26 import java.lang.reflect.Method;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.io.SequenceFile;
35 import org.apache.hadoop.io.SequenceFile.Metadata;
36 import org.apache.hadoop.io.compress.DefaultCodec;
37
38
39
40
41
42 public class SequenceFileLogWriter implements HLog.Writer {
43 private final Log LOG = LogFactory.getLog(this.getClass());
44
45 private SequenceFile.Writer writer;
46
47 private OutputStream dfsClient_out;
48
49 private Method syncFs;
50
51 private Class<? extends HLogKey> keyClass;
52
53
54
55
56 public SequenceFileLogWriter() {
57 super();
58 }
59
60
61
62
63
64
65
66 public SequenceFileLogWriter(Class<? extends HLogKey> keyClass) {
67 this.keyClass = keyClass;
68 }
69
70 @Override
71 public void init(FileSystem fs, Path path, Configuration conf)
72 throws IOException {
73
74 if (null == keyClass) {
75 keyClass = HLog.getKeyClass(conf);
76 }
77
78
79 this.writer = SequenceFile.createWriter(fs, conf, path,
80 keyClass, WALEdit.class,
81 fs.getConf().getInt("io.file.buffer.size", 4096),
82 (short) conf.getInt("hbase.regionserver.hlog.replication",
83 fs.getDefaultReplication()),
84 conf.getLong("hbase.regionserver.hlog.blocksize",
85 fs.getDefaultBlockSize()),
86 SequenceFile.CompressionType.NONE,
87 new DefaultCodec(),
88 null,
89 new Metadata());
90
91
92
93
94 final Field fields [] = this.writer.getClass().getDeclaredFields();
95 final String fieldName = "out";
96 for (int i = 0; i < fields.length; ++i) {
97 if (fieldName.equals(fields[i].getName())) {
98 try {
99
100 fields[i].setAccessible(true);
101 FSDataOutputStream out =
102 (FSDataOutputStream)fields[i].get(this.writer);
103 this.dfsClient_out = out.getWrappedStream();
104 break;
105 } catch (IllegalAccessException ex) {
106 throw new IOException("Accessing " + fieldName, ex);
107 }
108 }
109 }
110
111
112
113 Method m = null;
114 boolean append = conf.getBoolean("dfs.support.append", false);
115 if (append) {
116 try {
117
118 m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
119 } catch (SecurityException e) {
120 throw new IOException("Failed test for syncfs", e);
121 } catch (NoSuchMethodException e) {
122
123 }
124 }
125 this.syncFs = m;
126 LOG.info((this.syncFs != null)?
127 "Using syncFs -- HDFS-200":
128 ("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append));
129 }
130
131 @Override
132 public void append(HLog.Entry entry) throws IOException {
133 this.writer.append(entry.getKey(), entry.getEdit());
134 }
135
136 @Override
137 public void close() throws IOException {
138 this.writer.close();
139 }
140
141 @Override
142 public void sync() throws IOException {
143 if (this.syncFs != null) {
144 try {
145 this.syncFs.invoke(this.writer, HLog.NO_ARGS);
146 } catch (Exception e) {
147 throw new IOException("Reflection", e);
148 }
149 }
150 }
151
152 @Override
153 public long getLength() throws IOException {
154 return this.writer.getLength();
155 }
156
157
158
159
160
161 public OutputStream getDFSCOutputStream() {
162 return this.dfsClient_out;
163 }
164 }