View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.TableNotFoundException;
27  import org.apache.hadoop.hbase.client.Delete;
28  import org.apache.hadoop.hbase.client.HTableInterface;
29  import org.apache.hadoop.hbase.client.HTablePool;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.regionserver.wal.HLog;
32  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.Stoppable;
35  
36  import java.io.IOException;
37  import java.util.ArrayList;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.TreeMap;
41  
42  /**
43   * This class is responsible for replicating the edits coming
44   * from another cluster.
45   * <p/>
46   * This replication process is currently waiting for the edits to be applied
47   * before the method can return. This means that the replication of edits
48   * is synchronized (after reading from HLogs in ReplicationSource) and that a
49   * single region server cannot receive edits from two sources at the same time
50   * <p/>
51   * This class uses the native HBase client in order to replicate entries.
52   * <p/>
53   *
54   * TODO make this class more like ReplicationSource wrt log handling
55   */
56  public class ReplicationSink {
57  
58    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
59    // Name of the HDFS directory that contains the temporary rep logs
60    public static final String REPLICATION_LOG_DIR = ".replogs";
61    private final Configuration conf;
62    // Pool used to replicated
63    private final HTablePool pool;
64    // Chain to pull on when we want all to stop.
65    private final Stoppable stopper;
66    private final ReplicationSinkMetrics metrics;
67  
68    /**
69     * Create a sink for replication
70     *
71     * @param conf                conf object
72     * @param stopper             boolean to tell this thread to stop
73     * @throws IOException thrown when HDFS goes bad or bad file name
74     */
75    public ReplicationSink(Configuration conf, Stoppable stopper)
76        throws IOException {
77      this.conf = conf;
78      this.pool = new HTablePool(this.conf,
79          conf.getInt("replication.sink.htablepool.capacity", 10));
80      this.stopper = stopper;
81      this.metrics = new ReplicationSinkMetrics();
82    }
83  
84    /**
85     * Replicate this array of entries directly into the local cluster
86     * using the native client.
87     *
88     * @param entries
89     * @throws IOException
90     */
91    public void replicateEntries(HLog.Entry[] entries)
92        throws IOException {
93      if (entries.length == 0) {
94        return;
95      }
96      // Very simple optimization where we batch sequences of rows going
97      // to the same table.
98      try {
99        long totalReplicated = 0;
100       // Map of table => list of puts, we only want to flushCommits once per
101       // invocation of this method per table.
102       Map<byte[], List<Put>> puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
103       for (HLog.Entry entry : entries) {
104         WALEdit edit = entry.getEdit();
105         List<KeyValue> kvs = edit.getKeyValues();
106         if (kvs.get(0).isDelete()) {
107           Delete delete = new Delete(kvs.get(0).getRow(),
108               kvs.get(0).getTimestamp(), null);
109           for (KeyValue kv : kvs) {
110             if (kv.isDeleteFamily()) {
111               delete.deleteFamily(kv.getFamily());
112             } else if (!kv.isEmptyColumn()) {
113               delete.deleteColumn(kv.getFamily(),
114                   kv.getQualifier());
115             }
116           }
117           delete(entry.getKey().getTablename(), delete);
118         } else {
119           byte[] table = entry.getKey().getTablename();
120           List<Put> tableList = puts.get(table);
121           if (tableList == null) {
122             tableList = new ArrayList<Put>();
123             puts.put(table, tableList);
124           }
125           // With mini-batching, we need to expect multiple rows per edit
126           byte[] lastKey = kvs.get(0).getRow();
127           Put put = new Put(kvs.get(0).getRow(),
128               kvs.get(0).getTimestamp());
129           for (KeyValue kv : kvs) {
130             if (!Bytes.equals(lastKey, kv.getRow())) {
131               tableList.add(put);
132               put = new Put(kv.getRow(), kv.getTimestamp());
133             }
134             put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
135             lastKey = kv.getRow();
136           }
137           tableList.add(put);
138         }
139         totalReplicated++;
140       }
141       for(byte [] table : puts.keySet()) {
142         put(table, puts.get(table));
143       }
144       this.metrics.setAgeOfLastAppliedOp(
145           entries[entries.length-1].getKey().getWriteTime());
146       this.metrics.appliedBatchesRate.inc(1);
147       LOG.info("Total replicated: " + totalReplicated);
148     } catch (IOException ex) {
149       LOG.error("Unable to accept edit because:", ex);
150       throw ex;
151     }
152   }
153 
154   /**
155    * Do the puts and handle the pool
156    * @param tableName table to insert into
157    * @param puts list of puts
158    * @throws IOException
159    */
160   private void put(byte[] tableName, List<Put> puts) throws IOException {
161     if (puts.isEmpty()) {
162       return;
163     }
164     HTableInterface table = null;
165     try {
166       table = this.pool.getTable(tableName);
167       table.put(puts);
168       this.metrics.appliedOpsRate.inc(puts.size());
169     } finally {
170       if (table != null) {
171         this.pool.putTable(table);
172       }
173     }
174   }
175 
176   /**
177    * Do the delete and handle the pool
178    * @param tableName table to delete in
179    * @param delete the delete to use
180    * @throws IOException
181    */
182   private void delete(byte[] tableName, Delete delete) throws IOException {
183     HTableInterface table = null;
184     try {
185       table = this.pool.getTable(tableName);
186       table.delete(delete);
187       this.metrics.appliedOpsRate.inc(1);
188     } finally {
189       if (table != null) {
190         this.pool.putTable(table);
191       }
192     }
193   }
194 }