View Javadoc

1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.OutputStream;
27  import java.io.Reader;
28  import java.net.BindException;
29  import java.net.InetSocketAddress;
30  import java.net.Socket;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileUtil;
36  import org.apache.hadoop.hbase.HBaseConfiguration;
37  import org.apache.zookeeper.server.NIOServerCnxn;
38  import org.apache.zookeeper.server.ZooKeeperServer;
39  import org.apache.zookeeper.server.persistence.FileTxnLog;
40  
41  /**
42   * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
43   * of redoing it, we should contribute updates to their code which let us more
44   * easily access testing helper objects.
45   */
46  public class MiniZooKeeperCluster {
47    private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
48  
49    private static final int TICK_TIME = 2000;
50    private static final int CONNECTION_TIMEOUT = 30000;
51  
52    private boolean started;
53    private int clientPort = 21818; // use non-standard port
54  
55    private NIOServerCnxn.Factory standaloneServerFactory;
56    private int tickTime = 0;
57  
58    private Configuration configuration;
59  
60    /** Create mini ZooKeeper cluster. */
61    public MiniZooKeeperCluster() {
62      this(HBaseConfiguration.create());
63    }
64  
65    /** Create mini ZooKeeper cluster with configuration (usually from test environment) */
66    public MiniZooKeeperCluster(Configuration configuration) {
67      this.started = false;
68      this.configuration = configuration;
69    }
70  
71    public void setClientPort(int clientPort) {
72      this.clientPort = clientPort;
73    }
74  
75    public int getClientPort() {
76      return clientPort;
77    }
78  
79    public void setTickTime(int tickTime) {
80      this.tickTime = tickTime;
81    }
82  
83    // / XXX: From o.a.zk.t.ClientBase
84    private static void setupTestEnv() {
85      // during the tests we run with 100K prealloc in the logs.
86      // on windows systems prealloc of 64M was seen to take ~15seconds
87      // resulting in test failure (client timeout on first session).
88      // set env and directly in order to handle static init/gc issues
89      System.setProperty("zookeeper.preAllocSize", "100");
90      FileTxnLog.setPreallocSize(100);
91    }
92  
93    /**
94     * @param baseDir
95     * @return ClientPort server bound to.
96     * @throws IOException
97     * @throws InterruptedException
98     */
99    public int startup(File baseDir) throws IOException,
100       InterruptedException {
101 
102     setupTestEnv();
103 
104     shutdown();
105 
106     File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
107     recreateDir(dir);
108 
109     int tickTimeToUse;
110     if (this.tickTime > 0) {
111       tickTimeToUse = this.tickTime;
112     } else {
113       tickTimeToUse = TICK_TIME;
114     }
115     ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
116     while (true) {
117       try {
118         int numberOfConnections = this.configuration.getInt("hbase.zookeeper.property.maxClientCnxns",5000);
119         standaloneServerFactory =
120 	  new NIOServerCnxn.Factory(new InetSocketAddress(clientPort), numberOfConnections);
121       } catch (BindException e) {
122         LOG.info("Failed binding ZK Server to client port: " + clientPort);
123         //this port is already in use. try to use another
124         clientPort++;
125         continue;
126       }
127       break;
128     }
129     standaloneServerFactory.startup(server);
130 
131     if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
132       throw new IOException("Waiting for startup of standalone server");
133     }
134 
135     started = true;
136     LOG.info("Started MiniZK Server on client port: " + clientPort);
137     return clientPort;
138   }
139 
140   private void recreateDir(File dir) throws IOException {
141     if (dir.exists()) {
142       FileUtil.fullyDelete(dir);
143     }
144     try {
145       dir.mkdirs();
146     } catch (SecurityException e) {
147       throw new IOException("creating dir: " + dir, e);
148     }
149   }
150 
151   /**
152    * @throws IOException
153    */
154   public void shutdown() throws IOException {
155     if (!started) {
156       return;
157     }
158 
159     standaloneServerFactory.shutdown();
160     if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
161       throw new IOException("Waiting for shutdown of standalone server");
162     }
163 
164     started = false;
165   }
166 
167   // XXX: From o.a.zk.t.ClientBase
168   private static boolean waitForServerDown(int port, long timeout) {
169     long start = System.currentTimeMillis();
170     while (true) {
171       try {
172         Socket sock = new Socket("localhost", port);
173         try {
174           OutputStream outstream = sock.getOutputStream();
175           outstream.write("stat".getBytes());
176           outstream.flush();
177         } finally {
178           sock.close();
179         }
180       } catch (IOException e) {
181         return true;
182       }
183 
184       if (System.currentTimeMillis() > start + timeout) {
185         break;
186       }
187       try {
188         Thread.sleep(250);
189       } catch (InterruptedException e) {
190         // ignore
191       }
192     }
193     return false;
194   }
195 
196   // XXX: From o.a.zk.t.ClientBase
197   private static boolean waitForServerUp(int port, long timeout) {
198     long start = System.currentTimeMillis();
199     while (true) {
200       try {
201         Socket sock = new Socket("localhost", port);
202         BufferedReader reader = null;
203         try {
204           OutputStream outstream = sock.getOutputStream();
205           outstream.write("stat".getBytes());
206           outstream.flush();
207 
208           Reader isr = new InputStreamReader(sock.getInputStream());
209           reader = new BufferedReader(isr);
210           String line = reader.readLine();
211           if (line != null && line.startsWith("Zookeeper version:")) {
212             return true;
213           }
214         } finally {
215           sock.close();
216           if (reader != null) {
217             reader.close();
218           }
219         }
220       } catch (IOException e) {
221         // ignore as this is expected
222         LOG.info("server localhost:" + port + " not up " + e);
223       }
224 
225       if (System.currentTimeMillis() > start + timeout) {
226         break;
227       }
228       try {
229         Thread.sleep(250);
230       } catch (InterruptedException e) {
231         // ignore
232       }
233     }
234     return false;
235   }
236 }