View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.IOException;
23  import java.util.NavigableMap;
24  import java.util.TreeMap;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.regionserver.wal.HLog;
34  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
37  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
38  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.zookeeper.KeeperException;
41  
42  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
43  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
44  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
45  
46  /**
47   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
48   */
49  public class Replication implements WALObserver {
50    private final boolean replication;
51    private final ReplicationSourceManager replicationManager;
52    private final AtomicBoolean replicating = new AtomicBoolean(true);
53    private final ReplicationZookeeper zkHelper;
54    private final Configuration conf;
55    private ReplicationSink replicationSink;
56    // Hosting server
57    private final Server server;
58  
59    /**
60     * Instantiate the replication management (if rep is enabled).
61     * @param server Hosting server
62     * @param fs handle to the filesystem
63     * @param logDir
64     * @param oldLogDir directory where logs are archived
65     * @throws IOException
66     * @throws KeeperException 
67     */
68    public Replication(final Server server, final FileSystem fs,
69        final Path logDir, final Path oldLogDir)
70    throws IOException, KeeperException {
71      this.server = server;
72      this.conf = this.server.getConfiguration();
73      this.replication = isReplication(this.conf);
74      if (replication) {
75        this.zkHelper = new ReplicationZookeeper(server, this.replicating);
76        this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
77            this.server, fs, this.replicating, logDir, oldLogDir) ;
78      } else {
79        this.replicationManager = null;
80        this.zkHelper = null;
81      }
82    }
83  
84    /**
85     * @param c Configuration to look at
86     * @return True if replication is enabled.
87     */
88    public static boolean isReplication(final Configuration c) {
89      return c.getBoolean(REPLICATION_ENABLE_KEY, false);
90    }
91  
92    /**
93     * Join with the replication threads
94     */
95    public void join() {
96      if (this.replication) {
97        this.replicationManager.join();
98      }
99    }
100 
101   /**
102    * Carry on the list of log entries down to the sink
103    * @param entries list of entries to replicate
104    * @throws IOException
105    */
106   public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
107     if (this.replication) {
108       this.replicationSink.replicateEntries(entries);
109     }
110   }
111 
112   /**
113    * If replication is enabled and this cluster is a master,
114    * it starts
115    * @throws IOException
116    */
117   public void startReplicationServices() throws IOException {
118     if (this.replication) {
119       this.replicationManager.init();
120       this.replicationSink = new ReplicationSink(this.conf, this.server);
121     }
122   }
123 
124   /**
125    * Get the replication sources manager
126    * @return the manager if replication is enabled, else returns false
127    */
128   public ReplicationSourceManager getReplicationManager() {
129     return this.replicationManager;
130   }
131 
132   @Override
133   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
134       WALEdit logEdit) {
135     NavigableMap<byte[], Integer> scopes =
136         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
137     byte[] family;
138     for (KeyValue kv : logEdit.getKeyValues()) {
139       family = kv.getFamily();
140       int scope = info.getTableDesc().getFamily(family).getScope();
141       if (scope != REPLICATION_SCOPE_LOCAL &&
142           !scopes.containsKey(family)) {
143         scopes.put(family, scope);
144       }
145     }
146     if (!scopes.isEmpty()) {
147       logEdit.setScopes(scopes);
148     }
149   }
150 
151   @Override
152   public void logRolled(Path p) {
153     getReplicationManager().logRolled(p);
154   }
155 
156   /**
157    * This method modifies the master's configuration in order to inject
158    * replication-related features
159    * @param conf
160    */
161   public static void decorateMasterConfiguration(Configuration conf) {
162     if (!isReplication(conf)) {
163       return;
164     }
165     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
166     if (!plugins.contains(ReplicationLogCleaner.class.toString())) {
167       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS,
168           plugins + "," + ReplicationLogCleaner.class.getCanonicalName());
169     }
170   }
171 
172   @Override
173   public void logRollRequested() {
174     // Not interested
175   }
176 
177   @Override
178   public void logCloseRequested() {
179     // not interested
180   }
181 }