View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver.wal;
22  
23  import java.io.IOException;
24  import java.io.OutputStream;
25  import java.lang.reflect.Field;
26  import java.lang.reflect.Method;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FSDataOutputStream;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.io.SequenceFile;
35  import org.apache.hadoop.io.SequenceFile.Metadata;
36  import org.apache.hadoop.io.compress.DefaultCodec;
37  
38  /**
39   * Implementation of {@link HLog.Writer} that delegates to
40   * SequenceFile.Writer.
41   */
42  public class SequenceFileLogWriter implements HLog.Writer {
43    private final Log LOG = LogFactory.getLog(this.getClass());
44    // The sequence file we delegate to.
45    private SequenceFile.Writer writer;
46    // The dfsclient out stream gotten made accessible or null if not available.
47    private OutputStream dfsClient_out;
48    // The syncFs method from hdfs-200 or null if not available.
49    private Method syncFs;
50  
51    private Class<? extends HLogKey> keyClass;
52  
53    /**
54     * Default constructor.
55     */
56    public SequenceFileLogWriter() {
57      super();
58    }
59  
60    /**
61     * This constructor allows a specific HLogKey implementation to override that
62     * which would otherwise be chosen via configuration property.
63     * 
64     * @param keyClass
65     */
66    public SequenceFileLogWriter(Class<? extends HLogKey> keyClass) {
67      this.keyClass = keyClass;
68    }
69    
70    @Override
71    public void init(FileSystem fs, Path path, Configuration conf)
72        throws IOException {
73  
74      if (null == keyClass) {
75        keyClass = HLog.getKeyClass(conf);
76      }
77  
78      // Create a SF.Writer instance.
79      this.writer = SequenceFile.createWriter(fs, conf, path,
80        keyClass, WALEdit.class,
81        fs.getConf().getInt("io.file.buffer.size", 4096),
82        (short) conf.getInt("hbase.regionserver.hlog.replication",
83        fs.getDefaultReplication()),
84        conf.getLong("hbase.regionserver.hlog.blocksize",
85        fs.getDefaultBlockSize()),
86        SequenceFile.CompressionType.NONE,
87        new DefaultCodec(),
88        null,
89        new Metadata());
90  
91      // Get at the private FSDataOutputStream inside in SequenceFile so we can
92      // call sync on it.  Make it accessible.  Stash it aside for call up in
93      // the sync method.
94      final Field fields [] = this.writer.getClass().getDeclaredFields();
95      final String fieldName = "out";
96      for (int i = 0; i < fields.length; ++i) {
97        if (fieldName.equals(fields[i].getName())) {
98          try {
99            // Make the 'out' field up in SF.Writer accessible.
100           fields[i].setAccessible(true);
101           FSDataOutputStream out =
102             (FSDataOutputStream)fields[i].get(this.writer);
103           this.dfsClient_out = out.getWrappedStream();
104           break;
105         } catch (IllegalAccessException ex) {
106           throw new IOException("Accessing " + fieldName, ex);
107         }
108       }
109     }
110 
111     // Now do dirty work to see if syncFs is available.
112     // Test if syncfs is available.
113     Method m = null;
114     boolean append = conf.getBoolean("dfs.support.append", false);
115     if (append) {
116       try {
117         // function pointer to writer.syncFs()
118         m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
119       } catch (SecurityException e) {
120         throw new IOException("Failed test for syncfs", e);
121       } catch (NoSuchMethodException e) {
122         // Not available
123       }
124     }
125     this.syncFs = m;
126     LOG.info((this.syncFs != null)?
127       "Using syncFs -- HDFS-200":
128       ("syncFs -- HDFS-200 -- not available, dfs.support.append=" + append));
129   }
130 
131   @Override
132   public void append(HLog.Entry entry) throws IOException {
133     this.writer.append(entry.getKey(), entry.getEdit());
134   }
135 
136   @Override
137   public void close() throws IOException {
138     this.writer.close();
139   }
140 
141   @Override
142   public void sync() throws IOException {
143     if (this.syncFs != null) {
144       try {
145        this.syncFs.invoke(this.writer, HLog.NO_ARGS);
146       } catch (Exception e) {
147         throw new IOException("Reflection", e);
148       }
149     }
150   }
151 
152   @Override
153   public long getLength() throws IOException {
154     return this.writer.getLength();
155   }
156 
157   /**
158    * @return The dfsclient out stream up inside SF.Writer made accessible, or
159    * null if not available.
160    */
161   public OutputStream getDFSCOutputStream() {
162     return this.dfsClient_out;
163   }
164 }