1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.lang.ref.SoftReference;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.List;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.FilterFileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.fs.PositionedReadable;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.client.HBaseAdmin;
47  import org.apache.hadoop.hbase.client.HTable;
48  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
49  import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.junit.Test;
52  
53  
54  /**
55   * Test cases that ensure that file system level errors are bubbled up
56   * appropriately to clients, rather than swallowed.
57   */
58  public class TestFSErrorsExposed {
59    private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
60  
61    HBaseTestingUtility util = new HBaseTestingUtility();
62  
63    /**
64     * Injects errors into the pread calls of an on-disk file, and makes
65     * sure those bubble up to the HFile scanner
66     */
67    @Test
68    public void testHFileScannerThrowsErrors() throws IOException {
69      Path hfilePath = new Path(new Path(
70          HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
71          "regionname"), "familyname");
72      FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
73      StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024);
74      TestStoreFile.writeStoreFile(
75          writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
76  
77      StoreFile sf = new StoreFile(fs, writer.getPath(), false,
78          util.getConfiguration(), StoreFile.BloomType.NONE, false);
79      StoreFile.Reader reader = sf.createReader();
80      HFileScanner scanner = reader.getScanner(false, true);
81  
82      FaultyInputStream inStream = fs.inStreams.get(0).get();
83      assertNotNull(inStream);
84  
85      scanner.seekTo();
86      // Do at least one successful read
87      assertTrue(scanner.next());
88  
89      inStream.startFaults();
90  
91      try {
92        int scanned=0;
93        while (scanner.next()) {
94          scanned++;
95        }
96        fail("Scanner didn't throw after faults injected");
97      } catch (IOException ioe) {
98        LOG.info("Got expected exception", ioe);
99        assertTrue(ioe.getMessage().contains("Fault"));
100     }
101     reader.close();
102   }
103 
104   /**
105    * Injects errors into the pread calls of an on-disk file, and makes
106    * sure those bubble up to the StoreFileScanner
107    */
108   @Test
109   public void testStoreFileScannerThrowsErrors() throws IOException {
110     Path hfilePath = new Path(new Path(
111         HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
112         "regionname"), "familyname");
113     FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
114     StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024);
115     TestStoreFile.writeStoreFile(
116         writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
117 
118     StoreFile sf = new StoreFile(fs, writer.getPath(), false,
119         util.getConfiguration(), BloomType.NONE, false);
120     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
121         Collections.singletonList(sf), false, true);
122     KeyValueScanner scanner = scanners.get(0);
123 
124     FaultyInputStream inStream = fs.inStreams.get(0).get();
125     assertNotNull(inStream);
126 
127     scanner.seek(KeyValue.LOWESTKEY);
128     // Do at least one successful read
129     assertNotNull(scanner.next());
130 
131     inStream.startFaults();
132 
133     try {
134       int scanned=0;
135       while (scanner.next() != null) {
136         scanned++;
137       }
138       fail("Scanner didn't throw after faults injected");
139     } catch (IOException ioe) {
140       LOG.info("Got expected exception", ioe);
141       assertTrue(ioe.getMessage().contains("Could not iterate"));
142     }
143     scanner.close();
144   }
145 
146   /**
147    * Cluster test which starts a region server with a region, then
148    * removes the data from HDFS underneath it, and ensures that
149    * errors are bubbled to the client.
150    */
151   @Test
152   public void testFullSystemBubblesFSErrors() throws Exception {
153     try {
154       // We set it not to run or it will trigger server shutdown while sync'ing
155       // because all the datanodes are bad
156       util.getConfiguration().setInt(
157           "hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
158       util.startMiniCluster(1);
159       byte[] tableName = Bytes.toBytes("table");
160       byte[] fam = Bytes.toBytes("fam");
161 
162       HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
163       HTableDescriptor desc = new HTableDescriptor(tableName);
164       desc.addFamily(new HColumnDescriptor(
165           fam, 1, HColumnDescriptor.DEFAULT_COMPRESSION,
166           false, false, HConstants.FOREVER, "NONE"));
167       admin.createTable(desc);
168       // Make it fail faster.
169       util.getConfiguration().setInt("hbase.client.retries.number", 1);
170       // Make a new Configuration so it makes a new connection that has the
171       // above configuration on it; else we use the old one w/ 10 as default.
172       HTable table = new HTable(new Configuration(util.getConfiguration()), tableName);
173 
174       // Load some data
175       util.loadTable(table, fam);
176       table.flushCommits();
177       util.flush();
178       util.countRows(table);
179 
180       // Kill the DFS cluster
181       util.getDFSCluster().shutdownDataNodes();
182 
183       try {
184         util.countRows(table);
185         fail("Did not fail to count after removing data");
186       } catch (Exception e) {
187         LOG.info("Got expected error", e);
188         assertTrue(e.getMessage().contains("Could not seek"));
189       }
190 
191     } finally {
192       util.shutdownMiniCluster();
193     }
194   }
195 
196   static class FaultyFileSystem extends FilterFileSystem {
197     List<SoftReference<FaultyInputStream>> inStreams =
198       new ArrayList<SoftReference<FaultyInputStream>>();
199 
200     public FaultyFileSystem(FileSystem testFileSystem) {
201       super(testFileSystem);
202     }
203 
204     @Override
205     public FSDataInputStream open(Path p, int bufferSize) throws IOException  {
206       FSDataInputStream orig = fs.open(p, bufferSize);
207       FaultyInputStream faulty = new FaultyInputStream(orig);
208       inStreams.add(new SoftReference<FaultyInputStream>(faulty));
209       return faulty;
210     }
211   }
212 
213   static class FaultyInputStream extends FSDataInputStream {
214     boolean faultsStarted = false;
215 
216     public FaultyInputStream(InputStream in) throws IOException {
217       super(in);
218     }
219 
220     public void startFaults() {
221       faultsStarted = true;
222     }
223 
224     public int read(long position, byte[] buffer, int offset, int length)
225       throws IOException {
226       injectFault();
227       return ((PositionedReadable)in).read(position, buffer, offset, length);
228     }
229 
230     private void injectFault() throws IOException {
231       if (faultsStarted) {
232         throw new IOException("Fault injected");
233       }
234     }
235   }
236 
237 
238 }