1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.zookeeper;
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.Reader;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.zookeeper.server.NIOServerCnxn;
38 import org.apache.zookeeper.server.ZooKeeperServer;
39 import org.apache.zookeeper.server.persistence.FileTxnLog;
40
41
42
43
44
45
46 public class MiniZooKeeperCluster {
47 private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
48
49 private static final int TICK_TIME = 2000;
50 private static final int CONNECTION_TIMEOUT = 30000;
51
52 private boolean started;
53 private int clientPort = 21818;
54
55 private NIOServerCnxn.Factory standaloneServerFactory;
56 private int tickTime = 0;
57
58 private Configuration configuration;
59
60
61 public MiniZooKeeperCluster() {
62 this(HBaseConfiguration.create());
63 }
64
65
66 public MiniZooKeeperCluster(Configuration configuration) {
67 this.started = false;
68 this.configuration = configuration;
69 }
70
71 public void setClientPort(int clientPort) {
72 this.clientPort = clientPort;
73 }
74
75 public int getClientPort() {
76 return clientPort;
77 }
78
79 public void setTickTime(int tickTime) {
80 this.tickTime = tickTime;
81 }
82
83
84 private static void setupTestEnv() {
85
86
87
88
89 System.setProperty("zookeeper.preAllocSize", "100");
90 FileTxnLog.setPreallocSize(100);
91 }
92
93
94
95
96
97
98
99 public int startup(File baseDir) throws IOException,
100 InterruptedException {
101
102 setupTestEnv();
103
104 shutdown();
105
106 File dir = new File(baseDir, "zookeeper").getAbsoluteFile();
107 recreateDir(dir);
108
109 int tickTimeToUse;
110 if (this.tickTime > 0) {
111 tickTimeToUse = this.tickTime;
112 } else {
113 tickTimeToUse = TICK_TIME;
114 }
115 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
116 while (true) {
117 try {
118 int numberOfConnections = this.configuration.getInt("hbase.zookeeper.property.maxClientCnxns",5000);
119 standaloneServerFactory =
120 new NIOServerCnxn.Factory(new InetSocketAddress(clientPort), numberOfConnections);
121 } catch (BindException e) {
122 LOG.info("Failed binding ZK Server to client port: " + clientPort);
123
124 clientPort++;
125 continue;
126 }
127 break;
128 }
129 standaloneServerFactory.startup(server);
130
131 if (!waitForServerUp(clientPort, CONNECTION_TIMEOUT)) {
132 throw new IOException("Waiting for startup of standalone server");
133 }
134
135 started = true;
136 LOG.info("Started MiniZK Server on client port: " + clientPort);
137 return clientPort;
138 }
139
140 private void recreateDir(File dir) throws IOException {
141 if (dir.exists()) {
142 FileUtil.fullyDelete(dir);
143 }
144 try {
145 dir.mkdirs();
146 } catch (SecurityException e) {
147 throw new IOException("creating dir: " + dir, e);
148 }
149 }
150
151
152
153
154 public void shutdown() throws IOException {
155 if (!started) {
156 return;
157 }
158
159 standaloneServerFactory.shutdown();
160 if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
161 throw new IOException("Waiting for shutdown of standalone server");
162 }
163
164 started = false;
165 }
166
167
168 private static boolean waitForServerDown(int port, long timeout) {
169 long start = System.currentTimeMillis();
170 while (true) {
171 try {
172 Socket sock = new Socket("localhost", port);
173 try {
174 OutputStream outstream = sock.getOutputStream();
175 outstream.write("stat".getBytes());
176 outstream.flush();
177 } finally {
178 sock.close();
179 }
180 } catch (IOException e) {
181 return true;
182 }
183
184 if (System.currentTimeMillis() > start + timeout) {
185 break;
186 }
187 try {
188 Thread.sleep(250);
189 } catch (InterruptedException e) {
190
191 }
192 }
193 return false;
194 }
195
196
197 private static boolean waitForServerUp(int port, long timeout) {
198 long start = System.currentTimeMillis();
199 while (true) {
200 try {
201 Socket sock = new Socket("localhost", port);
202 BufferedReader reader = null;
203 try {
204 OutputStream outstream = sock.getOutputStream();
205 outstream.write("stat".getBytes());
206 outstream.flush();
207
208 Reader isr = new InputStreamReader(sock.getInputStream());
209 reader = new BufferedReader(isr);
210 String line = reader.readLine();
211 if (line != null && line.startsWith("Zookeeper version:")) {
212 return true;
213 }
214 } finally {
215 sock.close();
216 if (reader != null) {
217 reader.close();
218 }
219 }
220 } catch (IOException e) {
221
222 LOG.info("server localhost:" + port + " not up " + e);
223 }
224
225 if (System.currentTimeMillis() > start + timeout) {
226 break;
227 }
228 try {
229 Thread.sleep(250);
230 } catch (InterruptedException e) {
231
232 }
233 }
234 return false;
235 }
236 }