1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.lang.ref.SoftReference;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.FilterFileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.fs.PositionedReadable;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.client.HBaseAdmin;
47 import org.apache.hadoop.hbase.client.HTable;
48 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
49 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.junit.Test;
52
53
54
55
56
57
58 public class TestFSErrorsExposed {
59 private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
60
61 HBaseTestingUtility util = new HBaseTestingUtility();
62
63
64
65
66
67 @Test
68 public void testHFileScannerThrowsErrors() throws IOException {
69 Path hfilePath = new Path(new Path(
70 HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
71 "regionname"), "familyname");
72 FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
73 StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2*1024);
74 TestStoreFile.writeStoreFile(
75 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
76
77 StoreFile sf = new StoreFile(fs, writer.getPath(), false,
78 util.getConfiguration(), StoreFile.BloomType.NONE, false);
79 StoreFile.Reader reader = sf.createReader();
80 HFileScanner scanner = reader.getScanner(false, true);
81
82 FaultyInputStream inStream = fs.inStreams.get(0).get();
83 assertNotNull(inStream);
84
85 scanner.seekTo();
86
87 assertTrue(scanner.next());
88
89 inStream.startFaults();
90
91 try {
92 int scanned=0;
93 while (scanner.next()) {
94 scanned++;
95 }
96 fail("Scanner didn't throw after faults injected");
97 } catch (IOException ioe) {
98 LOG.info("Got expected exception", ioe);
99 assertTrue(ioe.getMessage().contains("Fault"));
100 }
101 reader.close();
102 }
103
104
105
106
107
108 @Test
109 public void testStoreFileScannerThrowsErrors() throws IOException {
110 Path hfilePath = new Path(new Path(
111 HBaseTestingUtility.getTestDir("internalScannerExposesErrors"),
112 "regionname"), "familyname");
113 FaultyFileSystem fs = new FaultyFileSystem(util.getTestFileSystem());
114 StoreFile.Writer writer = StoreFile.createWriter(fs, hfilePath, 2 * 1024);
115 TestStoreFile.writeStoreFile(
116 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
117
118 StoreFile sf = new StoreFile(fs, writer.getPath(), false,
119 util.getConfiguration(), BloomType.NONE, false);
120 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
121 Collections.singletonList(sf), false, true);
122 KeyValueScanner scanner = scanners.get(0);
123
124 FaultyInputStream inStream = fs.inStreams.get(0).get();
125 assertNotNull(inStream);
126
127 scanner.seek(KeyValue.LOWESTKEY);
128
129 assertNotNull(scanner.next());
130
131 inStream.startFaults();
132
133 try {
134 int scanned=0;
135 while (scanner.next() != null) {
136 scanned++;
137 }
138 fail("Scanner didn't throw after faults injected");
139 } catch (IOException ioe) {
140 LOG.info("Got expected exception", ioe);
141 assertTrue(ioe.getMessage().contains("Could not iterate"));
142 }
143 scanner.close();
144 }
145
146
147
148
149
150
151 @Test
152 public void testFullSystemBubblesFSErrors() throws Exception {
153 try {
154
155
156 util.getConfiguration().setInt(
157 "hbase.regionserver.optionallogflushinterval", Integer.MAX_VALUE);
158 util.startMiniCluster(1);
159 byte[] tableName = Bytes.toBytes("table");
160 byte[] fam = Bytes.toBytes("fam");
161
162 HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
163 HTableDescriptor desc = new HTableDescriptor(tableName);
164 desc.addFamily(new HColumnDescriptor(
165 fam, 1, HColumnDescriptor.DEFAULT_COMPRESSION,
166 false, false, HConstants.FOREVER, "NONE"));
167 admin.createTable(desc);
168
169 util.getConfiguration().setInt("hbase.client.retries.number", 1);
170
171
172 HTable table = new HTable(new Configuration(util.getConfiguration()), tableName);
173
174
175 util.loadTable(table, fam);
176 table.flushCommits();
177 util.flush();
178 util.countRows(table);
179
180
181 util.getDFSCluster().shutdownDataNodes();
182
183 try {
184 util.countRows(table);
185 fail("Did not fail to count after removing data");
186 } catch (Exception e) {
187 LOG.info("Got expected error", e);
188 assertTrue(e.getMessage().contains("Could not seek"));
189 }
190
191 } finally {
192 util.shutdownMiniCluster();
193 }
194 }
195
196 static class FaultyFileSystem extends FilterFileSystem {
197 List<SoftReference<FaultyInputStream>> inStreams =
198 new ArrayList<SoftReference<FaultyInputStream>>();
199
200 public FaultyFileSystem(FileSystem testFileSystem) {
201 super(testFileSystem);
202 }
203
204 @Override
205 public FSDataInputStream open(Path p, int bufferSize) throws IOException {
206 FSDataInputStream orig = fs.open(p, bufferSize);
207 FaultyInputStream faulty = new FaultyInputStream(orig);
208 inStreams.add(new SoftReference<FaultyInputStream>(faulty));
209 return faulty;
210 }
211 }
212
213 static class FaultyInputStream extends FSDataInputStream {
214 boolean faultsStarted = false;
215
216 public FaultyInputStream(InputStream in) throws IOException {
217 super(in);
218 }
219
220 public void startFaults() {
221 faultsStarted = true;
222 }
223
224 public int read(long position, byte[] buffer, int offset, int length)
225 throws IOException {
226 injectFault();
227 return ((PositionedReadable)in).read(position, buffer, offset, length);
228 }
229
230 private void injectFault() throws IOException {
231 if (faultsStarted) {
232 throw new IOException("Fault injected");
233 }
234 }
235 }
236
237
238 }