1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.lang.Class;
26 import java.lang.reflect.Constructor;
27 import java.lang.reflect.Field;
28 import java.lang.reflect.Method;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FSDataInputStream;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.io.SequenceFile;
37
38 public class SequenceFileLogReader implements HLog.Reader {
39 private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
40
41
42
43
44
45
46
47
48
49
50
51
52
53 static class WALReader extends SequenceFile.Reader {
54
55 WALReader(final FileSystem fs, final Path p, final Configuration c)
56 throws IOException {
57 super(fs, p, c);
58
59 }
60
61 @Override
62 protected FSDataInputStream openFile(FileSystem fs, Path file,
63 int bufferSize, long length)
64 throws IOException {
65 return new WALReaderFSDataInputStream(super.openFile(fs, file,
66 bufferSize, length), length);
67 }
68
69
70
71
72 static class WALReaderFSDataInputStream extends FSDataInputStream {
73 private boolean firstGetPosInvocation = true;
74 private long length;
75
76 WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
77 throws IOException {
78 super(is);
79 this.length = l;
80 }
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @Override
98 public long getPos() throws IOException {
99 if (this.firstGetPosInvocation) {
100 this.firstGetPosInvocation = false;
101 long adjust = 0;
102
103 try {
104 Field fIn = FilterInputStream.class.getDeclaredField("in");
105 fIn.setAccessible(true);
106 Object realIn = fIn.get(this.in);
107 Method getFileLength = realIn.getClass().
108 getMethod("getFileLength", new Class<?> []{});
109 getFileLength.setAccessible(true);
110 long realLength = ((Long)getFileLength.
111 invoke(realIn, new Object []{})).longValue();
112 assert(realLength >= this.length);
113 adjust = realLength - this.length;
114 } catch(Exception e) {
115 SequenceFileLogReader.LOG.warn(
116 "Error while trying to get accurate file length. " +
117 "Truncation / data loss may occur if RegionServers die.", e);
118 }
119
120 return adjust + super.getPos();
121 }
122 return super.getPos();
123 }
124 }
125 }
126
127 Configuration conf;
128 WALReader reader;
129
130 Path path;
131 int edit = 0;
132 long entryStart = 0;
133
134 protected Class<? extends HLogKey> keyClass;
135
136
137
138
139 public SequenceFileLogReader() {
140 }
141
142
143
144
145
146
147
148 public SequenceFileLogReader(Class<? extends HLogKey> keyClass) {
149 this.keyClass = keyClass;
150 }
151
152
153 @Override
154 public void init(FileSystem fs, Path path, Configuration conf)
155 throws IOException {
156 this.conf = conf;
157 this.path = path;
158 reader = new WALReader(fs, path, conf);
159 }
160
161 @Override
162 public void close() throws IOException {
163 try {
164 reader.close();
165 } catch (IOException ioe) {
166 throw addFileInfoToException(ioe);
167 }
168 }
169
170 @Override
171 public HLog.Entry next() throws IOException {
172 return next(null);
173 }
174
175 @Override
176 public HLog.Entry next(HLog.Entry reuse) throws IOException {
177 this.entryStart = this.reader.getPosition();
178 HLog.Entry e = reuse;
179 if (e == null) {
180 HLogKey key;
181 if (keyClass == null) {
182 key = HLog.newKey(conf);
183 } else {
184 try {
185 key = keyClass.newInstance();
186 } catch (InstantiationException ie) {
187 throw new IOException(ie);
188 } catch (IllegalAccessException iae) {
189 throw new IOException(iae);
190 }
191 }
192
193 WALEdit val = new WALEdit();
194 e = new HLog.Entry(key, val);
195 }
196 boolean b = false;
197 try {
198 b = this.reader.next(e.getKey(), e.getEdit());
199 } catch (IOException ioe) {
200 throw addFileInfoToException(ioe);
201 }
202 edit++;
203 return b? e: null;
204 }
205
206 @Override
207 public void seek(long pos) throws IOException {
208 try {
209 reader.seek(pos);
210 } catch (IOException ioe) {
211 throw addFileInfoToException(ioe);
212 }
213 }
214
215 @Override
216 public long getPosition() throws IOException {
217 return reader.getPosition();
218 }
219
220 protected IOException addFileInfoToException(final IOException ioe)
221 throws IOException {
222 long pos = -1;
223 try {
224 pos = getPosition();
225 } catch (IOException e) {
226 LOG.warn("Failed getting position to add to throw", e);
227 }
228
229
230 long end = Long.MAX_VALUE;
231 try {
232 Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
233 fEnd.setAccessible(true);
234 end = fEnd.getLong(this.reader);
235 } catch(Exception e) {
236
237 String msg = (this.path == null? "": this.path.toString()) +
238 ", entryStart=" + entryStart + ", pos=" + pos +
239 ((end == Long.MAX_VALUE) ? "" : ", end=" + end) +
240 ", edit=" + this.edit;
241
242
243 try {
244 return (IOException) ioe.getClass()
245 .getConstructor(String.class)
246 .newInstance(msg)
247 .initCause(ioe);
248 } catch(Exception e) {
249
250 return ioe;
251 }
252 }