View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.ipc;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.DataInputStream;
26  import java.io.DataOutputStream;
27  import java.io.IOException;
28  import java.net.BindException;
29  import java.net.InetAddress;
30  import java.net.InetSocketAddress;
31  import java.net.ServerSocket;
32  import java.net.Socket;
33  import java.net.SocketException;
34  import java.net.UnknownHostException;
35  import java.nio.ByteBuffer;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.ReadableByteChannel;
39  import java.nio.channels.SelectionKey;
40  import java.nio.channels.Selector;
41  import java.nio.channels.ServerSocketChannel;
42  import java.nio.channels.SocketChannel;
43  import java.nio.channels.WritableByteChannel;
44  import java.util.ArrayList;
45  import java.util.Collections;
46  import java.util.Iterator;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Random;
50  import java.util.concurrent.BlockingQueue;
51  import java.util.concurrent.ExecutorService;
52  import java.util.concurrent.Executors;
53  import java.util.concurrent.LinkedBlockingQueue;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.hbase.io.WritableWithSize;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.io.ObjectWritable;
61  import org.apache.hadoop.io.Writable;
62  import org.apache.hadoop.io.WritableUtils;
63  import org.apache.hadoop.security.UserGroupInformation;
64  import org.apache.hadoop.util.ReflectionUtils;
65  import org.apache.hadoop.util.StringUtils;
66  
67  import com.google.common.base.Function;
68  import com.google.common.util.concurrent.ThreadFactoryBuilder;
69  
70  /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
71   * parameter, and return a {@link Writable} as their value.  A service runs on
72   * a port and is defined by a parameter class and a value class.
73   *
74   *
75   * <p>Copied local so can fix HBASE-900.
76   *
77   * @see HBaseClient
78   */
79  public abstract class HBaseServer {
80  
81    /**
82     * The first four bytes of Hadoop RPC connections
83     */
84    public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
85  
86    // 1 : Introduce ping and server does not throw away RPCs
87    // 3 : RPC was refactored in 0.19
88    public static final byte CURRENT_VERSION = 3;
89  
90    /**
91     * How many calls/handler are allowed in the queue.
92     */
93    private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10;
94  
95    private static final String WARN_RESPONSE_SIZE =
96        "hbase.ipc.warn.response.size";
97  
98    /** Default value for above param */
99    private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
100 
101   private final int warnResponseSize;
102 
103   public static final Log LOG =
104     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
105 
106   protected static final ThreadLocal<HBaseServer> SERVER =
107     new ThreadLocal<HBaseServer>();
108   private volatile boolean started = false;
109 
110   /** Returns the server instance called under or null.  May be called under
111    * {@link #call(Writable, long)} implementations, and under {@link Writable}
112    * methods of paramters and return values.  Permits applications to access
113    * the server context.
114    * @return HBaseServer
115    */
116   public static HBaseServer get() {
117     return SERVER.get();
118   }
119 
120   /** This is set to Call object before Handler invokes an RPC and reset
121    * after the call returns.
122    */
123   protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
124 
125   /** Returns the remote side ip address when invoked inside an RPC
126    *  Returns null incase of an error.
127    *  @return InetAddress
128    */
129   public static InetAddress getRemoteIp() {
130     Call call = CurCall.get();
131     if (call != null) {
132       return call.connection.socket.getInetAddress();
133     }
134     return null;
135   }
136   /** Returns remote address as a string when invoked inside an RPC.
137    *  Returns null in case of an error.
138    *  @return String
139    */
140   public static String getRemoteAddress() {
141     InetAddress addr = getRemoteIp();
142     return (addr == null) ? null : addr.getHostAddress();
143   }
144 
145   protected String bindAddress;
146   protected int port;                             // port we listen on
147   private int handlerCount;                       // number of handler threads
148   private int priorityHandlerCount;
149   private int readThreads;                        // number of read threads
150   protected Class<? extends Writable> paramClass; // class of call parameters
151   protected int maxIdleTime;                      // the maximum idle time after
152                                                   // which a client may be
153                                                   // disconnected
154   protected int thresholdIdleConnections;         // the number of idle
155                                                   // connections after which we
156                                                   // will start cleaning up idle
157                                                   // connections
158   int maxConnectionsToNuke;                       // the max number of
159                                                   // connections to nuke
160                                                   // during a cleanup
161 
162   protected HBaseRpcMetrics  rpcMetrics;
163 
164   protected Configuration conf;
165 
166   private int maxQueueSize;
167   protected int socketSendBufferSize;
168   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
169   protected final boolean tcpKeepAlive; // if T then use keepalives
170 
171   volatile protected boolean running = true;         // true while server runs
172   protected BlockingQueue<Call> callQueue; // queued calls
173   protected BlockingQueue<Call> priorityCallQueue;
174 
175   private int highPriorityLevel;  // what level a high priority call is at
176 
177   protected final List<Connection> connectionList =
178     Collections.synchronizedList(new LinkedList<Connection>());
179   //maintain a list
180   //of client connections
181   private Listener listener = null;
182   protected Responder responder = null;
183   protected int numConnections = 0;
184   private Handler[] handlers = null;
185   private Handler[] priorityHandlers = null;
186   protected HBaseRPCErrorHandler errorHandler = null;
187 
188   /**
189    * A convenience method to bind to a given address and report
190    * better exceptions if the address is not a valid host.
191    * @param socket the socket to bind
192    * @param address the address to bind to
193    * @param backlog the number of connections allowed in the queue
194    * @throws BindException if the address can't be bound
195    * @throws UnknownHostException if the address isn't a valid host name
196    * @throws IOException other random errors from bind
197    */
198   public static void bind(ServerSocket socket, InetSocketAddress address,
199                           int backlog) throws IOException {
200     try {
201       socket.bind(address, backlog);
202     } catch (BindException e) {
203       BindException bindException =
204         new BindException("Problem binding to " + address + " : " +
205             e.getMessage());
206       bindException.initCause(e);
207       throw bindException;
208     } catch (SocketException e) {
209       // If they try to bind to a different host's address, give a better
210       // error message.
211       if ("Unresolved address".equals(e.getMessage())) {
212         throw new UnknownHostException("Invalid hostname for server: " +
213                                        address.getHostName());
214       }
215       throw e;
216     }
217   }
218 
219   /** A call queued for handling. */
220   private static class Call {
221     protected int id;                             // the client's call id
222     protected Writable param;                     // the parameter passed
223     protected Connection connection;              // connection to client
224     protected long timestamp;      // the time received when response is null
225                                    // the time served when response is not null
226     protected ByteBuffer response;                // the response for this call
227 
228     public Call(int id, Writable param, Connection connection) {
229       this.id = id;
230       this.param = param;
231       this.connection = connection;
232       this.timestamp = System.currentTimeMillis();
233       this.response = null;
234     }
235 
236     @Override
237     public String toString() {
238       return param.toString() + " from " + connection.toString();
239     }
240 
241     public void setResponse(ByteBuffer response) {
242       this.response = response;
243     }
244   }
245 
246   /** Listens on the socket. Creates jobs for the handler threads*/
247   private class Listener extends Thread {
248 
249     private ServerSocketChannel acceptChannel = null; //the accept channel
250     private Selector selector = null; //the selector that we use for the server
251     private Reader[] readers = null;
252     private int currentReader = 0;
253     private InetSocketAddress address; //the address we bind at
254     private Random rand = new Random();
255     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
256                                          //-tion (for idle connections) ran
257     private long cleanupInterval = 10000; //the minimum interval between
258                                           //two cleanup runs
259     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
260 
261     private ExecutorService readPool;
262 
263     public Listener() throws IOException {
264       address = new InetSocketAddress(bindAddress, port);
265       // Create a new server socket and set to non blocking mode
266       acceptChannel = ServerSocketChannel.open();
267       acceptChannel.configureBlocking(false);
268 
269       // Bind the server socket to the local host and port
270       bind(acceptChannel.socket(), address, backlogLength);
271       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
272       // create a selector;
273       selector= Selector.open();
274 
275       readers = new Reader[readThreads];
276       readPool = Executors.newFixedThreadPool(readThreads,
277         new ThreadFactoryBuilder().setNameFormat(
278           "IPC Reader %d on port " + port).build());
279       for (int i = 0; i < readThreads; ++i) {
280         Selector readSelector = Selector.open();
281         Reader reader = new Reader(readSelector);
282         readers[i] = reader;
283         readPool.execute(reader);
284       }
285 
286       // Register accepts on the server socket with the selector.
287       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
288       this.setName("IPC Server listener on " + port);
289       this.setDaemon(true);
290     }
291 
292 
293     private class Reader implements Runnable {
294       private volatile boolean adding = false;
295       private Selector readSelector = null;
296 
297       Reader(Selector readSelector) {
298         this.readSelector = readSelector;
299       }
300       public void run() {
301         synchronized(this) {
302           while (running) {
303             SelectionKey key = null;
304             try {
305               readSelector.select();
306               while (adding) {
307                 this.wait(1000);
308               }
309 
310               Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
311               while (iter.hasNext()) {
312                 key = iter.next();
313                 iter.remove();
314                 if (key.isValid()) {
315                   if (key.isReadable()) {
316                     doRead(key);
317                   }
318                 }
319                 key = null;
320               }
321             } catch (InterruptedException e) {
322               if (running) {                     // unexpected -- log it
323                 LOG.info(getName() + "caught: " +
324                     StringUtils.stringifyException(e));
325               }
326             } catch (IOException ex) {
327                LOG.error("Error in Reader", ex);
328             }
329           }
330         }
331       }
332 
333       /**
334        * This gets reader into the state that waits for the new channel
335        * to be registered with readSelector. If it was waiting in select()
336        * the thread will be woken up, otherwise whenever select() is called
337        * it will return even if there is nothing to read and wait
338        * in while(adding) for finishAdd call
339        */
340       public void startAdd() {
341         adding = true;
342         readSelector.wakeup();
343       }
344 
345       public synchronized SelectionKey registerChannel(SocketChannel channel)
346         throws IOException {
347         return channel.register(readSelector, SelectionKey.OP_READ);
348       }
349 
350       public synchronized void finishAdd() {
351         adding = false;
352         this.notify();
353       }
354     }
355 
356     /** cleanup connections from connectionList. Choose a random range
357      * to scan and also have a limit on the number of the connections
358      * that will be cleanedup per run. The criteria for cleanup is the time
359      * for which the connection was idle. If 'force' is true then all
360      * connections will be looked at for the cleanup.
361      * @param force all connections will be looked at for cleanup
362      */
363     private void cleanupConnections(boolean force) {
364       if (force || numConnections > thresholdIdleConnections) {
365         long currentTime = System.currentTimeMillis();
366         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
367           return;
368         }
369         int start = 0;
370         int end = numConnections - 1;
371         if (!force) {
372           start = rand.nextInt() % numConnections;
373           end = rand.nextInt() % numConnections;
374           int temp;
375           if (end < start) {
376             temp = start;
377             start = end;
378             end = temp;
379           }
380         }
381         int i = start;
382         int numNuked = 0;
383         while (i <= end) {
384           Connection c;
385           synchronized (connectionList) {
386             try {
387               c = connectionList.get(i);
388             } catch (Exception e) {return;}
389           }
390           if (c.timedOut(currentTime)) {
391             if (LOG.isDebugEnabled())
392               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
393             closeConnection(c);
394             numNuked++;
395             end--;
396             //noinspection UnusedAssignment
397             c = null;
398             if (!force && numNuked == maxConnectionsToNuke) break;
399           }
400           else i++;
401         }
402         lastCleanupRunTime = System.currentTimeMillis();
403       }
404     }
405 
406     @Override
407     public void run() {
408       LOG.info(getName() + ": starting");
409       SERVER.set(HBaseServer.this);
410 
411       while (running) {
412         SelectionKey key = null;
413         try {
414           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
415           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
416           while (iter.hasNext()) {
417             key = iter.next();
418             iter.remove();
419             try {
420               if (key.isValid()) {
421                 if (key.isAcceptable())
422                   doAccept(key);
423               }
424             } catch (IOException ignored) {
425             }
426             key = null;
427           }
428         } catch (OutOfMemoryError e) {
429           if (errorHandler != null) {
430             if (errorHandler.checkOOME(e)) {
431               LOG.info(getName() + ": exiting on OOME");
432               closeCurrentConnection(key);
433               cleanupConnections(true);
434               return;
435             }
436           } else {
437             // we can run out of memory if we have too many threads
438             // log the event and sleep for a minute and give
439             // some thread(s) a chance to finish
440             LOG.warn("Out of Memory in server select", e);
441             closeCurrentConnection(key);
442             cleanupConnections(true);
443             try { Thread.sleep(60000); } catch (Exception ignored) {}
444       }
445         } catch (Exception e) {
446           closeCurrentConnection(key);
447         }
448         cleanupConnections(false);
449       }
450       LOG.info("Stopping " + this.getName());
451 
452       synchronized (this) {
453         try {
454           acceptChannel.close();
455           selector.close();
456         } catch (IOException ignored) { }
457 
458         selector= null;
459         acceptChannel= null;
460 
461         // clean up all connections
462         while (!connectionList.isEmpty()) {
463           closeConnection(connectionList.remove(0));
464         }
465       }
466     }
467 
468     private void closeCurrentConnection(SelectionKey key) {
469       if (key != null) {
470         Connection c = (Connection)key.attachment();
471         if (c != null) {
472           if (LOG.isDebugEnabled())
473             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
474           closeConnection(c);
475         }
476       }
477     }
478 
479     InetSocketAddress getAddress() {
480       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
481     }
482 
483     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
484       Connection c;
485       ServerSocketChannel server = (ServerSocketChannel) key.channel();
486 
487       SocketChannel channel;
488       while ((channel = server.accept()) != null) {
489         channel.configureBlocking(false);
490         channel.socket().setTcpNoDelay(tcpNoDelay);
491         channel.socket().setKeepAlive(tcpKeepAlive);
492 
493         Reader reader = getReader();
494         try {
495           reader.startAdd();
496           SelectionKey readKey = reader.registerChannel(channel);
497           c = new Connection(channel, System.currentTimeMillis());
498           readKey.attach(c);
499           synchronized (connectionList) {
500             connectionList.add(numConnections, c);
501             numConnections++;
502           }
503           if (LOG.isDebugEnabled())
504             LOG.debug("Server connection from " + c.toString() +
505                 "; # active connections: " + numConnections +
506                 "; # queued calls: " + callQueue.size());
507         } finally {
508           reader.finishAdd();
509         }
510       }
511     }
512 
513     void doRead(SelectionKey key) throws InterruptedException {
514       int count = 0;
515       Connection c = (Connection)key.attachment();
516       if (c == null) {
517         return;
518       }
519       c.setLastContact(System.currentTimeMillis());
520 
521       try {
522         count = c.readAndProcess();
523       } catch (InterruptedException ieo) {
524         throw ieo;
525       } catch (Exception e) {
526         LOG.warn(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
527         count = -1; //so that the (count < 0) block is executed
528       }
529       if (count < 0) {
530         if (LOG.isDebugEnabled())
531           LOG.debug(getName() + ": disconnecting client " +
532                     c.getHostAddress() + ". Number of active connections: "+
533                     numConnections);
534         closeConnection(c);
535         // c = null;
536       }
537       else {
538         c.setLastContact(System.currentTimeMillis());
539       }
540     }
541 
542     synchronized void doStop() {
543       if (selector != null) {
544         selector.wakeup();
545         Thread.yield();
546       }
547       if (acceptChannel != null) {
548         try {
549           acceptChannel.socket().close();
550         } catch (IOException e) {
551           LOG.info(getName() + ":Exception in closing listener socket. " + e);
552         }
553       }
554       readPool.shutdownNow();
555     }
556 
557     // The method that will return the next reader to work with
558     // Simplistic implementation of round robin for now
559     Reader getReader() {
560       currentReader = (currentReader + 1) % readers.length;
561       return readers[currentReader];
562     }
563   }
564 
565   // Sends responses of RPC back to clients.
566   private class Responder extends Thread {
567     private Selector writeSelector;
568     private int pending;         // connections waiting to register
569 
570     final static int PURGE_INTERVAL = 900000; // 15mins
571 
572     Responder() throws IOException {
573       this.setName("IPC Server Responder");
574       this.setDaemon(true);
575       writeSelector = Selector.open(); // create a selector
576       pending = 0;
577     }
578 
579     @Override
580     public void run() {
581       LOG.info(getName() + ": starting");
582       SERVER.set(HBaseServer.this);
583       long lastPurgeTime = 0;   // last check for old calls.
584 
585       while (running) {
586         try {
587           waitPending();     // If a channel is being registered, wait.
588           writeSelector.select(PURGE_INTERVAL);
589           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
590           while (iter.hasNext()) {
591             SelectionKey key = iter.next();
592             iter.remove();
593             try {
594               if (key.isValid() && key.isWritable()) {
595                   doAsyncWrite(key);
596               }
597             } catch (IOException e) {
598               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
599             }
600           }
601           long now = System.currentTimeMillis();
602           if (now < lastPurgeTime + PURGE_INTERVAL) {
603             continue;
604           }
605           lastPurgeTime = now;
606           //
607           // If there were some calls that have not been sent out for a
608           // long time, discard them.
609           //
610           LOG.debug("Checking for old call responses.");
611           ArrayList<Call> calls;
612 
613           // get the list of channels from list of keys.
614           synchronized (writeSelector.keys()) {
615             calls = new ArrayList<Call>(writeSelector.keys().size());
616             iter = writeSelector.keys().iterator();
617             while (iter.hasNext()) {
618               SelectionKey key = iter.next();
619               Call call = (Call)key.attachment();
620               if (call != null && key.channel() == call.connection.channel) {
621                 calls.add(call);
622               }
623             }
624           }
625 
626           for(Call call : calls) {
627             doPurge(call, now);
628           }
629         } catch (OutOfMemoryError e) {
630           if (errorHandler != null) {
631             if (errorHandler.checkOOME(e)) {
632               LOG.info(getName() + ": exiting on OOME");
633               return;
634             }
635           } else {
636             //
637             // we can run out of memory if we have too many threads
638             // log the event and sleep for a minute and give
639             // some thread(s) a chance to finish
640             //
641             LOG.warn("Out of Memory in server select", e);
642             try { Thread.sleep(60000); } catch (Exception ignored) {}
643       }
644         } catch (Exception e) {
645           LOG.warn("Exception in Responder " +
646                    StringUtils.stringifyException(e));
647         }
648       }
649       LOG.info("Stopping " + this.getName());
650     }
651 
652     private void doAsyncWrite(SelectionKey key) throws IOException {
653       Call call = (Call)key.attachment();
654       if (call == null) {
655         return;
656       }
657       if (key.channel() != call.connection.channel) {
658         throw new IOException("doAsyncWrite: bad channel");
659       }
660 
661       synchronized(call.connection.responseQueue) {
662         if (processResponse(call.connection.responseQueue, false)) {
663           try {
664             key.interestOps(0);
665           } catch (CancelledKeyException e) {
666             /* The Listener/reader might have closed the socket.
667              * We don't explicitly cancel the key, so not sure if this will
668              * ever fire.
669              * This warning could be removed.
670              */
671             LOG.warn("Exception while changing ops : " + e);
672           }
673         }
674       }
675     }
676 
677     //
678     // Remove calls that have been pending in the responseQueue
679     // for a long time.
680     //
681     private void doPurge(Call call, long now) {
682       synchronized (call.connection.responseQueue) {
683         Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
684         while (iter.hasNext()) {
685           Call nextCall = iter.next();
686           if (now > nextCall.timestamp + PURGE_INTERVAL) {
687             closeConnection(nextCall.connection);
688             break;
689           }
690         }
691       }
692     }
693 
694     // Processes one response. Returns true if there are no more pending
695     // data for this channel.
696     //
697     @SuppressWarnings({"ConstantConditions"})
698     private boolean processResponse(final LinkedList<Call> responseQueue,
699                                     boolean inHandler) throws IOException {
700       boolean error = true;
701       boolean done = false;       // there is more data for this channel.
702       int numElements;
703       Call call = null;
704       try {
705         //noinspection SynchronizationOnLocalVariableOrMethodParameter
706         synchronized (responseQueue) {
707           //
708           // If there are no items for this channel, then we are done
709           //
710           numElements = responseQueue.size();
711           if (numElements == 0) {
712             error = false;
713             return true;              // no more data for this channel.
714           }
715           //
716           // Extract the first call
717           //
718           call = responseQueue.removeFirst();
719           SocketChannel channel = call.connection.channel;
720           if (LOG.isDebugEnabled()) {
721             LOG.debug(getName() + ": responding to #" + call.id + " from " +
722                       call.connection);
723           }
724           //
725           // Send as much data as we can in the non-blocking fashion
726           //
727           int numBytes = channelWrite(channel, call.response);
728           if (numBytes < 0) {
729             return true;
730           }
731           if (!call.response.hasRemaining()) {
732             call.connection.decRpcCount();
733             //noinspection RedundantIfStatement
734             if (numElements == 1) {    // last call fully processes.
735               done = true;             // no more data for this channel.
736             } else {
737               done = false;            // more calls pending to be sent.
738             }
739             if (LOG.isDebugEnabled()) {
740               LOG.debug(getName() + ": responding to #" + call.id + " from " +
741                         call.connection + " Wrote " + numBytes + " bytes.");
742             }
743           } else {
744             //
745             // If we were unable to write the entire response out, then
746             // insert in Selector queue.
747             //
748             call.connection.responseQueue.addFirst(call);
749 
750             if (inHandler) {
751               // set the serve time when the response has to be sent later
752               call.timestamp = System.currentTimeMillis();
753 
754               incPending();
755               try {
756                 // Wakeup the thread blocked on select, only then can the call
757                 // to channel.register() complete.
758                 writeSelector.wakeup();
759                 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
760               } catch (ClosedChannelException e) {
761                 //Its ok. channel might be closed else where.
762                 done = true;
763               } finally {
764                 decPending();
765               }
766             }
767             if (LOG.isDebugEnabled()) {
768               LOG.debug(getName() + ": responding to #" + call.id + " from " +
769                         call.connection + " Wrote partial " + numBytes +
770                         " bytes.");
771             }
772           }
773           error = false;              // everything went off well
774         }
775       } finally {
776         if (error && call != null) {
777           LOG.warn(getName()+", call " + call + ": output error");
778           done = true;               // error. no more data for this channel.
779           closeConnection(call.connection);
780         }
781       }
782       return done;
783     }
784 
785     //
786     // Enqueue a response from the application.
787     //
788     void doRespond(Call call) throws IOException {
789       synchronized (call.connection.responseQueue) {
790         call.connection.responseQueue.addLast(call);
791         if (call.connection.responseQueue.size() == 1) {
792           processResponse(call.connection.responseQueue, true);
793         }
794       }
795     }
796 
797     private synchronized void incPending() {   // call waiting to be enqueued.
798       pending++;
799     }
800 
801     private synchronized void decPending() { // call done enqueueing.
802       pending--;
803       notify();
804     }
805 
806     private synchronized void waitPending() throws InterruptedException {
807       while (pending > 0) {
808         wait();
809       }
810     }
811   }
812 
813   /** Reads calls from a connection and queues them for handling. */
814   private class Connection {
815     private boolean versionRead = false; //if initial signature and
816                                          //version are read
817     private boolean headerRead = false;  //if the connection header that
818                                          //follows version is read.
819     protected SocketChannel channel;
820     private ByteBuffer data;
821     private ByteBuffer dataLengthBuffer;
822     protected final LinkedList<Call> responseQueue;
823     private volatile int rpcCount = 0; // number of outstanding rpcs
824     private long lastContact;
825     private int dataLength;
826     protected Socket socket;
827     // Cache the remote host & port info so that even if the socket is
828     // disconnected, we can say where it used to connect to.
829     private String hostAddress;
830     private int remotePort;
831     protected UserGroupInformation ticket = null;
832 
833     public Connection(SocketChannel channel, long lastContact) {
834       this.channel = channel;
835       this.lastContact = lastContact;
836       this.data = null;
837       this.dataLengthBuffer = ByteBuffer.allocate(4);
838       this.socket = channel.socket();
839       InetAddress addr = socket.getInetAddress();
840       if (addr == null) {
841         this.hostAddress = "*Unknown*";
842       } else {
843         this.hostAddress = addr.getHostAddress();
844       }
845       this.remotePort = socket.getPort();
846       this.responseQueue = new LinkedList<Call>();
847       if (socketSendBufferSize != 0) {
848         try {
849           socket.setSendBufferSize(socketSendBufferSize);
850         } catch (IOException e) {
851           LOG.warn("Connection: unable to set socket send buffer size to " +
852                    socketSendBufferSize);
853         }
854       }
855     }
856 
857     @Override
858     public String toString() {
859       return getHostAddress() + ":" + remotePort;
860     }
861 
862     public String getHostAddress() {
863       return hostAddress;
864     }
865 
866     public void setLastContact(long lastContact) {
867       this.lastContact = lastContact;
868     }
869 
870     public long getLastContact() {
871       return lastContact;
872     }
873 
874     /* Return true if the connection has no outstanding rpc */
875     private boolean isIdle() {
876       return rpcCount == 0;
877     }
878 
879     /* Decrement the outstanding RPC count */
880     protected void decRpcCount() {
881       rpcCount--;
882     }
883 
884     /* Increment the outstanding RPC count */
885     private void incRpcCount() {
886       rpcCount++;
887     }
888 
889     protected boolean timedOut(long currentTime) {
890       return isIdle() && currentTime - lastContact > maxIdleTime;
891     }
892 
893     public int readAndProcess() throws IOException, InterruptedException {
894       while (true) {
895         /* Read at most one RPC. If the header is not read completely yet
896          * then iterate until we read first RPC or until there is no data left.
897          */
898         int count;
899         if (dataLengthBuffer.remaining() > 0) {
900           count = channelRead(channel, dataLengthBuffer);
901           if (count < 0 || dataLengthBuffer.remaining() > 0)
902             return count;
903         }
904 
905         if (!versionRead) {
906           //Every connection is expected to send the header.
907           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
908           count = channelRead(channel, versionBuffer);
909           if (count <= 0) {
910             return count;
911           }
912           int version = versionBuffer.get(0);
913 
914           dataLengthBuffer.flip();
915           if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
916             //Warning is ok since this is not supposed to happen.
917             LOG.warn("Incorrect header or version mismatch from " +
918                      hostAddress + ":" + remotePort +
919                      " got version " + version +
920                      " expected version " + CURRENT_VERSION);
921             return -1;
922           }
923           dataLengthBuffer.clear();
924           versionRead = true;
925           continue;
926         }
927 
928         if (data == null) {
929           dataLengthBuffer.flip();
930           dataLength = dataLengthBuffer.getInt();
931 
932           if (dataLength == HBaseClient.PING_CALL_ID) {
933             dataLengthBuffer.clear();
934             return 0;  //ping message
935           }
936           data = ByteBuffer.allocate(dataLength);
937           incRpcCount();  // Increment the rpc count
938         }
939 
940         count = channelRead(channel, data);
941 
942         if (data.remaining() == 0) {
943           dataLengthBuffer.clear();
944           data.flip();
945           if (headerRead) {
946             processData();
947             data = null;
948             return count;
949           }
950           processHeader();
951           headerRead = true;
952           data = null;
953           continue;
954         }
955         return count;
956       }
957     }
958 
959     /// Reads the header following version
960     private void processHeader() throws IOException {
961       /* In the current version, it is just a ticket.
962        * Later we could introduce a "ConnectionHeader" class.
963        */
964       DataInputStream in =
965         new DataInputStream(new ByteArrayInputStream(data.array()));
966       ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
967     }
968 
969     private void processData() throws  IOException, InterruptedException {
970       DataInputStream dis =
971         new DataInputStream(new ByteArrayInputStream(data.array()));
972       int id = dis.readInt();                    // try to read an id
973 
974       if (LOG.isDebugEnabled())
975         LOG.debug(" got #" + id);
976 
977       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
978       param.readFields(dis);
979 
980       Call call = new Call(id, param, this);
981 
982       if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
983         priorityCallQueue.put(call);
984       } else {
985         callQueue.put(call);              // queue the call; maybe blocked here
986       }
987     }
988 
989     protected synchronized void close() {
990       data = null;
991       dataLengthBuffer = null;
992       if (!channel.isOpen())
993         return;
994       try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE
995       if (channel.isOpen()) {
996         try {channel.close();} catch(Exception ignored) {}
997       }
998       try {socket.close();} catch(Exception ignored) {}
999     }
1000   }
1001 
1002   /** Handles queued calls . */
1003   private class Handler extends Thread {
1004     private final BlockingQueue<Call> myCallQueue;
1005     static final int BUFFER_INITIAL_SIZE = 1024;
1006 
1007     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
1008       this.myCallQueue = cq;
1009       this.setDaemon(true);
1010 
1011       String threadName = "IPC Server handler " + instanceNumber + " on " + port;
1012       if (cq == priorityCallQueue) {
1013         // this is just an amazing hack, but it works.
1014         threadName = "PRI " + threadName;
1015       }
1016       this.setName(threadName);
1017     }
1018 
1019     @Override
1020     public void run() {
1021       LOG.info(getName() + ": starting");
1022       SERVER.set(HBaseServer.this);
1023       while (running) {
1024         try {
1025           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
1026 
1027           if (LOG.isDebugEnabled())
1028             LOG.debug(getName() + ": has #" + call.id + " from " +
1029                       call.connection);
1030 
1031           String errorClass = null;
1032           String error = null;
1033           Writable value = null;
1034 
1035           CurCall.set(call);
1036           try {
1037             if (!started)
1038               throw new ServerNotRunningException("Server is not running yet");
1039             value = call(call.param, call.timestamp);             // make the call
1040           } catch (Throwable e) {
1041             LOG.debug(getName()+", call "+call+": error: " + e, e);
1042             errorClass = e.getClass().getName();
1043             error = StringUtils.stringifyException(e);
1044           }
1045           CurCall.set(null);
1046 
1047           int size = BUFFER_INITIAL_SIZE;
1048           if (value instanceof WritableWithSize) {
1049             // get the size hint.
1050             WritableWithSize ohint = (WritableWithSize)value;
1051             long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
1052             if (hint > 0) {
1053               if ((hint) > Integer.MAX_VALUE) {
1054                 // oops, new problem.
1055                 IOException ioe =
1056                     new IOException("Result buffer size too large: " + hint);
1057                 errorClass = ioe.getClass().getName();
1058                 error = StringUtils.stringifyException(ioe);
1059               } else {
1060                 size = (int)hint;
1061               }
1062             }
1063           }
1064           ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
1065           DataOutputStream out = new DataOutputStream(buf);
1066           out.writeInt(call.id);                // write call id
1067           out.writeBoolean(error != null);      // write error flag
1068 
1069           if (error == null) {
1070             value.write(out);
1071           } else {
1072             WritableUtils.writeString(out, errorClass);
1073             WritableUtils.writeString(out, error);
1074           }
1075 
1076           if (buf.size() > warnResponseSize) {
1077             LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
1078                      + StringUtils.humanReadableInt(buf.size()));
1079           }
1080 
1081 
1082           call.setResponse(buf.getByteBuffer());
1083           responder.doRespond(call);
1084         } catch (InterruptedException e) {
1085           if (running) {                          // unexpected -- log it
1086             LOG.info(getName() + " caught: " +
1087                      StringUtils.stringifyException(e));
1088           }
1089         } catch (OutOfMemoryError e) {
1090           if (errorHandler != null) {
1091             if (errorHandler.checkOOME(e)) {
1092               LOG.info(getName() + ": exiting on OOME");
1093               return;
1094             }
1095           } else {
1096             // rethrow if no handler
1097             throw e;
1098           }
1099         } catch (Exception e) {
1100           LOG.warn(getName() + " caught: " +
1101                    StringUtils.stringifyException(e));
1102         }
1103       }
1104       LOG.info(getName() + ": exiting");
1105     }
1106 
1107   }
1108 
1109   /**
1110    * Gets the QOS level for this call.  If it is higher than the highPriorityLevel and there
1111    * are priorityHandlers available it will be processed in it's own thread set.
1112    *
1113    * @param param
1114    * @return priority, higher is better
1115    */
1116   private Function<Writable,Integer> qosFunction = null;
1117   public void setQosFunction(Function<Writable, Integer> newFunc) {
1118     qosFunction = newFunc;
1119   }
1120 
1121   protected int getQosLevel(Writable param) {
1122     if (qosFunction == null) {
1123       return 0;
1124     }
1125 
1126     Integer res = qosFunction.apply(param);
1127     if (res == null) {
1128       return 0;
1129     }
1130     return res;
1131   }
1132 
1133   /* Constructs a server listening on the named port and address.  Parameters passed must
1134    * be of the named class.  The <code>handlerCount</handlerCount> determines
1135    * the number of handler threads that will be used to process calls.
1136    *
1137    */
1138   protected HBaseServer(String bindAddress, int port,
1139                         Class<? extends Writable> paramClass, int handlerCount,
1140                         int priorityHandlerCount, Configuration conf, String serverName,
1141                         int highPriorityLevel)
1142     throws IOException {
1143     this.bindAddress = bindAddress;
1144     this.conf = conf;
1145     this.port = port;
1146     this.paramClass = paramClass;
1147     this.handlerCount = handlerCount;
1148     this.priorityHandlerCount = priorityHandlerCount;
1149     this.socketSendBufferSize = 0;
1150     this.maxQueueSize =
1151       this.conf.getInt("ipc.server.max.queue.size",
1152         handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER);
1153      this.readThreads = conf.getInt(
1154         "ipc.server.read.threadpool.size",
1155         10);
1156     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
1157     if (priorityHandlerCount > 0) {
1158       this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize); // TODO hack on size
1159     } else {
1160       this.priorityCallQueue = null;
1161     }
1162     this.highPriorityLevel = highPriorityLevel;
1163     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1164     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1165     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1166 
1167     // Start the listener here and let it bind to the port
1168     listener = new Listener();
1169     this.port = listener.getAddress().getPort();
1170     this.rpcMetrics = new HBaseRpcMetrics(serverName,
1171                           Integer.toString(this.port));
1172     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
1173     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1174 
1175     this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
1176                                         DEFAULT_WARN_RESPONSE_SIZE);
1177 
1178 
1179     // Create the responder here
1180     responder = new Responder();
1181   }
1182 
1183   protected void closeConnection(Connection connection) {
1184     synchronized (connectionList) {
1185       if (connectionList.remove(connection))
1186         numConnections--;
1187     }
1188     connection.close();
1189   }
1190 
1191   /** Sets the socket buffer size used for responding to RPCs.
1192    * @param size send size
1193    */
1194   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1195 
1196   /** Starts the service.  Must be called before any calls will be handled. */
1197   public void start() {
1198     startThreads();
1199     openServer();
1200   }
1201 
1202   /**
1203    * Open a previously started server.
1204    */
1205   public void openServer() {
1206     started = true;
1207   }
1208 
1209   /**
1210    * Starts the service threads but does not allow requests to be responded yet.
1211    * Client will get {@link ServerNotRunningException} instead.
1212    */
1213   public synchronized void startThreads() {
1214     responder.start();
1215     listener.start();
1216     handlers = new Handler[handlerCount];
1217 
1218     for (int i = 0; i < handlerCount; i++) {
1219       handlers[i] = new Handler(callQueue, i);
1220       handlers[i].start();
1221     }
1222 
1223     if (priorityHandlerCount > 0) {
1224       priorityHandlers = new Handler[priorityHandlerCount];
1225       for (int i = 0 ; i < priorityHandlerCount; i++) {
1226         priorityHandlers[i] = new Handler(priorityCallQueue, i);
1227         priorityHandlers[i].start();
1228       }
1229     }
1230   }
1231 
1232   /** Stops the service.  No new calls will be handled after this is called. */
1233   public synchronized void stop() {
1234     LOG.info("Stopping server on " + port);
1235     running = false;
1236     if (handlers != null) {
1237       for (Handler handler : handlers) {
1238         if (handler != null) {
1239           handler.interrupt();
1240         }
1241       }
1242     }
1243     if (priorityHandlers != null) {
1244       for (Handler handler : priorityHandlers) {
1245         if (handler != null) {
1246           handler.interrupt();
1247         }
1248       }
1249     }
1250     listener.interrupt();
1251     listener.doStop();
1252     responder.interrupt();
1253     notifyAll();
1254     if (this.rpcMetrics != null) {
1255       this.rpcMetrics.shutdown();
1256     }
1257   }
1258 
1259   /** Wait for the server to be stopped.
1260    * Does not wait for all subthreads to finish.
1261    *  See {@link #stop()}.
1262    * @throws InterruptedException e
1263    */
1264   public synchronized void join() throws InterruptedException {
1265     while (running) {
1266       wait();
1267     }
1268   }
1269 
1270   /**
1271    * Return the socket (ip+port) on which the RPC server is listening to.
1272    * @return the socket (ip+port) on which the RPC server is listening to.
1273    */
1274   public synchronized InetSocketAddress getListenerAddress() {
1275     return listener.getAddress();
1276   }
1277 
1278   /** Called for each call.
1279    * @param param writable parameter
1280    * @param receiveTime time
1281    * @return Writable
1282    * @throws IOException e
1283    */
1284   public abstract Writable call(Writable param, long receiveTime)
1285                                                 throws IOException;
1286 
1287   /**
1288    * The number of open RPC conections
1289    * @return the number of open rpc connections
1290    */
1291   public int getNumOpenConnections() {
1292     return numConnections;
1293   }
1294 
1295   /**
1296    * The number of rpc calls in the queue.
1297    * @return The number of rpc calls in the queue.
1298    */
1299   public int getCallQueueLen() {
1300     return callQueue.size();
1301   }
1302 
1303   /**
1304    * Set the handler for calling out of RPC for error conditions.
1305    * @param handler the handler implementation
1306    */
1307   public void setErrorHandler(HBaseRPCErrorHandler handler) {
1308     this.errorHandler = handler;
1309   }
1310 
1311   /**
1312    * Returns the metrics instance for reporting RPC call statistics
1313    */
1314   public HBaseRpcMetrics getRpcMetrics() {
1315     return rpcMetrics;
1316   }
1317 
1318   /**
1319    * When the read or write buffer size is larger than this limit, i/o will be
1320    * done in chunks of this size. Most RPC requests and responses would be
1321    * be smaller.
1322    */
1323   private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
1324 
1325   /**
1326    * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
1327    * If the amount of data is large, it writes to channel in smaller chunks.
1328    * This is to avoid jdk from creating many direct buffers as the size of
1329    * buffer increases. This also minimizes extra copies in NIO layer
1330    * as a result of multiple write operations required to write a large
1331    * buffer.
1332    *
1333    * @param channel writable byte channel to write to
1334    * @param buffer buffer to write
1335    * @return number of bytes written
1336    * @throws java.io.IOException e
1337    * @see WritableByteChannel#write(ByteBuffer)
1338    */
1339   protected static int channelWrite(WritableByteChannel channel,
1340                                     ByteBuffer buffer) throws IOException {
1341     return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1342            channel.write(buffer) : channelIO(null, channel, buffer);
1343   }
1344 
1345   /**
1346    * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
1347    * If the amount of data is large, it writes to channel in smaller chunks.
1348    * This is to avoid jdk from creating many direct buffers as the size of
1349    * ByteBuffer increases. There should not be any performance degredation.
1350    *
1351    * @param channel writable byte channel to write on
1352    * @param buffer buffer to write
1353    * @return number of bytes written
1354    * @throws java.io.IOException e
1355    * @see ReadableByteChannel#read(ByteBuffer)
1356    */
1357   protected static int channelRead(ReadableByteChannel channel,
1358                                    ByteBuffer buffer) throws IOException {
1359     return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1360            channel.read(buffer) : channelIO(channel, null, buffer);
1361   }
1362 
1363   /**
1364    * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
1365    * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
1366    * one of readCh or writeCh should be non-null.
1367    *
1368    * @param readCh read channel
1369    * @param writeCh write channel
1370    * @param buf buffer to read or write into/out of
1371    * @return bytes written
1372    * @throws java.io.IOException e
1373    * @see #channelRead(ReadableByteChannel, ByteBuffer)
1374    * @see #channelWrite(WritableByteChannel, ByteBuffer)
1375    */
1376   private static int channelIO(ReadableByteChannel readCh,
1377                                WritableByteChannel writeCh,
1378                                ByteBuffer buf) throws IOException {
1379 
1380     int originalLimit = buf.limit();
1381     int initialRemaining = buf.remaining();
1382     int ret = 0;
1383 
1384     while (buf.remaining() > 0) {
1385       try {
1386         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
1387         buf.limit(buf.position() + ioSize);
1388 
1389         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
1390 
1391         if (ret < ioSize) {
1392           break;
1393         }
1394 
1395       } finally {
1396         buf.limit(originalLimit);
1397       }
1398     }
1399 
1400     int nBytes = initialRemaining - buf.remaining();
1401     return (nBytes > 0) ? nBytes : ret;
1402   }
1403 }