View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.io.DataOutputBuffer;
28  import org.apache.hadoop.io.IOUtils;
29  import org.apache.hadoop.io.ObjectWritable;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.io.WritableUtils;
32  import org.apache.hadoop.ipc.RemoteException;
33  import org.apache.hadoop.net.NetUtils;
34  import org.apache.hadoop.security.UserGroupInformation;
35  import org.apache.hadoop.util.ReflectionUtils;
36  
37  import javax.net.SocketFactory;
38  import java.io.BufferedInputStream;
39  import java.io.BufferedOutputStream;
40  import java.io.DataInputStream;
41  import java.io.DataOutputStream;
42  import java.io.FilterInputStream;
43  import java.io.IOException;
44  import java.io.InputStream;
45  import java.net.ConnectException;
46  import java.net.InetSocketAddress;
47  import java.net.Socket;
48  import java.net.SocketTimeoutException;
49  import java.net.UnknownHostException;
50  import java.util.Hashtable;
51  import java.util.Iterator;
52  import java.util.Map.Entry;
53  import java.util.concurrent.atomic.AtomicBoolean;
54  import java.util.concurrent.atomic.AtomicLong;
55  
56  /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
57   * parameter, and return a {@link Writable} as their value.  A service runs on
58   * a port and is defined by a parameter class and a value class.
59   *
60   * <p>This is the org.apache.hadoop.ipc.Client renamed as HBaseClient and
61   * moved into this package so can access package-private methods.
62   *
63   * @see HBaseServer
64   */
65  public class HBaseClient {
66  
67    private static final Log LOG =
68      LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
69    protected final Hashtable<ConnectionId, Connection> connections =
70      new Hashtable<ConnectionId, Connection>();
71  
72    protected final Class<? extends Writable> valueClass;   // class of call values
73    protected int counter;                            // counter for call ids
74    protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
75    final protected Configuration conf;
76    final protected int maxIdleTime; // connections will be culled if it was idle for
77                             // maxIdleTime microsecs
78    final protected int maxRetries; //the max. no. of retries for socket connections
79    final protected long failureSleep; // Time to sleep before retry on failure.
80    protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
81    protected final boolean tcpKeepAlive; // if T then use keepalives
82    protected int pingInterval; // how often sends ping to the server in msecs
83    protected int socketTimeout; // socket timeout
84  
85    protected final SocketFactory socketFactory;           // how to create sockets
86    private int refCount = 1;
87  
88    final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
89    final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
90    final static int DEFAULT_PING_INTERVAL = 60000;  // 1 min
91    final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
92    final static int PING_CALL_ID = -1;
93  
94    /**
95     * set the ping interval value in configuration
96     *
97     * @param conf Configuration
98     * @param pingInterval the ping interval
99     */
100   public static void setPingInterval(Configuration conf, int pingInterval) {
101     conf.setInt(PING_INTERVAL_NAME, pingInterval);
102   }
103 
104   /**
105    * Get the ping interval from configuration;
106    * If not set in the configuration, return the default value.
107    *
108    * @param conf Configuration
109    * @return the ping interval
110    */
111   static int getPingInterval(Configuration conf) {
112     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
113   }
114 
115   /**
116    * Set the socket timeout
117    * @param conf Configuration
118    * @param socketTimeout the socket timeout
119    */
120   public static void setSocketTimeout(Configuration conf, int socketTimeout) {
121     conf.setInt(SOCKET_TIMEOUT, socketTimeout);
122   }
123 
124   /**
125    * @return the socket timeout
126    */
127   static int getSocketTimeout(Configuration conf) {
128     return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
129   }
130 
131   /**
132    * Increment this client's reference count
133    *
134    */
135   synchronized void incCount() {
136     refCount++;
137   }
138 
139   /**
140    * Decrement this client's reference count
141    *
142    */
143   synchronized void decCount() {
144     refCount--;
145   }
146 
147   /**
148    * Return if this client has no reference
149    *
150    * @return true if this client has no reference; false otherwise
151    */
152   synchronized boolean isZeroReference() {
153     return refCount==0;
154   }
155 
156   /** A call waiting for a value. */
157   private class Call {
158     final int id;                                       // call id
159     final Writable param;                               // parameter
160     Writable value;                               // value, null if error
161     IOException error;                            // exception, null if value
162     boolean done;                                 // true when call is done
163 
164     protected Call(Writable param) {
165       this.param = param;
166       synchronized (HBaseClient.this) {
167         this.id = counter++;
168       }
169     }
170 
171     /** Indicate when the call is complete and the
172      * value or error are available.  Notifies by default.  */
173     protected synchronized void callComplete() {
174       this.done = true;
175       notify();                                 // notify caller
176     }
177 
178     /** Set the exception when there is an error.
179      * Notify the caller the call is done.
180      *
181      * @param error exception thrown by the call; either local or remote
182      */
183     public synchronized void setException(IOException error) {
184       this.error = error;
185       callComplete();
186     }
187 
188     /** Set the return value when there is no error.
189      * Notify the caller the call is done.
190      *
191      * @param value return value of the call.
192      */
193     public synchronized void setValue(Writable value) {
194       this.value = value;
195       callComplete();
196     }
197   }
198 
199   /** Thread that reads responses and notifies callers.  Each connection owns a
200    * socket connected to a remote address.  Calls are multiplexed through this
201    * socket: responses may be delivered out of order. */
202   private class Connection extends Thread {
203     private ConnectionId remoteId;
204     private Socket socket = null;                 // connected socket
205     private DataInputStream in;
206     private DataOutputStream out;
207 
208     // currently active calls
209     private final Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
210     private final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
211     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
212     private IOException closeException; // close reason
213 
214     public Connection(InetSocketAddress address) throws IOException {
215       this(new ConnectionId(address, null, 0));
216     }
217 
218     public Connection(ConnectionId remoteId) throws IOException {
219       if (remoteId.getAddress().isUnresolved()) {
220         throw new UnknownHostException("unknown host: " +
221                                        remoteId.getAddress().getHostName());
222       }
223       this.remoteId = remoteId;
224       UserGroupInformation ticket = remoteId.getTicket();
225       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
226         remoteId.getAddress().toString() +
227         ((ticket==null)?" from an unknown user": (" from " + ticket.getUserName())));
228       this.setDaemon(true);
229     }
230 
231     /** Update lastActivity with the current time. */
232     private void touch() {
233       lastActivity.set(System.currentTimeMillis());
234     }
235 
236     /**
237      * Add a call to this connection's call queue and notify
238      * a listener; synchronized.
239      * Returns false if called during shutdown.
240      * @param call to add
241      * @return true if the call was added.
242      */
243     protected synchronized boolean addCall(Call call) {
244       if (shouldCloseConnection.get())
245         return false;
246       calls.put(call.id, call);
247       notify();
248       return true;
249     }
250 
251     /** This class sends a ping to the remote side when timeout on
252      * reading. If no failure is detected, it retries until at least
253      * a byte is read.
254      */
255     private class PingInputStream extends FilterInputStream {
256       /* constructor */
257       protected PingInputStream(InputStream in) {
258         super(in);
259       }
260 
261       /* Process timeout exception
262        * if the connection is not going to be closed, send a ping.
263        * otherwise, throw the timeout exception.
264        */
265       private void handleTimeout(SocketTimeoutException e) throws IOException {
266         if (shouldCloseConnection.get() || !running.get() || 
267             remoteId.rpcTimeout > 0) {
268           throw e;
269         }
270         sendPing();
271       }
272 
273       /** Read a byte from the stream.
274        * Send a ping if timeout on read. Retries if no failure is detected
275        * until a byte is read.
276        * @throws IOException for any IO problem other than socket timeout
277        */
278       @Override
279       public int read() throws IOException {
280         do {
281           try {
282             return super.read();
283           } catch (SocketTimeoutException e) {
284             handleTimeout(e);
285           }
286         } while (true);
287       }
288 
289       /** Read bytes into a buffer starting from offset <code>off</code>
290        * Send a ping if timeout on read. Retries if no failure is detected
291        * until a byte is read.
292        *
293        * @return the total number of bytes read; -1 if the connection is closed.
294        */
295       @Override
296       public int read(byte[] buf, int off, int len) throws IOException {
297         do {
298           try {
299             return super.read(buf, off, len);
300           } catch (SocketTimeoutException e) {
301             handleTimeout(e);
302           }
303         } while (true);
304       }
305     }
306 
307     /** Connect to the server and set up the I/O streams. It then sends
308      * a header to the server and starts
309      * the connection thread that waits for responses.
310      * @throws java.io.IOException e
311      */
312     protected synchronized void setupIOstreams() throws IOException {
313       if (socket != null || shouldCloseConnection.get()) {
314         return;
315       }
316 
317       short ioFailures = 0;
318       short timeoutFailures = 0;
319       try {
320         if (LOG.isDebugEnabled()) {
321           LOG.debug("Connecting to "+remoteId.getAddress());
322         }
323         while (true) {
324           try {
325             this.socket = socketFactory.createSocket();
326             this.socket.setTcpNoDelay(tcpNoDelay);
327             this.socket.setKeepAlive(tcpKeepAlive);
328             NetUtils.connect(this.socket, remoteId.getAddress(),
329               getSocketTimeout(conf));
330             if (remoteId.rpcTimeout > 0) {
331               pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
332             }
333             this.socket.setSoTimeout(pingInterval);
334             break;
335           } catch (SocketTimeoutException toe) {
336             handleConnectionFailure(timeoutFailures++, maxRetries, toe);
337           } catch (IOException ie) {
338             handleConnectionFailure(ioFailures++, maxRetries, ie);
339           }
340         }
341         this.in = new DataInputStream(new BufferedInputStream
342             (new PingInputStream(NetUtils.getInputStream(socket))));
343         this.out = new DataOutputStream
344             (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
345         writeHeader();
346 
347         // update last activity time
348         touch();
349 
350         // start the receiver thread after the socket connection has been set up
351         start();
352       } catch (IOException e) {
353         markClosed(e);
354         close();
355 
356         throw e;
357       }
358     }
359 
360     /* Handle connection failures
361      *
362      * If the current number of retries is equal to the max number of retries,
363      * stop retrying and throw the exception; Otherwise backoff N seconds and
364      * try connecting again.
365      *
366      * This Method is only called from inside setupIOstreams(), which is
367      * synchronized. Hence the sleep is synchronized; the locks will be retained.
368      *
369      * @param curRetries current number of retries
370      * @param maxRetries max number of retries allowed
371      * @param ioe failure reason
372      * @throws IOException if max number of retries is reached
373      */
374     private void handleConnectionFailure(
375         int curRetries, int maxRetries, IOException ioe) throws IOException {
376       // close the current connection
377       if (socket != null) { // could be null if the socket creation failed
378         try {
379           socket.close();
380         } catch (IOException e) {
381           LOG.warn("Not able to close a socket", e);
382         }
383       }
384       // set socket to null so that the next call to setupIOstreams
385       // can start the process of connect all over again.
386       socket = null;
387 
388       // throw the exception if the maximum number of retries is reached
389       if (curRetries >= maxRetries) {
390         throw ioe;
391       }
392 
393       // otherwise back off and retry
394       try {
395         Thread.sleep(failureSleep);
396       } catch (InterruptedException ignored) {}
397 
398       LOG.info("Retrying connect to server: " + remoteId.getAddress() +
399         " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
400         " time(s).");
401     }
402 
403     /* Write the header for each connection
404      * Out is not synchronized because only the first thread does this.
405      */
406     private void writeHeader() throws IOException {
407       out.write(HBaseServer.HEADER.array());
408       out.write(HBaseServer.CURRENT_VERSION);
409       //When there are more fields we can have ConnectionHeader Writable.
410       DataOutputBuffer buf = new DataOutputBuffer();
411       ObjectWritable.writeObject(buf, remoteId.getTicket(),
412                                  UserGroupInformation.class, conf);
413       int bufLen = buf.getLength();
414       out.writeInt(bufLen);
415       out.write(buf.getData(), 0, bufLen);
416     }
417 
418     /* wait till someone signals us to start reading RPC response or
419      * it is idle too long, it is marked as to be closed,
420      * or the client is marked as not running.
421      *
422      * Return true if it is time to read a response; false otherwise.
423      */
424     @SuppressWarnings({"ThrowableInstanceNeverThrown"})
425     private synchronized boolean waitForWork() {
426       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
427         long timeout = maxIdleTime-
428               (System.currentTimeMillis()-lastActivity.get());
429         if (timeout>0) {
430           try {
431             wait(timeout);
432           } catch (InterruptedException ignored) {}
433         }
434       }
435 
436       if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
437         return true;
438       } else if (shouldCloseConnection.get()) {
439         return false;
440       } else if (calls.isEmpty()) { // idle connection closed or stopped
441         markClosed(null);
442         return false;
443       } else { // get stopped but there are still pending requests
444         markClosed((IOException)new IOException().initCause(
445             new InterruptedException()));
446         return false;
447       }
448     }
449 
450     public InetSocketAddress getRemoteAddress() {
451       return remoteId.getAddress();
452     }
453 
454     /* Send a ping to the server if the time elapsed
455      * since last I/O activity is equal to or greater than the ping interval
456      */
457     protected synchronized void sendPing() throws IOException {
458       long curTime = System.currentTimeMillis();
459       if ( curTime - lastActivity.get() >= pingInterval) {
460         lastActivity.set(curTime);
461         //noinspection SynchronizeOnNonFinalField
462         synchronized (this.out) {
463           out.writeInt(PING_CALL_ID);
464           out.flush();
465         }
466       }
467     }
468 
469     @Override
470     public void run() {
471       if (LOG.isDebugEnabled())
472         LOG.debug(getName() + ": starting, having connections "
473             + connections.size());
474 
475       try {
476         while (waitForWork()) {//wait here for work - read or close connection
477           receiveResponse();
478         }
479       } catch (Throwable t) {
480         LOG.warn("Unexpected exception receiving call responses", t);
481         markClosed(new IOException("Unexpected exception receiving call responses", t));
482       }
483 
484       close();
485 
486       if (LOG.isDebugEnabled())
487         LOG.debug(getName() + ": stopped, remaining connections "
488             + connections.size());
489     }
490 
491     /* Initiates a call by sending the parameter to the remote server.
492      * Note: this is not called from the Connection thread, but by other
493      * threads.
494      */
495     protected void sendParam(Call call) {
496       if (shouldCloseConnection.get()) {
497         return;
498       }
499 
500       DataOutputBuffer d=null;
501       try {
502         //noinspection SynchronizeOnNonFinalField
503         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
504           if (LOG.isDebugEnabled())
505             LOG.debug(getName() + " sending #" + call.id);
506 
507           //for serializing the
508           //data to be written
509           d = new DataOutputBuffer();
510           d.writeInt(0xdeadbeef); // placeholder for data length
511           d.writeInt(call.id);
512           call.param.write(d);
513           byte[] data = d.getData();
514           int dataLength = d.getLength();
515           // fill in the placeholder
516           Bytes.putInt(data, 0, dataLength - 4);
517           out.write(data, 0, dataLength);
518           out.flush();
519         }
520       } catch(IOException e) {
521         markClosed(e);
522       } finally {
523         //the buffer is just an in-memory buffer, but it is still polite to
524         // close early
525         IOUtils.closeStream(d);
526       }
527     }
528 
529     /* Receive a response.
530      * Because only one receiver, so no synchronization on in.
531      */
532     private void receiveResponse() {
533       if (shouldCloseConnection.get()) {
534         return;
535       }
536       touch();
537 
538       try {
539         int id = in.readInt();                    // try to read an id
540 
541         if (LOG.isDebugEnabled())
542           LOG.debug(getName() + " got value #" + id);
543 
544         Call call = calls.get(id);
545 
546         boolean isError = in.readBoolean();     // read if error
547         if (isError) {
548           //noinspection ThrowableInstanceNeverThrown
549           call.setException(new RemoteException( WritableUtils.readString(in),
550               WritableUtils.readString(in)));
551           calls.remove(id);
552         } else {
553           Writable value = ReflectionUtils.newInstance(valueClass, conf);
554           value.readFields(in);                 // read value
555           call.setValue(value);
556           calls.remove(id);
557         }
558       } catch (IOException e) {
559         markClosed(e);
560       }
561     }
562 
563     private synchronized void markClosed(IOException e) {
564       if (shouldCloseConnection.compareAndSet(false, true)) {
565         closeException = e;
566         notifyAll();
567       }
568     }
569 
570     /** Close the connection. */
571     private synchronized void close() {
572       if (!shouldCloseConnection.get()) {
573         LOG.error("The connection is not in the closed state");
574         return;
575       }
576 
577       // release the resources
578       // first thing to do;take the connection out of the connection list
579       synchronized (connections) {
580         if (connections.get(remoteId) == this) {
581           connections.remove(remoteId);
582         }
583       }
584 
585       // close the streams and therefore the socket
586       IOUtils.closeStream(out);
587       IOUtils.closeStream(in);
588 
589       // clean up all calls
590       if (closeException == null) {
591         if (!calls.isEmpty()) {
592           LOG.warn(
593               "A connection is closed for no cause and calls are not empty");
594 
595           // clean up calls anyway
596           closeException = new IOException("Unexpected closed connection");
597           cleanupCalls();
598         }
599       } else {
600         // log the info
601         if (LOG.isDebugEnabled()) {
602           LOG.debug("closing ipc connection to " + remoteId.address + ": " +
603               closeException.getMessage(),closeException);
604         }
605 
606         // cleanup calls
607         cleanupCalls();
608       }
609       if (LOG.isDebugEnabled())
610         LOG.debug(getName() + ": closed");
611     }
612 
613     /* Cleanup all calls and mark them as done */
614     private void cleanupCalls() {
615       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
616       while (itor.hasNext()) {
617         Call c = itor.next().getValue();
618         c.setException(closeException); // local exception
619         itor.remove();
620       }
621     }
622   }
623 
624   /** Call implementation used for parallel calls. */
625   private class ParallelCall extends Call {
626     private final ParallelResults results;
627     protected final int index;
628 
629     public ParallelCall(Writable param, ParallelResults results, int index) {
630       super(param);
631       this.results = results;
632       this.index = index;
633     }
634 
635     /** Deliver result to result collector. */
636     @Override
637     protected void callComplete() {
638       results.callComplete(this);
639     }
640   }
641 
642   /** Result collector for parallel calls. */
643   private static class ParallelResults {
644     protected final Writable[] values;
645     protected int size;
646     protected int count;
647 
648     public ParallelResults(int size) {
649       this.values = new Writable[size];
650       this.size = size;
651     }
652 
653     /*
654      * Collect a result.
655      */
656     synchronized void callComplete(ParallelCall call) {
657       // FindBugs IS2_INCONSISTENT_SYNC
658       values[call.index] = call.value;            // store the value
659       count++;                                    // count it
660       if (count == size)                          // if all values are in
661         notify();                                 // then notify waiting caller
662     }
663   }
664 
665   /**
666    * Construct an IPC client whose values are of the given {@link Writable}
667    * class.
668    * @param valueClass value class
669    * @param conf configuration
670    * @param factory socket factory
671    */
672   public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
673       SocketFactory factory) {
674     this.valueClass = valueClass;
675     this.maxIdleTime =
676       conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
677     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
678     this.failureSleep = conf.getInt("hbase.client.pause", 1000);
679     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false);
680     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
681     this.pingInterval = getPingInterval(conf);
682     if (LOG.isDebugEnabled()) {
683       LOG.debug("The ping interval is" + this.pingInterval + "ms.");
684     }
685     this.conf = conf;
686     this.socketFactory = factory;
687   }
688 
689   /**
690    * Construct an IPC client with the default SocketFactory
691    * @param valueClass value class
692    * @param conf configuration
693    */
694   public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
695     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
696   }
697 
698   /** Return the socket factory of this client
699    *
700    * @return this client's socket factory
701    */
702   SocketFactory getSocketFactory() {
703     return socketFactory;
704   }
705 
706   /** Stop all threads related to this client.  No further calls may be made
707    * using this client. */
708   public void stop() {
709     if (LOG.isDebugEnabled()) {
710       LOG.debug("Stopping client");
711     }
712 
713     if (!running.compareAndSet(true, false)) {
714       return;
715     }
716 
717     // wake up all connections
718     synchronized (connections) {
719       for (Connection conn : connections.values()) {
720         conn.interrupt();
721       }
722     }
723 
724     // wait until all connections are closed
725     while (!connections.isEmpty()) {
726       try {
727         Thread.sleep(100);
728       } catch (InterruptedException ignored) {
729       }
730     }
731   }
732 
733   /** Make a call, passing <code>param</code>, to the IPC server running at
734    * <code>address</code>, returning the value.  Throws exceptions if there are
735    * network problems or if the remote code threw an exception.
736    * @param param writable parameter
737    * @param address network address
738    * @return Writable
739    * @throws IOException e
740    */
741   public Writable call(Writable param, InetSocketAddress address)
742   throws IOException {
743       return call(param, address, null, 0);
744   }
745 
746   public Writable call(Writable param, InetSocketAddress addr,
747                        UserGroupInformation ticket, int rpcTimeout)
748                        throws IOException {
749     Call call = new Call(param);
750     Connection connection = getConnection(addr, ticket, rpcTimeout, call);
751     connection.sendParam(call);                 // send the parameter
752     boolean interrupted = false;
753     //noinspection SynchronizationOnLocalVariableOrMethodParameter
754     synchronized (call) {
755       while (!call.done) {
756         try {
757           call.wait();                           // wait for the result
758         } catch (InterruptedException ignored) {
759           // save the fact that we were interrupted
760           interrupted = true;
761         }
762       }
763 
764       if (interrupted) {
765         // set the interrupt flag now that we are done waiting
766         Thread.currentThread().interrupt();
767       }
768 
769       if (call.error != null) {
770         if (call.error instanceof RemoteException) {
771           call.error.fillInStackTrace();
772           throw call.error;
773         }
774         // local exception
775         throw wrapException(addr, call.error);
776       }
777       return call.value;
778     }
779   }
780 
781   /**
782    * Take an IOException and the address we were trying to connect to
783    * and return an IOException with the input exception as the cause.
784    * The new exception provides the stack trace of the place where
785    * the exception is thrown and some extra diagnostics information.
786    * If the exception is ConnectException or SocketTimeoutException,
787    * return a new one of the same type; Otherwise return an IOException.
788    *
789    * @param addr target address
790    * @param exception the relevant exception
791    * @return an exception to throw
792    */
793   @SuppressWarnings({"ThrowableInstanceNeverThrown"})
794   private IOException wrapException(InetSocketAddress addr,
795                                          IOException exception) {
796     if (exception instanceof ConnectException) {
797       //connection refused; include the host:port in the error
798       return (ConnectException)new ConnectException(
799            "Call to " + addr + " failed on connection exception: " + exception)
800                     .initCause(exception);
801     } else if (exception instanceof SocketTimeoutException) {
802       return (SocketTimeoutException)new SocketTimeoutException(
803            "Call to " + addr + " failed on socket timeout exception: "
804                       + exception).initCause(exception);
805     } else {
806       return (IOException)new IOException(
807            "Call to " + addr + " failed on local exception: " + exception)
808                                  .initCause(exception);
809 
810     }
811   }
812 
813   /** Makes a set of calls in parallel.  Each parameter is sent to the
814    * corresponding address.  When all values are available, or have timed out
815    * or errored, the collected results are returned in an array.  The array
816    * contains nulls for calls that timed out or errored.
817    * @param params writable parameters
818    * @param addresses socket addresses
819    * @return  Writable[]
820    * @throws IOException e
821    */
822   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
823     throws IOException {
824     if (addresses.length == 0) return new Writable[0];
825 
826     ParallelResults results = new ParallelResults(params.length);
827     // TODO this synchronization block doesnt make any sense, we should possibly fix it
828     //noinspection SynchronizationOnLocalVariableOrMethodParameter
829     synchronized (results) {
830       for (int i = 0; i < params.length; i++) {
831         ParallelCall call = new ParallelCall(params[i], results, i);
832         try {
833           Connection connection = getConnection(addresses[i], null, 0, call);
834           connection.sendParam(call);             // send each parameter
835         } catch (IOException e) {
836           // log errors
837           LOG.info("Calling "+addresses[i]+" caught: " +
838                    e.getMessage(),e);
839           results.size--;                         //  wait for one fewer result
840         }
841       }
842       while (results.count != results.size) {
843         try {
844           results.wait();                    // wait for all results
845         } catch (InterruptedException ignored) {}
846       }
847 
848       return results.values;
849     }
850   }
851 
852   /* Get a connection from the pool, or create a new one and add it to the
853    * pool.  Connections to a given host/port are reused. */
854   private Connection getConnection(InetSocketAddress addr,
855                                    UserGroupInformation ticket,
856                                    int rpcTimeout,
857                                    Call call)
858                                    throws IOException {
859     if (!running.get()) {
860       // the client is stopped
861       throw new IOException("The client is stopped");
862     }
863     Connection connection;
864     /* we could avoid this allocation for each RPC by having a
865      * connectionsId object and with set() method. We need to manage the
866      * refs for keys in HashMap properly. For now its ok.
867      */
868     ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
869     do {
870       synchronized (connections) {
871         connection = connections.get(remoteId);
872         if (connection == null) {
873           connection = new Connection(remoteId);
874           connections.put(remoteId, connection);
875         }
876       }
877     } while (!connection.addCall(call));
878 
879     //we don't invoke the method below inside "synchronized (connections)"
880     //block above. The reason for that is if the server happens to be slow,
881     //it will take longer to establish a connection and that will slow the
882     //entire system down.
883     connection.setupIOstreams();
884     return connection;
885   }
886 
887   /**
888    * This class holds the address and the user ticket. The client connections
889    * to servers are uniquely identified by <remoteAddress, ticket>
890    */
891   private static class ConnectionId {
892     final InetSocketAddress address;
893     final UserGroupInformation ticket;
894     final private int rpcTimeout;
895 
896     ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
897         int rpcTimeout) {
898       this.address = address;
899       this.ticket = ticket;
900       this.rpcTimeout = rpcTimeout;
901     }
902 
903     InetSocketAddress getAddress() {
904       return address;
905     }
906     UserGroupInformation getTicket() {
907       return ticket;
908     }
909 
910     @Override
911     public boolean equals(Object obj) {
912      if (obj instanceof ConnectionId) {
913        ConnectionId id = (ConnectionId) obj;
914        return address.equals(id.address) && ticket == id.ticket && 
915        rpcTimeout == id.rpcTimeout;
916        //Note : ticket is a ref comparision.
917      }
918      return false;
919     }
920 
921     @Override
922     public int hashCode() {
923       return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
924     }
925   }
926 }