View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  
25  import java.util.ConcurrentModificationException;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.concurrent.Delayed;
29  import java.util.concurrent.DelayQueue;
30  import java.util.concurrent.TimeUnit;
31  
32  import java.io.IOException;
33  
34  /**
35   * Leases
36   *
37   * There are several server classes in HBase that need to track external
38   * clients that occasionally send heartbeats.
39   *
40   * <p>These external clients hold resources in the server class.
41   * Those resources need to be released if the external client fails to send a
42   * heartbeat after some interval of time passes.
43   *
44   * <p>The Leases class is a general reusable class for this kind of pattern.
45   * An instance of the Leases class will create a thread to do its dirty work.
46   * You should close() the instance if you want to clean up the thread properly.
47   *
48   * <p>
49   * NOTE: This class extends Thread rather than Chore because the sleep time
50   * can be interrupted when there is something to do, rather than the Chore
51   * sleep time which is invariant.
52   */
53  public class Leases extends Thread {
54    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
55    private final int leasePeriod;
56    private final int leaseCheckFrequency;
57    private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
58    protected final Map<String, Lease> leases = new HashMap<String, Lease>();
59    private volatile boolean stopRequested = false;
60  
61    /**
62     * Creates a lease monitor
63     *
64     * @param leasePeriod - length of time (milliseconds) that the lease is valid
65     * @param leaseCheckFrequency - how often the lease should be checked
66     * (milliseconds)
67     */
68    public Leases(final int leasePeriod, final int leaseCheckFrequency) {
69      this.leasePeriod = leasePeriod;
70      this.leaseCheckFrequency = leaseCheckFrequency;
71      setDaemon(true);
72    }
73  
74    /**
75     * @see java.lang.Thread#run()
76     */
77    @Override
78    public void run() {
79      while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
80        Lease lease = null;
81        try {
82          lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
83        } catch (InterruptedException e) {
84          continue;
85        } catch (ConcurrentModificationException e) {
86          continue;
87        } catch (Throwable e) {
88          LOG.fatal("Unexpected exception killed leases thread", e);
89          break;
90        }
91        if (lease == null) {
92          continue;
93        }
94        // A lease expired.  Run the expired code before removing from queue
95        // since its presence in queue is used to see if lease exists still.
96        if (lease.getListener() == null) {
97          LOG.error("lease listener is null for lease " + lease.getLeaseName());
98        } else {
99          lease.getListener().leaseExpired();
100       }
101       synchronized (leaseQueue) {
102         leases.remove(lease.getLeaseName());
103       }
104     }
105     close();
106   }
107 
108   /**
109    * Shuts down this lease instance when all outstanding leases expire.
110    * Like {@link #close()} but rather than violently end all leases, waits
111    * first on extant leases to finish.  Use this method if the lease holders
112    * could loose data, leak locks, etc.  Presumes client has shutdown
113    * allocation of new leases.
114    */
115   public void closeAfterLeasesExpire() {
116     this.stopRequested = true;
117   }
118 
119   /**
120    * Shut down this Leases instance.  All pending leases will be destroyed,
121    * without any cancellation calls.
122    */
123   public void close() {
124     LOG.info(Thread.currentThread().getName() + " closing leases");
125     this.stopRequested = true;
126     synchronized (leaseQueue) {
127       leaseQueue.clear();
128       leases.clear();
129       leaseQueue.notifyAll();
130     }
131     LOG.info(Thread.currentThread().getName() + " closed leases");
132   }
133 
134   /**
135    * Obtain a lease
136    *
137    * @param leaseName name of the lease
138    * @param listener listener that will process lease expirations
139    * @throws LeaseStillHeldException
140    */
141   public void createLease(String leaseName, final LeaseListener listener)
142   throws LeaseStillHeldException {
143     addLease(new Lease(leaseName, listener));
144   }
145 
146   /**
147    * Inserts lease.  Resets expiration before insertion.
148    * @param lease
149    * @throws LeaseStillHeldException
150    */
151   public void addLease(final Lease lease) throws LeaseStillHeldException {
152     if (this.stopRequested) {
153       return;
154     }
155     lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod);
156     synchronized (leaseQueue) {
157       if (leases.containsKey(lease.getLeaseName())) {
158         throw new LeaseStillHeldException(lease.getLeaseName());
159       }
160       leases.put(lease.getLeaseName(), lease);
161       leaseQueue.add(lease);
162     }
163   }
164 
165   /**
166    * Thrown if we are asked create a lease but lease on passed name already
167    * exists.
168    */
169   @SuppressWarnings("serial")
170   public static class LeaseStillHeldException extends IOException {
171     private final String leaseName;
172 
173     /**
174      * @param name
175      */
176     public LeaseStillHeldException(final String name) {
177       this.leaseName = name;
178     }
179 
180     /** @return name of lease */
181     public String getName() {
182       return this.leaseName;
183     }
184   }
185 
186   /**
187    * Renew a lease
188    *
189    * @param leaseName name of lease
190    * @throws LeaseException
191    */
192   public void renewLease(final String leaseName) throws LeaseException {
193     synchronized (leaseQueue) {
194       Lease lease = leases.get(leaseName);
195       // We need to check to see if the remove is successful as the poll in the run()
196       // method could have completed between the get and the remove which will result
197       // in a corrupt leaseQueue.
198       if (lease == null || !leaseQueue.remove(lease)) {
199         throw new LeaseException("lease '" + leaseName +
200         "' does not exist or has already expired");
201       }
202       lease.setExpirationTime(System.currentTimeMillis() + leasePeriod);
203       leaseQueue.add(lease);
204     }
205   }
206 
207   /**
208    * Client explicitly cancels a lease.
209    * @param leaseName name of lease
210    * @throws LeaseException
211    */
212   public void cancelLease(final String leaseName) throws LeaseException {
213     removeLease(leaseName);
214   }
215 
216   /**
217    * Remove named lease.
218    * Lease is removed from the list of leases and removed from the delay queue.
219    * Lease can be resinserted using {@link #addLease(Lease)}
220    *
221    * @param leaseName name of lease
222    * @throws LeaseException
223    * @return Removed lease
224    */
225   Lease removeLease(final String leaseName) throws LeaseException {
226     Lease lease =  null;
227     synchronized (leaseQueue) {
228       lease = leases.remove(leaseName);
229       if (lease == null) {
230         throw new LeaseException("lease '" + leaseName + "' does not exist");
231       }
232       leaseQueue.remove(lease);
233     }
234     return lease;
235   }
236 
237   /** This class tracks a single Lease. */
238   static class Lease implements Delayed {
239     private final String leaseName;
240     private final LeaseListener listener;
241     private long expirationTime;
242 
243     Lease(final String leaseName, LeaseListener listener) {
244       this(leaseName, listener, 0);
245     }
246 
247     Lease(final String leaseName, LeaseListener listener, long expirationTime) {
248       this.leaseName = leaseName;
249       this.listener = listener;
250       this.expirationTime = expirationTime;
251     }
252 
253     /** @return the lease name */
254     public String getLeaseName() {
255       return leaseName;
256     }
257 
258     /** @return listener */
259     public LeaseListener getListener() {
260       return this.listener;
261     }
262 
263     @Override
264     public boolean equals(Object obj) {
265       if (this == obj) {
266         return true;
267       }
268       if (obj == null) {
269         return false;
270       }
271       if (getClass() != obj.getClass()) {
272         return false;
273       }
274       return this.hashCode() == ((Lease) obj).hashCode();
275     }
276 
277     @Override
278     public int hashCode() {
279       return this.leaseName.hashCode();
280     }
281 
282     public long getDelay(TimeUnit unit) {
283       return unit.convert(this.expirationTime - System.currentTimeMillis(),
284           TimeUnit.MILLISECONDS);
285     }
286 
287     public int compareTo(Delayed o) {
288       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
289         o.getDelay(TimeUnit.MILLISECONDS);
290 
291       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
292     }
293 
294     /** @param expirationTime the expirationTime to set */
295     public void setExpirationTime(long expirationTime) {
296       this.expirationTime = expirationTime;
297     }
298   }
299 }