1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.ReentrantLock;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.RemoteExceptionHandler;
30 import org.apache.hadoop.util.StringUtils;
31
32
33
34
35 public class CompactSplitThread extends Thread implements CompactionRequestor {
36 static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
37 private final long frequency;
38 private final ReentrantLock lock = new ReentrantLock();
39
40 private final HRegionServer server;
41 private final Configuration conf;
42
43 private final PriorityCompactionQueue compactionQueue =
44 new PriorityCompactionQueue();
45
46
47
48
49 public static final int PRIORITY_USER = 1;
50
51
52
53
54
55
56 private int regionSplitLimit;
57
58
59 public CompactSplitThread(HRegionServer server) {
60 super();
61 this.server = server;
62 this.conf = server.getConfiguration();
63 this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
64 Integer.MAX_VALUE);
65 this.frequency =
66 conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
67 20 * 1000);
68 }
69
70 @Override
71 public void run() {
72 while (!this.server.isStopped()) {
73 HRegion r = null;
74 try {
75 r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
76 if (r != null) {
77 lock.lock();
78 try {
79 if(!this.server.isStopped()) {
80
81 byte [] midKey = r.compactStores();
82 if (r.getLastCompactInfo() != null) {
83 this.server.getMetrics().addCompaction(r.getLastCompactInfo());
84 }
85 if (shouldSplitRegion() && midKey != null &&
86 !this.server.isStopped()) {
87 split(r, midKey);
88 }
89 }
90 } finally {
91 lock.unlock();
92 }
93 }
94 } catch (InterruptedException ex) {
95 continue;
96 } catch (IOException ex) {
97 LOG.error("Compaction/Split failed for region " +
98 r.getRegionNameAsString(),
99 RemoteExceptionHandler.checkIOException(ex));
100 if (!server.checkFileSystem()) {
101 break;
102 }
103 } catch (Exception ex) {
104 LOG.error("Compaction failed" +
105 (r != null ? (" for region " + r.getRegionNameAsString()) : ""),
106 ex);
107 if (!server.checkFileSystem()) {
108 break;
109 }
110 }
111 }
112 compactionQueue.clear();
113 LOG.info(getName() + " exiting");
114 }
115
116 public synchronized void requestCompaction(final HRegion r,
117 final String why) {
118 requestCompaction(r, false, why, r.getCompactPriority());
119 }
120
121 public synchronized void requestCompaction(final HRegion r,
122 final String why, int p) {
123 requestCompaction(r, false, why, p);
124 }
125
126
127
128
129
130
131 public synchronized void requestCompaction(final HRegion r,
132 final boolean force, final String why, int priority) {
133 if (this.server.isStopped()) {
134 return;
135 }
136
137 if (force) {
138 r.setForceMajorCompaction(force);
139 }
140 if (compactionQueue.add(r, priority) && LOG.isDebugEnabled()) {
141 LOG.debug("Compaction " + (force? "(major) ": "") +
142 "requested for " + r.getRegionNameAsString() +
143 (why != null && !why.isEmpty()? " because " + why: "") +
144 "; priority=" + priority + ", compaction queue size=" + compactionQueue.size());
145 }
146 }
147
148 private void split(final HRegion parent, final byte [] midKey)
149 throws IOException {
150 final long startTime = System.currentTimeMillis();
151 SplitTransaction st = new SplitTransaction(parent, midKey);
152
153
154 if (!st.prepare()) return;
155 try {
156 st.execute(this.server, this.server);
157 } catch (IOException ioe) {
158 try {
159 LOG.info("Running rollback of failed split of " +
160 parent.getRegionNameAsString() + "; " + ioe.getMessage());
161 st.rollback(this.server, this.server);
162 LOG.info("Successful rollback of failed split of " +
163 parent.getRegionNameAsString());
164 } catch (Exception ee) {
165
166 LOG.info("Failed rollback of failed split of " +
167 parent.getRegionNameAsString() + " -- aborting server", ee);
168 this.server.abort("Failed split");
169 }
170 return;
171 }
172
173
174
175
176 this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(),
177 st.getSecondDaughter());
178 LOG.info("Region split, META updated, and report to master. Parent=" +
179 parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
180 st.getFirstDaughter().getRegionNameAsString() + ", " +
181 st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
182 StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
183 }
184
185
186
187
188 void interruptIfNecessary() {
189 if (lock.tryLock()) {
190 try {
191 this.interrupt();
192 } finally {
193 lock.unlock();
194 }
195 }
196 }
197
198
199
200
201
202
203
204 public int getCompactionQueueSize() {
205 return compactionQueue.size();
206 }
207
208 private boolean shouldSplitRegion() {
209 return (regionSplitLimit > server.getNumberOfOnlineRegions());
210 }
211
212
213
214
215 public int getRegionSplitLimit() {
216 return this.regionSplitLimit;
217 }
218 }