View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.util.LinkedList;
23  
24  import org.apache.hadoop.hbase.util.Bytes;
25  import org.apache.hadoop.hbase.util.ClassSize;
26  
27  /**
28   * Manages the read/write consistency within memstore. This provides
29   * an interface for readers to determine what entries to ignore, and
30   * a mechanism for writers to obtain new write numbers, then "commit"
31   * the new writes for readers to read (thus forming atomic transactions).
32   */
33  public class ReadWriteConsistencyControl {
34    private volatile long memstoreRead = 0;
35    private volatile long memstoreWrite = 0;
36  
37    private final Object readWaiters = new Object();
38  
39    // This is the pending queue of writes.
40    private final LinkedList<WriteEntry> writeQueue =
41        new LinkedList<WriteEntry>();
42  
43    private static final ThreadLocal<Long> perThreadReadPoint =
44        new ThreadLocal<Long>();
45  
46    /**
47     * Get this thread's read point. Used primarily by the memstore scanner to
48     * know which values to skip (ie: have not been completed/committed to 
49     * memstore).
50     */
51    public static long getThreadReadPoint() {
52      return perThreadReadPoint.get();
53    }
54  
55    /** 
56     * Set the thread read point to the given value. The thread RWCC
57     * is used by the Memstore scanner so it knows which values to skip. 
58     * Give it a value of 0 if you want everything.
59     */
60    public static void setThreadReadPoint(long readPoint) {
61      perThreadReadPoint.set(readPoint);
62    }
63  
64    /**
65     * Set the thread RWCC read point to whatever the current read point is in
66     * this particular instance of RWCC.  Returns the new thread read point value.
67     */
68    public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
69      perThreadReadPoint.set(rwcc.memstoreReadPoint());
70      return getThreadReadPoint();
71    }
72    
73    /**
74     * Set the thread RWCC read point to 0 (include everything).
75     */
76    public static void resetThreadReadPoint() {
77      perThreadReadPoint.set(0L);
78    }
79  
80    public WriteEntry beginMemstoreInsert() {
81      synchronized (writeQueue) {
82        long nextWriteNumber = ++memstoreWrite;
83        WriteEntry e = new WriteEntry(nextWriteNumber);
84        writeQueue.add(e);
85        return e;
86      }
87    }
88  
89    public void completeMemstoreInsert(WriteEntry e) {
90      synchronized (writeQueue) {
91        e.markCompleted();
92  
93        long nextReadValue = -1;
94        boolean ranOnce=false;
95        while (!writeQueue.isEmpty()) {
96          ranOnce=true;
97          WriteEntry queueFirst = writeQueue.getFirst();
98  
99          if (nextReadValue > 0) {
100           if (nextReadValue+1 != queueFirst.getWriteNumber()) {
101             throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
102                 + nextReadValue + " next: " + queueFirst.getWriteNumber());
103           }
104         }
105 
106         if (queueFirst.isCompleted()) {
107           nextReadValue = queueFirst.getWriteNumber();
108           writeQueue.removeFirst();
109         } else {
110           break;
111         }
112       }
113 
114       if (!ranOnce) {
115         throw new RuntimeException("never was a first");
116       }
117 
118       if (nextReadValue > 0) {
119         synchronized (readWaiters) {
120           memstoreRead = nextReadValue;
121           readWaiters.notifyAll();
122         }
123 
124       }
125     }
126 
127     boolean interrupted = false;
128     synchronized (readWaiters) {
129       while (memstoreRead < e.getWriteNumber()) {
130         try {
131           readWaiters.wait(0);
132         } catch (InterruptedException ie) {
133           // We were interrupted... finish the loop -- i.e. cleanup --and then
134           // on our way out, reset the interrupt flag.
135           interrupted = true;
136         }
137       }
138     }
139     if (interrupted) Thread.currentThread().interrupt();
140   }
141 
142   public long memstoreReadPoint() {
143     return memstoreRead;
144   }
145 
146 
147   public static class WriteEntry {
148     private long writeNumber;
149     private boolean completed = false;
150     WriteEntry(long writeNumber) {
151       this.writeNumber = writeNumber;
152     }
153     void markCompleted() {
154       this.completed = true;
155     }
156     boolean isCompleted() {
157       return this.completed;
158     }
159     long getWriteNumber() {
160       return this.writeNumber;
161     }
162   }
163   
164   public static final long FIXED_SIZE = ClassSize.align(
165       ClassSize.OBJECT + 
166       2 * Bytes.SIZEOF_LONG + 
167       2 * ClassSize.REFERENCE);
168   
169 }