1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.ipc;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.util.Bytes;
27 import org.apache.hadoop.io.DataOutputBuffer;
28 import org.apache.hadoop.io.IOUtils;
29 import org.apache.hadoop.io.ObjectWritable;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.io.WritableUtils;
32 import org.apache.hadoop.ipc.RemoteException;
33 import org.apache.hadoop.net.NetUtils;
34 import org.apache.hadoop.security.UserGroupInformation;
35 import org.apache.hadoop.util.ReflectionUtils;
36
37 import javax.net.SocketFactory;
38 import java.io.BufferedInputStream;
39 import java.io.BufferedOutputStream;
40 import java.io.DataInputStream;
41 import java.io.DataOutputStream;
42 import java.io.FilterInputStream;
43 import java.io.IOException;
44 import java.io.InputStream;
45 import java.net.ConnectException;
46 import java.net.InetSocketAddress;
47 import java.net.Socket;
48 import java.net.SocketTimeoutException;
49 import java.net.UnknownHostException;
50 import java.util.Hashtable;
51 import java.util.Iterator;
52 import java.util.Map.Entry;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import java.util.concurrent.atomic.AtomicLong;
55
56
57
58
59
60
61
62
63
64
65 public class HBaseClient {
66
67 private static final Log LOG =
68 LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
69 protected final Hashtable<ConnectionId, Connection> connections =
70 new Hashtable<ConnectionId, Connection>();
71
72 protected final Class<? extends Writable> valueClass;
73 protected int counter;
74 protected final AtomicBoolean running = new AtomicBoolean(true);
75 final protected Configuration conf;
76 final protected int maxIdleTime;
77
78 final protected int maxRetries;
79 final protected long failureSleep;
80 protected final boolean tcpNoDelay;
81 protected final boolean tcpKeepAlive;
82 protected int pingInterval;
83 protected int socketTimeout;
84
85 protected final SocketFactory socketFactory;
86 private int refCount = 1;
87
88 final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
89 final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
90 final static int DEFAULT_PING_INTERVAL = 60000;
91 final static int DEFAULT_SOCKET_TIMEOUT = 20000;
92 final static int PING_CALL_ID = -1;
93
94
95
96
97
98
99
100 public static void setPingInterval(Configuration conf, int pingInterval) {
101 conf.setInt(PING_INTERVAL_NAME, pingInterval);
102 }
103
104
105
106
107
108
109
110
111 static int getPingInterval(Configuration conf) {
112 return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
113 }
114
115
116
117
118
119
120 public static void setSocketTimeout(Configuration conf, int socketTimeout) {
121 conf.setInt(SOCKET_TIMEOUT, socketTimeout);
122 }
123
124
125
126
127 static int getSocketTimeout(Configuration conf) {
128 return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
129 }
130
131
132
133
134
135 synchronized void incCount() {
136 refCount++;
137 }
138
139
140
141
142
143 synchronized void decCount() {
144 refCount--;
145 }
146
147
148
149
150
151
152 synchronized boolean isZeroReference() {
153 return refCount==0;
154 }
155
156
157 private class Call {
158 final int id;
159 final Writable param;
160 Writable value;
161 IOException error;
162 boolean done;
163
164 protected Call(Writable param) {
165 this.param = param;
166 synchronized (HBaseClient.this) {
167 this.id = counter++;
168 }
169 }
170
171
172
173 protected synchronized void callComplete() {
174 this.done = true;
175 notify();
176 }
177
178
179
180
181
182
183 public synchronized void setException(IOException error) {
184 this.error = error;
185 callComplete();
186 }
187
188
189
190
191
192
193 public synchronized void setValue(Writable value) {
194 this.value = value;
195 callComplete();
196 }
197 }
198
199
200
201
202 private class Connection extends Thread {
203 private ConnectionId remoteId;
204 private Socket socket = null;
205 private DataInputStream in;
206 private DataOutputStream out;
207
208
209 private final Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
210 private final AtomicLong lastActivity = new AtomicLong();
211 protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
212 private IOException closeException;
213
214 public Connection(InetSocketAddress address) throws IOException {
215 this(new ConnectionId(address, null, 0));
216 }
217
218 public Connection(ConnectionId remoteId) throws IOException {
219 if (remoteId.getAddress().isUnresolved()) {
220 throw new UnknownHostException("unknown host: " +
221 remoteId.getAddress().getHostName());
222 }
223 this.remoteId = remoteId;
224 UserGroupInformation ticket = remoteId.getTicket();
225 this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
226 remoteId.getAddress().toString() +
227 ((ticket==null)?" from an unknown user": (" from " + ticket.getUserName())));
228 this.setDaemon(true);
229 }
230
231
232 private void touch() {
233 lastActivity.set(System.currentTimeMillis());
234 }
235
236
237
238
239
240
241
242
243 protected synchronized boolean addCall(Call call) {
244 if (shouldCloseConnection.get())
245 return false;
246 calls.put(call.id, call);
247 notify();
248 return true;
249 }
250
251
252
253
254
255 private class PingInputStream extends FilterInputStream {
256
257 protected PingInputStream(InputStream in) {
258 super(in);
259 }
260
261
262
263
264
265 private void handleTimeout(SocketTimeoutException e) throws IOException {
266 if (shouldCloseConnection.get() || !running.get() ||
267 remoteId.rpcTimeout > 0) {
268 throw e;
269 }
270 sendPing();
271 }
272
273
274
275
276
277
278 @Override
279 public int read() throws IOException {
280 do {
281 try {
282 return super.read();
283 } catch (SocketTimeoutException e) {
284 handleTimeout(e);
285 }
286 } while (true);
287 }
288
289
290
291
292
293
294
295 @Override
296 public int read(byte[] buf, int off, int len) throws IOException {
297 do {
298 try {
299 return super.read(buf, off, len);
300 } catch (SocketTimeoutException e) {
301 handleTimeout(e);
302 }
303 } while (true);
304 }
305 }
306
307
308
309
310
311
312 protected synchronized void setupIOstreams() throws IOException {
313 if (socket != null || shouldCloseConnection.get()) {
314 return;
315 }
316
317 short ioFailures = 0;
318 short timeoutFailures = 0;
319 try {
320 if (LOG.isDebugEnabled()) {
321 LOG.debug("Connecting to "+remoteId.getAddress());
322 }
323 while (true) {
324 try {
325 this.socket = socketFactory.createSocket();
326 this.socket.setTcpNoDelay(tcpNoDelay);
327 this.socket.setKeepAlive(tcpKeepAlive);
328 NetUtils.connect(this.socket, remoteId.getAddress(),
329 getSocketTimeout(conf));
330 if (remoteId.rpcTimeout > 0) {
331 pingInterval = remoteId.rpcTimeout;
332 }
333 this.socket.setSoTimeout(pingInterval);
334 break;
335 } catch (SocketTimeoutException toe) {
336 handleConnectionFailure(timeoutFailures++, maxRetries, toe);
337 } catch (IOException ie) {
338 handleConnectionFailure(ioFailures++, maxRetries, ie);
339 }
340 }
341 this.in = new DataInputStream(new BufferedInputStream
342 (new PingInputStream(NetUtils.getInputStream(socket))));
343 this.out = new DataOutputStream
344 (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
345 writeHeader();
346
347
348 touch();
349
350
351 start();
352 } catch (IOException e) {
353 markClosed(e);
354 close();
355
356 throw e;
357 }
358 }
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374 private void handleConnectionFailure(
375 int curRetries, int maxRetries, IOException ioe) throws IOException {
376
377 if (socket != null) {
378 try {
379 socket.close();
380 } catch (IOException e) {
381 LOG.warn("Not able to close a socket", e);
382 }
383 }
384
385
386 socket = null;
387
388
389 if (curRetries >= maxRetries) {
390 throw ioe;
391 }
392
393
394 try {
395 Thread.sleep(failureSleep);
396 } catch (InterruptedException ignored) {}
397
398 LOG.info("Retrying connect to server: " + remoteId.getAddress() +
399 " after sleeping " + failureSleep + "ms. Already tried " + curRetries +
400 " time(s).");
401 }
402
403
404
405
406 private void writeHeader() throws IOException {
407 out.write(HBaseServer.HEADER.array());
408 out.write(HBaseServer.CURRENT_VERSION);
409
410 DataOutputBuffer buf = new DataOutputBuffer();
411 ObjectWritable.writeObject(buf, remoteId.getTicket(),
412 UserGroupInformation.class, conf);
413 int bufLen = buf.getLength();
414 out.writeInt(bufLen);
415 out.write(buf.getData(), 0, bufLen);
416 }
417
418
419
420
421
422
423
424 @SuppressWarnings({"ThrowableInstanceNeverThrown"})
425 private synchronized boolean waitForWork() {
426 if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
427 long timeout = maxIdleTime-
428 (System.currentTimeMillis()-lastActivity.get());
429 if (timeout>0) {
430 try {
431 wait(timeout);
432 } catch (InterruptedException ignored) {}
433 }
434 }
435
436 if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
437 return true;
438 } else if (shouldCloseConnection.get()) {
439 return false;
440 } else if (calls.isEmpty()) {
441 markClosed(null);
442 return false;
443 } else {
444 markClosed((IOException)new IOException().initCause(
445 new InterruptedException()));
446 return false;
447 }
448 }
449
450 public InetSocketAddress getRemoteAddress() {
451 return remoteId.getAddress();
452 }
453
454
455
456
457 protected synchronized void sendPing() throws IOException {
458 long curTime = System.currentTimeMillis();
459 if ( curTime - lastActivity.get() >= pingInterval) {
460 lastActivity.set(curTime);
461
462 synchronized (this.out) {
463 out.writeInt(PING_CALL_ID);
464 out.flush();
465 }
466 }
467 }
468
469 @Override
470 public void run() {
471 if (LOG.isDebugEnabled())
472 LOG.debug(getName() + ": starting, having connections "
473 + connections.size());
474
475 try {
476 while (waitForWork()) {
477 receiveResponse();
478 }
479 } catch (Throwable t) {
480 LOG.warn("Unexpected exception receiving call responses", t);
481 markClosed(new IOException("Unexpected exception receiving call responses", t));
482 }
483
484 close();
485
486 if (LOG.isDebugEnabled())
487 LOG.debug(getName() + ": stopped, remaining connections "
488 + connections.size());
489 }
490
491
492
493
494
495 protected void sendParam(Call call) {
496 if (shouldCloseConnection.get()) {
497 return;
498 }
499
500 DataOutputBuffer d=null;
501 try {
502
503 synchronized (this.out) {
504 if (LOG.isDebugEnabled())
505 LOG.debug(getName() + " sending #" + call.id);
506
507
508
509 d = new DataOutputBuffer();
510 d.writeInt(0xdeadbeef);
511 d.writeInt(call.id);
512 call.param.write(d);
513 byte[] data = d.getData();
514 int dataLength = d.getLength();
515
516 Bytes.putInt(data, 0, dataLength - 4);
517 out.write(data, 0, dataLength);
518 out.flush();
519 }
520 } catch(IOException e) {
521 markClosed(e);
522 } finally {
523
524
525 IOUtils.closeStream(d);
526 }
527 }
528
529
530
531
532 private void receiveResponse() {
533 if (shouldCloseConnection.get()) {
534 return;
535 }
536 touch();
537
538 try {
539 int id = in.readInt();
540
541 if (LOG.isDebugEnabled())
542 LOG.debug(getName() + " got value #" + id);
543
544 Call call = calls.get(id);
545
546 boolean isError = in.readBoolean();
547 if (isError) {
548
549 call.setException(new RemoteException( WritableUtils.readString(in),
550 WritableUtils.readString(in)));
551 calls.remove(id);
552 } else {
553 Writable value = ReflectionUtils.newInstance(valueClass, conf);
554 value.readFields(in);
555 call.setValue(value);
556 calls.remove(id);
557 }
558 } catch (IOException e) {
559 markClosed(e);
560 }
561 }
562
563 private synchronized void markClosed(IOException e) {
564 if (shouldCloseConnection.compareAndSet(false, true)) {
565 closeException = e;
566 notifyAll();
567 }
568 }
569
570
571 private synchronized void close() {
572 if (!shouldCloseConnection.get()) {
573 LOG.error("The connection is not in the closed state");
574 return;
575 }
576
577
578
579 synchronized (connections) {
580 if (connections.get(remoteId) == this) {
581 connections.remove(remoteId);
582 }
583 }
584
585
586 IOUtils.closeStream(out);
587 IOUtils.closeStream(in);
588
589
590 if (closeException == null) {
591 if (!calls.isEmpty()) {
592 LOG.warn(
593 "A connection is closed for no cause and calls are not empty");
594
595
596 closeException = new IOException("Unexpected closed connection");
597 cleanupCalls();
598 }
599 } else {
600
601 if (LOG.isDebugEnabled()) {
602 LOG.debug("closing ipc connection to " + remoteId.address + ": " +
603 closeException.getMessage(),closeException);
604 }
605
606
607 cleanupCalls();
608 }
609 if (LOG.isDebugEnabled())
610 LOG.debug(getName() + ": closed");
611 }
612
613
614 private void cleanupCalls() {
615 Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
616 while (itor.hasNext()) {
617 Call c = itor.next().getValue();
618 c.setException(closeException);
619 itor.remove();
620 }
621 }
622 }
623
624
625 private class ParallelCall extends Call {
626 private final ParallelResults results;
627 protected final int index;
628
629 public ParallelCall(Writable param, ParallelResults results, int index) {
630 super(param);
631 this.results = results;
632 this.index = index;
633 }
634
635
636 @Override
637 protected void callComplete() {
638 results.callComplete(this);
639 }
640 }
641
642
643 private static class ParallelResults {
644 protected final Writable[] values;
645 protected int size;
646 protected int count;
647
648 public ParallelResults(int size) {
649 this.values = new Writable[size];
650 this.size = size;
651 }
652
653
654
655
656 synchronized void callComplete(ParallelCall call) {
657
658 values[call.index] = call.value;
659 count++;
660 if (count == size)
661 notify();
662 }
663 }
664
665
666
667
668
669
670
671
672 public HBaseClient(Class<? extends Writable> valueClass, Configuration conf,
673 SocketFactory factory) {
674 this.valueClass = valueClass;
675 this.maxIdleTime =
676 conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
677 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
678 this.failureSleep = conf.getInt("hbase.client.pause", 1000);
679 this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", false);
680 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
681 this.pingInterval = getPingInterval(conf);
682 if (LOG.isDebugEnabled()) {
683 LOG.debug("The ping interval is" + this.pingInterval + "ms.");
684 }
685 this.conf = conf;
686 this.socketFactory = factory;
687 }
688
689
690
691
692
693
694 public HBaseClient(Class<? extends Writable> valueClass, Configuration conf) {
695 this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
696 }
697
698
699
700
701
702 SocketFactory getSocketFactory() {
703 return socketFactory;
704 }
705
706
707
708 public void stop() {
709 if (LOG.isDebugEnabled()) {
710 LOG.debug("Stopping client");
711 }
712
713 if (!running.compareAndSet(true, false)) {
714 return;
715 }
716
717
718 synchronized (connections) {
719 for (Connection conn : connections.values()) {
720 conn.interrupt();
721 }
722 }
723
724
725 while (!connections.isEmpty()) {
726 try {
727 Thread.sleep(100);
728 } catch (InterruptedException ignored) {
729 }
730 }
731 }
732
733
734
735
736
737
738
739
740
741 public Writable call(Writable param, InetSocketAddress address)
742 throws IOException {
743 return call(param, address, null, 0);
744 }
745
746 public Writable call(Writable param, InetSocketAddress addr,
747 UserGroupInformation ticket, int rpcTimeout)
748 throws IOException {
749 Call call = new Call(param);
750 Connection connection = getConnection(addr, ticket, rpcTimeout, call);
751 connection.sendParam(call);
752 boolean interrupted = false;
753
754 synchronized (call) {
755 while (!call.done) {
756 try {
757 call.wait();
758 } catch (InterruptedException ignored) {
759
760 interrupted = true;
761 }
762 }
763
764 if (interrupted) {
765
766 Thread.currentThread().interrupt();
767 }
768
769 if (call.error != null) {
770 if (call.error instanceof RemoteException) {
771 call.error.fillInStackTrace();
772 throw call.error;
773 }
774
775 throw wrapException(addr, call.error);
776 }
777 return call.value;
778 }
779 }
780
781
782
783
784
785
786
787
788
789
790
791
792
793 @SuppressWarnings({"ThrowableInstanceNeverThrown"})
794 private IOException wrapException(InetSocketAddress addr,
795 IOException exception) {
796 if (exception instanceof ConnectException) {
797
798 return (ConnectException)new ConnectException(
799 "Call to " + addr + " failed on connection exception: " + exception)
800 .initCause(exception);
801 } else if (exception instanceof SocketTimeoutException) {
802 return (SocketTimeoutException)new SocketTimeoutException(
803 "Call to " + addr + " failed on socket timeout exception: "
804 + exception).initCause(exception);
805 } else {
806 return (IOException)new IOException(
807 "Call to " + addr + " failed on local exception: " + exception)
808 .initCause(exception);
809
810 }
811 }
812
813
814
815
816
817
818
819
820
821
822 public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
823 throws IOException {
824 if (addresses.length == 0) return new Writable[0];
825
826 ParallelResults results = new ParallelResults(params.length);
827
828
829 synchronized (results) {
830 for (int i = 0; i < params.length; i++) {
831 ParallelCall call = new ParallelCall(params[i], results, i);
832 try {
833 Connection connection = getConnection(addresses[i], null, 0, call);
834 connection.sendParam(call);
835 } catch (IOException e) {
836
837 LOG.info("Calling "+addresses[i]+" caught: " +
838 e.getMessage(),e);
839 results.size--;
840 }
841 }
842 while (results.count != results.size) {
843 try {
844 results.wait();
845 } catch (InterruptedException ignored) {}
846 }
847
848 return results.values;
849 }
850 }
851
852
853
854 private Connection getConnection(InetSocketAddress addr,
855 UserGroupInformation ticket,
856 int rpcTimeout,
857 Call call)
858 throws IOException {
859 if (!running.get()) {
860
861 throw new IOException("The client is stopped");
862 }
863 Connection connection;
864
865
866
867
868 ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
869 do {
870 synchronized (connections) {
871 connection = connections.get(remoteId);
872 if (connection == null) {
873 connection = new Connection(remoteId);
874 connections.put(remoteId, connection);
875 }
876 }
877 } while (!connection.addCall(call));
878
879
880
881
882
883 connection.setupIOstreams();
884 return connection;
885 }
886
887
888
889
890
891 private static class ConnectionId {
892 final InetSocketAddress address;
893 final UserGroupInformation ticket;
894 final private int rpcTimeout;
895
896 ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
897 int rpcTimeout) {
898 this.address = address;
899 this.ticket = ticket;
900 this.rpcTimeout = rpcTimeout;
901 }
902
903 InetSocketAddress getAddress() {
904 return address;
905 }
906 UserGroupInformation getTicket() {
907 return ticket;
908 }
909
910 @Override
911 public boolean equals(Object obj) {
912 if (obj instanceof ConnectionId) {
913 ConnectionId id = (ConnectionId) obj;
914 return address.equals(id.address) && ticket == id.ticket &&
915 rpcTimeout == id.rpcTimeout;
916
917 }
918 return false;
919 }
920
921 @Override
922 public int hashCode() {
923 return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
924 }
925 }
926 }