1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.ipc;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataInputStream;
26 import java.io.DataOutputStream;
27 import java.io.IOException;
28 import java.net.BindException;
29 import java.net.InetAddress;
30 import java.net.InetSocketAddress;
31 import java.net.ServerSocket;
32 import java.net.Socket;
33 import java.net.SocketException;
34 import java.net.UnknownHostException;
35 import java.nio.ByteBuffer;
36 import java.nio.channels.CancelledKeyException;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.ReadableByteChannel;
39 import java.nio.channels.SelectionKey;
40 import java.nio.channels.Selector;
41 import java.nio.channels.ServerSocketChannel;
42 import java.nio.channels.SocketChannel;
43 import java.nio.channels.WritableByteChannel;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.Iterator;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Random;
50 import java.util.concurrent.BlockingQueue;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.LinkedBlockingQueue;
54
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57 import org.apache.hadoop.conf.Configuration;
58 import org.apache.hadoop.hbase.io.WritableWithSize;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.io.ObjectWritable;
61 import org.apache.hadoop.io.Writable;
62 import org.apache.hadoop.io.WritableUtils;
63 import org.apache.hadoop.security.UserGroupInformation;
64 import org.apache.hadoop.util.ReflectionUtils;
65 import org.apache.hadoop.util.StringUtils;
66
67 import com.google.common.base.Function;
68 import com.google.common.util.concurrent.ThreadFactoryBuilder;
69
70
71
72
73
74
75
76
77
78
79 public abstract class HBaseServer {
80
81
82
83
84 public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
85
86
87
88 public static final byte CURRENT_VERSION = 3;
89
90
91
92
93 private static final int DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER = 10;
94
95 private static final String WARN_RESPONSE_SIZE =
96 "hbase.ipc.warn.response.size";
97
98
99 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
100
101 private final int warnResponseSize;
102
103 public static final Log LOG =
104 LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
105
106 protected static final ThreadLocal<HBaseServer> SERVER =
107 new ThreadLocal<HBaseServer>();
108 private volatile boolean started = false;
109
110
111
112
113
114
115
116 public static HBaseServer get() {
117 return SERVER.get();
118 }
119
120
121
122
123 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
124
125
126
127
128
129 public static InetAddress getRemoteIp() {
130 Call call = CurCall.get();
131 if (call != null) {
132 return call.connection.socket.getInetAddress();
133 }
134 return null;
135 }
136
137
138
139
140 public static String getRemoteAddress() {
141 InetAddress addr = getRemoteIp();
142 return (addr == null) ? null : addr.getHostAddress();
143 }
144
145 protected String bindAddress;
146 protected int port;
147 private int handlerCount;
148 private int priorityHandlerCount;
149 private int readThreads;
150 protected Class<? extends Writable> paramClass;
151 protected int maxIdleTime;
152
153
154 protected int thresholdIdleConnections;
155
156
157
158 int maxConnectionsToNuke;
159
160
161
162 protected HBaseRpcMetrics rpcMetrics;
163
164 protected Configuration conf;
165
166 private int maxQueueSize;
167 protected int socketSendBufferSize;
168 protected final boolean tcpNoDelay;
169 protected final boolean tcpKeepAlive;
170
171 volatile protected boolean running = true;
172 protected BlockingQueue<Call> callQueue;
173 protected BlockingQueue<Call> priorityCallQueue;
174
175 private int highPriorityLevel;
176
177 protected final List<Connection> connectionList =
178 Collections.synchronizedList(new LinkedList<Connection>());
179
180
181 private Listener listener = null;
182 protected Responder responder = null;
183 protected int numConnections = 0;
184 private Handler[] handlers = null;
185 private Handler[] priorityHandlers = null;
186 protected HBaseRPCErrorHandler errorHandler = null;
187
188
189
190
191
192
193
194
195
196
197
198 public static void bind(ServerSocket socket, InetSocketAddress address,
199 int backlog) throws IOException {
200 try {
201 socket.bind(address, backlog);
202 } catch (BindException e) {
203 BindException bindException =
204 new BindException("Problem binding to " + address + " : " +
205 e.getMessage());
206 bindException.initCause(e);
207 throw bindException;
208 } catch (SocketException e) {
209
210
211 if ("Unresolved address".equals(e.getMessage())) {
212 throw new UnknownHostException("Invalid hostname for server: " +
213 address.getHostName());
214 }
215 throw e;
216 }
217 }
218
219
220 private static class Call {
221 protected int id;
222 protected Writable param;
223 protected Connection connection;
224 protected long timestamp;
225
226 protected ByteBuffer response;
227
228 public Call(int id, Writable param, Connection connection) {
229 this.id = id;
230 this.param = param;
231 this.connection = connection;
232 this.timestamp = System.currentTimeMillis();
233 this.response = null;
234 }
235
236 @Override
237 public String toString() {
238 return param.toString() + " from " + connection.toString();
239 }
240
241 public void setResponse(ByteBuffer response) {
242 this.response = response;
243 }
244 }
245
246
247 private class Listener extends Thread {
248
249 private ServerSocketChannel acceptChannel = null;
250 private Selector selector = null;
251 private Reader[] readers = null;
252 private int currentReader = 0;
253 private InetSocketAddress address;
254 private Random rand = new Random();
255 private long lastCleanupRunTime = 0;
256
257 private long cleanupInterval = 10000;
258
259 private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
260
261 private ExecutorService readPool;
262
263 public Listener() throws IOException {
264 address = new InetSocketAddress(bindAddress, port);
265
266 acceptChannel = ServerSocketChannel.open();
267 acceptChannel.configureBlocking(false);
268
269
270 bind(acceptChannel.socket(), address, backlogLength);
271 port = acceptChannel.socket().getLocalPort();
272
273 selector= Selector.open();
274
275 readers = new Reader[readThreads];
276 readPool = Executors.newFixedThreadPool(readThreads,
277 new ThreadFactoryBuilder().setNameFormat(
278 "IPC Reader %d on port " + port).build());
279 for (int i = 0; i < readThreads; ++i) {
280 Selector readSelector = Selector.open();
281 Reader reader = new Reader(readSelector);
282 readers[i] = reader;
283 readPool.execute(reader);
284 }
285
286
287 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
288 this.setName("IPC Server listener on " + port);
289 this.setDaemon(true);
290 }
291
292
293 private class Reader implements Runnable {
294 private volatile boolean adding = false;
295 private Selector readSelector = null;
296
297 Reader(Selector readSelector) {
298 this.readSelector = readSelector;
299 }
300 public void run() {
301 synchronized(this) {
302 while (running) {
303 SelectionKey key = null;
304 try {
305 readSelector.select();
306 while (adding) {
307 this.wait(1000);
308 }
309
310 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
311 while (iter.hasNext()) {
312 key = iter.next();
313 iter.remove();
314 if (key.isValid()) {
315 if (key.isReadable()) {
316 doRead(key);
317 }
318 }
319 key = null;
320 }
321 } catch (InterruptedException e) {
322 if (running) {
323 LOG.info(getName() + "caught: " +
324 StringUtils.stringifyException(e));
325 }
326 } catch (IOException ex) {
327 LOG.error("Error in Reader", ex);
328 }
329 }
330 }
331 }
332
333
334
335
336
337
338
339
340 public void startAdd() {
341 adding = true;
342 readSelector.wakeup();
343 }
344
345 public synchronized SelectionKey registerChannel(SocketChannel channel)
346 throws IOException {
347 return channel.register(readSelector, SelectionKey.OP_READ);
348 }
349
350 public synchronized void finishAdd() {
351 adding = false;
352 this.notify();
353 }
354 }
355
356
357
358
359
360
361
362
363 private void cleanupConnections(boolean force) {
364 if (force || numConnections > thresholdIdleConnections) {
365 long currentTime = System.currentTimeMillis();
366 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
367 return;
368 }
369 int start = 0;
370 int end = numConnections - 1;
371 if (!force) {
372 start = rand.nextInt() % numConnections;
373 end = rand.nextInt() % numConnections;
374 int temp;
375 if (end < start) {
376 temp = start;
377 start = end;
378 end = temp;
379 }
380 }
381 int i = start;
382 int numNuked = 0;
383 while (i <= end) {
384 Connection c;
385 synchronized (connectionList) {
386 try {
387 c = connectionList.get(i);
388 } catch (Exception e) {return;}
389 }
390 if (c.timedOut(currentTime)) {
391 if (LOG.isDebugEnabled())
392 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
393 closeConnection(c);
394 numNuked++;
395 end--;
396
397 c = null;
398 if (!force && numNuked == maxConnectionsToNuke) break;
399 }
400 else i++;
401 }
402 lastCleanupRunTime = System.currentTimeMillis();
403 }
404 }
405
406 @Override
407 public void run() {
408 LOG.info(getName() + ": starting");
409 SERVER.set(HBaseServer.this);
410
411 while (running) {
412 SelectionKey key = null;
413 try {
414 selector.select();
415 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
416 while (iter.hasNext()) {
417 key = iter.next();
418 iter.remove();
419 try {
420 if (key.isValid()) {
421 if (key.isAcceptable())
422 doAccept(key);
423 }
424 } catch (IOException ignored) {
425 }
426 key = null;
427 }
428 } catch (OutOfMemoryError e) {
429 if (errorHandler != null) {
430 if (errorHandler.checkOOME(e)) {
431 LOG.info(getName() + ": exiting on OOME");
432 closeCurrentConnection(key);
433 cleanupConnections(true);
434 return;
435 }
436 } else {
437
438
439
440 LOG.warn("Out of Memory in server select", e);
441 closeCurrentConnection(key);
442 cleanupConnections(true);
443 try { Thread.sleep(60000); } catch (Exception ignored) {}
444 }
445 } catch (Exception e) {
446 closeCurrentConnection(key);
447 }
448 cleanupConnections(false);
449 }
450 LOG.info("Stopping " + this.getName());
451
452 synchronized (this) {
453 try {
454 acceptChannel.close();
455 selector.close();
456 } catch (IOException ignored) { }
457
458 selector= null;
459 acceptChannel= null;
460
461
462 while (!connectionList.isEmpty()) {
463 closeConnection(connectionList.remove(0));
464 }
465 }
466 }
467
468 private void closeCurrentConnection(SelectionKey key) {
469 if (key != null) {
470 Connection c = (Connection)key.attachment();
471 if (c != null) {
472 if (LOG.isDebugEnabled())
473 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
474 closeConnection(c);
475 }
476 }
477 }
478
479 InetSocketAddress getAddress() {
480 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
481 }
482
483 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
484 Connection c;
485 ServerSocketChannel server = (ServerSocketChannel) key.channel();
486
487 SocketChannel channel;
488 while ((channel = server.accept()) != null) {
489 channel.configureBlocking(false);
490 channel.socket().setTcpNoDelay(tcpNoDelay);
491 channel.socket().setKeepAlive(tcpKeepAlive);
492
493 Reader reader = getReader();
494 try {
495 reader.startAdd();
496 SelectionKey readKey = reader.registerChannel(channel);
497 c = new Connection(channel, System.currentTimeMillis());
498 readKey.attach(c);
499 synchronized (connectionList) {
500 connectionList.add(numConnections, c);
501 numConnections++;
502 }
503 if (LOG.isDebugEnabled())
504 LOG.debug("Server connection from " + c.toString() +
505 "; # active connections: " + numConnections +
506 "; # queued calls: " + callQueue.size());
507 } finally {
508 reader.finishAdd();
509 }
510 }
511 }
512
513 void doRead(SelectionKey key) throws InterruptedException {
514 int count = 0;
515 Connection c = (Connection)key.attachment();
516 if (c == null) {
517 return;
518 }
519 c.setLastContact(System.currentTimeMillis());
520
521 try {
522 count = c.readAndProcess();
523 } catch (InterruptedException ieo) {
524 throw ieo;
525 } catch (Exception e) {
526 LOG.warn(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
527 count = -1;
528 }
529 if (count < 0) {
530 if (LOG.isDebugEnabled())
531 LOG.debug(getName() + ": disconnecting client " +
532 c.getHostAddress() + ". Number of active connections: "+
533 numConnections);
534 closeConnection(c);
535
536 }
537 else {
538 c.setLastContact(System.currentTimeMillis());
539 }
540 }
541
542 synchronized void doStop() {
543 if (selector != null) {
544 selector.wakeup();
545 Thread.yield();
546 }
547 if (acceptChannel != null) {
548 try {
549 acceptChannel.socket().close();
550 } catch (IOException e) {
551 LOG.info(getName() + ":Exception in closing listener socket. " + e);
552 }
553 }
554 readPool.shutdownNow();
555 }
556
557
558
559 Reader getReader() {
560 currentReader = (currentReader + 1) % readers.length;
561 return readers[currentReader];
562 }
563 }
564
565
566 private class Responder extends Thread {
567 private Selector writeSelector;
568 private int pending;
569
570 final static int PURGE_INTERVAL = 900000;
571
572 Responder() throws IOException {
573 this.setName("IPC Server Responder");
574 this.setDaemon(true);
575 writeSelector = Selector.open();
576 pending = 0;
577 }
578
579 @Override
580 public void run() {
581 LOG.info(getName() + ": starting");
582 SERVER.set(HBaseServer.this);
583 long lastPurgeTime = 0;
584
585 while (running) {
586 try {
587 waitPending();
588 writeSelector.select(PURGE_INTERVAL);
589 Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
590 while (iter.hasNext()) {
591 SelectionKey key = iter.next();
592 iter.remove();
593 try {
594 if (key.isValid() && key.isWritable()) {
595 doAsyncWrite(key);
596 }
597 } catch (IOException e) {
598 LOG.info(getName() + ": doAsyncWrite threw exception " + e);
599 }
600 }
601 long now = System.currentTimeMillis();
602 if (now < lastPurgeTime + PURGE_INTERVAL) {
603 continue;
604 }
605 lastPurgeTime = now;
606
607
608
609
610 LOG.debug("Checking for old call responses.");
611 ArrayList<Call> calls;
612
613
614 synchronized (writeSelector.keys()) {
615 calls = new ArrayList<Call>(writeSelector.keys().size());
616 iter = writeSelector.keys().iterator();
617 while (iter.hasNext()) {
618 SelectionKey key = iter.next();
619 Call call = (Call)key.attachment();
620 if (call != null && key.channel() == call.connection.channel) {
621 calls.add(call);
622 }
623 }
624 }
625
626 for(Call call : calls) {
627 doPurge(call, now);
628 }
629 } catch (OutOfMemoryError e) {
630 if (errorHandler != null) {
631 if (errorHandler.checkOOME(e)) {
632 LOG.info(getName() + ": exiting on OOME");
633 return;
634 }
635 } else {
636
637
638
639
640
641 LOG.warn("Out of Memory in server select", e);
642 try { Thread.sleep(60000); } catch (Exception ignored) {}
643 }
644 } catch (Exception e) {
645 LOG.warn("Exception in Responder " +
646 StringUtils.stringifyException(e));
647 }
648 }
649 LOG.info("Stopping " + this.getName());
650 }
651
652 private void doAsyncWrite(SelectionKey key) throws IOException {
653 Call call = (Call)key.attachment();
654 if (call == null) {
655 return;
656 }
657 if (key.channel() != call.connection.channel) {
658 throw new IOException("doAsyncWrite: bad channel");
659 }
660
661 synchronized(call.connection.responseQueue) {
662 if (processResponse(call.connection.responseQueue, false)) {
663 try {
664 key.interestOps(0);
665 } catch (CancelledKeyException e) {
666
667
668
669
670
671 LOG.warn("Exception while changing ops : " + e);
672 }
673 }
674 }
675 }
676
677
678
679
680
681 private void doPurge(Call call, long now) {
682 synchronized (call.connection.responseQueue) {
683 Iterator<Call> iter = call.connection.responseQueue.listIterator(0);
684 while (iter.hasNext()) {
685 Call nextCall = iter.next();
686 if (now > nextCall.timestamp + PURGE_INTERVAL) {
687 closeConnection(nextCall.connection);
688 break;
689 }
690 }
691 }
692 }
693
694
695
696
697 @SuppressWarnings({"ConstantConditions"})
698 private boolean processResponse(final LinkedList<Call> responseQueue,
699 boolean inHandler) throws IOException {
700 boolean error = true;
701 boolean done = false;
702 int numElements;
703 Call call = null;
704 try {
705
706 synchronized (responseQueue) {
707
708
709
710 numElements = responseQueue.size();
711 if (numElements == 0) {
712 error = false;
713 return true;
714 }
715
716
717
718 call = responseQueue.removeFirst();
719 SocketChannel channel = call.connection.channel;
720 if (LOG.isDebugEnabled()) {
721 LOG.debug(getName() + ": responding to #" + call.id + " from " +
722 call.connection);
723 }
724
725
726
727 int numBytes = channelWrite(channel, call.response);
728 if (numBytes < 0) {
729 return true;
730 }
731 if (!call.response.hasRemaining()) {
732 call.connection.decRpcCount();
733
734 if (numElements == 1) {
735 done = true;
736 } else {
737 done = false;
738 }
739 if (LOG.isDebugEnabled()) {
740 LOG.debug(getName() + ": responding to #" + call.id + " from " +
741 call.connection + " Wrote " + numBytes + " bytes.");
742 }
743 } else {
744
745
746
747
748 call.connection.responseQueue.addFirst(call);
749
750 if (inHandler) {
751
752 call.timestamp = System.currentTimeMillis();
753
754 incPending();
755 try {
756
757
758 writeSelector.wakeup();
759 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
760 } catch (ClosedChannelException e) {
761
762 done = true;
763 } finally {
764 decPending();
765 }
766 }
767 if (LOG.isDebugEnabled()) {
768 LOG.debug(getName() + ": responding to #" + call.id + " from " +
769 call.connection + " Wrote partial " + numBytes +
770 " bytes.");
771 }
772 }
773 error = false;
774 }
775 } finally {
776 if (error && call != null) {
777 LOG.warn(getName()+", call " + call + ": output error");
778 done = true;
779 closeConnection(call.connection);
780 }
781 }
782 return done;
783 }
784
785
786
787
788 void doRespond(Call call) throws IOException {
789 synchronized (call.connection.responseQueue) {
790 call.connection.responseQueue.addLast(call);
791 if (call.connection.responseQueue.size() == 1) {
792 processResponse(call.connection.responseQueue, true);
793 }
794 }
795 }
796
797 private synchronized void incPending() {
798 pending++;
799 }
800
801 private synchronized void decPending() {
802 pending--;
803 notify();
804 }
805
806 private synchronized void waitPending() throws InterruptedException {
807 while (pending > 0) {
808 wait();
809 }
810 }
811 }
812
813
814 private class Connection {
815 private boolean versionRead = false;
816
817 private boolean headerRead = false;
818
819 protected SocketChannel channel;
820 private ByteBuffer data;
821 private ByteBuffer dataLengthBuffer;
822 protected final LinkedList<Call> responseQueue;
823 private volatile int rpcCount = 0;
824 private long lastContact;
825 private int dataLength;
826 protected Socket socket;
827
828
829 private String hostAddress;
830 private int remotePort;
831 protected UserGroupInformation ticket = null;
832
833 public Connection(SocketChannel channel, long lastContact) {
834 this.channel = channel;
835 this.lastContact = lastContact;
836 this.data = null;
837 this.dataLengthBuffer = ByteBuffer.allocate(4);
838 this.socket = channel.socket();
839 InetAddress addr = socket.getInetAddress();
840 if (addr == null) {
841 this.hostAddress = "*Unknown*";
842 } else {
843 this.hostAddress = addr.getHostAddress();
844 }
845 this.remotePort = socket.getPort();
846 this.responseQueue = new LinkedList<Call>();
847 if (socketSendBufferSize != 0) {
848 try {
849 socket.setSendBufferSize(socketSendBufferSize);
850 } catch (IOException e) {
851 LOG.warn("Connection: unable to set socket send buffer size to " +
852 socketSendBufferSize);
853 }
854 }
855 }
856
857 @Override
858 public String toString() {
859 return getHostAddress() + ":" + remotePort;
860 }
861
862 public String getHostAddress() {
863 return hostAddress;
864 }
865
866 public void setLastContact(long lastContact) {
867 this.lastContact = lastContact;
868 }
869
870 public long getLastContact() {
871 return lastContact;
872 }
873
874
875 private boolean isIdle() {
876 return rpcCount == 0;
877 }
878
879
880 protected void decRpcCount() {
881 rpcCount--;
882 }
883
884
885 private void incRpcCount() {
886 rpcCount++;
887 }
888
889 protected boolean timedOut(long currentTime) {
890 return isIdle() && currentTime - lastContact > maxIdleTime;
891 }
892
893 public int readAndProcess() throws IOException, InterruptedException {
894 while (true) {
895
896
897
898 int count;
899 if (dataLengthBuffer.remaining() > 0) {
900 count = channelRead(channel, dataLengthBuffer);
901 if (count < 0 || dataLengthBuffer.remaining() > 0)
902 return count;
903 }
904
905 if (!versionRead) {
906
907 ByteBuffer versionBuffer = ByteBuffer.allocate(1);
908 count = channelRead(channel, versionBuffer);
909 if (count <= 0) {
910 return count;
911 }
912 int version = versionBuffer.get(0);
913
914 dataLengthBuffer.flip();
915 if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
916
917 LOG.warn("Incorrect header or version mismatch from " +
918 hostAddress + ":" + remotePort +
919 " got version " + version +
920 " expected version " + CURRENT_VERSION);
921 return -1;
922 }
923 dataLengthBuffer.clear();
924 versionRead = true;
925 continue;
926 }
927
928 if (data == null) {
929 dataLengthBuffer.flip();
930 dataLength = dataLengthBuffer.getInt();
931
932 if (dataLength == HBaseClient.PING_CALL_ID) {
933 dataLengthBuffer.clear();
934 return 0;
935 }
936 data = ByteBuffer.allocate(dataLength);
937 incRpcCount();
938 }
939
940 count = channelRead(channel, data);
941
942 if (data.remaining() == 0) {
943 dataLengthBuffer.clear();
944 data.flip();
945 if (headerRead) {
946 processData();
947 data = null;
948 return count;
949 }
950 processHeader();
951 headerRead = true;
952 data = null;
953 continue;
954 }
955 return count;
956 }
957 }
958
959
960 private void processHeader() throws IOException {
961
962
963
964 DataInputStream in =
965 new DataInputStream(new ByteArrayInputStream(data.array()));
966 ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
967 }
968
969 private void processData() throws IOException, InterruptedException {
970 DataInputStream dis =
971 new DataInputStream(new ByteArrayInputStream(data.array()));
972 int id = dis.readInt();
973
974 if (LOG.isDebugEnabled())
975 LOG.debug(" got #" + id);
976
977 Writable param = ReflectionUtils.newInstance(paramClass, conf);
978 param.readFields(dis);
979
980 Call call = new Call(id, param, this);
981
982 if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
983 priorityCallQueue.put(call);
984 } else {
985 callQueue.put(call);
986 }
987 }
988
989 protected synchronized void close() {
990 data = null;
991 dataLengthBuffer = null;
992 if (!channel.isOpen())
993 return;
994 try {socket.shutdownOutput();} catch(Exception ignored) {}
995 if (channel.isOpen()) {
996 try {channel.close();} catch(Exception ignored) {}
997 }
998 try {socket.close();} catch(Exception ignored) {}
999 }
1000 }
1001
1002
1003 private class Handler extends Thread {
1004 private final BlockingQueue<Call> myCallQueue;
1005 static final int BUFFER_INITIAL_SIZE = 1024;
1006
1007 public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
1008 this.myCallQueue = cq;
1009 this.setDaemon(true);
1010
1011 String threadName = "IPC Server handler " + instanceNumber + " on " + port;
1012 if (cq == priorityCallQueue) {
1013
1014 threadName = "PRI " + threadName;
1015 }
1016 this.setName(threadName);
1017 }
1018
1019 @Override
1020 public void run() {
1021 LOG.info(getName() + ": starting");
1022 SERVER.set(HBaseServer.this);
1023 while (running) {
1024 try {
1025 Call call = myCallQueue.take();
1026
1027 if (LOG.isDebugEnabled())
1028 LOG.debug(getName() + ": has #" + call.id + " from " +
1029 call.connection);
1030
1031 String errorClass = null;
1032 String error = null;
1033 Writable value = null;
1034
1035 CurCall.set(call);
1036 try {
1037 if (!started)
1038 throw new ServerNotRunningException("Server is not running yet");
1039 value = call(call.param, call.timestamp);
1040 } catch (Throwable e) {
1041 LOG.debug(getName()+", call "+call+": error: " + e, e);
1042 errorClass = e.getClass().getName();
1043 error = StringUtils.stringifyException(e);
1044 }
1045 CurCall.set(null);
1046
1047 int size = BUFFER_INITIAL_SIZE;
1048 if (value instanceof WritableWithSize) {
1049
1050 WritableWithSize ohint = (WritableWithSize)value;
1051 long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
1052 if (hint > 0) {
1053 if ((hint) > Integer.MAX_VALUE) {
1054
1055 IOException ioe =
1056 new IOException("Result buffer size too large: " + hint);
1057 errorClass = ioe.getClass().getName();
1058 error = StringUtils.stringifyException(ioe);
1059 } else {
1060 size = (int)hint;
1061 }
1062 }
1063 }
1064 ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
1065 DataOutputStream out = new DataOutputStream(buf);
1066 out.writeInt(call.id);
1067 out.writeBoolean(error != null);
1068
1069 if (error == null) {
1070 value.write(out);
1071 } else {
1072 WritableUtils.writeString(out, errorClass);
1073 WritableUtils.writeString(out, error);
1074 }
1075
1076 if (buf.size() > warnResponseSize) {
1077 LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
1078 + StringUtils.humanReadableInt(buf.size()));
1079 }
1080
1081
1082 call.setResponse(buf.getByteBuffer());
1083 responder.doRespond(call);
1084 } catch (InterruptedException e) {
1085 if (running) {
1086 LOG.info(getName() + " caught: " +
1087 StringUtils.stringifyException(e));
1088 }
1089 } catch (OutOfMemoryError e) {
1090 if (errorHandler != null) {
1091 if (errorHandler.checkOOME(e)) {
1092 LOG.info(getName() + ": exiting on OOME");
1093 return;
1094 }
1095 } else {
1096
1097 throw e;
1098 }
1099 } catch (Exception e) {
1100 LOG.warn(getName() + " caught: " +
1101 StringUtils.stringifyException(e));
1102 }
1103 }
1104 LOG.info(getName() + ": exiting");
1105 }
1106
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116 private Function<Writable,Integer> qosFunction = null;
1117 public void setQosFunction(Function<Writable, Integer> newFunc) {
1118 qosFunction = newFunc;
1119 }
1120
1121 protected int getQosLevel(Writable param) {
1122 if (qosFunction == null) {
1123 return 0;
1124 }
1125
1126 Integer res = qosFunction.apply(param);
1127 if (res == null) {
1128 return 0;
1129 }
1130 return res;
1131 }
1132
1133
1134
1135
1136
1137
1138 protected HBaseServer(String bindAddress, int port,
1139 Class<? extends Writable> paramClass, int handlerCount,
1140 int priorityHandlerCount, Configuration conf, String serverName,
1141 int highPriorityLevel)
1142 throws IOException {
1143 this.bindAddress = bindAddress;
1144 this.conf = conf;
1145 this.port = port;
1146 this.paramClass = paramClass;
1147 this.handlerCount = handlerCount;
1148 this.priorityHandlerCount = priorityHandlerCount;
1149 this.socketSendBufferSize = 0;
1150 this.maxQueueSize =
1151 this.conf.getInt("ipc.server.max.queue.size",
1152 handlerCount * DEFAULT_MAX_QUEUE_SIZE_PER_HANDLER);
1153 this.readThreads = conf.getInt(
1154 "ipc.server.read.threadpool.size",
1155 10);
1156 this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
1157 if (priorityHandlerCount > 0) {
1158 this.priorityCallQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
1159 } else {
1160 this.priorityCallQueue = null;
1161 }
1162 this.highPriorityLevel = highPriorityLevel;
1163 this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
1164 this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
1165 this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
1166
1167
1168 listener = new Listener();
1169 this.port = listener.getAddress().getPort();
1170 this.rpcMetrics = new HBaseRpcMetrics(serverName,
1171 Integer.toString(this.port));
1172 this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
1173 this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
1174
1175 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
1176 DEFAULT_WARN_RESPONSE_SIZE);
1177
1178
1179
1180 responder = new Responder();
1181 }
1182
1183 protected void closeConnection(Connection connection) {
1184 synchronized (connectionList) {
1185 if (connectionList.remove(connection))
1186 numConnections--;
1187 }
1188 connection.close();
1189 }
1190
1191
1192
1193
1194 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
1195
1196
1197 public void start() {
1198 startThreads();
1199 openServer();
1200 }
1201
1202
1203
1204
1205 public void openServer() {
1206 started = true;
1207 }
1208
1209
1210
1211
1212
1213 public synchronized void startThreads() {
1214 responder.start();
1215 listener.start();
1216 handlers = new Handler[handlerCount];
1217
1218 for (int i = 0; i < handlerCount; i++) {
1219 handlers[i] = new Handler(callQueue, i);
1220 handlers[i].start();
1221 }
1222
1223 if (priorityHandlerCount > 0) {
1224 priorityHandlers = new Handler[priorityHandlerCount];
1225 for (int i = 0 ; i < priorityHandlerCount; i++) {
1226 priorityHandlers[i] = new Handler(priorityCallQueue, i);
1227 priorityHandlers[i].start();
1228 }
1229 }
1230 }
1231
1232
1233 public synchronized void stop() {
1234 LOG.info("Stopping server on " + port);
1235 running = false;
1236 if (handlers != null) {
1237 for (Handler handler : handlers) {
1238 if (handler != null) {
1239 handler.interrupt();
1240 }
1241 }
1242 }
1243 if (priorityHandlers != null) {
1244 for (Handler handler : priorityHandlers) {
1245 if (handler != null) {
1246 handler.interrupt();
1247 }
1248 }
1249 }
1250 listener.interrupt();
1251 listener.doStop();
1252 responder.interrupt();
1253 notifyAll();
1254 if (this.rpcMetrics != null) {
1255 this.rpcMetrics.shutdown();
1256 }
1257 }
1258
1259
1260
1261
1262
1263
1264 public synchronized void join() throws InterruptedException {
1265 while (running) {
1266 wait();
1267 }
1268 }
1269
1270
1271
1272
1273
1274 public synchronized InetSocketAddress getListenerAddress() {
1275 return listener.getAddress();
1276 }
1277
1278
1279
1280
1281
1282
1283
1284 public abstract Writable call(Writable param, long receiveTime)
1285 throws IOException;
1286
1287
1288
1289
1290
1291 public int getNumOpenConnections() {
1292 return numConnections;
1293 }
1294
1295
1296
1297
1298
1299 public int getCallQueueLen() {
1300 return callQueue.size();
1301 }
1302
1303
1304
1305
1306
1307 public void setErrorHandler(HBaseRPCErrorHandler handler) {
1308 this.errorHandler = handler;
1309 }
1310
1311
1312
1313
1314 public HBaseRpcMetrics getRpcMetrics() {
1315 return rpcMetrics;
1316 }
1317
1318
1319
1320
1321
1322
1323 private static int NIO_BUFFER_LIMIT = 8*1024;
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339 protected static int channelWrite(WritableByteChannel channel,
1340 ByteBuffer buffer) throws IOException {
1341 return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1342 channel.write(buffer) : channelIO(null, channel, buffer);
1343 }
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 protected static int channelRead(ReadableByteChannel channel,
1358 ByteBuffer buffer) throws IOException {
1359 return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
1360 channel.read(buffer) : channelIO(channel, null, buffer);
1361 }
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376 private static int channelIO(ReadableByteChannel readCh,
1377 WritableByteChannel writeCh,
1378 ByteBuffer buf) throws IOException {
1379
1380 int originalLimit = buf.limit();
1381 int initialRemaining = buf.remaining();
1382 int ret = 0;
1383
1384 while (buf.remaining() > 0) {
1385 try {
1386 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
1387 buf.limit(buf.position() + ioSize);
1388
1389 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
1390
1391 if (ret < ioSize) {
1392 break;
1393 }
1394
1395 } finally {
1396 buf.limit(originalLimit);
1397 }
1398 }
1399
1400 int nBytes = initialRemaining - buf.remaining();
1401 return (nBytes > 0) ? nBytes : ret;
1402 }
1403 }