1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.ipc;
22
23 import com.google.common.base.Function;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configurable;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
29 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.ipc.VersionedProtocol;
33 import org.apache.hadoop.net.NetUtils;
34 import org.apache.hadoop.security.UserGroupInformation;
35
36 import javax.net.SocketFactory;
37 import java.io.DataInput;
38 import java.io.DataOutput;
39 import java.io.IOException;
40 import java.lang.reflect.Array;
41 import java.lang.reflect.InvocationHandler;
42 import java.lang.reflect.InvocationTargetException;
43 import java.lang.reflect.Method;
44 import java.lang.reflect.Proxy;
45 import java.net.ConnectException;
46 import java.net.InetSocketAddress;
47 import java.net.SocketTimeoutException;
48 import java.util.HashMap;
49 import java.util.Map;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public class HBaseRPC {
77
78
79
80 protected static final Log LOG =
81 LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
82
83 private HBaseRPC() {
84 super();
85 }
86
87
88
89 public static class Invocation implements Writable, Configurable {
90 private String methodName;
91 @SuppressWarnings("unchecked")
92 private Class[] parameterClasses;
93 private Object[] parameters;
94 private Configuration conf;
95
96
97 public Invocation() {
98 super();
99 }
100
101
102
103
104
105 public Invocation(Method method, Object[] parameters) {
106 this.methodName = method.getName();
107 this.parameterClasses = method.getParameterTypes();
108 this.parameters = parameters;
109 }
110
111
112 public String getMethodName() { return methodName; }
113
114
115 @SuppressWarnings("unchecked")
116 public Class[] getParameterClasses() { return parameterClasses; }
117
118
119 public Object[] getParameters() { return parameters; }
120
121 public void readFields(DataInput in) throws IOException {
122 methodName = in.readUTF();
123 parameters = new Object[in.readInt()];
124 parameterClasses = new Class[parameters.length];
125 HbaseObjectWritable objectWritable = new HbaseObjectWritable();
126 for (int i = 0; i < parameters.length; i++) {
127 parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
128 this.conf);
129 parameterClasses[i] = objectWritable.getDeclaredClass();
130 }
131 }
132
133 public void write(DataOutput out) throws IOException {
134 out.writeUTF(this.methodName);
135 out.writeInt(parameterClasses.length);
136 for (int i = 0; i < parameterClasses.length; i++) {
137 HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
138 conf);
139 }
140 }
141
142 @Override
143 public String toString() {
144 StringBuilder buffer = new StringBuilder(256);
145 buffer.append(methodName);
146 buffer.append("(");
147 for (int i = 0; i < parameters.length; i++) {
148 if (i != 0)
149 buffer.append(", ");
150 buffer.append(parameters[i]);
151 }
152 buffer.append(")");
153 return buffer.toString();
154 }
155
156 public void setConf(Configuration conf) {
157 this.conf = conf;
158 }
159
160 public Configuration getConf() {
161 return this.conf;
162 }
163 }
164
165
166 static private class ClientCache {
167 private Map<SocketFactory, HBaseClient> clients =
168 new HashMap<SocketFactory, HBaseClient>();
169
170 protected ClientCache() {}
171
172
173
174
175
176
177
178
179
180 protected synchronized HBaseClient getClient(Configuration conf,
181 SocketFactory factory) {
182
183
184
185
186
187 HBaseClient client = clients.get(factory);
188 if (client == null) {
189
190 client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
191 clients.put(factory, client);
192 } else {
193 client.incCount();
194 }
195 return client;
196 }
197
198
199
200
201
202
203
204
205 protected synchronized HBaseClient getClient(Configuration conf) {
206 return getClient(conf, SocketFactory.getDefault());
207 }
208
209
210
211
212
213
214 protected void stopClient(HBaseClient client) {
215 synchronized (this) {
216 client.decCount();
217 if (client.isZeroReference()) {
218 clients.remove(client.getSocketFactory());
219 }
220 }
221 if (client.isZeroReference()) {
222 client.stop();
223 }
224 }
225 }
226
227 protected final static ClientCache CLIENTS = new ClientCache();
228
229 private static class Invoker implements InvocationHandler {
230 private InetSocketAddress address;
231 private UserGroupInformation ticket;
232 private HBaseClient client;
233 private boolean isClosed = false;
234 final private int rpcTimeout;
235
236
237
238
239
240
241
242 public Invoker(InetSocketAddress address, UserGroupInformation ticket,
243 Configuration conf, SocketFactory factory, int rpcTimeout) {
244 this.address = address;
245 this.ticket = ticket;
246 this.client = CLIENTS.getClient(conf, factory);
247 this.rpcTimeout = rpcTimeout;
248 }
249
250 public Object invoke(Object proxy, Method method, Object[] args)
251 throws Throwable {
252 final boolean logDebug = LOG.isDebugEnabled();
253 long startTime = 0;
254 if (logDebug) {
255 startTime = System.currentTimeMillis();
256 }
257 HbaseObjectWritable value = (HbaseObjectWritable)
258 client.call(new Invocation(method, args), address, ticket, rpcTimeout);
259 if (logDebug) {
260 long callTime = System.currentTimeMillis() - startTime;
261 LOG.debug("Call: " + method.getName() + " " + callTime);
262 }
263 return value.get();
264 }
265
266
267 synchronized protected void close() {
268 if (!isClosed) {
269 isClosed = true;
270 CLIENTS.stopClient(client);
271 }
272 }
273 }
274
275
276
277
278 @SuppressWarnings("serial")
279 public static class VersionMismatch extends IOException {
280 private String interfaceName;
281 private long clientVersion;
282 private long serverVersion;
283
284
285
286
287
288
289
290 public VersionMismatch(String interfaceName, long clientVersion,
291 long serverVersion) {
292 super("Protocol " + interfaceName + " version mismatch. (client = " +
293 clientVersion + ", server = " + serverVersion + ")");
294 this.interfaceName = interfaceName;
295 this.clientVersion = clientVersion;
296 this.serverVersion = serverVersion;
297 }
298
299
300
301
302
303
304 public String getInterfaceName() {
305 return interfaceName;
306 }
307
308
309
310
311 public long getClientVersion() {
312 return clientVersion;
313 }
314
315
316
317
318 public long getServerVersion() {
319 return serverVersion;
320 }
321 }
322
323
324
325
326
327
328
329
330
331
332
333
334 @SuppressWarnings("unchecked")
335 public static VersionedProtocol waitForProxy(Class protocol,
336 long clientVersion,
337 InetSocketAddress addr,
338 Configuration conf,
339 int maxAttempts,
340 int rpcTimeout,
341 long timeout
342 ) throws IOException {
343
344 long startTime = System.currentTimeMillis();
345 IOException ioe;
346 int reconnectAttempts = 0;
347 while (true) {
348 try {
349 return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
350 } catch(ConnectException se) {
351 ioe = se;
352 if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
353 LOG.info("Server at " + addr + " could not be reached after " +
354 reconnectAttempts + " tries, giving up.");
355 throw new RetriesExhaustedException("Failed setting up proxy " +
356 protocol + " to " + addr.toString() + " after attempts=" +
357 reconnectAttempts, se);
358 }
359 } catch(SocketTimeoutException te) {
360 LOG.info("Problem connecting to server: " + addr);
361 ioe = te;
362 }
363
364 if (System.currentTimeMillis()-timeout >= startTime) {
365 throw ioe;
366 }
367
368
369 try {
370 Thread.sleep(1000);
371 } catch (InterruptedException ie) {
372
373 }
374 }
375 }
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390 public static VersionedProtocol getProxy(Class<?> protocol,
391 long clientVersion, InetSocketAddress addr, Configuration conf,
392 SocketFactory factory, int rpcTimeout) throws IOException {
393 return getProxy(protocol, clientVersion, addr, null, conf, factory,
394 rpcTimeout);
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411 public static VersionedProtocol getProxy(Class<?> protocol,
412 long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
413 Configuration conf, SocketFactory factory, int rpcTimeout)
414 throws IOException {
415 VersionedProtocol proxy =
416 (VersionedProtocol) Proxy.newProxyInstance(
417 protocol.getClassLoader(), new Class[] { protocol },
418 new Invoker(addr, ticket, conf, factory, rpcTimeout));
419 long serverVersion = proxy.getProtocolVersion(protocol.getName(),
420 clientVersion);
421 if (serverVersion == clientVersion) {
422 return proxy;
423 }
424 throw new VersionMismatch(protocol.getName(), clientVersion,
425 serverVersion);
426 }
427
428
429
430
431
432
433
434
435
436
437
438
439 public static VersionedProtocol getProxy(Class<?> protocol,
440 long clientVersion, InetSocketAddress addr, Configuration conf,
441 int rpcTimeout)
442 throws IOException {
443
444 return getProxy(protocol, clientVersion, addr, conf, NetUtils
445 .getDefaultSocketFactory(conf), rpcTimeout);
446 }
447
448
449
450
451
452 public static void stopProxy(VersionedProtocol proxy) {
453 if (proxy!=null) {
454 ((Invoker)Proxy.getInvocationHandler(proxy)).close();
455 }
456 }
457
458
459
460
461
462
463
464
465
466
467
468 public static Object[] call(Method method, Object[][] params,
469 InetSocketAddress[] addrs, Configuration conf)
470 throws IOException {
471
472 Invocation[] invocations = new Invocation[params.length];
473 for (int i = 0; i < params.length; i++)
474 invocations[i] = new Invocation(method, params[i]);
475 HBaseClient client = CLIENTS.getClient(conf);
476 try {
477 Writable[] wrappedValues = client.call(invocations, addrs);
478
479 if (method.getReturnType() == Void.TYPE) {
480 return null;
481 }
482
483 Object[] values =
484 (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
485 for (int i = 0; i < values.length; i++)
486 if (wrappedValues[i] != null)
487 values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
488
489 return values;
490 } finally {
491 CLIENTS.stopClient(client);
492 }
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508 public static Server getServer(final Object instance,
509 final Class<?>[] ifaces,
510 final String bindAddress, final int port,
511 final int numHandlers,
512 int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
513 throws IOException {
514 return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel);
515 }
516
517
518 public static class Server extends HBaseServer {
519 private Object instance;
520 private Class<?> implementation;
521 private Class<?> ifaces[];
522 private boolean verbose;
523
524 private static String classNameBase(String className) {
525 String[] names = className.split("\\.", -1);
526 if (names == null || names.length == 0) {
527 return className;
528 }
529 return names[names.length-1];
530 }
531
532
533
534
535
536
537
538
539
540
541 public Server(Object instance, final Class<?>[] ifaces,
542 Configuration conf, String bindAddress, int port,
543 int numHandlers, int metaHandlerCount, boolean verbose, int highPriorityLevel) throws IOException {
544 super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel);
545 this.instance = instance;
546 this.implementation = instance.getClass();
547
548 this.verbose = verbose;
549
550 this.ifaces = ifaces;
551
552
553 this.rpcMetrics.createMetrics(this.ifaces);
554 }
555
556 @Override
557 public Writable call(Writable param, long receivedTime) throws IOException {
558 try {
559 Invocation call = (Invocation)param;
560 if(call.getMethodName() == null) {
561 throw new IOException("Could not find requested method, the usual " +
562 "cause is a version mismatch between client and server.");
563 }
564 if (verbose) log("Call: " + call);
565 Method method =
566 implementation.getMethod(call.getMethodName(),
567 call.getParameterClasses());
568
569 long startTime = System.currentTimeMillis();
570 Object value = method.invoke(instance, call.getParameters());
571 int processingTime = (int) (System.currentTimeMillis() - startTime);
572 int qTime = (int) (startTime-receivedTime);
573 if (LOG.isDebugEnabled()) {
574 LOG.debug("Served: " + call.getMethodName() +
575 " queueTime= " + qTime +
576 " procesingTime= " + processingTime);
577 }
578 rpcMetrics.rpcQueueTime.inc(qTime);
579 rpcMetrics.rpcProcessingTime.inc(processingTime);
580 rpcMetrics.inc(call.getMethodName(), processingTime);
581 if (verbose) log("Return: "+value);
582
583 return new HbaseObjectWritable(method.getReturnType(), value);
584
585 } catch (InvocationTargetException e) {
586 Throwable target = e.getTargetException();
587 if (target instanceof IOException) {
588 throw (IOException)target;
589 }
590 IOException ioe = new IOException(target.toString());
591 ioe.setStackTrace(target.getStackTrace());
592 throw ioe;
593 } catch (Throwable e) {
594 IOException ioe = new IOException(e.toString());
595 ioe.setStackTrace(e.getStackTrace());
596 throw ioe;
597 }
598 }
599 }
600
601 protected static void log(String value) {
602 String v = value;
603 if (v != null && v.length() > 55)
604 v = v.substring(0, 55)+"...";
605 LOG.info(v);
606 }
607 }