1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.NavigableMap;
24 import java.util.TreeMap;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.Server;
33 import org.apache.hadoop.hbase.regionserver.wal.HLog;
34 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
35 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
37 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
38 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.zookeeper.KeeperException;
41
42 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
43 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
44 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
45
46
47
48
49 public class Replication implements WALObserver {
50 private final boolean replication;
51 private final ReplicationSourceManager replicationManager;
52 private final AtomicBoolean replicating = new AtomicBoolean(true);
53 private final ReplicationZookeeper zkHelper;
54 private final Configuration conf;
55 private ReplicationSink replicationSink;
56
57 private final Server server;
58
59
60
61
62
63
64
65
66
67
68 public Replication(final Server server, final FileSystem fs,
69 final Path logDir, final Path oldLogDir)
70 throws IOException, KeeperException {
71 this.server = server;
72 this.conf = this.server.getConfiguration();
73 this.replication = isReplication(this.conf);
74 if (replication) {
75 this.zkHelper = new ReplicationZookeeper(server, this.replicating);
76 this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
77 this.server, fs, this.replicating, logDir, oldLogDir) ;
78 } else {
79 this.replicationManager = null;
80 this.zkHelper = null;
81 }
82 }
83
84
85
86
87
88 public static boolean isReplication(final Configuration c) {
89 return c.getBoolean(REPLICATION_ENABLE_KEY, false);
90 }
91
92
93
94
95 public void join() {
96 if (this.replication) {
97 this.replicationManager.join();
98 }
99 }
100
101
102
103
104
105
106 public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
107 if (this.replication) {
108 this.replicationSink.replicateEntries(entries);
109 }
110 }
111
112
113
114
115
116
117 public void startReplicationServices() throws IOException {
118 if (this.replication) {
119 this.replicationManager.init();
120 this.replicationSink = new ReplicationSink(this.conf, this.server);
121 }
122 }
123
124
125
126
127
128 public ReplicationSourceManager getReplicationManager() {
129 return this.replicationManager;
130 }
131
132 @Override
133 public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
134 WALEdit logEdit) {
135 NavigableMap<byte[], Integer> scopes =
136 new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
137 byte[] family;
138 for (KeyValue kv : logEdit.getKeyValues()) {
139 family = kv.getFamily();
140 int scope = info.getTableDesc().getFamily(family).getScope();
141 if (scope != REPLICATION_SCOPE_LOCAL &&
142 !scopes.containsKey(family)) {
143 scopes.put(family, scope);
144 }
145 }
146 if (!scopes.isEmpty()) {
147 logEdit.setScopes(scopes);
148 }
149 }
150
151 @Override
152 public void logRolled(Path p) {
153 getReplicationManager().logRolled(p);
154 }
155
156
157
158
159
160
161 public static void decorateMasterConfiguration(Configuration conf) {
162 if (!isReplication(conf)) {
163 return;
164 }
165 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
166 if (!plugins.contains(ReplicationLogCleaner.class.toString())) {
167 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS,
168 plugins + "," + ReplicationLogCleaner.class.getCanonicalName());
169 }
170 }
171
172 @Override
173 public void logRollRequested() {
174
175 }
176
177 @Override
178 public void logCloseRequested() {
179
180 }
181 }