1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.Stoppable;
34 import org.apache.hadoop.hbase.client.Get;
35 import org.apache.hadoop.hbase.client.HTable;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.ResultScanner;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.regionserver.wal.HLog;
40 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
41 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47
48 public class TestReplicationSink {
49 private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
50 private static final int BATCH_SIZE = 10;
51 private static final long SLEEP_TIME = 500;
52
53 private final static HBaseTestingUtility TEST_UTIL =
54 new HBaseTestingUtility();
55
56 private static ReplicationSink SINK;
57
58 private static final byte[] TABLE_NAME1 =
59 Bytes.toBytes("table1");
60 private static final byte[] TABLE_NAME2 =
61 Bytes.toBytes("table2");
62
63 private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
64 private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
65
66 private static HTable table1;
67 private static Stoppable STOPPABLE = new Stoppable() {
68 final AtomicBoolean stop = new AtomicBoolean(false);
69
70 @Override
71 public boolean isStopped() {
72 return this.stop.get();
73 }
74
75 @Override
76 public void stop(String why) {
77 LOG.info("STOPPING BECAUSE: " + why);
78 this.stop.set(true);
79 }
80
81 };
82
83 private static HTable table2;
84
85
86
87
88 @BeforeClass
89 public static void setUpBeforeClass() throws Exception {
90 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
91 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
92 TEST_UTIL.startMiniCluster(3);
93 SINK =
94 new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
95 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
96 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
97 }
98
99
100
101
102 @AfterClass
103 public static void tearDownAfterClass() throws Exception {
104 STOPPABLE.stop("Shutting down");
105 TEST_UTIL.shutdownMiniCluster();
106 }
107
108
109
110
111 @Before
112 public void setUp() throws Exception {
113 table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
114 table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
115 Thread.sleep(SLEEP_TIME);
116 }
117
118
119
120
121
122 @Test
123 public void testBatchSink() throws Exception {
124 HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
125 for(int i = 0; i < BATCH_SIZE; i++) {
126 entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
127 }
128 SINK.replicateEntries(entries);
129 Scan scan = new Scan();
130 ResultScanner scanRes = table1.getScanner(scan);
131 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
132 }
133
134
135
136
137
138 @Test
139 public void testMixedPutDelete() throws Exception {
140 HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
141 for(int i = 0; i < BATCH_SIZE/2; i++) {
142 entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
143 }
144 SINK.replicateEntries(entries);
145
146 entries = new HLog.Entry[BATCH_SIZE];
147 for(int i = 0; i < BATCH_SIZE; i++) {
148 entries[i] = createEntry(TABLE_NAME1, i,
149 i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
150 }
151
152 SINK.replicateEntries(entries);
153 Scan scan = new Scan();
154 ResultScanner scanRes = table1.getScanner(scan);
155 assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
156 }
157
158
159
160
161
162 @Test
163 public void testMixedPutTables() throws Exception {
164 HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
165 for(int i = 0; i < BATCH_SIZE; i++) {
166 entries[i] =
167 createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
168 i, KeyValue.Type.Put);
169 }
170
171 SINK.replicateEntries(entries);
172 Scan scan = new Scan();
173 ResultScanner scanRes = table2.getScanner(scan);
174 for(Result res : scanRes) {
175 assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
176 }
177 }
178
179
180
181
182
183 @Test
184 public void testMixedDeletes() throws Exception {
185 HLog.Entry[] entries = new HLog.Entry[3];
186 for(int i = 0; i < 3; i++) {
187 entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
188 }
189 SINK.replicateEntries(entries);
190 entries = new HLog.Entry[3];
191
192 entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
193 entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
194 entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
195
196 SINK.replicateEntries(entries);
197
198 Scan scan = new Scan();
199 ResultScanner scanRes = table1.getScanner(scan);
200 assertEquals(0, scanRes.next(3).length);
201 }
202
203
204
205
206
207
208 @Test
209 public void testApplyDeleteBeforePut() throws Exception {
210 HLog.Entry[] entries = new HLog.Entry[5];
211 for(int i = 0; i < 2; i++) {
212 entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
213 }
214 entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
215 for(int i = 3; i < 5; i++) {
216 entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
217 }
218 SINK.replicateEntries(entries);
219 Get get = new Get(Bytes.toBytes(1));
220 Result res = table1.get(get);
221 assertEquals(0, res.size());
222 }
223
224 private HLog.Entry createEntry(byte [] table, int row, KeyValue.Type type) {
225 byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
226 byte[] rowBytes = Bytes.toBytes(row);
227
228
229 try {
230 Thread.sleep(1);
231 } catch (InterruptedException e) {
232 LOG.info("Was interrupted while sleep, meh", e);
233 }
234 final long now = System.currentTimeMillis();
235 KeyValue kv = null;
236 if(type.getCode() == KeyValue.Type.Put.getCode()) {
237 kv = new KeyValue(rowBytes, fam, fam, now,
238 KeyValue.Type.Put, Bytes.toBytes(row));
239 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
240 kv = new KeyValue(rowBytes, fam, fam,
241 now, KeyValue.Type.DeleteColumn);
242 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
243 kv = new KeyValue(rowBytes, fam, null,
244 now, KeyValue.Type.DeleteFamily);
245 }
246
247 HLogKey key = new HLogKey(table, table, now, now);
248
249 WALEdit edit = new WALEdit();
250 edit.add(kv);
251
252 return new HLog.Entry(key, edit);
253 }
254 }