View Javadoc

1   /*
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.EOFException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.io.UnsupportedEncodingException;
26  import java.lang.reflect.Constructor;
27  import java.text.ParseException;
28  import java.util.AbstractList;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collection;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NavigableMap;
37  import java.util.NavigableSet;
38  import java.util.Random;
39  import java.util.TreeMap;
40  import java.util.concurrent.ConcurrentHashMap;
41  import java.util.concurrent.ConcurrentSkipListMap;
42  import java.util.concurrent.CountDownLatch;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicInteger;
46  import java.util.concurrent.atomic.AtomicLong;
47  import java.util.concurrent.locks.ReentrantReadWriteLock;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.DoNotRetryIOException;
57  import org.apache.hadoop.hbase.DroppedSnapshotException;
58  import org.apache.hadoop.hbase.HBaseConfiguration;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.NotServingRegionException;
65  import org.apache.hadoop.hbase.UnknownScannerException;
66  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
67  import org.apache.hadoop.hbase.client.Delete;
68  import org.apache.hadoop.hbase.client.Get;
69  import org.apache.hadoop.hbase.client.Increment;
70  import org.apache.hadoop.hbase.client.Put;
71  import org.apache.hadoop.hbase.client.Result;
72  import org.apache.hadoop.hbase.client.Row;
73  import org.apache.hadoop.hbase.client.RowLock;
74  import org.apache.hadoop.hbase.client.Scan;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
77  import org.apache.hadoop.hbase.io.HeapSize;
78  import org.apache.hadoop.hbase.io.TimeRange;
79  import org.apache.hadoop.hbase.io.hfile.BlockCache;
80  import org.apache.hadoop.hbase.regionserver.wal.HLog;
81  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
82  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
83  import org.apache.hadoop.hbase.util.HashedBytes;
84  import org.apache.hadoop.hbase.util.Bytes;
85  import org.apache.hadoop.hbase.util.CancelableProgressable;
86  import org.apache.hadoop.hbase.util.ClassSize;
87  import org.apache.hadoop.hbase.util.CompressionTest;
88  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89  import org.apache.hadoop.hbase.util.FSUtils;
90  import org.apache.hadoop.hbase.util.Pair;
91  import org.apache.hadoop.hbase.util.Writables;
92  import org.apache.hadoop.io.Writable;
93  import org.apache.hadoop.util.StringUtils;
94  
95  import com.google.common.collect.Lists;
96  
97  /**
98   * HRegion stores data for a certain region of a table.  It stores all columns
99   * for each row. A given table consists of one or more HRegions.
100  *
101  * <p>We maintain multiple HStores for a single HRegion.
102  *
103  * <p>An Store is a set of rows with some column data; together,
104  * they make up all the data for the rows.
105  *
106  * <p>Each HRegion has a 'startKey' and 'endKey'.
107  * <p>The first is inclusive, the second is exclusive (except for
108  * the final region)  The endKey of region 0 is the same as
109  * startKey for region 1 (if it exists).  The startKey for the
110  * first region is null. The endKey for the final region is null.
111  *
112  * <p>Locking at the HRegion level serves only one purpose: preventing the
113  * region from being closed (and consequently split) while other operations
114  * are ongoing. Each row level operation obtains both a row lock and a region
115  * read lock for the duration of the operation. While a scanner is being
116  * constructed, getScanner holds a read lock. If the scanner is successfully
117  * constructed, it holds a read lock until it is closed. A close takes out a
118  * write lock and consequently will block for ongoing operations and will block
119  * new operations from starting while the close is in progress.
120  *
121  * <p>An HRegion is defined by its table and its key extent.
122  *
123  * <p>It consists of at least one Store.  The number of Stores should be
124  * configurable, so that data which is accessed together is stored in the same
125  * Store.  Right now, we approximate that by building a single Store for
126  * each column family.  (This config info will be communicated via the
127  * tabledesc.)
128  *
129  * <p>The HTableDescriptor contains metainfo about the HRegion's table.
130  * regionName is a unique identifier for this HRegion. (startKey, endKey]
131  * defines the keyspace for this HRegion.
132  */
133 public class HRegion implements HeapSize { // , Writable{
134   public static final Log LOG = LogFactory.getLog(HRegion.class);
135   static final String MERGEDIR = "merges";
136 
137   final AtomicBoolean closed = new AtomicBoolean(false);
138   /* Closing can take some time; use the closing flag if there is stuff we don't
139    * want to do while in closing state; e.g. like offer this region up to the
140    * master as a region to close if the carrying regionserver is overloaded.
141    * Once set, it is never cleared.
142    */
143   final AtomicBoolean closing = new AtomicBoolean(false);
144 
145   //////////////////////////////////////////////////////////////////////////////
146   // Members
147   //////////////////////////////////////////////////////////////////////////////
148 
149   private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows = 
150     new ConcurrentHashMap<HashedBytes, CountDownLatch>();
151   private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
152     new ConcurrentHashMap<Integer, HashedBytes>();
153   private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
154   static private Random rand = new Random();
155 
156   protected final Map<byte [], Store> stores =
157     new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
158 
159   final AtomicLong memstoreSize = new AtomicLong(0);
160 
161   /**
162    * The directory for the table this region is part of.
163    * This directory contains the directory for this region.
164    */
165   final Path tableDir;
166 
167   final HLog log;
168   final FileSystem fs;
169   final Configuration conf;
170   final int rowLockWaitDuration;
171   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
172   final HRegionInfo regionInfo;
173   final Path regiondir;
174   KeyValue.KVComparator comparator;
175 
176   /*
177    * Set this when scheduling compaction if want the next compaction to be a
178    * major compaction.  Cleared each time through compaction code.
179    */
180   private volatile boolean forceMajorCompaction = false;
181   private Pair<Long,Long> lastCompactInfo = null;
182 
183   // Used to ensure only one thread closes region at a time.
184   private final Object closeLock = new Object();
185 
186   /*
187    * Data structure of write state flags used coordinating flushes,
188    * compactions and closes.
189    */
190   static class WriteState {
191     // Set while a memstore flush is happening.
192     volatile boolean flushing = false;
193     // Set when a flush has been requested.
194     volatile boolean flushRequested = false;
195     // Set while a compaction is running.
196     volatile boolean compacting = false;
197     // Gets set in close. If set, cannot compact or flush again.
198     volatile boolean writesEnabled = true;
199     // Set if region is read-only
200     volatile boolean readOnly = false;
201 
202     /**
203      * Set flags that make this region read-only.
204      *
205      * @param onOff flip value for region r/o setting
206      */
207     synchronized void setReadOnly(final boolean onOff) {
208       this.writesEnabled = !onOff;
209       this.readOnly = onOff;
210     }
211 
212     boolean isReadOnly() {
213       return this.readOnly;
214     }
215 
216     boolean isFlushRequested() {
217       return this.flushRequested;
218     }
219 
220     static final long HEAP_SIZE = ClassSize.align(
221         ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
222   }
223 
224   final WriteState writestate = new WriteState();
225 
226   final long memstoreFlushSize;
227   private volatile long lastFlushTime;
228   private List<Pair<Long,Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
229   final FlushRequester flushRequester;
230   private final long blockingMemStoreSize;
231   final long threadWakeFrequency;
232   // Used to guard closes
233   final ReentrantReadWriteLock lock =
234     new ReentrantReadWriteLock();
235 
236   // Stop updates lock
237   private final ReentrantReadWriteLock updatesLock =
238     new ReentrantReadWriteLock();
239   private boolean splitRequest;
240   private byte[] splitPoint = null;
241 
242   private final ReadWriteConsistencyControl rwcc =
243       new ReadWriteConsistencyControl();
244 
245   /**
246    * Name of the region info file that resides just under the region directory.
247    */
248   public final static String REGIONINFO_FILE = ".regioninfo";
249 
250   /**
251    * Should only be used for testing purposes
252    */
253   public HRegion(){
254     this.tableDir = null;
255     this.blockingMemStoreSize = 0L;
256     this.conf = null;
257     this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
258     this.flushRequester = null;
259     this.fs = null;
260     this.memstoreFlushSize = 0L;
261     this.log = null;
262     this.regiondir = null;
263     this.regionInfo = null;
264     this.threadWakeFrequency = 0L;
265   }
266 
267   /**
268    * HRegion constructor.  his constructor should only be used for testing and
269    * extensions.  Instances of HRegion should be instantiated with the
270    * {@link HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)} method.
271    *
272    *
273    * @param tableDir qualified path of directory where region should be located,
274    * usually the table directory.
275    * @param log The HLog is the outbound log for any updates to the HRegion
276    * (There's a single HLog for all the HRegions on a single HRegionServer.)
277    * The log file is a logfile from the previous execution that's
278    * custom-computed for this HRegion. The HRegionServer computes and sorts the
279    * appropriate log info for this HRegion. If there is a previous log file
280    * (implying that the HRegion has been written-to before), then read it from
281    * the supplied path.
282    * @param fs is the filesystem.
283    * @param conf is global configuration settings.
284    * @param regionInfo - HRegionInfo that describes the region
285    * is new), then read them from the supplied path.
286    * @param flushRequester an object that implements {@link FlushRequester} or null
287    *
288    * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)
289    */
290   public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
291       HRegionInfo regionInfo, FlushRequester flushRequester) {
292     this.tableDir = tableDir;
293     this.comparator = regionInfo.getComparator();
294     this.log = log;
295     this.fs = fs;
296     this.conf = conf;
297     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
298                     DEFAULT_ROWLOCK_WAIT_DURATION);
299     this.regionInfo = regionInfo;
300     this.flushRequester = flushRequester;
301     this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
302         10 * 1000);
303     String encodedNameStr = this.regionInfo.getEncodedName();
304     this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
305     long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
306     if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
307       flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
308                       HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
309     }
310     this.memstoreFlushSize = flushSize;
311     this.blockingMemStoreSize = this.memstoreFlushSize *
312       conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
313     if (LOG.isDebugEnabled()) {
314       // Write out region name as string and its encoded name.
315       LOG.debug("Instantiated " + this);
316     }
317   }
318 
319   /**
320    * Initialize this region.
321    * @return What the next sequence (edit) id should be.
322    * @throws IOException e
323    */
324   public long initialize() throws IOException {
325     return initialize(null);
326   }
327 
328   /**
329    * Initialize this region.
330    *
331    * @param reporter Tickle every so often if initialize is taking a while.
332    * @return What the next sequence (edit) id should be.
333    * @throws IOException e
334    */
335   public long initialize(final CancelableProgressable reporter)
336   throws IOException {
337     // A region can be reopened if failed a split; reset flags
338     this.closing.set(false);
339     this.closed.set(false);
340 
341     // Write HRI to a file in case we need to recover .META.
342     checkRegioninfoOnFilesystem();
343 
344     // Remove temporary data left over from old regions
345     cleanupTmpDir();
346 
347     // Load in all the HStores.  Get maximum seqid.
348     long maxSeqId = -1;
349     for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
350       Store store = instantiateHStore(this.tableDir, c);
351       this.stores.put(c.getName(), store);
352       long storeSeqId = store.getMaxSequenceId();
353       if (storeSeqId > maxSeqId) {
354         maxSeqId = storeSeqId;
355       }
356     }
357     // Recover any edits if available.
358     maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
359 
360     // Get rid of any splits or merges that were lost in-progress.  Clean out
361     // these directories here on open.  We may be opening a region that was
362     // being split but we crashed in the middle of it all.
363     SplitTransaction.cleanupAnySplitDetritus(this);
364     FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
365 
366     this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
367 
368     this.writestate.compacting = false;
369     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
370     // Use maximum of log sequenceid or that which was found in stores
371     // (particularly if no recovered edits, seqid will be -1).
372     long nextSeqid = maxSeqId + 1;
373     LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
374     return nextSeqid;
375   }
376 
377   /*
378    * Move any passed HStore files into place (if any).  Used to pick up split
379    * files and any merges from splits and merges dirs.
380    * @param initialFiles
381    * @throws IOException
382    */
383   static void moveInitialFilesIntoPlace(final FileSystem fs,
384     final Path initialFiles, final Path regiondir)
385   throws IOException {
386     if (initialFiles != null && fs.exists(initialFiles)) {
387       if (!fs.rename(initialFiles, regiondir)) {
388         LOG.warn("Unable to rename " + initialFiles + " to " + regiondir);
389       }
390     }
391   }
392 
393   /**
394    * @return True if this region has references.
395    */
396   public boolean hasReferences() {
397     for (Store store : this.stores.values()) {
398       for (StoreFile sf : store.getStorefiles()) {
399         // Found a reference, return.
400         if (sf.isReference()) return true;
401       }
402     }
403     return false;
404   }
405 
406   /*
407    * Write out an info file under the region directory.  Useful recovering
408    * mangled regions.
409    * @throws IOException
410    */
411   private void checkRegioninfoOnFilesystem() throws IOException {
412     Path regioninfoPath = new Path(this.regiondir, REGIONINFO_FILE);
413     if (this.fs.exists(regioninfoPath) &&
414         this.fs.getFileStatus(regioninfoPath).getLen() > 0) {
415       return;
416     }
417     // Create in tmpdir and then move into place in case we crash after
418     // create but before close.  If we don't successfully close the file,
419     // subsequent region reopens will fail the below because create is
420     // registered in NN.
421     Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
422     FSDataOutputStream out = this.fs.create(tmpPath, true);
423     try {
424       this.regionInfo.write(out);
425       out.write('\n');
426       out.write('\n');
427       out.write(Bytes.toBytes(this.regionInfo.toString()));
428     } finally {
429       out.close();
430     }
431     if (!fs.rename(tmpPath, regioninfoPath)) {
432       throw new IOException("Unable to rename " + tmpPath + " to " +
433         regioninfoPath);
434     }
435   }
436 
437   /** @return a HRegionInfo object for this region */
438   public HRegionInfo getRegionInfo() {
439     return this.regionInfo;
440   }
441 
442   /** @return true if region is closed */
443   public boolean isClosed() {
444     return this.closed.get();
445   }
446 
447   /**
448    * @return True if closing process has started.
449    */
450   public boolean isClosing() {
451     return this.closing.get();
452   }
453 
454   boolean areWritesEnabled() {
455     synchronized(this.writestate) {
456       return this.writestate.writesEnabled;
457     }
458   }
459 
460    public ReadWriteConsistencyControl getRWCC() {
461      return rwcc;
462    }
463 
464   /**
465    * Close down this HRegion.  Flush the cache, shut down each HStore, don't
466    * service any more calls.
467    *
468    * <p>This method could take some time to execute, so don't call it from a
469    * time-sensitive thread.
470    *
471    * @return Vector of all the storage files that the HRegion's component
472    * HStores make use of.  It's a list of all HStoreFile objects. Returns empty
473    * vector if already closed and null if judged that it should not close.
474    *
475    * @throws IOException e
476    */
477   public List<StoreFile> close() throws IOException {
478     return close(false);
479   }
480 
481   /**
482    * Close down this HRegion.  Flush the cache unless abort parameter is true,
483    * Shut down each HStore, don't service any more calls.
484    *
485    * This method could take some time to execute, so don't call it from a
486    * time-sensitive thread.
487    *
488    * @param abort true if server is aborting (only during testing)
489    * @return Vector of all the storage files that the HRegion's component
490    * HStores make use of.  It's a list of HStoreFile objects.  Can be null if
491    * we are not to close at this time or we are already closed.
492    *
493    * @throws IOException e
494    */
495   public List<StoreFile> close(final boolean abort) throws IOException {
496     // Only allow one thread to close at a time. Serialize them so dual
497     // threads attempting to close will run up against each other.
498     synchronized (closeLock) {
499       return doClose(abort);
500     }
501   }
502 
503   private List<StoreFile> doClose(final boolean abort)
504   throws IOException {
505     if (isClosed()) {
506       LOG.warn("Region " + this + " already closed");
507       return null;
508     }
509     boolean wasFlushing = false;
510     synchronized (writestate) {
511       // Disable compacting and flushing by background threads for this
512       // region.
513       writestate.writesEnabled = false;
514       wasFlushing = writestate.flushing;
515       LOG.debug("Closing " + this + ": disabling compactions & flushes");
516       while (writestate.compacting || writestate.flushing) {
517         LOG.debug("waiting for" +
518           (writestate.compacting ? " compaction" : "") +
519           (writestate.flushing ?
520             (writestate.compacting ? "," : "") + " cache flush" :
521               "") + " to complete for region " + this);
522         try {
523           writestate.wait();
524         } catch (InterruptedException iex) {
525           // continue
526         }
527       }
528     }
529     // If we were not just flushing, is it worth doing a preflush...one
530     // that will clear out of the bulk of the memstore before we put up
531     // the close flag?
532     if (!abort && !wasFlushing && worthPreFlushing()) {
533       LOG.info("Running close preflush of " + this.getRegionNameAsString());
534       internalFlushcache();
535     }
536     this.closing.set(true);
537     lock.writeLock().lock();
538     try {
539       if (this.isClosed()) {
540         // SplitTransaction handles the null
541         return null;
542       }
543       LOG.debug("Updates disabled for region " + this);
544       // Don't flush the cache if we are aborting
545       if (!abort) {
546         internalFlushcache();
547       }
548 
549       List<StoreFile> result = new ArrayList<StoreFile>();
550       for (Store store : stores.values()) {
551         result.addAll(store.close());
552       }
553       this.closed.set(true);
554       LOG.info("Closed " + this);
555       return result;
556     } finally {
557       lock.writeLock().unlock();
558     }
559   }
560 
561    /**
562     * @return True if its worth doing a flush before we put up the close flag.
563     */
564   private boolean worthPreFlushing() {
565     return this.memstoreSize.get() >
566       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
567   }
568 
569   //////////////////////////////////////////////////////////////////////////////
570   // HRegion accessors
571   //////////////////////////////////////////////////////////////////////////////
572 
573   /** @return start key for region */
574   public byte [] getStartKey() {
575     return this.regionInfo.getStartKey();
576   }
577 
578   /** @return end key for region */
579   public byte [] getEndKey() {
580     return this.regionInfo.getEndKey();
581   }
582 
583   /** @return region id */
584   public long getRegionId() {
585     return this.regionInfo.getRegionId();
586   }
587 
588   /** @return region name */
589   public byte [] getRegionName() {
590     return this.regionInfo.getRegionName();
591   }
592 
593   /** @return region name as string for logging */
594   public String getRegionNameAsString() {
595     return this.regionInfo.getRegionNameAsString();
596   }
597 
598   /** @return HTableDescriptor for this region */
599   public HTableDescriptor getTableDesc() {
600     return this.regionInfo.getTableDesc();
601   }
602 
603   /** @return HLog in use for this region */
604   public HLog getLog() {
605     return this.log;
606   }
607 
608   /** @return Configuration object */
609   public Configuration getConf() {
610     return this.conf;
611   }
612 
613   /** @return region directory Path */
614   public Path getRegionDir() {
615     return this.regiondir;
616   }
617 
618   /**
619    * Computes the Path of the HRegion
620    *
621    * @param tabledir qualified path for table
622    * @param name ENCODED region name
623    * @return Path of HRegion directory
624    */
625   public static Path getRegionDir(final Path tabledir, final String name) {
626     return new Path(tabledir, name);
627   }
628 
629   /** @return FileSystem being used by this region */
630   public FileSystem getFilesystem() {
631     return this.fs;
632   }
633 
634   /** @return info about the last compaction <time, size> */
635   public Pair<Long,Long> getLastCompactInfo() {
636     return this.lastCompactInfo;
637   }
638 
639   /** @return the last time the region was flushed */
640   public long getLastFlushTime() {
641     return this.lastFlushTime;
642   }
643 
644   /** @return info about the last flushes <time, size> */
645   public List<Pair<Long,Long>> getRecentFlushInfo() {
646     this.lock.readLock().lock();
647     List<Pair<Long,Long>> ret = this.recentFlushes;
648     this.recentFlushes = new ArrayList<Pair<Long,Long>>();
649     this.lock.readLock().unlock();
650     return ret;
651   }
652 
653   //////////////////////////////////////////////////////////////////////////////
654   // HRegion maintenance.
655   //
656   // These methods are meant to be called periodically by the HRegionServer for
657   // upkeep.
658   //////////////////////////////////////////////////////////////////////////////
659 
660   /** @return returns size of largest HStore. */
661   public long getLargestHStoreSize() {
662     long size = 0;
663     for (Store h: stores.values()) {
664       long storeSize = h.getSize();
665       if (storeSize > size) {
666         size = storeSize;
667       }
668     }
669     return size;
670   }
671 
672   /*
673    * Do preparation for pending compaction.
674    * @throws IOException
675    */
676   void doRegionCompactionPrep() throws IOException {
677   }
678 
679   /*
680    * Removes the temporary directory for this Store.
681    */
682   private void cleanupTmpDir() throws IOException {
683     FSUtils.deleteDirectory(this.fs, getTmpDir());
684   }
685 
686   /**
687    * Get the temporary diretory for this region. This directory
688    * will have its contents removed when the region is reopened.
689    */
690   Path getTmpDir() {
691     return new Path(getRegionDir(), ".tmp");
692   }
693 
694   void setForceMajorCompaction(final boolean b) {
695     this.forceMajorCompaction = b;
696   }
697 
698   boolean getForceMajorCompaction() {
699     return this.forceMajorCompaction;
700   }
701 
702   /**
703    * Called by compaction thread and after region is opened to compact the
704    * HStores if necessary.
705    *
706    * <p>This operation could block for a long time, so don't call it from a
707    * time-sensitive thread.
708    *
709    * Note that no locking is necessary at this level because compaction only
710    * conflicts with a region split, and that cannot happen because the region
711    * server does them sequentially and not in parallel.
712    *
713    * @return mid key if split is needed
714    * @throws IOException e
715    */
716   public byte [] compactStores() throws IOException {
717     boolean majorCompaction = this.forceMajorCompaction;
718     this.forceMajorCompaction = false;
719     return compactStores(majorCompaction);
720   }
721 
722   /*
723    * Called by compaction thread and after region is opened to compact the
724    * HStores if necessary.
725    *
726    * <p>This operation could block for a long time, so don't call it from a
727    * time-sensitive thread.
728    *
729    * Note that no locking is necessary at this level because compaction only
730    * conflicts with a region split, and that cannot happen because the region
731    * server does them sequentially and not in parallel.
732    *
733    * @param majorCompaction True to force a major compaction regardless of thresholds
734    * @return split row if split is needed
735    * @throws IOException e
736    */
737   byte [] compactStores(final boolean majorCompaction)
738   throws IOException {
739     if (this.closing.get()) {
740       LOG.debug("Skipping compaction on " + this + " because closing");
741       return null;
742     }
743     lock.readLock().lock();
744     this.lastCompactInfo = null;
745     try {
746       if (this.closed.get()) {
747         LOG.debug("Skipping compaction on " + this + " because closed");
748         return null;
749       }
750       byte [] splitRow = null;
751       if (this.closed.get()) {
752         return splitRow;
753       }
754       try {
755         synchronized (writestate) {
756           if (!writestate.compacting && writestate.writesEnabled) {
757             writestate.compacting = true;
758           } else {
759             LOG.info("NOT compacting region " + this +
760                 ": compacting=" + writestate.compacting + ", writesEnabled=" +
761                 writestate.writesEnabled);
762               return splitRow;
763           }
764         }
765         LOG.info("Starting" + (majorCompaction? " major " : " ") +
766             "compaction on region " + this);
767         long startTime = EnvironmentEdgeManager.currentTimeMillis();
768         doRegionCompactionPrep();
769         long lastCompactSize = 0;
770         long maxSize = -1;
771         boolean completed = false;
772         try {
773           for (Store store: stores.values()) {
774             final Store.StoreSize ss = store.compact(majorCompaction);
775             lastCompactSize += store.getLastCompactSize();
776             if (ss != null && ss.getSize() > maxSize) {
777               maxSize = ss.getSize();
778               splitRow = ss.getSplitRow();
779             }
780           }
781           completed = true;
782         } catch (InterruptedIOException iioe) {
783           LOG.info("compaction interrupted by user: ", iioe);
784         } finally {
785           long now = EnvironmentEdgeManager.currentTimeMillis();
786           LOG.info(((completed) ? "completed" : "aborted")
787               + " compaction on region " + this
788               + " after " + StringUtils.formatTimeDiff(now, startTime));
789           if (completed) {
790             this.lastCompactInfo =
791               new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
792           }
793         }
794       } finally {
795         synchronized (writestate) {
796           writestate.compacting = false;
797           writestate.notifyAll();
798         }
799       }
800       if (splitRow != null) {
801         assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
802         this.splitPoint = null; // clear the split point (if set)
803       }
804       return splitRow;
805     } finally {
806       lock.readLock().unlock();
807     }
808   }
809 
810   /**
811    * Flush the cache.
812    *
813    * When this method is called the cache will be flushed unless:
814    * <ol>
815    *   <li>the cache is empty</li>
816    *   <li>the region is closed.</li>
817    *   <li>a flush is already in progress</li>
818    *   <li>writes are disabled</li>
819    * </ol>
820    *
821    * <p>This method may block for some time, so it should not be called from a
822    * time-sensitive thread.
823    *
824    * @return true if cache was flushed
825    *
826    * @throws IOException general io exceptions
827    * @throws DroppedSnapshotException Thrown when replay of hlog is required
828    * because a Snapshot was not properly persisted.
829    */
830   public boolean flushcache() throws IOException {
831     // fail-fast instead of waiting on the lock
832     if (this.closing.get()) {
833       LOG.debug("Skipping flush on " + this + " because closing");
834       return false;
835     }
836     lock.readLock().lock();
837     try {
838       if (this.closed.get()) {
839         LOG.debug("Skipping flush on " + this + " because closed");
840         return false;
841       }
842       try {
843         synchronized (writestate) {
844           if (!writestate.flushing && writestate.writesEnabled) {
845             this.writestate.flushing = true;
846           } else {
847             if (LOG.isDebugEnabled()) {
848               LOG.debug("NOT flushing memstore for region " + this +
849                   ", flushing=" +
850                   writestate.flushing + ", writesEnabled=" +
851                   writestate.writesEnabled);
852             }
853             return false;
854           }
855         }
856         return internalFlushcache();
857       } finally {
858         synchronized (writestate) {
859           writestate.flushing = false;
860           this.writestate.flushRequested = false;
861           writestate.notifyAll();
862         }
863       }
864     } finally {
865       lock.readLock().unlock();
866     }
867   }
868 
869   /**
870    * Flush the memstore.
871    *
872    * Flushing the memstore is a little tricky. We have a lot of updates in the
873    * memstore, all of which have also been written to the log. We need to
874    * write those updates in the memstore out to disk, while being able to
875    * process reads/writes as much as possible during the flush operation. Also,
876    * the log has to state clearly the point in time at which the memstore was
877    * flushed. (That way, during recovery, we know when we can rely on the
878    * on-disk flushed structures and when we have to recover the memstore from
879    * the log.)
880    *
881    * <p>So, we have a three-step process:
882    *
883    * <ul><li>A. Flush the memstore to the on-disk stores, noting the current
884    * sequence ID for the log.<li>
885    *
886    * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
887    * ID that was current at the time of memstore-flush.</li>
888    *
889    * <li>C. Get rid of the memstore structures that are now redundant, as
890    * they've been flushed to the on-disk HStores.</li>
891    * </ul>
892    * <p>This method is protected, but can be accessed via several public
893    * routes.
894    *
895    * <p> This method may block for some time.
896    *
897    * @return true if the region needs compacting
898    *
899    * @throws IOException general io exceptions
900    * @throws DroppedSnapshotException Thrown when replay of hlog is required
901    * because a Snapshot was not properly persisted.
902    */
903   protected boolean internalFlushcache() throws IOException {
904     return internalFlushcache(this.log, -1);
905   }
906 
907   /**
908    * @param wal Null if we're NOT to go via hlog/wal.
909    * @param myseqid The seqid to use if <code>wal</code> is null writing out
910    * flush file.
911    * @return true if the region needs compacting
912    * @throws IOException
913    * @see #internalFlushcache()
914    */
915   protected boolean internalFlushcache(final HLog wal, final long myseqid)
916   throws IOException {
917     final long startTime = EnvironmentEdgeManager.currentTimeMillis();
918     // Clear flush flag.
919     // Record latest flush time
920     this.lastFlushTime = startTime;
921     // If nothing to flush, return and avoid logging start/stop flush.
922     if (this.memstoreSize.get() <= 0) {
923       return false;
924     }
925     if (LOG.isDebugEnabled()) {
926       LOG.debug("Started memstore flush for " + this +
927         ", current region memstore size " +
928         StringUtils.humanReadableInt(this.memstoreSize.get()) +
929         ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
930     }
931 
932     // Stop updates while we snapshot the memstore of all stores. We only have
933     // to do this for a moment.  Its quick.  The subsequent sequence id that
934     // goes into the HLog after we've flushed all these snapshots also goes
935     // into the info file that sits beside the flushed files.
936     // We also set the memstore size to zero here before we allow updates
937     // again so its value will represent the size of the updates received
938     // during the flush
939     long sequenceId = -1L;
940     long completeSequenceId = -1L;
941 
942     // We have to take a write lock during snapshot, or else a write could
943     // end up in both snapshot and memstore (makes it difficult to do atomic
944     // rows then)
945     this.updatesLock.writeLock().lock();
946     final long currentMemStoreSize = this.memstoreSize.get();
947     List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
948     try {
949       sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
950       completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
951 
952       for (Store s : stores.values()) {
953         storeFlushers.add(s.getStoreFlusher(completeSequenceId));
954       }
955 
956       // prepare flush (take a snapshot)
957       for (StoreFlusher flusher : storeFlushers) {
958         flusher.prepare();
959       }
960     } finally {
961       this.updatesLock.writeLock().unlock();
962     }
963 
964     LOG.debug("Finished snapshotting, commencing flushing stores");
965 
966     // Any failure from here on out will be catastrophic requiring server
967     // restart so hlog content can be replayed and put back into the memstore.
968     // Otherwise, the snapshot content while backed up in the hlog, it will not
969     // be part of the current running servers state.
970     boolean compactionRequested = false;
971     try {
972       // A.  Flush memstore to all the HStores.
973       // Keep running vector of all store files that includes both old and the
974       // just-made new flush store file.
975 
976       for (StoreFlusher flusher : storeFlushers) {
977         flusher.flushCache();
978       }
979       // Switch snapshot (in memstore) -> new hfile (thus causing
980       // all the store scanners to reset/reseek).
981       for (StoreFlusher flusher : storeFlushers) {
982         boolean needsCompaction = flusher.commit();
983         if (needsCompaction) {
984           compactionRequested = true;
985         }
986       }
987       storeFlushers.clear();
988 
989       // Set down the memstore size by amount of flush.
990       this.memstoreSize.addAndGet(-currentMemStoreSize);
991     } catch (Throwable t) {
992       // An exception here means that the snapshot was not persisted.
993       // The hlog needs to be replayed so its content is restored to memstore.
994       // Currently, only a server restart will do this.
995       // We used to only catch IOEs but its possible that we'd get other
996       // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch
997       // all and sundry.
998       if (wal != null) wal.abortCacheFlush();
999       DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1000           Bytes.toStringBinary(getRegionName()));
1001       dse.initCause(t);
1002       throw dse;
1003     }
1004 
1005     // If we get to here, the HStores have been written. If we get an
1006     // error in completeCacheFlush it will release the lock it is holding
1007 
1008     // B.  Write a FLUSHCACHE-COMPLETE message to the log.
1009     //     This tells future readers that the HStores were emitted correctly,
1010     //     and that all updates to the log for this regionName that have lower
1011     //     log-sequence-ids can be safely ignored.
1012     if (wal != null) {
1013       wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
1014         regionInfo.getTableDesc().getName(), completeSequenceId,
1015         this.getRegionInfo().isMetaRegion());
1016     }
1017 
1018     // C. Finally notify anyone waiting on memstore to clear:
1019     // e.g. checkResources().
1020     synchronized (this) {
1021       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1022     }
1023 
1024     long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1025     if (LOG.isDebugEnabled()) {
1026       LOG.info("Finished memstore flush of ~" +
1027         StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
1028         this + " in " + time + "ms, sequenceid=" + sequenceId +
1029         ", compaction requested=" + compactionRequested +
1030         ((wal == null)? "; wal=null": ""));
1031     }
1032     this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
1033 
1034     return compactionRequested;
1035   }
1036 
1037    /**
1038    * Get the sequence number to be associated with this cache flush. Used by
1039    * TransactionalRegion to not complete pending transactions.
1040    *
1041    *
1042    * @param currentSequenceId
1043    * @return sequence id to complete the cache flush with
1044    */
1045   protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
1046     return currentSequenceId;
1047   }
1048 
1049   //////////////////////////////////////////////////////////////////////////////
1050   // get() methods for client use.
1051   //////////////////////////////////////////////////////////////////////////////
1052   /**
1053    * Return all the data for the row that matches <i>row</i> exactly,
1054    * or the one that immediately preceeds it, at or immediately before
1055    * <i>ts</i>.
1056    *
1057    * @param row row key
1058    * @return map of values
1059    * @throws IOException
1060    */
1061   Result getClosestRowBefore(final byte [] row)
1062   throws IOException{
1063     return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1064   }
1065 
1066   /**
1067    * Return all the data for the row that matches <i>row</i> exactly,
1068    * or the one that immediately preceeds it, at or immediately before
1069    * <i>ts</i>.
1070    *
1071    * @param row row key
1072    * @param family column family to find on
1073    * @return map of values
1074    * @throws IOException read exceptions
1075    */
1076   public Result getClosestRowBefore(final byte [] row, final byte [] family)
1077   throws IOException {
1078     // look across all the HStores for this region and determine what the
1079     // closest key is across all column families, since the data may be sparse
1080     KeyValue key = null;
1081     checkRow(row);
1082     startRegionOperation();
1083     try {
1084       Store store = getStore(family);
1085       KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1086       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
1087       key = store.getRowKeyAtOrBefore(kv);
1088       if (key == null) {
1089         return null;
1090       }
1091       Get get = new Get(key.getRow());
1092       get.addFamily(family);
1093       return get(get, null);
1094     } finally {
1095       closeRegionOperation();
1096     }
1097   }
1098 
1099   /**
1100    * Return an iterator that scans over the HRegion, returning the indicated
1101    * columns and rows specified by the {@link Scan}.
1102    * <p>
1103    * This Iterator must be closed by the caller.
1104    *
1105    * @param scan configured {@link Scan}
1106    * @return InternalScanner
1107    * @throws IOException read exceptions
1108    */
1109   public InternalScanner getScanner(Scan scan)
1110   throws IOException {
1111    return getScanner(scan, null);
1112   }
1113 
1114   protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
1115     startRegionOperation();
1116     try {
1117       // Verify families are all valid
1118       if(scan.hasFamilies()) {
1119         for(byte [] family : scan.getFamilyMap().keySet()) {
1120           checkFamily(family);
1121         }
1122       } else { // Adding all families to scanner
1123         for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
1124           scan.addFamily(family);
1125         }
1126       }
1127       return instantiateInternalScanner(scan, additionalScanners);
1128 
1129     } finally {
1130       closeRegionOperation();
1131     }
1132   }
1133 
1134   protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
1135     return new RegionScanner(scan, additionalScanners);
1136   }
1137 
1138   /*
1139    * @param delete The passed delete is modified by this method. WARNING!
1140    */
1141   private void prepareDelete(Delete delete) throws IOException {
1142     // Check to see if this is a deleteRow insert
1143     if(delete.getFamilyMap().isEmpty()){
1144       for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
1145         // Don't eat the timestamp
1146         delete.deleteFamily(family, delete.getTimeStamp());
1147       }
1148     } else {
1149       for(byte [] family : delete.getFamilyMap().keySet()) {
1150         if(family == null) {
1151           throw new NoSuchColumnFamilyException("Empty family is invalid");
1152         }
1153         checkFamily(family);
1154       }
1155     }
1156   }
1157 
1158   //////////////////////////////////////////////////////////////////////////////
1159   // set() methods for client use.
1160   //////////////////////////////////////////////////////////////////////////////
1161   /**
1162    * @param delete delete object
1163    * @param lockid existing lock id, or null for grab a lock
1164    * @param writeToWAL append to the write ahead lock or not
1165    * @throws IOException read exceptions
1166    */
1167   public void delete(Delete delete, Integer lockid, boolean writeToWAL)
1168   throws IOException {
1169     checkReadOnly();
1170     checkResources();
1171     Integer lid = null;
1172     startRegionOperation();
1173     try {
1174       byte [] row = delete.getRow();
1175       // If we did not pass an existing row lock, obtain a new one
1176       lid = getLock(lockid, row, true);
1177 
1178       try {
1179         // All edits for the given row (across all column families) must happen atomically.
1180         prepareDelete(delete);
1181         delete(delete.getFamilyMap(), writeToWAL);
1182       } finally {
1183         if(lockid == null) releaseRowLock(lid);
1184       }
1185     } finally {
1186       closeRegionOperation();
1187     }
1188   }
1189 
1190 
1191   /**
1192    * @param familyMap map of family to edits for the given family.
1193    * @param writeToWAL
1194    * @throws IOException
1195    */
1196   public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
1197   throws IOException {
1198     long now = EnvironmentEdgeManager.currentTimeMillis();
1199     byte [] byteNow = Bytes.toBytes(now);
1200     boolean flush = false;
1201 
1202     updatesLock.readLock().lock();
1203 
1204     try {
1205 
1206       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
1207 
1208         byte[] family = e.getKey();
1209         List<KeyValue> kvs = e.getValue();
1210         Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1211 
1212         for (KeyValue kv: kvs) {
1213           //  Check if time is LATEST, change to time of most recent addition if so
1214           //  This is expensive.
1215           if (kv.isLatestTimestamp() && kv.isDeleteType()) {
1216             byte[] qual = kv.getQualifier();
1217             if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
1218 
1219             Integer count = kvCount.get(qual);
1220             if (count == null) {
1221               kvCount.put(qual, 1);
1222             } else {
1223               kvCount.put(qual, count + 1);
1224             }
1225             count = kvCount.get(qual);
1226 
1227             Get get = new Get(kv.getRow());
1228             get.setMaxVersions(count);
1229             get.addColumn(family, qual);
1230 
1231             List<KeyValue> result = get(get);
1232 
1233             if (result.size() < count) {
1234               // Nothing to delete
1235               kv.updateLatestStamp(byteNow);
1236               continue;
1237             }
1238             if (result.size() > count) {
1239               throw new RuntimeException("Unexpected size: " + result.size());
1240             }
1241             KeyValue getkv = result.get(count - 1);
1242             Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
1243                 getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
1244           } else {
1245             kv.updateLatestStamp(byteNow);
1246           }
1247         }
1248       }
1249 
1250       if (writeToWAL) {
1251         // write/sync to WAL should happen before we touch memstore.
1252         //
1253         // If order is reversed, i.e. we write to memstore first, and
1254         // for some reason fail to write/sync to commit log, the memstore
1255         // will contain uncommitted transactions.
1256         //
1257         // bunch up all edits across all column families into a
1258         // single WALEdit.
1259         WALEdit walEdit = new WALEdit();
1260         addFamilyMapToWALEdit(familyMap, walEdit);
1261         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1262             walEdit, now);
1263       }
1264 
1265       // Now make changes to the memstore.
1266       long addedSize = applyFamilyMapToMemstore(familyMap);
1267       flush = isFlushSize(memstoreSize.addAndGet(addedSize));
1268     } finally {
1269       this.updatesLock.readLock().unlock();
1270     }
1271 
1272     if (flush) {
1273       // Request a cache flush.  Do it outside update lock.
1274       requestFlush();
1275     }
1276   }
1277 
1278   /**
1279    * @param put
1280    * @throws IOException
1281    */
1282   public void put(Put put) throws IOException {
1283     this.put(put, null, put.getWriteToWAL());
1284   }
1285 
1286   /**
1287    * @param put
1288    * @param writeToWAL
1289    * @throws IOException
1290    */
1291   public void put(Put put, boolean writeToWAL) throws IOException {
1292     this.put(put, null, writeToWAL);
1293   }
1294 
1295   /**
1296    * @param put
1297    * @param lockid
1298    * @throws IOException
1299    */
1300   public void put(Put put, Integer lockid) throws IOException {
1301     this.put(put, lockid, put.getWriteToWAL());
1302   }
1303 
1304   /**
1305    * @param put
1306    * @param lockid
1307    * @param writeToWAL
1308    * @throws IOException
1309    */
1310   public void put(Put put, Integer lockid, boolean writeToWAL)
1311   throws IOException {
1312     checkReadOnly();
1313 
1314     // Do a rough check that we have resources to accept a write.  The check is
1315     // 'rough' in that between the resource check and the call to obtain a
1316     // read lock, resources may run out.  For now, the thought is that this
1317     // will be extremely rare; we'll deal with it when it happens.
1318     checkResources();
1319     startRegionOperation();
1320     try {
1321       // We obtain a per-row lock, so other clients will block while one client
1322       // performs an update. The read lock is released by the client calling
1323       // #commit or #abort or if the HRegionServer lease on the lock expires.
1324       // See HRegionServer#RegionListener for how the expire on HRegionServer
1325       // invokes a HRegion#abort.
1326       byte [] row = put.getRow();
1327       // If we did not pass an existing row lock, obtain a new one
1328       Integer lid = getLock(lockid, row, true);
1329 
1330       try {
1331         // All edits for the given row (across all column families) must happen atomically.
1332         put(put.getFamilyMap(), writeToWAL);
1333       } finally {
1334         if(lockid == null) releaseRowLock(lid);
1335       }
1336     } finally {
1337       closeRegionOperation();
1338     }
1339   }
1340 
1341   /**
1342    * Struct-like class that tracks the progress of a batch operation,
1343    * accumulating status codes and tracking the index at which processing
1344    * is proceeding.
1345    */
1346   private static class BatchOperationInProgress<T> {
1347     T[] operations;
1348     OperationStatusCode[] retCodes;
1349     int nextIndexToProcess = 0;
1350 
1351     public BatchOperationInProgress(T[] operations) {
1352       this.operations = operations;
1353       retCodes = new OperationStatusCode[operations.length];
1354       Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
1355     }
1356 
1357     public boolean isDone() {
1358       return nextIndexToProcess == operations.length;
1359     }
1360   }
1361 
1362   /**
1363    * Perform a batch put with no pre-specified locks
1364    * @see HRegion#put(Pair[])
1365    */
1366   public OperationStatusCode[] put(Put[] puts) throws IOException {
1367     @SuppressWarnings("unchecked")
1368     Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
1369 
1370     for (int i = 0; i < puts.length; i++) {
1371       putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
1372     }
1373     return put(putsAndLocks);
1374   }
1375 
1376   /**
1377    * Perform a batch of puts.
1378    * @param putsAndLocks the list of puts paired with their requested lock IDs.
1379    * @throws IOException
1380    */
1381   public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
1382     BatchOperationInProgress<Pair<Put, Integer>> batchOp =
1383       new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
1384 
1385     while (!batchOp.isDone()) {
1386       checkReadOnly();
1387       checkResources();
1388 
1389       long newSize;
1390       startRegionOperation();
1391       try {
1392         long addedSize = doMiniBatchPut(batchOp);
1393         newSize = memstoreSize.addAndGet(addedSize);
1394       } finally {
1395         closeRegionOperation();
1396       }
1397       if (isFlushSize(newSize)) {
1398         requestFlush();
1399       }
1400     }
1401     return batchOp.retCodes;
1402   }
1403 
1404   private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
1405     long now = EnvironmentEdgeManager.currentTimeMillis();
1406     byte[] byteNow = Bytes.toBytes(now);
1407     boolean locked = false;
1408 
1409     /** Keep track of the locks we hold so we can release them in finally clause */
1410     List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
1411     // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
1412     int firstIndex = batchOp.nextIndexToProcess;
1413     int lastIndexExclusive = firstIndex;
1414     boolean success = false;
1415     try {
1416       // ------------------------------------
1417       // STEP 1. Try to acquire as many locks as we can, and ensure
1418       // we acquire at least one.
1419       // ----------------------------------
1420       int numReadyToWrite = 0;
1421       while (lastIndexExclusive < batchOp.operations.length) {
1422         Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
1423         Put put = nextPair.getFirst();
1424         Integer providedLockId = nextPair.getSecond();
1425 
1426         // Check the families in the put. If bad, skip this one.
1427         try {
1428           checkFamilies(put.getFamilyMap().keySet());
1429         } catch (NoSuchColumnFamilyException nscf) {
1430           LOG.warn("No such column family in batch put", nscf);
1431           batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY;
1432           lastIndexExclusive++;
1433           continue;
1434         }
1435 
1436         // If we haven't got any rows in our batch, we should block to
1437         // get the next one.
1438         boolean shouldBlock = numReadyToWrite == 0;
1439         Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
1440         if (acquiredLockId == null) {
1441           // We failed to grab another lock
1442           assert !shouldBlock : "Should never fail to get lock when blocking";
1443           break; // stop acquiring more rows for this batch
1444         }
1445         if (providedLockId == null) {
1446           acquiredLocks.add(acquiredLockId);
1447         }
1448         lastIndexExclusive++;
1449         numReadyToWrite++;
1450       }
1451       // Nothing to put -- an exception in the above such as NoSuchColumnFamily?
1452       if (numReadyToWrite <= 0) return 0L;
1453 
1454       // We've now grabbed as many puts off the list as we can
1455 
1456       // ------------------------------------
1457       // STEP 2. Update any LATEST_TIMESTAMP timestamps
1458       // ----------------------------------
1459       for (int i = firstIndex; i < lastIndexExclusive; i++) {
1460         updateKVTimestamps(
1461             batchOp.operations[i].getFirst().getFamilyMap().values(),
1462             byteNow);
1463       }
1464 
1465 
1466       this.updatesLock.readLock().lock();
1467       locked = true;
1468 
1469       // ------------------------------------
1470       // STEP 3. Write to WAL
1471       // ----------------------------------
1472       WALEdit walEdit = new WALEdit();
1473       for (int i = firstIndex; i < lastIndexExclusive; i++) {
1474         // Skip puts that were determined to be invalid during preprocessing
1475         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
1476 
1477         Put p = batchOp.operations[i].getFirst();
1478         if (!p.getWriteToWAL()) continue;
1479         addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
1480       }
1481 
1482       // Append the edit to WAL
1483       this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1484           walEdit, now);
1485 
1486       // ------------------------------------
1487       // STEP 4. Write back to memstore
1488       // ----------------------------------
1489       long addedSize = 0;
1490       for (int i = firstIndex; i < lastIndexExclusive; i++) {
1491         if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
1492 
1493         Put p = batchOp.operations[i].getFirst();
1494         addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
1495         batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
1496       }
1497       success = true;
1498       return addedSize;
1499     } finally {
1500       if (locked)
1501         this.updatesLock.readLock().unlock();
1502 
1503       for (Integer toRelease : acquiredLocks) {
1504         releaseRowLock(toRelease);
1505       }
1506       if (!success) {
1507         for (int i = firstIndex; i < lastIndexExclusive; i++) {
1508           if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
1509             batchOp.retCodes[i] = OperationStatusCode.FAILURE;
1510           }
1511         }
1512       }
1513       batchOp.nextIndexToProcess = lastIndexExclusive;
1514     }
1515   }
1516 
1517   //TODO, Think that gets/puts and deletes should be refactored a bit so that
1518   //the getting of the lock happens before, so that you would just pass it into
1519   //the methods. So in the case of checkAndMutate you could just do lockRow,
1520   //get, put, unlockRow or something
1521   /**
1522    *
1523    * @param row
1524    * @param family
1525    * @param qualifier
1526    * @param expectedValue
1527    * @param lockId
1528    * @param writeToWAL
1529    * @throws IOException
1530    * @return true if the new put was execute, false otherwise
1531    */
1532   public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
1533       byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
1534   throws IOException{
1535     checkReadOnly();
1536     //TODO, add check for value length or maybe even better move this to the
1537     //client if this becomes a global setting
1538     checkResources();
1539     boolean isPut = w instanceof Put;
1540     if (!isPut && !(w instanceof Delete))
1541       throw new DoNotRetryIOException("Action must be Put or Delete");
1542     Row r = (Row)w;
1543     if (Bytes.compareTo(row, r.getRow()) != 0) {
1544       throw new DoNotRetryIOException("Action's getRow must match the passed row");
1545     }
1546 
1547     startRegionOperation();
1548     try {
1549       RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
1550       Get get = new Get(row, lock);
1551       checkFamily(family);
1552       get.addColumn(family, qualifier);
1553 
1554       // Lock row
1555       Integer lid = getLock(lockId, get.getRow(), true);
1556       List<KeyValue> result = new ArrayList<KeyValue>();
1557       try {
1558         result = get(get);
1559 
1560         boolean matches = false;
1561         if (result.size() == 0 &&
1562             (expectedValue == null || expectedValue.length == 0)) {
1563           matches = true;
1564         } else if (result.size() == 1) {
1565           //Compare the expected value with the actual value
1566           byte [] actualValue = result.get(0).getValue();
1567           matches = Bytes.equals(expectedValue, actualValue);
1568         }
1569         //If matches put the new put or delete the new delete
1570         if (matches) {
1571           // All edits for the given row (across all column families) must happen atomically.
1572           if (isPut) {
1573             put(((Put)w).getFamilyMap(), writeToWAL);
1574           } else {
1575             Delete d = (Delete)w;
1576             prepareDelete(d);
1577             delete(d.getFamilyMap(), writeToWAL);
1578           }
1579           return true;
1580         }
1581         return false;
1582       } finally {
1583         if(lockId == null) releaseRowLock(lid);
1584       }
1585     } finally {
1586       closeRegionOperation();
1587     }
1588   }
1589 
1590 
1591   /**
1592    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
1593    * with the provided current timestamp.
1594    */
1595   private void updateKVTimestamps(
1596       final Iterable<List<KeyValue>> keyLists, final byte[] now) {
1597     for (List<KeyValue> keys: keyLists) {
1598       if (keys == null) continue;
1599       for (KeyValue key : keys) {
1600         key.updateLatestStamp(now);
1601       }
1602     }
1603   }
1604 
1605   /*
1606    * Check if resources to support an update.
1607    *
1608    * Here we synchronize on HRegion, a broad scoped lock.  Its appropriate
1609    * given we're figuring in here whether this region is able to take on
1610    * writes.  This is only method with a synchronize (at time of writing),
1611    * this and the synchronize on 'this' inside in internalFlushCache to send
1612    * the notify.
1613    */
1614   private void checkResources() {
1615 
1616     // If catalog region, do not impose resource constraints or block updates.
1617     if (this.getRegionInfo().isMetaRegion()) return;
1618 
1619     boolean blocked = false;
1620     while (this.memstoreSize.get() > this.blockingMemStoreSize) {
1621       requestFlush();
1622       if (!blocked) {
1623         LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
1624           "' on region " + Bytes.toStringBinary(getRegionName()) +
1625           ": memstore size " +
1626           StringUtils.humanReadableInt(this.memstoreSize.get()) +
1627           " is >= than blocking " +
1628           StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
1629       }
1630       blocked = true;
1631       synchronized(this) {
1632         try {
1633           wait(threadWakeFrequency);
1634         } catch (InterruptedException e) {
1635           // continue;
1636         }
1637       }
1638     }
1639     if (blocked) {
1640       LOG.info("Unblocking updates for region " + this + " '"
1641           + Thread.currentThread().getName() + "'");
1642     }
1643   }
1644 
1645   /**
1646    * @throws IOException Throws exception if region is in read-only mode.
1647    */
1648   protected void checkReadOnly() throws IOException {
1649     if (this.writestate.isReadOnly()) {
1650       throw new IOException("region is read only");
1651     }
1652   }
1653 
1654   /**
1655    * Add updates first to the hlog and then add values to memstore.
1656    * Warning: Assumption is caller has lock on passed in row.
1657    * @param family
1658    * @param edits Cell updates by column
1659    * @praram now
1660    * @throws IOException
1661    */
1662   private void put(final byte [] family, final List<KeyValue> edits)
1663   throws IOException {
1664     Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
1665     familyMap.put(family, edits);
1666     this.put(familyMap, true);
1667   }
1668 
1669   /**
1670    * Add updates first to the hlog (if writeToWal) and then add values to memstore.
1671    * Warning: Assumption is caller has lock on passed in row.
1672    * @param familyMap map of family to edits for the given family.
1673    * @param writeToWAL if true, then we should write to the log
1674    * @throws IOException
1675    */
1676   private void put(final Map<byte [], List<KeyValue>> familyMap,
1677       boolean writeToWAL) throws IOException {
1678     long now = EnvironmentEdgeManager.currentTimeMillis();
1679     byte[] byteNow = Bytes.toBytes(now);
1680     boolean flush = false;
1681     this.updatesLock.readLock().lock();
1682     try {
1683       checkFamilies(familyMap.keySet());
1684       updateKVTimestamps(familyMap.values(), byteNow);
1685       // write/sync to WAL should happen before we touch memstore.
1686       //
1687       // If order is reversed, i.e. we write to memstore first, and
1688       // for some reason fail to write/sync to commit log, the memstore
1689       // will contain uncommitted transactions.
1690       if (writeToWAL) {
1691         WALEdit walEdit = new WALEdit();
1692         addFamilyMapToWALEdit(familyMap, walEdit);
1693         this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1694            walEdit, now);
1695       }
1696 
1697       long addedSize = applyFamilyMapToMemstore(familyMap);
1698       flush = isFlushSize(memstoreSize.addAndGet(addedSize));
1699     } finally {
1700       this.updatesLock.readLock().unlock();
1701     }
1702     if (flush) {
1703       // Request a cache flush.  Do it outside update lock.
1704       requestFlush();
1705     }
1706   }
1707 
1708   /**
1709    * Atomically apply the given map of family->edits to the memstore.
1710    * This handles the consistency control on its own, but the caller
1711    * should already have locked updatesLock.readLock(). This also does
1712    * <b>not</b> check the families for validity.
1713    *
1714    * @return the additional memory usage of the memstore caused by the
1715    * new entries.
1716    */
1717   private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
1718     ReadWriteConsistencyControl.WriteEntry w = null;
1719     long size = 0;
1720     try {
1721       w = rwcc.beginMemstoreInsert();
1722 
1723       for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
1724         byte[] family = e.getKey();
1725         List<KeyValue> edits = e.getValue();
1726 
1727         Store store = getStore(family);
1728         for (KeyValue kv: edits) {
1729           kv.setMemstoreTS(w.getWriteNumber());
1730           size += store.add(kv);
1731         }
1732       }
1733     } finally {
1734       rwcc.completeMemstoreInsert(w);
1735     }
1736     return size;
1737   }
1738 
1739   /**
1740    * Check the collection of families for validity.
1741    * @throws NoSuchColumnFamilyException if a family does not exist.
1742    */
1743   private void checkFamilies(Collection<byte[]> families)
1744   throws NoSuchColumnFamilyException {
1745     for (byte[] family : families) {
1746       checkFamily(family);
1747     }
1748   }
1749 
1750   /**
1751    * Append the given map of family->edits to a WALEdit data structure.
1752    * This does not write to the HLog itself.
1753    * @param familyMap map of family->edits
1754    * @param walEdit the destination entry to append into
1755    */
1756   private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
1757       WALEdit walEdit) {
1758     for (List<KeyValue> edits : familyMap.values()) {
1759       for (KeyValue kv : edits) {
1760         walEdit.add(kv);
1761       }
1762     }
1763   }
1764 
1765   private void requestFlush() {
1766     if (this.flushRequester == null) {
1767       return;
1768     }
1769     synchronized (writestate) {
1770       if (this.writestate.isFlushRequested()) {
1771         return;
1772       }
1773       writestate.flushRequested = true;
1774     }
1775     // Make request outside of synchronize block; HBASE-818.
1776     this.flushRequester.requestFlush(this);
1777     if (LOG.isDebugEnabled()) {
1778       LOG.debug("Flush requested on " + this);
1779     }
1780   }
1781 
1782   /*
1783    * @param size
1784    * @return True if size is over the flush threshold
1785    */
1786   private boolean isFlushSize(final long size) {
1787     return size > this.memstoreFlushSize;
1788   }
1789 
1790   /**
1791    * Read the edits log put under this region by wal log splitting process.  Put
1792    * the recovered edits back up into this region.
1793    *
1794    * <p>We can ignore any log message that has a sequence ID that's equal to or
1795    * lower than minSeqId.  (Because we know such log messages are already
1796    * reflected in the HFiles.)
1797    *
1798    * <p>While this is running we are putting pressure on memory yet we are
1799    * outside of our usual accounting because we are not yet an onlined region
1800    * (this stuff is being run as part of Region initialization).  This means
1801    * that if we're up against global memory limits, we'll not be flagged to flush
1802    * because we are not online. We can't be flushed by usual mechanisms anyways;
1803    * we're not yet online so our relative sequenceids are not yet aligned with
1804    * HLog sequenceids -- not till we come up online, post processing of split
1805    * edits.
1806    *
1807    * <p>But to help relieve memory pressure, at least manage our own heap size
1808    * flushing if are in excess of per-region limits.  Flushing, though, we have
1809    * to be careful and avoid using the regionserver/hlog sequenceid.  Its running
1810    * on a different line to whats going on in here in this region context so if we
1811    * crashed replaying these edits, but in the midst had a flush that used the
1812    * regionserver log with a sequenceid in excess of whats going on in here
1813    * in this region and with its split editlogs, then we could miss edits the
1814    * next time we go to recover. So, we have to flush inline, using seqids that
1815    * make sense in a this single region context only -- until we online.
1816    *
1817    * @param regiondir
1818    * @param minSeqId Any edit found in split editlogs needs to be in excess of
1819    * this minSeqId to be applied, else its skipped.
1820    * @param reporter
1821    * @return the sequence id of the last edit added to this region out of the
1822    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
1823    * @throws UnsupportedEncodingException
1824    * @throws IOException
1825    */
1826   protected long replayRecoveredEditsIfAny(final Path regiondir,
1827       final long minSeqId, final CancelableProgressable reporter)
1828   throws UnsupportedEncodingException, IOException {
1829     long seqid = minSeqId;
1830     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
1831     if (files == null || files.isEmpty()) return seqid;
1832     for (Path edits: files) {
1833       if (edits == null || !this.fs.exists(edits)) {
1834         LOG.warn("Null or non-existent edits file: " + edits);
1835         continue;
1836       }
1837       if (isZeroLengthThenDelete(this.fs, edits)) continue;
1838       try {
1839         seqid = replayRecoveredEdits(edits, seqid, reporter);
1840       } catch (IOException e) {
1841         boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
1842         if (skipErrors) {
1843           Path p = HLog.moveAsideBadEditsFile(fs, edits);
1844           LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
1845             " as " + p, e);
1846         } else {
1847           throw e;
1848         }
1849       }
1850     }
1851     if (seqid > minSeqId) {
1852       // Then we added some edits to memory. Flush and cleanup split edit files.
1853       internalFlushcache(null, seqid);
1854     }
1855     // Now delete the content of recovered edits.  We're done w/ them.
1856     for (Path file: files) {
1857       if (!this.fs.delete(file, false)) {
1858         LOG.error("Failed delete of " + file);
1859       } else {
1860         LOG.debug("Deleted recovered.edits file=" + file);
1861       }
1862     }
1863     return seqid;
1864   }
1865 
1866   /*
1867    * @param edits File of recovered edits.
1868    * @param minSeqId Minimum sequenceid found in a store file.  Edits in log
1869    * must be larger than this to be replayed.
1870    * @param reporter
1871    * @return the sequence id of the last edit added to this region out of the
1872    * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
1873    * @throws IOException
1874    */
1875   private long replayRecoveredEdits(final Path edits,
1876       final long minSeqId, final CancelableProgressable reporter)
1877     throws IOException {
1878     LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
1879     HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
1880     try {
1881     long currentEditSeqId = minSeqId;
1882     long firstSeqIdInLog = -1;
1883     long skippedEdits = 0;
1884     long editsCount = 0;
1885     long intervalEdits = 0;
1886     HLog.Entry entry;
1887     Store store = null;
1888 
1889     try {
1890       // How many edits seen before we check elapsed time
1891       int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
1892           2000);
1893       // How often to send a progress report (default 1/2 master timeout)
1894       int period = this.conf.getInt("hbase.hstore.report.period",
1895           this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
1896               180000) / 2);
1897       long lastReport = EnvironmentEdgeManager.currentTimeMillis();
1898 
1899       while ((entry = reader.next()) != null) {
1900         HLogKey key = entry.getKey();
1901         WALEdit val = entry.getEdit();
1902 
1903         if (reporter != null) {
1904           intervalEdits += val.size();
1905           if (intervalEdits >= interval) {
1906             // Number of edits interval reached
1907             intervalEdits = 0;
1908             long cur = EnvironmentEdgeManager.currentTimeMillis();
1909             if (lastReport + period <= cur) {
1910               // Timeout reached
1911               if(!reporter.progress()) {
1912                 String msg = "Progressable reporter failed, stopping replay";
1913                 LOG.warn(msg);
1914                 throw new IOException(msg);
1915               }
1916               lastReport = cur;
1917             }
1918           }
1919         }
1920 
1921         if (firstSeqIdInLog == -1) {
1922           firstSeqIdInLog = key.getLogSeqNum();
1923         }
1924         // Now, figure if we should skip this edit.
1925         if (key.getLogSeqNum() <= currentEditSeqId) {
1926           skippedEdits++;
1927           continue;
1928         }
1929         currentEditSeqId = key.getLogSeqNum();
1930         boolean flush = false;
1931         for (KeyValue kv: val.getKeyValues()) {
1932           // Check this edit is for me. Also, guard against writing the special
1933           // METACOLUMN info such as HBASE::CACHEFLUSH entries
1934           if (kv.matchingFamily(HLog.METAFAMILY) ||
1935               !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
1936             skippedEdits++;
1937             continue;
1938               }
1939           // Figure which store the edit is meant for.
1940           if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
1941             store = this.stores.get(kv.getFamily());
1942           }
1943           if (store == null) {
1944             // This should never happen.  Perhaps schema was changed between
1945             // crash and redeploy?
1946             LOG.warn("No family for " + kv);
1947             skippedEdits++;
1948             continue;
1949           }
1950           // Once we are over the limit, restoreEdit will keep returning true to
1951           // flush -- but don't flush until we've played all the kvs that make up
1952           // the WALEdit.
1953           flush = restoreEdit(store, kv);
1954           editsCount++;
1955         }
1956         if (flush) internalFlushcache(null, currentEditSeqId);
1957       }
1958     } catch (EOFException eof) {
1959       Path p = HLog.moveAsideBadEditsFile(fs, edits);
1960       LOG.warn("Encountered EOF. Most likely due to Master failure during " +
1961           "log spliting, so we have this data in another edit.  " +
1962           "Continuing, but renaming " + edits + " as " + p, eof);
1963     } catch (IOException ioe) {
1964       // If the IOE resulted from bad file format,
1965       // then this problem is idempotent and retrying won't help
1966       if (ioe.getCause() instanceof ParseException) {
1967         Path p = HLog.moveAsideBadEditsFile(fs, edits);
1968         LOG.warn("File corruption encountered!  " +
1969             "Continuing, but renaming " + edits + " as " + p, ioe);
1970       } else {
1971         // other IO errors may be transient (bad network connection,
1972         // checksum exception on one datanode, etc).  throw & retry
1973         throw ioe;
1974       }
1975     }
1976     if (LOG.isDebugEnabled()) {
1977       LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
1978           ", firstSequenceidInLog=" + firstSeqIdInLog +
1979           ", maxSequenceidInLog=" + currentEditSeqId);
1980     }
1981     return currentEditSeqId;
1982     } finally {
1983       reader.close();
1984     }
1985   }
1986 
1987   /**
1988    * Used by tests
1989    * @param s Store to add edit too.
1990    * @param kv KeyValue to add.
1991    * @return True if we should flush.
1992    */
1993   protected boolean restoreEdit(final Store s, final KeyValue kv) {
1994     return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
1995   }
1996 
1997   /*
1998    * @param fs
1999    * @param p File to check.
2000    * @return True if file was zero-length (and if so, we'll delete it in here).
2001    * @throws IOException
2002    */
2003   private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
2004   throws IOException {
2005     FileStatus stat = fs.getFileStatus(p);
2006     if (stat.getLen() > 0) return false;
2007     LOG.warn("File " + p + " is zero-length, deleting.");
2008     fs.delete(p, false);
2009     return true;
2010   }
2011 
2012   protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
2013   throws IOException {
2014     return new Store(tableDir, this, c, this.fs, this.conf);
2015   }
2016 
2017   /**
2018    * Return HStore instance.
2019    * Use with caution.  Exposed for use of fixup utilities.
2020    * @param column Name of column family hosted by this region.
2021    * @return Store that goes with the family on passed <code>column</code>.
2022    * TODO: Make this lookup faster.
2023    */
2024   public Store getStore(final byte [] column) {
2025     return this.stores.get(column);
2026   }
2027 
2028   //////////////////////////////////////////////////////////////////////////////
2029   // Support code
2030   //////////////////////////////////////////////////////////////////////////////
2031 
2032   /** Make sure this is a valid row for the HRegion */
2033   private void checkRow(final byte [] row) throws IOException {
2034     if(!rowIsInRange(regionInfo, row)) {
2035       throw new WrongRegionException("Requested row out of range for " +
2036           "HRegion " + this + ", startKey='" +
2037           Bytes.toStringBinary(regionInfo.getStartKey()) + "', getEndKey()='" +
2038           Bytes.toStringBinary(regionInfo.getEndKey()) + "', row='" +
2039           Bytes.toStringBinary(row) + "'");
2040     }
2041   }
2042 
2043   /**
2044    * Obtain a lock on the given row.  Blocks until success.
2045    *
2046    * I know it's strange to have two mappings:
2047    * <pre>
2048    *   ROWS  ==> LOCKS
2049    * </pre>
2050    * as well as
2051    * <pre>
2052    *   LOCKS ==> ROWS
2053    * </pre>
2054    *
2055    * But it acts as a guard on the client; a miswritten client just can't
2056    * submit the name of a row and start writing to it; it must know the correct
2057    * lockid, which matches the lock list in memory.
2058    *
2059    * <p>It would be more memory-efficient to assume a correctly-written client,
2060    * which maybe we'll do in the future.
2061    *
2062    * @param row Name of row to lock.
2063    * @throws IOException
2064    * @return The id of the held lock.
2065    */
2066   public Integer obtainRowLock(final byte [] row) throws IOException {
2067     startRegionOperation();
2068     try {
2069       return internalObtainRowLock(row, true);
2070     } finally {
2071       closeRegionOperation();
2072     }
2073   }
2074 
2075   /**
2076    * Tries to obtain a row lock on the given row, but does not block if the
2077    * row lock is not available. If the lock is not available, returns false.
2078    * Otherwise behaves the same as the above method.
2079    * @see HRegion#obtainRowLock(byte[])
2080    */
2081   public Integer tryObtainRowLock(final byte[] row) throws IOException {
2082     startRegionOperation();
2083     try {
2084       return internalObtainRowLock(row, false);
2085     } finally {
2086       closeRegionOperation();
2087     }
2088   }
2089 
2090   /**
2091    * Obtains or tries to obtain the given row lock.
2092    * @param waitForLock if true, will block until the lock is available.
2093    *        Otherwise, just tries to obtain the lock and returns
2094    *        null if unavailable.
2095    */
2096   private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
2097       throws IOException {
2098     checkRow(row);
2099     startRegionOperation();
2100     try {
2101       HashedBytes rowKey = new HashedBytes(row);
2102       CountDownLatch rowLatch = new CountDownLatch(1);
2103       
2104       // loop until we acquire the row lock (unless !waitForLock)
2105       while (true) {
2106         CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
2107         if (existingLatch == null) {
2108           break;
2109         } else {
2110           // row already locked
2111           if (!waitForLock) {
2112             return null;
2113           }
2114           try {
2115             if (!existingLatch.await(this.rowLockWaitDuration,
2116                             TimeUnit.MILLISECONDS)) {
2117                 return null;
2118             }
2119           } catch (InterruptedException ie) {
2120             // Empty
2121           }
2122         }
2123       }
2124        
2125       // loop until we generate an unused lock id
2126       while (true) {
2127         Integer lockId = lockIdGenerator.incrementAndGet();
2128         HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
2129         if (existingRowKey == null) {
2130           return lockId;
2131         } else {
2132           // lockId already in use, jump generator to a new spot
2133           lockIdGenerator.set(rand.nextInt());
2134         }
2135       }
2136     } finally {
2137       closeRegionOperation();
2138     }
2139   }
2140 
2141   /**
2142    * Used by unit tests.
2143    * @param lockid
2144    * @return Row that goes with <code>lockid</code>
2145    */
2146   byte[] getRowFromLock(final Integer lockid) {
2147     HashedBytes rowKey = lockIds.get(lockid);
2148     return rowKey == null ? null : rowKey.getBytes();
2149   }
2150 
2151   /**
2152    * Release the row lock!
2153    * @param lockId  The lock ID to release.
2154    */
2155   void releaseRowLock(final Integer lockId) {
2156     HashedBytes rowKey = lockIds.remove(lockId);
2157     if (rowKey == null) {
2158       LOG.warn("Release unknown lockId: " + lockId);
2159       return;
2160     }
2161     CountDownLatch rowLatch = lockedRows.remove(rowKey);
2162     if (rowLatch == null) {
2163       LOG.error("Releases row not locked, lockId: " + lockId + " row: "
2164           + rowKey);
2165       return;
2166     }
2167     rowLatch.countDown();
2168   }
2169 
2170   /**
2171    * See if row is currently locked.
2172    * @param lockId
2173    * @return boolean
2174    */
2175   boolean isRowLocked(final Integer lockId) {
2176     return lockIds.containsKey(lockId);
2177   }
2178 
2179   /**
2180    * Returns existing row lock if found, otherwise
2181    * obtains a new row lock and returns it.
2182    * @param lockid requested by the user, or null if the user didn't already hold lock
2183    * @param row the row to lock
2184    * @param waitForLock if true, will block until the lock is available, otherwise will
2185    * simply return null if it could not acquire the lock.
2186    * @return lockid or null if waitForLock is false and the lock was unavailable.
2187    */
2188   private Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
2189   throws IOException {
2190     Integer lid = null;
2191     if (lockid == null) {
2192       lid = internalObtainRowLock(row, waitForLock);
2193     } else {
2194       if (!isRowLocked(lockid)) {
2195         throw new IOException("Invalid row lock");
2196       }
2197       lid = lockid;
2198     }
2199     return lid;
2200   }
2201 
2202   public void bulkLoadHFile(String hfilePath, byte[] familyName)
2203   throws IOException {
2204     startRegionOperation();
2205     try {
2206       Store store = getStore(familyName);
2207       if (store == null) {
2208         throw new DoNotRetryIOException(
2209             "No such column family " + Bytes.toStringBinary(familyName));
2210       }
2211       store.bulkLoadHFile(hfilePath);
2212     } finally {
2213       closeRegionOperation();
2214     }
2215 
2216   }
2217 
2218 
2219   @Override
2220   public boolean equals(Object o) {
2221     if (!(o instanceof HRegion)) {
2222       return false;
2223     }
2224     return this.hashCode() == ((HRegion)o).hashCode();
2225   }
2226 
2227   @Override
2228   public int hashCode() {
2229     return Bytes.hashCode(this.regionInfo.getRegionName());
2230   }
2231 
2232   @Override
2233   public String toString() {
2234     return this.regionInfo.getRegionNameAsString();
2235   }
2236 
2237   /** @return Path of region base directory */
2238   public Path getTableDir() {
2239     return this.tableDir;
2240   }
2241 
2242   /**
2243    * RegionScanner is an iterator through a bunch of rows in an HRegion.
2244    * <p>
2245    * It is used to combine scanners from multiple Stores (aka column families).
2246    */
2247   class RegionScanner implements InternalScanner {
2248     // Package local for testability
2249     KeyValueHeap storeHeap = null;
2250     private final byte [] stopRow;
2251     private Filter filter;
2252     private List<KeyValue> results = new ArrayList<KeyValue>();
2253     private int batch;
2254     private int isScan;
2255     private boolean filterClosed = false;
2256     private long readPt;
2257 
2258     public HRegionInfo getRegionName() {
2259       return regionInfo;
2260     }
2261     RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
2262       //DebugPrint.println("HRegionScanner.<init>");
2263       this.filter = scan.getFilter();
2264       this.batch = scan.getBatch();
2265       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
2266         this.stopRow = null;
2267       } else {
2268         this.stopRow = scan.getStopRow();
2269       }
2270       // If we are doing a get, we want to be [startRow,endRow] normally
2271       // it is [startRow,endRow) and if startRow=endRow we get nothing.
2272       this.isScan = scan.isGetScan() ? -1 : 0;
2273 
2274       this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
2275 
2276       List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
2277       if (additionalScanners != null) {
2278         scanners.addAll(additionalScanners);
2279       }
2280 
2281       for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
2282           scan.getFamilyMap().entrySet()) {
2283         Store store = stores.get(entry.getKey());
2284         scanners.add(store.getScanner(scan, entry.getValue()));
2285       }
2286       this.storeHeap = new KeyValueHeap(scanners, comparator);
2287     }
2288 
2289     RegionScanner(Scan scan) throws IOException {
2290       this(scan, null);
2291     }
2292 
2293     /**
2294      * Reset both the filter and the old filter.
2295      */
2296     protected void resetFilters() {
2297       if (filter != null) {
2298         filter.reset();
2299       }
2300     }
2301 
2302     @Override
2303     public synchronized boolean next(List<KeyValue> outResults, int limit)
2304         throws IOException {
2305       if (this.filterClosed) {
2306         throw new UnknownScannerException("Scanner was closed (timed out?) " +
2307             "after we renewed it. Could be caused by a very slow scanner " +
2308             "or a lengthy garbage collection");
2309       }
2310       startRegionOperation();
2311       try {
2312 
2313         // This could be a new thread from the last time we called next().
2314         ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
2315 
2316         results.clear();
2317         boolean returnResult = nextInternal(limit);
2318 
2319         outResults.addAll(results);
2320         resetFilters();
2321         if (isFilterDone()) {
2322           return false;
2323         }
2324         return returnResult;
2325       } finally {
2326         closeRegionOperation();
2327       }
2328     }
2329 
2330     @Override
2331     public synchronized boolean next(List<KeyValue> outResults)
2332         throws IOException {
2333       // apply the batching limit by default
2334       return next(outResults, batch);
2335     }
2336 
2337     /*
2338      * @return True if a filter rules the scanner is over, done.
2339      */
2340     synchronized boolean isFilterDone() {
2341       return this.filter != null && this.filter.filterAllRemaining();
2342     }
2343 
2344     private boolean nextInternal(int limit) throws IOException {
2345       while (true) {
2346         byte [] currentRow = peekRow();
2347         if (isStopRow(currentRow)) {
2348           if (filter != null && filter.hasFilterRow()) {
2349             filter.filterRow(results);
2350           }
2351           if (filter != null && filter.filterRow()) {
2352             results.clear();
2353           }
2354 
2355           return false;
2356         } else if (filterRowKey(currentRow)) {
2357           nextRow(currentRow);
2358         } else {
2359           byte [] nextRow;
2360           do {
2361             this.storeHeap.next(results, limit - results.size());
2362             if (limit > 0 && results.size() == limit) {
2363               if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
2364                   "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
2365               return true; // we are expecting more yes, but also limited to how many we can return.
2366             }
2367           } while (Bytes.equals(currentRow, nextRow = peekRow()));
2368 
2369           final boolean stopRow = isStopRow(nextRow);
2370 
2371           // now that we have an entire row, lets process with a filters:
2372 
2373           // first filter with the filterRow(List)
2374           if (filter != null && filter.hasFilterRow()) {
2375             filter.filterRow(results);
2376           }
2377 
2378           if (results.isEmpty() || filterRow()) {
2379             // this seems like a redundant step - we already consumed the row
2380             // there're no left overs.
2381             // the reasons for calling this method are:
2382             // 1. reset the filters.
2383             // 2. provide a hook to fast forward the row (used by subclasses)
2384             nextRow(currentRow);
2385 
2386             // This row was totally filtered out, if this is NOT the last row,
2387             // we should continue on.
2388 
2389             if (!stopRow) continue;
2390           }
2391           return !stopRow;
2392         }
2393       }
2394     }
2395 
2396     private boolean filterRow() {
2397       return filter != null
2398           && filter.filterRow();
2399     }
2400     private boolean filterRowKey(byte[] row) {
2401       return filter != null
2402           && filter.filterRowKey(row, 0, row.length);
2403     }
2404 
2405     protected void nextRow(byte [] currentRow) throws IOException {
2406       while (Bytes.equals(currentRow, peekRow())) {
2407         this.storeHeap.next(MOCKED_LIST);
2408       }
2409       results.clear();
2410       resetFilters();
2411     }
2412 
2413     private byte[] peekRow() {
2414       KeyValue kv = this.storeHeap.peek();
2415       return kv == null ? null : kv.getRow();
2416     }
2417 
2418     private boolean isStopRow(byte [] currentRow) {
2419       return currentRow == null ||
2420           (stopRow != null &&
2421           comparator.compareRows(stopRow, 0, stopRow.length,
2422               currentRow, 0, currentRow.length) <= isScan);
2423     }
2424 
2425     @Override
2426     public synchronized void close() {
2427       if (storeHeap != null) {
2428         storeHeap.close();
2429         storeHeap = null;
2430       }
2431       this.filterClosed = true;
2432     }
2433   }
2434 
2435   // Utility methods
2436   /**
2437    * A utility method to create new instances of HRegion based on the
2438    * {@link HConstants#REGION_IMPL} configuration property.
2439    * @param tableDir qualified path of directory where region should be located,
2440    * usually the table directory.
2441    * @param log The HLog is the outbound log for any updates to the HRegion
2442    * (There's a single HLog for all the HRegions on a single HRegionServer.)
2443    * The log file is a logfile from the previous execution that's
2444    * custom-computed for this HRegion. The HRegionServer computes and sorts the
2445    * appropriate log info for this HRegion. If there is a previous log file
2446    * (implying that the HRegion has been written-to before), then read it from
2447    * the supplied path.
2448    * @param fs is the filesystem.
2449    * @param conf is global configuration settings.
2450    * @param regionInfo - HRegionInfo that describes the region
2451    * is new), then read them from the supplied path.
2452    * @param flushListener an object that implements CacheFlushListener or null
2453    * making progress to master -- otherwise master might think region deploy
2454    * failed.  Can be null.
2455    * @return the new instance
2456    */
2457   public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
2458                                    HRegionInfo regionInfo, FlushRequester flushListener) {
2459     try {
2460       @SuppressWarnings("unchecked")
2461       Class<? extends HRegion> regionClass =
2462           (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
2463 
2464       Constructor<? extends HRegion> c =
2465           regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
2466               Configuration.class, HRegionInfo.class, FlushRequester.class);
2467 
2468       return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener);
2469     } catch (Throwable e) {
2470       // todo: what should I throw here?
2471       throw new IllegalStateException("Could not instantiate a region instance.", e);
2472     }
2473   }
2474 
2475   /**
2476    * Convenience method creating new HRegions. Used by createTable and by the
2477    * bootstrap code in the HMaster constructor.
2478    * Note, this method creates an {@link HLog} for the created region. It
2479    * needs to be closed explicitly.  Use {@link HRegion#getLog()} to get
2480    * access.
2481    * @param info Info for region to create.
2482    * @param rootDir Root directory for HBase instance
2483    * @param conf
2484    * @return new HRegion
2485    *
2486    * @throws IOException
2487    */
2488   public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
2489     final Configuration conf)
2490   throws IOException {
2491     Path tableDir =
2492       HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName());
2493     Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
2494     FileSystem fs = FileSystem.get(conf);
2495     fs.mkdirs(regionDir);
2496     HRegion region = HRegion.newHRegion(tableDir,
2497       new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
2498           new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf),
2499       fs, conf, info, null);
2500     region.initialize();
2501     return region;
2502   }
2503 
2504   /**
2505    * Open a Region.
2506    * @param info Info for region to be opened.
2507    * @param wal HLog for region to use. This method will call
2508    * HLog#setSequenceNumber(long) passing the result of the call to
2509    * HRegion#getMinSequenceId() to ensure the log id is properly kept
2510    * up.  HRegionStore does this every time it opens a new region.
2511    * @param conf
2512    * @return new HRegion
2513    *
2514    * @throws IOException
2515    */
2516   public static HRegion openHRegion(final HRegionInfo info, final HLog wal,
2517       final Configuration conf)
2518   throws IOException {
2519     return openHRegion(info, wal, conf, null, null);
2520   }
2521 
2522   /**
2523    * Open a Region.
2524    * @param info Info for region to be opened.
2525    * @param wal HLog for region to use. This method will call
2526    * HLog#setSequenceNumber(long) passing the result of the call to
2527    * HRegion#getMinSequenceId() to ensure the log id is properly kept
2528    * up.  HRegionStore does this every time it opens a new region.
2529    * @param conf
2530    * @param flusher An interface we can request flushes against.
2531    * @param reporter An interface we can report progress against.
2532    * @return new HRegion
2533    *
2534    * @throws IOException
2535    */
2536   public static HRegion openHRegion(final HRegionInfo info, final HLog wal,
2537     final Configuration conf, final FlushRequester flusher,
2538     final CancelableProgressable reporter)
2539   throws IOException {
2540     if (LOG.isDebugEnabled()) {
2541       LOG.debug("Opening region: " + info);
2542     }
2543     if (info == null) {
2544       throw new NullPointerException("Passed region info is null");
2545     }
2546     Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
2547       info.getTableDesc().getName());
2548     HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
2549       flusher);
2550     return r.openHRegion(reporter);
2551   }
2552 
2553   /**
2554    * Open HRegion.
2555    * Calls initialize and sets sequenceid.
2556    * @param reporter
2557    * @return Returns <code>this</code>
2558    * @throws IOException
2559    */
2560   protected HRegion openHRegion(final CancelableProgressable reporter)
2561   throws IOException {
2562     checkCompressionCodecs();
2563 
2564     long seqid = initialize(reporter);
2565     if (this.log != null) {
2566       this.log.setSequenceNumber(seqid);
2567     }
2568     return this;
2569   }
2570 
2571   private void checkCompressionCodecs() throws IOException {
2572     for (HColumnDescriptor fam: regionInfo.getTableDesc().getColumnFamilies()) {
2573       CompressionTest.testCompression(fam.getCompression());
2574       CompressionTest.testCompression(fam.getCompactionCompression());
2575     }
2576   }
2577 
2578   /**
2579    * Inserts a new region's meta information into the passed
2580    * <code>meta</code> region. Used by the HMaster bootstrap code adding
2581    * new table to ROOT table.
2582    *
2583    * @param meta META HRegion to be updated
2584    * @param r HRegion to add to <code>meta</code>
2585    *
2586    * @throws IOException
2587    */
2588   public static void addRegionToMETA(HRegion meta, HRegion r)
2589   throws IOException {
2590     meta.checkResources();
2591     // The row key is the region name
2592     byte[] row = r.getRegionName();
2593     Integer lid = meta.obtainRowLock(row);
2594     try {
2595       final List<KeyValue> edits = new ArrayList<KeyValue>(1);
2596       edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
2597           HConstants.REGIONINFO_QUALIFIER,
2598           EnvironmentEdgeManager.currentTimeMillis(),
2599           Writables.getBytes(r.getRegionInfo())));
2600       meta.put(HConstants.CATALOG_FAMILY, edits);
2601     } finally {
2602       meta.releaseRowLock(lid);
2603     }
2604   }
2605 
2606   /**
2607    * Deletes all the files for a HRegion
2608    *
2609    * @param fs the file system object
2610    * @param rootdir qualified path of HBase root directory
2611    * @param info HRegionInfo for region to be deleted
2612    * @throws IOException
2613    */
2614   public static void deleteRegion(FileSystem fs, Path rootdir, HRegionInfo info)
2615   throws IOException {
2616     deleteRegion(fs, HRegion.getRegionDir(rootdir, info));
2617   }
2618 
2619   private static void deleteRegion(FileSystem fs, Path regiondir)
2620   throws IOException {
2621     if (LOG.isDebugEnabled()) {
2622       LOG.debug("DELETING region " + regiondir.toString());
2623     }
2624     if (!fs.delete(regiondir, true)) {
2625       LOG.warn("Failed delete of " + regiondir);
2626     }
2627   }
2628 
2629   /**
2630    * Computes the Path of the HRegion
2631    *
2632    * @param rootdir qualified path of HBase root directory
2633    * @param info HRegionInfo for the region
2634    * @return qualified path of region directory
2635    */
2636   public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
2637     return new Path(
2638       HTableDescriptor.getTableDir(rootdir, info.getTableDesc().getName()),
2639                                    info.getEncodedName());
2640   }
2641 
2642   /**
2643    * Determines if the specified row is within the row range specified by the
2644    * specified HRegionInfo
2645    *
2646    * @param info HRegionInfo that specifies the row range
2647    * @param row row to be checked
2648    * @return true if the row is within the range specified by the HRegionInfo
2649    */
2650   public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
2651     return ((info.getStartKey().length == 0) ||
2652         (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
2653         ((info.getEndKey().length == 0) ||
2654             (Bytes.compareTo(info.getEndKey(), row) > 0));
2655   }
2656 
2657   /**
2658    * Make the directories for a specific column family
2659    *
2660    * @param fs the file system
2661    * @param tabledir base directory where region will live (usually the table dir)
2662    * @param hri
2663    * @param colFamily the column family
2664    * @throws IOException
2665    */
2666   public static void makeColumnFamilyDirs(FileSystem fs, Path tabledir,
2667     final HRegionInfo hri, byte [] colFamily)
2668   throws IOException {
2669     Path dir = Store.getStoreHomedir(tabledir, hri.getEncodedName(), colFamily);
2670     if (!fs.mkdirs(dir)) {
2671       LOG.warn("Failed to create " + dir);
2672     }
2673   }
2674 
2675   /**
2676    * Merge two HRegions.  The regions must be adjacent and must not overlap.
2677    *
2678    * @param srcA
2679    * @param srcB
2680    * @return new merged HRegion
2681    * @throws IOException
2682    */
2683   public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
2684   throws IOException {
2685     HRegion a = srcA;
2686     HRegion b = srcB;
2687 
2688     // Make sure that srcA comes first; important for key-ordering during
2689     // write of the merged file.
2690     if (srcA.getStartKey() == null) {
2691       if (srcB.getStartKey() == null) {
2692         throw new IOException("Cannot merge two regions with null start key");
2693       }
2694       // A's start key is null but B's isn't. Assume A comes before B
2695     } else if ((srcB.getStartKey() == null) ||
2696       (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
2697       a = srcB;
2698       b = srcA;
2699     }
2700 
2701     if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
2702       throw new IOException("Cannot merge non-adjacent regions");
2703     }
2704     return merge(a, b);
2705   }
2706 
2707   /**
2708    * Merge two regions whether they are adjacent or not.
2709    *
2710    * @param a region a
2711    * @param b region b
2712    * @return new merged region
2713    * @throws IOException
2714    */
2715   public static HRegion merge(HRegion a, HRegion b) throws IOException {
2716     if (!a.getRegionInfo().getTableDesc().getNameAsString().equals(
2717         b.getRegionInfo().getTableDesc().getNameAsString())) {
2718       throw new IOException("Regions do not belong to the same table");
2719     }
2720 
2721     FileSystem fs = a.getFilesystem();
2722 
2723     // Make sure each region's cache is empty
2724 
2725     a.flushcache();
2726     b.flushcache();
2727 
2728     // Compact each region so we only have one store file per family
2729 
2730     a.compactStores(true);
2731     if (LOG.isDebugEnabled()) {
2732       LOG.debug("Files for region: " + a);
2733       listPaths(fs, a.getRegionDir());
2734     }
2735     b.compactStores(true);
2736     if (LOG.isDebugEnabled()) {
2737       LOG.debug("Files for region: " + b);
2738       listPaths(fs, b.getRegionDir());
2739     }
2740 
2741     Configuration conf = a.getConf();
2742     HTableDescriptor tabledesc = a.getTableDesc();
2743     HLog log = a.getLog();
2744     Path tableDir = a.getTableDir();
2745     // Presume both are of same region type -- i.e. both user or catalog
2746     // table regions.  This way can use comparator.
2747     final byte[] startKey =
2748       (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
2749            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
2750        || b.comparator.matchingRows(b.getStartKey(), 0,
2751               b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
2752               HConstants.EMPTY_BYTE_ARRAY.length))
2753       ? HConstants.EMPTY_BYTE_ARRAY
2754       : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
2755              b.getStartKey(), 0, b.getStartKey().length) <= 0
2756          ? a.getStartKey()
2757          : b.getStartKey());
2758     final byte[] endKey =
2759       (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
2760            HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
2761        || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
2762               HConstants.EMPTY_BYTE_ARRAY, 0,
2763               HConstants.EMPTY_BYTE_ARRAY.length))
2764       ? HConstants.EMPTY_BYTE_ARRAY
2765       : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
2766              b.getEndKey(), 0, b.getEndKey().length) <= 0
2767          ? b.getEndKey()
2768          : a.getEndKey());
2769 
2770     HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
2771     LOG.info("Creating new region " + newRegionInfo.toString());
2772     String encodedName = newRegionInfo.getEncodedName();
2773     Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
2774     if(fs.exists(newRegionDir)) {
2775       throw new IOException("Cannot merge; target file collision at " +
2776           newRegionDir);
2777     }
2778     fs.mkdirs(newRegionDir);
2779 
2780     LOG.info("starting merge of regions: " + a + " and " + b +
2781       " into new region " + newRegionInfo.toString() +
2782         " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
2783         Bytes.toStringBinary(endKey) + ">");
2784 
2785     // Move HStoreFiles under new region directory
2786     Map<byte [], List<StoreFile>> byFamily =
2787       new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
2788     byFamily = filesByFamily(byFamily, a.close());
2789     byFamily = filesByFamily(byFamily, b.close());
2790     for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
2791       byte [] colFamily = es.getKey();
2792       makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
2793       // Because we compacted the source regions we should have no more than two
2794       // HStoreFiles per family and there will be no reference store
2795       List<StoreFile> srcFiles = es.getValue();
2796       if (srcFiles.size() == 2) {
2797         long seqA = srcFiles.get(0).getMaxSequenceId();
2798         long seqB = srcFiles.get(1).getMaxSequenceId();
2799         if (seqA == seqB) {
2800           // Can't have same sequenceid since on open of a store, this is what
2801           // distingushes the files (see the map of stores how its keyed by
2802           // sequenceid).
2803           throw new IOException("Files have same sequenceid: " + seqA);
2804         }
2805       }
2806       for (StoreFile hsf: srcFiles) {
2807         StoreFile.rename(fs, hsf.getPath(),
2808           StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
2809             newRegionInfo.getEncodedName(), colFamily)));
2810       }
2811     }
2812     if (LOG.isDebugEnabled()) {
2813       LOG.debug("Files for new region");
2814       listPaths(fs, newRegionDir);
2815     }
2816     HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, null);
2817     dstRegion.initialize();
2818     dstRegion.compactStores();
2819     if (LOG.isDebugEnabled()) {
2820       LOG.debug("Files for new region");
2821       listPaths(fs, dstRegion.getRegionDir());
2822     }
2823     deleteRegion(fs, a.getRegionDir());
2824     deleteRegion(fs, b.getRegionDir());
2825 
2826     LOG.info("merge completed. New region is " + dstRegion);
2827 
2828     return dstRegion;
2829   }
2830 
2831   /*
2832    * Fills a map with a vector of store files keyed by column family.
2833    * @param byFamily Map to fill.
2834    * @param storeFiles Store files to process.
2835    * @param family
2836    * @return Returns <code>byFamily</code>
2837    */
2838   private static Map<byte [], List<StoreFile>> filesByFamily(
2839       Map<byte [], List<StoreFile>> byFamily, List<StoreFile> storeFiles) {
2840     for (StoreFile src: storeFiles) {
2841       byte [] family = src.getFamily();
2842       List<StoreFile> v = byFamily.get(family);
2843       if (v == null) {
2844         v = new ArrayList<StoreFile>();
2845         byFamily.put(family, v);
2846       }
2847       v.add(src);
2848     }
2849     return byFamily;
2850   }
2851 
2852   /**
2853    * @return True if needs a mojor compaction.
2854    * @throws IOException
2855    */
2856   boolean isMajorCompaction() throws IOException {
2857     for (Store store: this.stores.values()) {
2858       if (store.isMajorCompaction()) {
2859         return true;
2860       }
2861     }
2862     return false;
2863   }
2864 
2865   /*
2866    * List the files under the specified directory
2867    *
2868    * @param fs
2869    * @param dir
2870    * @throws IOException
2871    */
2872   private static void listPaths(FileSystem fs, Path dir) throws IOException {
2873     if (LOG.isDebugEnabled()) {
2874       FileStatus[] stats = fs.listStatus(dir);
2875       if (stats == null || stats.length == 0) {
2876         return;
2877       }
2878       for (int i = 0; i < stats.length; i++) {
2879         String path = stats[i].getPath().toString();
2880         if (stats[i].isDir()) {
2881           LOG.debug("d " + path);
2882           listPaths(fs, stats[i].getPath());
2883         } else {
2884           LOG.debug("f " + path + " size=" + stats[i].getLen());
2885         }
2886       }
2887     }
2888   }
2889 
2890 
2891   //
2892   // HBASE-880
2893   //
2894   /**
2895    * @param get get object
2896    * @param lockid existing lock id, or null for no previous lock
2897    * @return result
2898    * @throws IOException read exceptions
2899    */
2900   public Result get(final Get get, final Integer lockid) throws IOException {
2901     // Verify families are all valid
2902     if (get.hasFamilies()) {
2903       for (byte [] family: get.familySet()) {
2904         checkFamily(family);
2905       }
2906     } else { // Adding all families to scanner
2907       for (byte[] family: regionInfo.getTableDesc().getFamiliesKeys()) {
2908         get.addFamily(family);
2909       }
2910     }
2911     List<KeyValue> result = get(get);
2912 
2913     return new Result(result);
2914   }
2915 
2916   /**
2917    * An optimized version of {@link #get(Get)} that checks MemStore first for
2918    * the specified query.
2919    * <p>
2920    * This is intended for use by increment operations where we have the
2921    * guarantee that versions are never inserted out-of-order so if a value
2922    * exists in MemStore it is the latest value.
2923    * <p>
2924    * It only makes sense to use this method without a TimeRange and maxVersions
2925    * equal to 1.
2926    * @param get
2927    * @return result
2928    * @throws IOException
2929    */
2930   private List<KeyValue> getLastIncrement(final Get get) throws IOException {
2931     InternalScan iscan = new InternalScan(get);
2932 
2933     List<KeyValue> results = new ArrayList<KeyValue>();
2934 
2935     // memstore scan
2936     iscan.checkOnlyMemStore();
2937     InternalScanner scanner = null;
2938     try {
2939       scanner = getScanner(iscan);
2940       scanner.next(results);
2941     } finally {
2942       if (scanner != null)
2943         scanner.close();
2944     }
2945 
2946     // count how many columns we're looking for
2947     int expected = 0;
2948     Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap();
2949     for (NavigableSet<byte[]> qfs : familyMap.values()) {
2950       expected += qfs.size();
2951     }
2952 
2953     // found everything we were looking for, done
2954     if (results.size() == expected) {
2955       return results;
2956     }
2957 
2958     // still have more columns to find
2959     if (results != null && !results.isEmpty()) {
2960       // subtract what was found in memstore
2961       for (KeyValue kv : results) {
2962         byte [] family = kv.getFamily();
2963         NavigableSet<byte[]> qfs = familyMap.get(family);
2964         qfs.remove(kv.getQualifier());
2965         if (qfs.isEmpty()) familyMap.remove(family);
2966         expected--;
2967       }
2968       // make a new get for just what is left
2969       Get newGet = new Get(get.getRow());
2970       for (Map.Entry<byte[], NavigableSet<byte[]>> f : familyMap.entrySet()) {
2971         byte [] family = f.getKey();
2972         for (byte [] qualifier : f.getValue()) {
2973           newGet.addColumn(family, qualifier);
2974         }
2975       }
2976       newGet.setTimeRange(get.getTimeRange().getMin(),
2977           get.getTimeRange().getMax());
2978       iscan = new InternalScan(newGet);
2979     }
2980 
2981     // check store files for what is left
2982     List<KeyValue> fileResults = new ArrayList<KeyValue>();
2983     iscan.checkOnlyStoreFiles();
2984     scanner = null;
2985     try {
2986       scanner = getScanner(iscan);
2987       scanner.next(fileResults);
2988     } finally {
2989       if (scanner != null)
2990         scanner.close();
2991     }
2992 
2993     // combine and return
2994     results.addAll(fileResults);
2995     Collections.sort(results, KeyValue.COMPARATOR);
2996     return results;
2997   }
2998 
2999   /*
3000    * Do a get based on the get parameter.
3001    */
3002   private List<KeyValue> get(final Get get) throws IOException {
3003     Scan scan = new Scan(get);
3004 
3005     List<KeyValue> results = new ArrayList<KeyValue>();
3006 
3007     InternalScanner scanner = null;
3008     try {
3009       scanner = getScanner(scan);
3010       scanner.next(results);
3011     } finally {
3012       if (scanner != null)
3013         scanner.close();
3014     }
3015     return results;
3016   }
3017 
3018   /**
3019    * Perform one or more increment operations on a row.
3020    * <p>
3021    * Increments performed are done under row lock but reads do not take locks
3022    * out so this can be seen partially complete by gets and scans.
3023    * @param increment
3024    * @param lockid
3025    * @param writeToWAL
3026    * @return new keyvalues after increment
3027    * @throws IOException
3028    */
3029   public Result increment(Increment increment, Integer lockid,
3030       boolean writeToWAL)
3031   throws IOException {
3032     // TODO: Use RWCC to make this set of increments atomic to reads
3033     byte [] row = increment.getRow();
3034     checkRow(row);
3035     TimeRange tr = increment.getTimeRange();
3036     boolean flush = false;
3037     WALEdit walEdits = null;
3038     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
3039     List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
3040     long now = EnvironmentEdgeManager.currentTimeMillis();
3041     long size = 0;
3042 
3043     // Lock row
3044     startRegionOperation();
3045     try {
3046       Integer lid = getLock(lockid, row, true);
3047       this.updatesLock.readLock().lock();
3048       try {
3049         // Process each family
3050         for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
3051           increment.getFamilyMap().entrySet()) {
3052 
3053           Store store = stores.get(family.getKey());
3054 
3055           // Get previous values for all columns in this family
3056           Get get = new Get(row);
3057           for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
3058             get.addColumn(family.getKey(), column.getKey());
3059           }
3060           get.setTimeRange(tr.getMin(), tr.getMax());
3061           List<KeyValue> results = getLastIncrement(get);
3062 
3063           // Iterate the input columns and update existing values if they were
3064           // found, otherwise add new column initialized to the increment amount
3065           int idx = 0;
3066           for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
3067             long amount = column.getValue();
3068             if (idx < results.size() &&
3069                 results.get(idx).matchingQualifier(column.getKey())) {
3070               amount += Bytes.toLong(results.get(idx).getValue());
3071               idx++;
3072             }
3073 
3074             // Append new incremented KeyValue to list
3075             KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
3076                 now, Bytes.toBytes(amount));
3077             kvs.add(newKV);
3078 
3079             // Append update to WAL
3080             if (writeToWAL) {
3081               if (walEdits == null) {
3082                 walEdits = new WALEdit();
3083               }
3084               walEdits.add(newKV);
3085             }
3086           }
3087 
3088           // Write the KVs for this family into the store
3089           size += store.upsert(kvs);
3090           allKVs.addAll(kvs);
3091           kvs.clear();
3092         }
3093 
3094         // Actually write to WAL now
3095         if (writeToWAL) {
3096           this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
3097             walEdits, now);
3098         }
3099 
3100         size = this.memstoreSize.addAndGet(size);
3101         flush = isFlushSize(size);
3102       } finally {
3103         this.updatesLock.readLock().unlock();
3104         releaseRowLock(lid);
3105       }
3106     } finally {
3107       closeRegionOperation();
3108     }
3109 
3110     if (flush) {
3111       // Request a cache flush.  Do it outside update lock.
3112       requestFlush();
3113     }
3114 
3115     return new Result(allKVs);
3116   }
3117 
3118   /**
3119    *
3120    * @param row
3121    * @param family
3122    * @param qualifier
3123    * @param amount
3124    * @param writeToWAL
3125    * @return The new value.
3126    * @throws IOException
3127    */
3128   public long incrementColumnValue(byte [] row, byte [] family,
3129       byte [] qualifier, long amount, boolean writeToWAL)
3130   throws IOException {
3131     checkRow(row);
3132     boolean flush = false;
3133     // Lock row
3134     long result = amount;
3135     startRegionOperation();
3136     try {
3137       Integer lid = obtainRowLock(row);
3138       this.updatesLock.readLock().lock();
3139       try {
3140         Store store = stores.get(family);
3141 
3142         // Get the old value:
3143         Get get = new Get(row);
3144         get.addColumn(family, qualifier);
3145 
3146         List<KeyValue> results = getLastIncrement(get);
3147 
3148         if (!results.isEmpty()) {
3149           KeyValue kv = results.get(0);
3150           byte [] buffer = kv.getBuffer();
3151           int valueOffset = kv.getValueOffset();
3152           result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
3153         }
3154 
3155         // build the KeyValue now:
3156         KeyValue newKv = new KeyValue(row, family,
3157             qualifier, EnvironmentEdgeManager.currentTimeMillis(),
3158             Bytes.toBytes(result));
3159 
3160         // now log it:
3161         if (writeToWAL) {
3162           long now = EnvironmentEdgeManager.currentTimeMillis();
3163           WALEdit walEdit = new WALEdit();
3164           walEdit.add(newKv);
3165           this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
3166             walEdit, now);
3167         }
3168 
3169         // Now request the ICV to the store, this will set the timestamp
3170         // appropriately depending on if there is a value in memcache or not.
3171         // returns the change in the size of the memstore from operation
3172         long size = store.updateColumnValue(row, family, qualifier, result);
3173 
3174         size = this.memstoreSize.addAndGet(size);
3175         flush = isFlushSize(size);
3176       } finally {
3177         this.updatesLock.readLock().unlock();
3178         releaseRowLock(lid);
3179       }
3180     } finally {
3181       closeRegionOperation();
3182     }
3183 
3184     if (flush) {
3185       // Request a cache flush.  Do it outside update lock.
3186       requestFlush();
3187     }
3188 
3189     return result;
3190   }
3191 
3192 
3193   //
3194   // New HBASE-880 Helpers
3195   //
3196 
3197   private void checkFamily(final byte [] family)
3198   throws NoSuchColumnFamilyException {
3199     if(!regionInfo.getTableDesc().hasFamily(family)) {
3200       throw new NoSuchColumnFamilyException("Column family " +
3201           Bytes.toStringBinary(family) + " does not exist in region " + this
3202             + " in table " + regionInfo.getTableDesc());
3203     }
3204   }
3205 
3206   public static final long FIXED_OVERHEAD = ClassSize.align(
3207       ClassSize.OBJECT + // this
3208       (4 * Bytes.SIZEOF_LONG) + // memstoreFlushSize, lastFlushTime, blockingMemStoreSize, threadWakeFrequency 
3209       Bytes.SIZEOF_INT + // rowLockWaitDuration
3210       (2 * Bytes.SIZEOF_BOOLEAN) + // forceMajorCompaction, splitRequest 
3211       ClassSize.ARRAY + // splitPoint
3212       (23 * ClassSize.REFERENCE));
3213 
3214   public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
3215       ClassSize.OBJECT + // closeLock
3216       (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing
3217       ClassSize.ATOMIC_LONG + // memStoreSize 
3218       ClassSize.ATOMIC_INTEGER + // lockIdGenerator
3219       (2 * ClassSize.CONCURRENT_HASHMAP) +  // lockedRows, lockIds
3220       WriteState.HEAP_SIZE + // writestate
3221       ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
3222       (2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
3223       ClassSize.ARRAYLIST + // recentFlushes
3224       ReadWriteConsistencyControl.FIXED_SIZE // rwcc
3225       ;
3226 
3227   @Override
3228   public long heapSize() {
3229     long heapSize = DEEP_OVERHEAD;
3230     for(Store store : this.stores.values()) {
3231       heapSize += store.heapSize();
3232     }
3233     // this does not take into account row locks, recent flushes, rwcc entries
3234     return heapSize;
3235   }
3236 
3237   /*
3238    * This method calls System.exit.
3239    * @param message Message to print out.  May be null.
3240    */
3241   private static void printUsageAndExit(final String message) {
3242     if (message != null && message.length() > 0) System.out.println(message);
3243     System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
3244     System.out.println("Options:");
3245     System.out.println(" major_compact  Pass this option to major compact " +
3246       "passed region.");
3247     System.out.println("Default outputs scan of passed region.");
3248     System.exit(1);
3249   }
3250 
3251   /*
3252    * Process table.
3253    * Do major compaction or list content.
3254    * @param fs
3255    * @param p
3256    * @param log
3257    * @param c
3258    * @param majorCompact
3259    * @throws IOException
3260    */
3261   private static void processTable(final FileSystem fs, final Path p,
3262       final HLog log, final Configuration c,
3263       final boolean majorCompact)
3264   throws IOException {
3265     HRegion region = null;
3266     String rootStr = Bytes.toString(HConstants.ROOT_TABLE_NAME);
3267     String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
3268     // Currently expects tables have one region only.
3269     if (p.getName().startsWith(rootStr)) {
3270       region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
3271     } else if (p.getName().startsWith(metaStr)) {
3272       region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
3273           null);
3274     } else {
3275       throw new IOException("Not a known catalog table: " + p.toString());
3276     }
3277     try {
3278       region.initialize();
3279       if (majorCompact) {
3280         region.compactStores(true);
3281       } else {
3282         // Default behavior
3283         Scan scan = new Scan();
3284         // scan.addFamily(HConstants.CATALOG_FAMILY);
3285         InternalScanner scanner = region.getScanner(scan);
3286         try {
3287           List<KeyValue> kvs = new ArrayList<KeyValue>();
3288           boolean done = false;
3289           do {
3290             kvs.clear();
3291             done = scanner.next(kvs);
3292             if (kvs.size() > 0) LOG.info(kvs);
3293           } while (done);
3294         } finally {
3295           scanner.close();
3296         }
3297         // System.out.println(region.getClosestRowBefore(Bytes.toBytes("GeneratedCSVContent2,E3652782193BC8D66A0BA1629D0FAAAB,9993372036854775807")));
3298       }
3299     } finally {
3300       region.close();
3301     }
3302   }
3303 
3304   boolean shouldForceSplit() {
3305     return this.splitRequest;
3306   }
3307 
3308   byte[] getSplitPoint() {
3309     return this.splitPoint;
3310   }
3311 
3312   void forceSplit(byte[] sp) {
3313     // NOTE : this HRegion will go away after the forced split is successfull
3314     //        therefore, no reason to clear this value
3315     this.splitRequest = true;
3316     if (sp != null) {
3317       this.splitPoint = sp;
3318     }
3319   }
3320 
3321   /**
3322    * Give the region a chance to prepare before it is split.
3323    */
3324   protected void prepareToSplit() {
3325     // nothing
3326   }
3327 
3328   /**
3329    * @return The priority that this region should have in the compaction queue
3330    */
3331   public int getCompactPriority() {
3332     int count = Integer.MAX_VALUE;
3333     for(Store store : stores.values()) {
3334       count = Math.min(count, store.getCompactPriority());
3335     }
3336     return count;
3337   }
3338 
3339   /**
3340    * Checks every store to see if one has too many
3341    * store files
3342    * @return true if any store has too many store files
3343    */
3344   public boolean hasTooManyStoreFiles() {
3345     for(Store store : stores.values()) {
3346       if(store.hasTooManyStoreFiles()) {
3347         return true;
3348       }
3349     }
3350     return false;
3351   }
3352 
3353   /**
3354    * This method needs to be called before any public call that reads or
3355    * modifies data. It has to be called just before a try.
3356    * #closeRegionOperation needs to be called in the try's finally block
3357    * Acquires a read lock and checks if the region is closing or closed.
3358    * @throws NotServingRegionException when the region is closing or closed
3359    */
3360   private void startRegionOperation() throws NotServingRegionException {
3361     if (this.closing.get()) {
3362       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
3363           " is closing");
3364     }
3365     lock.readLock().lock();
3366     if (this.closed.get()) {
3367       lock.readLock().unlock();
3368       throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
3369           " is closed");
3370     }
3371   }
3372 
3373   /**
3374    * Closes the lock. This needs to be called in the finally block corresponding
3375    * to the try block of #startRegionOperation
3376    */
3377   private void closeRegionOperation(){
3378     lock.readLock().unlock();
3379   }
3380 
3381   /**
3382    * A mocked list implementaion - discards all updates.
3383    */
3384   private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
3385 
3386     @Override
3387     public void add(int index, KeyValue element) {
3388       // do nothing
3389     }
3390 
3391     @Override
3392     public boolean addAll(int index, Collection<? extends KeyValue> c) {
3393       return false; // this list is never changed as a result of an update
3394     }
3395 
3396     @Override
3397     public KeyValue get(int index) {
3398       throw new UnsupportedOperationException();
3399     }
3400 
3401     @Override
3402     public int size() {
3403       return 0;
3404     }
3405   };
3406 
3407 
3408   /**
3409    * Facility for dumping and compacting catalog tables.
3410    * Only does catalog tables since these are only tables we for sure know
3411    * schema on.  For usage run:
3412    * <pre>
3413    *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
3414    * </pre>
3415    * @param args
3416    * @throws IOException
3417    */
3418   public static void main(String[] args) throws IOException {
3419     if (args.length < 1) {
3420       printUsageAndExit(null);
3421     }
3422     boolean majorCompact = false;
3423     if (args.length > 1) {
3424       if (!args[1].toLowerCase().startsWith("major")) {
3425         printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
3426       }
3427       majorCompact = true;
3428     }
3429     final Path tableDir = new Path(args[0]);
3430     final Configuration c = HBaseConfiguration.create();
3431     final FileSystem fs = FileSystem.get(c);
3432     final Path logdir = new Path(c.get("hbase.tmp.dir"),
3433         "hlog" + tableDir.getName()
3434         + EnvironmentEdgeManager.currentTimeMillis());
3435     final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
3436         HConstants.HREGION_OLDLOGDIR_NAME);
3437     final HLog log = new HLog(fs, logdir, oldLogDir, c);
3438     try {
3439       processTable(fs, tableDir, log, c, majorCompact);
3440      } finally {
3441        log.close();
3442        BlockCache bc = StoreFile.getBlockCache(c);
3443        if (bc != null) bc.shutdown();
3444      }
3445   }
3446 }