View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import com.google.common.base.Function;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configurable;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
29  import org.apache.hadoop.hbase.io.HbaseObjectWritable;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.ipc.VersionedProtocol;
33  import org.apache.hadoop.net.NetUtils;
34  import org.apache.hadoop.security.UserGroupInformation;
35  
36  import javax.net.SocketFactory;
37  import java.io.DataInput;
38  import java.io.DataOutput;
39  import java.io.IOException;
40  import java.lang.reflect.Array;
41  import java.lang.reflect.InvocationHandler;
42  import java.lang.reflect.InvocationTargetException;
43  import java.lang.reflect.Method;
44  import java.lang.reflect.Proxy;
45  import java.net.ConnectException;
46  import java.net.InetSocketAddress;
47  import java.net.SocketTimeoutException;
48  import java.util.HashMap;
49  import java.util.Map;
50  
51  /** A simple RPC mechanism.
52   *
53   * This is a local hbase copy of the hadoop RPC so we can do things like
54   * address HADOOP-414 for hbase-only and try other hbase-specific
55   * optimizations like using our own version of ObjectWritable.  Class has been
56   * renamed to avoid confusing it w/ hadoop versions.
57   * <p>
58   *
59   *
60   * A <i>protocol</i> is a Java interface.  All parameters and return types must
61   * be one of:
62   *
63   * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
64   * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
65   * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
66   *
67   * <li>a {@link String}; or</li>
68   *
69   * <li>a {@link Writable}; or</li>
70   *
71   * <li>an array of the above types</li> </ul>
72   *
73   * All methods in the protocol should throw only IOException.  No field data of
74   * the protocol instance is transmitted.
75   */
76  public class HBaseRPC {
77    // Leave this out in the hadoop ipc package but keep class name.  Do this
78    // so that we dont' get the logging of this class's invocations by doing our
79    // blanket enabling DEBUG on the o.a.h.h. package.
80    protected static final Log LOG =
81      LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
82  
83    private HBaseRPC() {
84      super();
85    }                                  // no public ctor
86  
87  
88    /** A method invocation, including the method name and its parameters.*/
89    public static class Invocation implements Writable, Configurable {
90      private String methodName;
91      @SuppressWarnings("unchecked")
92      private Class[] parameterClasses;
93      private Object[] parameters;
94      private Configuration conf;
95  
96      /** default constructor */
97      public Invocation() {
98        super();
99      }
100 
101     /**
102      * @param method method to call
103      * @param parameters parameters of call
104      */
105     public Invocation(Method method, Object[] parameters) {
106       this.methodName = method.getName();
107       this.parameterClasses = method.getParameterTypes();
108       this.parameters = parameters;
109     }
110 
111     /** @return The name of the method invoked. */
112     public String getMethodName() { return methodName; }
113 
114     /** @return The parameter classes. */
115     @SuppressWarnings("unchecked")
116     public Class[] getParameterClasses() { return parameterClasses; }
117 
118     /** @return The parameter instances. */
119     public Object[] getParameters() { return parameters; }
120 
121     public void readFields(DataInput in) throws IOException {
122       methodName = in.readUTF();
123       parameters = new Object[in.readInt()];
124       parameterClasses = new Class[parameters.length];
125       HbaseObjectWritable objectWritable = new HbaseObjectWritable();
126       for (int i = 0; i < parameters.length; i++) {
127         parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
128           this.conf);
129         parameterClasses[i] = objectWritable.getDeclaredClass();
130       }
131     }
132 
133     public void write(DataOutput out) throws IOException {
134       out.writeUTF(this.methodName);
135       out.writeInt(parameterClasses.length);
136       for (int i = 0; i < parameterClasses.length; i++) {
137         HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
138                                    conf);
139       }
140     }
141 
142     @Override
143     public String toString() {
144       StringBuilder buffer = new StringBuilder(256);
145       buffer.append(methodName);
146       buffer.append("(");
147       for (int i = 0; i < parameters.length; i++) {
148         if (i != 0)
149           buffer.append(", ");
150         buffer.append(parameters[i]);
151       }
152       buffer.append(")");
153       return buffer.toString();
154     }
155 
156     public void setConf(Configuration conf) {
157       this.conf = conf;
158     }
159 
160     public Configuration getConf() {
161       return this.conf;
162     }
163   }
164 
165   /* Cache a client using its socket factory as the hash key */
166   static private class ClientCache {
167     private Map<SocketFactory, HBaseClient> clients =
168       new HashMap<SocketFactory, HBaseClient>();
169 
170     protected ClientCache() {}
171 
172     /**
173      * Construct & cache an IPC client with the user-provided SocketFactory
174      * if no cached client exists.
175      *
176      * @param conf Configuration
177      * @param factory socket factory
178      * @return an IPC client
179      */
180     protected synchronized HBaseClient getClient(Configuration conf,
181         SocketFactory factory) {
182       // Construct & cache client.  The configuration is only used for timeout,
183       // and Clients have connection pools.  So we can either (a) lose some
184       // connection pooling and leak sockets, or (b) use the same timeout for all
185       // configurations.  Since the IPC is usually intended globally, not
186       // per-job, we choose (a).
187       HBaseClient client = clients.get(factory);
188       if (client == null) {
189         // Make an hbase client instead of hadoop Client.
190         client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
191         clients.put(factory, client);
192       } else {
193         client.incCount();
194       }
195       return client;
196     }
197 
198     /**
199      * Construct & cache an IPC client with the default SocketFactory
200      * if no cached client exists.
201      *
202      * @param conf Configuration
203      * @return an IPC client
204      */
205     protected synchronized HBaseClient getClient(Configuration conf) {
206       return getClient(conf, SocketFactory.getDefault());
207     }
208 
209     /**
210      * Stop a RPC client connection
211      * A RPC client is closed only when its reference count becomes zero.
212      * @param client client to stop
213      */
214     protected void stopClient(HBaseClient client) {
215       synchronized (this) {
216         client.decCount();
217         if (client.isZeroReference()) {
218           clients.remove(client.getSocketFactory());
219         }
220       }
221       if (client.isZeroReference()) {
222         client.stop();
223       }
224     }
225   }
226 
227   protected final static ClientCache CLIENTS = new ClientCache();
228 
229   private static class Invoker implements InvocationHandler {
230     private InetSocketAddress address;
231     private UserGroupInformation ticket;
232     private HBaseClient client;
233     private boolean isClosed = false;
234     final private int rpcTimeout;
235 
236     /**
237      * @param address address for invoker
238      * @param ticket ticket
239      * @param conf configuration
240      * @param factory socket factory
241      */
242     public Invoker(InetSocketAddress address, UserGroupInformation ticket,
243                    Configuration conf, SocketFactory factory, int rpcTimeout) {
244       this.address = address;
245       this.ticket = ticket;
246       this.client = CLIENTS.getClient(conf, factory);
247       this.rpcTimeout = rpcTimeout;
248     }
249 
250     public Object invoke(Object proxy, Method method, Object[] args)
251         throws Throwable {
252       final boolean logDebug = LOG.isDebugEnabled();
253       long startTime = 0;
254       if (logDebug) {
255         startTime = System.currentTimeMillis();
256       }
257       HbaseObjectWritable value = (HbaseObjectWritable)
258         client.call(new Invocation(method, args), address, ticket, rpcTimeout);
259       if (logDebug) {
260         long callTime = System.currentTimeMillis() - startTime;
261         LOG.debug("Call: " + method.getName() + " " + callTime);
262       }
263       return value.get();
264     }
265 
266     /* close the IPC client that's responsible for this invoker's RPCs */
267     synchronized protected void close() {
268       if (!isClosed) {
269         isClosed = true;
270         CLIENTS.stopClient(client);
271       }
272     }
273   }
274 
275   /**
276    * A version mismatch for the RPC protocol.
277    */
278   @SuppressWarnings("serial")
279   public static class VersionMismatch extends IOException {
280     private String interfaceName;
281     private long clientVersion;
282     private long serverVersion;
283 
284     /**
285      * Create a version mismatch exception
286      * @param interfaceName the name of the protocol mismatch
287      * @param clientVersion the client's version of the protocol
288      * @param serverVersion the server's version of the protocol
289      */
290     public VersionMismatch(String interfaceName, long clientVersion,
291                            long serverVersion) {
292       super("Protocol " + interfaceName + " version mismatch. (client = " +
293             clientVersion + ", server = " + serverVersion + ")");
294       this.interfaceName = interfaceName;
295       this.clientVersion = clientVersion;
296       this.serverVersion = serverVersion;
297     }
298 
299     /**
300      * Get the interface name
301      * @return the java class name
302      *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
303      */
304     public String getInterfaceName() {
305       return interfaceName;
306     }
307 
308     /**
309      * @return the client's preferred version
310      */
311     public long getClientVersion() {
312       return clientVersion;
313     }
314 
315     /**
316      * @return the server's agreed to version.
317      */
318     public long getServerVersion() {
319       return serverVersion;
320     }
321   }
322 
323   /**
324    * @param protocol protocol interface
325    * @param clientVersion which client version we expect
326    * @param addr address of remote service
327    * @param conf configuration
328    * @param maxAttempts max attempts
329    * @param rpcTimeout timeout for each RPC
330    * @param timeout timeout in milliseconds
331    * @return proxy
332    * @throws IOException e
333    */
334   @SuppressWarnings("unchecked")
335   public static VersionedProtocol waitForProxy(Class protocol,
336                                                long clientVersion,
337                                                InetSocketAddress addr,
338                                                Configuration conf,
339                                                int maxAttempts,
340                                                int rpcTimeout,
341                                                long timeout
342                                                ) throws IOException {
343     // HBase does limited number of reconnects which is different from hadoop.
344     long startTime = System.currentTimeMillis();
345     IOException ioe;
346     int reconnectAttempts = 0;
347     while (true) {
348       try {
349         return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
350       } catch(ConnectException se) {  // namenode has not been started
351         ioe = se;
352         if (maxAttempts >= 0 && ++reconnectAttempts >= maxAttempts) {
353           LOG.info("Server at " + addr + " could not be reached after " +
354             reconnectAttempts + " tries, giving up.");
355           throw new RetriesExhaustedException("Failed setting up proxy " +
356             protocol + " to " + addr.toString() + " after attempts=" +
357             reconnectAttempts, se);
358       }
359       } catch(SocketTimeoutException te) {  // namenode is busy
360         LOG.info("Problem connecting to server: " + addr);
361         ioe = te;
362       }
363       // check if timed out
364       if (System.currentTimeMillis()-timeout >= startTime) {
365         throw ioe;
366       }
367 
368       // wait for retry
369       try {
370         Thread.sleep(1000);
371       } catch (InterruptedException ie) {
372         // IGNORE
373       }
374     }
375   }
376 
377   /**
378    * Construct a client-side proxy object that implements the named protocol,
379    * talking to a server at the named address.
380    *
381    * @param protocol interface
382    * @param clientVersion version we are expecting
383    * @param addr remote address
384    * @param conf configuration
385    * @param factory socket factory
386    * @param rpcTimeout timeout for each RPC
387    * @return proxy
388    * @throws IOException e
389    */
390   public static VersionedProtocol getProxy(Class<?> protocol,
391       long clientVersion, InetSocketAddress addr, Configuration conf,
392       SocketFactory factory, int rpcTimeout) throws IOException {
393     return getProxy(protocol, clientVersion, addr, null, conf, factory,
394         rpcTimeout);
395   }
396 
397   /**
398    * Construct a client-side proxy object that implements the named protocol,
399    * talking to a server at the named address.
400    *
401    * @param protocol interface
402    * @param clientVersion version we are expecting
403    * @param addr remote address
404    * @param ticket ticket
405    * @param conf configuration
406    * @param factory socket factory
407    * @param rpcTimeout timeout for each RPC
408    * @return proxy
409    * @throws IOException e
410    */
411   public static VersionedProtocol getProxy(Class<?> protocol,
412       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
413       Configuration conf, SocketFactory factory, int rpcTimeout)
414   throws IOException {
415     VersionedProtocol proxy =
416         (VersionedProtocol) Proxy.newProxyInstance(
417             protocol.getClassLoader(), new Class[] { protocol },
418             new Invoker(addr, ticket, conf, factory, rpcTimeout));
419     long serverVersion = proxy.getProtocolVersion(protocol.getName(),
420                                                   clientVersion);
421     if (serverVersion == clientVersion) {
422       return proxy;
423     }
424     throw new VersionMismatch(protocol.getName(), clientVersion,
425                               serverVersion);
426   }
427 
428   /**
429    * Construct a client-side proxy object with the default SocketFactory
430    *
431    * @param protocol interface
432    * @param clientVersion version we are expecting
433    * @param addr remote address
434    * @param conf configuration
435    * @param rpcTimeout timeout for each RPC
436    * @return a proxy instance
437    * @throws IOException e
438    */
439   public static VersionedProtocol getProxy(Class<?> protocol,
440       long clientVersion, InetSocketAddress addr, Configuration conf,
441       int rpcTimeout)
442       throws IOException {
443 
444     return getProxy(protocol, clientVersion, addr, conf, NetUtils
445         .getDefaultSocketFactory(conf), rpcTimeout);
446   }
447 
448   /**
449    * Stop this proxy and release its invoker's resource
450    * @param proxy the proxy to be stopped
451    */
452   public static void stopProxy(VersionedProtocol proxy) {
453     if (proxy!=null) {
454       ((Invoker)Proxy.getInvocationHandler(proxy)).close();
455     }
456   }
457 
458   /**
459    * Expert: Make multiple, parallel calls to a set of servers.
460    *
461    * @param method method to invoke
462    * @param params array of parameters
463    * @param addrs array of addresses
464    * @param conf configuration
465    * @return values
466    * @throws IOException e
467    */
468   public static Object[] call(Method method, Object[][] params,
469                               InetSocketAddress[] addrs, Configuration conf)
470     throws IOException {
471 
472     Invocation[] invocations = new Invocation[params.length];
473     for (int i = 0; i < params.length; i++)
474       invocations[i] = new Invocation(method, params[i]);
475     HBaseClient client = CLIENTS.getClient(conf);
476     try {
477     Writable[] wrappedValues = client.call(invocations, addrs);
478 
479     if (method.getReturnType() == Void.TYPE) {
480       return null;
481     }
482 
483     Object[] values =
484       (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
485     for (int i = 0; i < values.length; i++)
486       if (wrappedValues[i] != null)
487         values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
488 
489     return values;
490     } finally {
491       CLIENTS.stopClient(client);
492     }
493   }
494 
495   /**
496    * Construct a server for a protocol implementation instance listening on a
497    * port and address.
498    *
499    * @param instance instance
500    * @param bindAddress bind address
501    * @param port port to bind to
502    * @param numHandlers number of handlers to start
503    * @param verbose verbose flag
504    * @param conf configuration
505    * @return Server
506    * @throws IOException e
507    */
508   public static Server getServer(final Object instance,
509                                  final Class<?>[] ifaces,
510                                  final String bindAddress, final int port,
511                                  final int numHandlers,
512                                  int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
513     throws IOException {
514     return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel);
515   }
516 
517   /** An RPC Server. */
518   public static class Server extends HBaseServer {
519     private Object instance;
520     private Class<?> implementation;
521     private Class<?> ifaces[];
522     private boolean verbose;
523 
524     private static String classNameBase(String className) {
525       String[] names = className.split("\\.", -1);
526       if (names == null || names.length == 0) {
527         return className;
528       }
529       return names[names.length-1];
530     }
531 
532     /** Construct an RPC server.
533      * @param instance the instance whose methods will be called
534      * @param conf the configuration to use
535      * @param bindAddress the address to bind on to listen for connection
536      * @param port the port to listen for connections on
537      * @param numHandlers the number of method handler threads to run
538      * @param verbose whether each call should be logged
539      * @throws IOException e
540      */
541     public Server(Object instance, final Class<?>[] ifaces,
542                   Configuration conf, String bindAddress, int port,
543                   int numHandlers, int metaHandlerCount, boolean verbose, int highPriorityLevel) throws IOException {
544       super(bindAddress, port, Invocation.class, numHandlers, metaHandlerCount, conf, classNameBase(instance.getClass().getName()), highPriorityLevel);
545       this.instance = instance;
546       this.implementation = instance.getClass();
547 
548       this.verbose = verbose;
549 
550       this.ifaces = ifaces;
551 
552       // create metrics for the advertised interfaces this server implements.
553       this.rpcMetrics.createMetrics(this.ifaces);
554     }
555 
556     @Override
557     public Writable call(Writable param, long receivedTime) throws IOException {
558       try {
559         Invocation call = (Invocation)param;
560         if(call.getMethodName() == null) {
561           throw new IOException("Could not find requested method, the usual " +
562               "cause is a version mismatch between client and server.");
563         }
564         if (verbose) log("Call: " + call);
565         Method method =
566           implementation.getMethod(call.getMethodName(),
567                                    call.getParameterClasses());
568 
569         long startTime = System.currentTimeMillis();
570         Object value = method.invoke(instance, call.getParameters());
571         int processingTime = (int) (System.currentTimeMillis() - startTime);
572         int qTime = (int) (startTime-receivedTime);
573         if (LOG.isDebugEnabled()) {
574           LOG.debug("Served: " + call.getMethodName() +
575             " queueTime= " + qTime +
576             " procesingTime= " + processingTime);
577         }
578         rpcMetrics.rpcQueueTime.inc(qTime);
579         rpcMetrics.rpcProcessingTime.inc(processingTime);
580         rpcMetrics.inc(call.getMethodName(), processingTime);
581         if (verbose) log("Return: "+value);
582 
583         return new HbaseObjectWritable(method.getReturnType(), value);
584 
585       } catch (InvocationTargetException e) {
586         Throwable target = e.getTargetException();
587         if (target instanceof IOException) {
588           throw (IOException)target;
589         }
590         IOException ioe = new IOException(target.toString());
591         ioe.setStackTrace(target.getStackTrace());
592         throw ioe;
593       } catch (Throwable e) {
594         IOException ioe = new IOException(e.toString());
595         ioe.setStackTrace(e.getStackTrace());
596         throw ioe;
597       }
598     }
599   }
600 
601   protected static void log(String value) {
602     String v = value;
603     if (v != null && v.length() > 55)
604       v = v.substring(0, 55)+"...";
605     LOG.info(v);
606   }
607 }