1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.master;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.client.HConnectionManager;
29 import org.apache.hadoop.hbase.master.LogCleanerDelegate;
30 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import java.io.IOException;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Set;
38
39
40
41
42
43 public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
44 private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
45 private Configuration conf;
46 private ReplicationZookeeper zkHelper;
47 private Set<String> hlogs = new HashSet<String>();
48 private boolean stopped = false;
49
50
51
52
53 public ReplicationLogCleaner() {}
54
55 @Override
56 public boolean isLogDeletable(Path filePath) {
57
58 try {
59 if (!zkHelper.getReplication()) {
60 return false;
61 }
62 } catch (KeeperException e) {
63 abort("Cannot get the state of replication", e);
64 return false;
65 }
66
67
68
69 if (this.conf == null) {
70 return true;
71 }
72 String log = filePath.getName();
73
74
75 if (this.hlogs.contains(log)) {
76 return false;
77 }
78
79
80
81
82 return !refreshHLogsAndSearch(log);
83 }
84
85
86
87
88
89
90
91
92 private boolean refreshHLogsAndSearch(String searchedLog) {
93 this.hlogs.clear();
94 final boolean lookForLog = searchedLog != null;
95 List<String> rss = zkHelper.getListOfReplicators();
96 if (rss == null) {
97 LOG.debug("Didn't find any region server that replicates, deleting: " +
98 searchedLog);
99 return false;
100 }
101 for (String rs: rss) {
102 List<String> listOfPeers = zkHelper.getListPeersForRS(rs);
103
104 if (listOfPeers == null) {
105 continue;
106 }
107 for (String id : listOfPeers) {
108 List<String> peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
109 if (peersHlogs != null) {
110 this.hlogs.addAll(peersHlogs);
111 }
112
113 if(lookForLog && this.hlogs.contains(searchedLog)) {
114 LOG.debug("Found log in ZK, keeping: " + searchedLog);
115 return true;
116 }
117 }
118 }
119 LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
120 return false;
121 }
122
123 @Override
124 public void setConf(Configuration conf) {
125
126 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
127 return;
128 }
129
130
131 this.conf = new Configuration(conf);
132 try {
133 ZooKeeperWatcher zkw =
134 new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
135 this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw);
136 } catch (KeeperException e) {
137 LOG.error("Error while configuring " + this.getClass().getName(), e);
138 } catch (IOException e) {
139 LOG.error("Error while configuring " + this.getClass().getName(), e);
140 }
141 refreshHLogsAndSearch(null);
142 }
143
144 @Override
145 public Configuration getConf() {
146 return conf;
147 }
148
149 @Override
150 public void stop(String why) {
151 if (this.stopped) return;
152 this.stopped = true;
153 if (this.zkHelper != null) {
154 LOG.info("Stopping " + this.zkHelper.getZookeeperWatcher());
155 this.zkHelper.getZookeeperWatcher().close();
156 }
157 HConnectionManager.deleteConnection(this.conf, true);
158 }
159
160 @Override
161 public boolean isStopped() {
162 return this.stopped;
163 }
164
165 @Override
166 public void abort(String why, Throwable e) {
167 LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
168 stop(why);
169 }
170 }