1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.TableNotFoundException;
27 import org.apache.hadoop.hbase.client.Delete;
28 import org.apache.hadoop.hbase.client.HTableInterface;
29 import org.apache.hadoop.hbase.client.HTablePool;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.regionserver.wal.HLog;
32 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.Stoppable;
35
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.TreeMap;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public class ReplicationSink {
57
58 private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
59
60 public static final String REPLICATION_LOG_DIR = ".replogs";
61 private final Configuration conf;
62
63 private final HTablePool pool;
64
65 private final Stoppable stopper;
66 private final ReplicationSinkMetrics metrics;
67
68
69
70
71
72
73
74
75 public ReplicationSink(Configuration conf, Stoppable stopper)
76 throws IOException {
77 this.conf = conf;
78 this.pool = new HTablePool(this.conf,
79 conf.getInt("replication.sink.htablepool.capacity", 10));
80 this.stopper = stopper;
81 this.metrics = new ReplicationSinkMetrics();
82 }
83
84
85
86
87
88
89
90
91 public void replicateEntries(HLog.Entry[] entries)
92 throws IOException {
93 if (entries.length == 0) {
94 return;
95 }
96
97
98 try {
99 long totalReplicated = 0;
100
101
102 Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
103 for (HLog.Entry entry : entries) {
104 WALEdit edit = entry.getEdit();
105 List<KeyValue> kvs = edit.getKeyValues();
106 if (kvs.get(0).isDelete()) {
107 Delete delete = new Delete(kvs.get(0).getRow(),
108 kvs.get(0).getTimestamp(), null);
109 for (KeyValue kv : kvs) {
110 if (kv.isDeleteFamily()) {
111 delete.deleteFamily(kv.getFamily());
112 } else if (!kv.isEmptyColumn()) {
113 delete.deleteColumn(kv.getFamily(),
114 kv.getQualifier());
115 }
116 }
117 delete(entry.getKey().getTablename(), delete);
118 } else {
119 byte[] table = entry.getKey().getTablename();
120 List<Put> tableList = puts.get(table);
121 if (tableList == null) {
122 tableList = new ArrayList<Put>();
123 puts.put(table, tableList);
124 }
125
126 byte[] lastKey = kvs.get(0).getRow();
127 Put put = new Put(kvs.get(0).getRow(),
128 kvs.get(0).getTimestamp());
129 for (KeyValue kv : kvs) {
130 if (!Bytes.equals(lastKey, kv.getRow())) {
131 tableList.add(put);
132 put = new Put(kv.getRow(), kv.getTimestamp());
133 }
134 put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
135 lastKey = kv.getRow();
136 }
137 tableList.add(put);
138 }
139 totalReplicated++;
140 }
141 for(byte [] table : puts.keySet()) {
142 put(table, puts.get(table));
143 }
144 this.metrics.setAgeOfLastAppliedOp(
145 entries[entries.length-1].getKey().getWriteTime());
146 this.metrics.appliedBatchesRate.inc(1);
147 LOG.info("Total replicated: " + totalReplicated);
148 } catch (IOException ex) {
149 LOG.error("Unable to accept edit because:", ex);
150 throw ex;
151 }
152 }
153
154
155
156
157
158
159
160 private void put(byte[] tableName, List<Put> puts) throws IOException {
161 if (puts.isEmpty()) {
162 return;
163 }
164 HTableInterface table = null;
165 try {
166 table = this.pool.getTable(tableName);
167 table.put(puts);
168 this.metrics.appliedOpsRate.inc(puts.size());
169 } finally {
170 if (table != null) {
171 this.pool.putTable(table);
172 }
173 }
174 }
175
176
177
178
179
180
181
182 private void delete(byte[] tableName, Delete delete) throws IOException {
183 HTableInterface table = null;
184 try {
185 table = this.pool.getTable(tableName);
186 table.delete(delete);
187 this.metrics.appliedOpsRate.inc(1);
188 } finally {
189 if (table != null) {
190 this.pool.putTable(table);
191 }
192 }
193 }
194 }