View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.RemoteExceptionHandler;
28  import org.apache.hadoop.hbase.Server;
29  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
30  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
31  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
32  import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
33  import org.apache.hadoop.hbase.util.Bytes;
34  
35  import java.io.IOException;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.locks.ReentrantLock;
38  
39  /**
40   * Runs periodically to determine if the HLog should be rolled.
41   *
42   * NOTE: This class extends Thread rather than Chore because the sleep time
43   * can be interrupted when there is something to do, rather than the Chore
44   * sleep time which is invariant.
45   */
46  class LogRoller extends Thread implements WALObserver {
47    static final Log LOG = LogFactory.getLog(LogRoller.class);
48    private final ReentrantLock rollLock = new ReentrantLock();
49    private final AtomicBoolean rollLog = new AtomicBoolean(false);
50    private final Server server;
51    private final RegionServerServices services;
52    private volatile long lastrolltime = System.currentTimeMillis();
53    // Period to roll log.
54    private final long rollperiod;
55    private final int threadWakeFrequency;
56  
57    /** @param server */
58    public LogRoller(final Server server, final RegionServerServices services) {
59      super();
60      this.server = server;
61      this.services = services;
62      this.rollperiod = this.server.getConfiguration().
63        getLong("hbase.regionserver.logroll.period", 3600000);
64      this.threadWakeFrequency = this.server.getConfiguration().
65        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
66    }
67  
68    @Override
69    public void run() {
70      while (!server.isStopped()) {
71        long now = System.currentTimeMillis();
72        boolean periodic = false;
73        if (!rollLog.get()) {
74          periodic = (now - this.lastrolltime) > this.rollperiod;
75          if (!periodic) {
76            synchronized (rollLog) {
77              try {
78                rollLog.wait(this.threadWakeFrequency);
79              } catch (InterruptedException e) {
80                // Fall through
81              }
82            }
83            continue;
84          }
85          // Time for periodic roll
86          if (LOG.isDebugEnabled()) {
87            LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
88          }
89        }
90        rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
91        try {
92          this.lastrolltime = now;
93          // This is array of actual region names.
94          byte [][] regionsToFlush = this.services.getWAL().rollWriter();
95          if (regionsToFlush != null) {
96            for (byte [] r: regionsToFlush) scheduleFlush(r);
97          }
98        } catch (FailedLogCloseException e) {
99          server.abort("Failed log close in log roller", e);
100       } catch (java.net.ConnectException e) {
101         server.abort("Failed log close in log roller", e);
102       } catch (IOException ex) {
103         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
104         server.abort("IOE in log roller",
105           RemoteExceptionHandler.checkIOException(ex));
106       } catch (Exception ex) {
107         LOG.error("Log rolling failed", ex);
108         server.abort("Log rolling failed", ex);
109       } finally {
110         rollLog.set(false);
111         rollLock.unlock();
112       }
113     }
114     LOG.info("LogRoller exiting.");
115   }
116 
117   /**
118    * @param region Encoded name of region to flush.
119    */
120   private void scheduleFlush(final byte [] region) {
121     boolean scheduled = false;
122     HRegion r = this.services.getFromOnlineRegions(Bytes.toString(region));
123     FlushRequester requester = null;
124     if (r != null) {
125       requester = this.services.getFlushRequester();
126       if (requester != null) {
127         requester.requestFlush(r);
128         scheduled = true;
129       }
130     }
131     if (!scheduled) {
132     LOG.warn("Failed to schedule flush of " +
133       Bytes.toString(region) + "r=" + r + ", requester=" + requester);
134     }
135   }
136 
137   public void logRollRequested() {
138     synchronized (rollLog) {
139       rollLog.set(true);
140       rollLog.notifyAll();
141     }
142   }
143 
144   /**
145    * Called by region server to wake up this thread if it sleeping.
146    * It is sleeping if rollLock is not held.
147    */
148   public void interruptIfNecessary() {
149     try {
150       rollLock.lock();
151       this.interrupt();
152     } finally {
153       rollLock.unlock();
154     }
155   }
156 
157   @Override
158   public void logRolled(Path newFile) {
159     // Not interested
160   }
161 
162   @Override
163   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
164       WALEdit logEdit) {
165     // Not interested.
166   }
167 
168   @Override
169   public void logCloseRequested() {
170     // not interested
171   }
172 }