View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.FilterInputStream;
24  import java.io.IOException;
25  import java.lang.Class;
26  import java.lang.reflect.Constructor;
27  import java.lang.reflect.Field;
28  import java.lang.reflect.Method;
29   
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FSDataInputStream;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.io.SequenceFile;
37  
38  public class SequenceFileLogReader implements HLog.Reader {
39    private static final Log LOG = LogFactory.getLog(SequenceFileLogReader.class);
40  
41    /**
42     * Hack just to set the correct file length up in SequenceFile.Reader.
43     * See HADOOP-6307.  The below is all about setting the right length on the
44     * file we are reading.  fs.getFileStatus(file).getLen() is passed down to
45     * a private SequenceFile.Reader constructor.  This won't work.  Need to do
46     * the available on the stream.  The below is ugly.  It makes getPos, the
47     * first time its called, return length of the file -- i.e. tell a lie -- just
48     * so this line up in SF.Reader's constructor ends up with right answer:
49     *
50     *         this.end = in.getPos() + length;
51     *
52     */
53    static class WALReader extends SequenceFile.Reader {
54  
55      WALReader(final FileSystem fs, final Path p, final Configuration c)
56      throws IOException {
57        super(fs, p, c);
58  
59      }
60  
61      @Override
62      protected FSDataInputStream openFile(FileSystem fs, Path file,
63        int bufferSize, long length)
64      throws IOException {
65        return new WALReaderFSDataInputStream(super.openFile(fs, file,
66          bufferSize, length), length);
67      }
68  
69      /**
70       * Override just so can intercept first call to getPos.
71       */
72      static class WALReaderFSDataInputStream extends FSDataInputStream {
73        private boolean firstGetPosInvocation = true;
74        private long length;
75  
76        WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
77        throws IOException {
78          super(is);
79          this.length = l;
80        }
81  
82        // This section can be confusing.  It is specific to how HDFS works.
83        // Let me try to break it down.  This is the problem:
84        //
85        //  1. HDFS DataNodes update the NameNode about a filename's length 
86        //     on block boundaries or when a file is closed. Therefore, 
87        //     if an RS dies, then the NN's fs.getLength() can be out of date
88        //  2. this.in.available() would work, but it returns int &
89        //     therefore breaks for files > 2GB (happens on big clusters)
90        //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
91        //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
92        //
93        // So, here we adjust getPos() using getFileLength() so the
94        // SequenceFile.Reader constructor (aka: first invocation) comes out 
95        // with the correct end of the file:
96        //         this.end = in.getPos() + length;
97        @Override
98        public long getPos() throws IOException {
99          if (this.firstGetPosInvocation) {
100           this.firstGetPosInvocation = false;
101           long adjust = 0;
102 
103           try {
104             Field fIn = FilterInputStream.class.getDeclaredField("in");
105             fIn.setAccessible(true);
106             Object realIn = fIn.get(this.in);
107             Method getFileLength = realIn.getClass().
108               getMethod("getFileLength", new Class<?> []{});
109             getFileLength.setAccessible(true);
110             long realLength = ((Long)getFileLength.
111               invoke(realIn, new Object []{})).longValue();
112             assert(realLength >= this.length);
113             adjust = realLength - this.length;
114           } catch(Exception e) {
115             SequenceFileLogReader.LOG.warn(
116               "Error while trying to get accurate file length.  " +
117               "Truncation / data loss may occur if RegionServers die.", e);
118           }
119 
120           return adjust + super.getPos();
121         }
122         return super.getPos();
123       }
124     }
125   }
126 
127   Configuration conf;
128   WALReader reader;
129   // Needed logging exceptions
130   Path path;
131   int edit = 0;
132   long entryStart = 0;
133 
134   protected Class<? extends HLogKey> keyClass;
135 
136   /**
137    * Default constructor.
138    */
139   public SequenceFileLogReader() {
140   }
141 
142   /**
143    * This constructor allows a specific HLogKey implementation to override that
144    * which would otherwise be chosen via configuration property.
145    * 
146    * @param keyClass
147    */
148   public SequenceFileLogReader(Class<? extends HLogKey> keyClass) {
149     this.keyClass = keyClass;
150   }
151 
152 
153   @Override
154   public void init(FileSystem fs, Path path, Configuration conf)
155       throws IOException {
156     this.conf = conf;
157     this.path = path;
158     reader = new WALReader(fs, path, conf);
159   }
160 
161   @Override
162   public void close() throws IOException {
163     try {
164       reader.close();
165     } catch (IOException ioe) {
166       throw addFileInfoToException(ioe);
167     }
168   }
169 
170   @Override
171   public HLog.Entry next() throws IOException {
172     return next(null);
173   }
174 
175   @Override
176   public HLog.Entry next(HLog.Entry reuse) throws IOException {
177     this.entryStart = this.reader.getPosition();
178     HLog.Entry e = reuse;
179     if (e == null) {
180       HLogKey key;
181       if (keyClass == null) {
182         key = HLog.newKey(conf);
183       } else {
184         try {
185           key = keyClass.newInstance();
186         } catch (InstantiationException ie) {
187           throw new IOException(ie);
188         } catch (IllegalAccessException iae) {
189           throw new IOException(iae);
190         }
191       }
192       
193       WALEdit val = new WALEdit();
194       e = new HLog.Entry(key, val);
195     }
196     boolean b = false;
197     try {
198       b = this.reader.next(e.getKey(), e.getEdit());
199     } catch (IOException ioe) {
200       throw addFileInfoToException(ioe);
201     }
202     edit++;
203     return b? e: null;
204   }
205 
206   @Override
207   public void seek(long pos) throws IOException {
208     try {
209       reader.seek(pos);
210     } catch (IOException ioe) {
211       throw addFileInfoToException(ioe);
212     }
213   }
214 
215   @Override
216   public long getPosition() throws IOException {
217     return reader.getPosition();
218   }
219 
220   protected IOException addFileInfoToException(final IOException ioe)
221   throws IOException {
222     long pos = -1;
223     try {
224       pos = getPosition();
225     } catch (IOException e) {
226       LOG.warn("Failed getting position to add to throw", e);
227     }
228 
229     // See what SequenceFile.Reader thinks is the end of the file
230     long end = Long.MAX_VALUE;
231     try {
232       Field fEnd = SequenceFile.Reader.class.getDeclaredField("end");
233       fEnd.setAccessible(true);
234       end = fEnd.getLong(this.reader);
235     } catch(Exception e) { /* reflection fail. keep going */ }
236 
237     String msg = (this.path == null? "": this.path.toString()) +
238       ", entryStart=" + entryStart + ", pos=" + pos + 
239       ((end == Long.MAX_VALUE) ? "" : ", end=" + end) + 
240       ", edit=" + this.edit;
241 
242     // Enhance via reflection so we don't change the original class type
243     try {
244       return (IOException) ioe.getClass()
245         .getConstructor(String.class)
246         .newInstance(msg)
247         .initCause(ioe);
248     } catch(Exception e) { /* reflection fail. keep going */ }
249     
250     return ioe;
251   }
252 }