package org.apache.avro.ipc.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.avro.Protocol;
import org.apache.avro.ipc.CallFuture;
import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.netty.NettyTransportCodec;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/avro/ipc/netty/NettyTransceiver.class */
public class NettyTransceiver extends Transceiver {
    public static final long DEFAULT_CONNECTION_TIMEOUT_MILLIS = 60000;
    public static final String NETTY_CONNECT_TIMEOUT_OPTION = "connectTimeoutMillis";
    public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
    public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
    public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
    private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class.getName());
    private final AtomicInteger serialGenerator;
    private final Map<Integer, Callback<List<ByteBuffer>>> requests;
    private final ChannelFactory channelFactory;
    private final long connectTimeoutMillis;
    private final ClientBootstrap bootstrap;
    private final InetSocketAddress remoteAddr;
    volatile ChannelFuture channelFuture;
    volatile boolean stopping;
    private final Object channelFutureLock;
    private final ReentrantReadWriteLock stateLock;
    private Channel channel;
    private Protocol remote;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/avro/ipc/netty/NettyTransceiver$NettyClientAvroHandler.class */
    public class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
        protected NettyClientAvroHandler() {
        }

        public void handleUpstream(ChannelHandlerContext channelHandlerContext, ChannelEvent channelEvent) throws Exception {
            if (channelEvent instanceof ChannelStateEvent) {
                NettyTransceiver.LOG.debug(channelEvent.toString());
                ChannelStateEvent channelStateEvent = (ChannelStateEvent) channelEvent;
                if (channelStateEvent.getState() == ChannelState.OPEN && Boolean.FALSE.equals(channelStateEvent.getValue())) {
                    NettyTransceiver.LOG.debug("Remote peer " + NettyTransceiver.this.remoteAddr + " closed connection.");
                    NettyTransceiver.this.disconnect(false, true, null);
                }
            }
            super.handleUpstream(channelHandlerContext, channelEvent);
        }

        public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            super.channelOpen(channelHandlerContext, channelStateEvent);
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
            NettyTransportCodec.NettyDataPack nettyDataPack = (NettyTransportCodec.NettyDataPack) messageEvent.getMessage();
            Callback callback = (Callback) NettyTransceiver.this.requests.get(Integer.valueOf(nettyDataPack.getSerial()));
            if (callback == null) {
                throw new RuntimeException("Missing previous call info");
            }
            try {
                callback.handleResult(nettyDataPack.getDatas());
                NettyTransceiver.this.requests.remove(Integer.valueOf(nettyDataPack.getSerial()));
            } catch (Throwable th) {
                NettyTransceiver.this.requests.remove(Integer.valueOf(nettyDataPack.getSerial()));
                throw th;
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
            NettyTransceiver.this.disconnect(false, true, exceptionEvent.getCause());
        }
    }

    /* loaded from: input_file:org/apache/avro/ipc/netty/NettyTransceiver$NettyTransceiverThreadFactory.class */
    protected static class NettyTransceiverThreadFactory implements ThreadFactory {
        private final AtomicInteger threadId = new AtomicInteger(0);
        private final String prefix;

        public NettyTransceiverThreadFactory(String str) {
            this.prefix = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName(this.prefix + " " + this.threadId.incrementAndGet());
            return thread;
        }
    }

    /* loaded from: input_file:org/apache/avro/ipc/netty/NettyTransceiver$WriteFutureListener.class */
    protected static class WriteFutureListener implements ChannelFutureListener {
        protected final Callback<List<ByteBuffer>> callback;

        public WriteFutureListener(Callback<List<ByteBuffer>> callback) {
            this.callback = callback;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess() || this.callback == null) {
                return;
            }
            this.callback.handleError(new IOException("Error writing buffers", channelFuture.getCause()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyTransceiver() {
        this.serialGenerator = new AtomicInteger(0);
        this.requests = new ConcurrentHashMap();
        this.channelFutureLock = new Object();
        this.stateLock = new ReentrantReadWriteLock();
        this.channelFactory = null;
        this.connectTimeoutMillis = 0L;
        this.bootstrap = null;
        this.remoteAddr = null;
        this.channelFuture = null;
    }

    public NettyTransceiver(InetSocketAddress inetSocketAddress) throws IOException {
        this(inetSocketAddress, Long.valueOf(DEFAULT_CONNECTION_TIMEOUT_MILLIS));
    }

    public NettyTransceiver(InetSocketAddress inetSocketAddress, Long l) throws IOException {
        this(inetSocketAddress, (ChannelFactory) new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " Boss")), Executors.newCachedThreadPool(new NettyTransceiverThreadFactory("Avro " + NettyTransceiver.class.getSimpleName() + " I/O Worker"))), l);
    }

    public NettyTransceiver(InetSocketAddress inetSocketAddress, ChannelFactory channelFactory) throws IOException {
        this(inetSocketAddress, channelFactory, buildDefaultBootstrapOptions(null));
    }

    public NettyTransceiver(InetSocketAddress inetSocketAddress, ChannelFactory channelFactory, Long l) throws IOException {
        this(inetSocketAddress, channelFactory, buildDefaultBootstrapOptions(l));
    }

    public NettyTransceiver(InetSocketAddress inetSocketAddress, ChannelFactory channelFactory, Map<String, Object> map) throws IOException {
        this.serialGenerator = new AtomicInteger(0);
        this.requests = new ConcurrentHashMap();
        this.channelFutureLock = new Object();
        this.stateLock = new ReentrantReadWriteLock();
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory is null");
        }
        this.channelFactory = channelFactory;
        this.connectTimeoutMillis = ((Long) map.get(NETTY_CONNECT_TIMEOUT_OPTION)).longValue();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.remoteAddr = inetSocketAddress;
        this.bootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("frameDecoder", new NettyTransportCodec.NettyFrameDecoder());
            pipeline.addLast("frameEncoder", new NettyTransportCodec.NettyFrameEncoder());
            pipeline.addLast("handler", createNettyClientAvroHandler());
            return pipeline;
        });
        if (map != null) {
            LOG.debug("Using Netty bootstrap options: " + map);
            this.bootstrap.setOptions(map);
        }
        this.stateLock.readLock().lock();
        try {
            try {
                getChannel();
                this.stateLock.readLock().unlock();
            } catch (Throwable th) {
                if (this.channelFuture != null) {
                    this.channelFuture.getChannel().close();
                }
                if (th instanceof IOException) {
                    throw ((IOException) th);
                }
                if (!(th instanceof RuntimeException)) {
                    throw ((Error) th);
                }
                throw ((RuntimeException) th);
            }
        } catch (Throwable th2) {
            this.stateLock.readLock().unlock();
            throw th2;
        }
    }

    protected ChannelUpstreamHandler createNettyClientAvroHandler() {
        return new NettyClientAvroHandler();
    }

    protected static Map<String, Object> buildDefaultBootstrapOptions(Long l) {
        HashMap hashMap = new HashMap(3);
        hashMap.put(NETTY_TCP_NODELAY_OPTION, true);
        hashMap.put(NETTY_KEEPALIVE_OPTION, true);
        hashMap.put(NETTY_CONNECT_TIMEOUT_OPTION, Long.valueOf(l == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : l.longValue()));
        return hashMap;
    }

    private static boolean isChannelReady(Channel channel) {
        return channel != null && channel.isOpen() && channel.isBound() && channel.isConnected();
    }

    private Channel getChannel() throws IOException {
        if (!isChannelReady(this.channel)) {
            this.stateLock.readLock().unlock();
            this.stateLock.writeLock().lock();
            try {
                if (!isChannelReady(this.channel)) {
                    synchronized (this.channelFutureLock) {
                        if (!this.stopping) {
                            LOG.debug("Connecting to " + this.remoteAddr);
                            this.channelFuture = this.bootstrap.connect(this.remoteAddr);
                        }
                    }
                    if (this.channelFuture != null) {
                        try {
                            this.channelFuture.await(this.connectTimeoutMillis);
                            synchronized (this.channelFutureLock) {
                                if (!this.channelFuture.isSuccess()) {
                                    throw new IOException("Error connecting to " + this.remoteAddr, this.channelFuture.getCause());
                                }
                                this.channel = this.channelFuture.getChannel();
                                this.channelFuture = null;
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new IOException("Interrupted while connecting to " + this.remoteAddr);
                        }
                    }
                }
            } finally {
                this.stateLock.readLock().lock();
                this.stateLock.writeLock().unlock();
            }
        }
        return this.channel;
    }

    private void disconnect() {
        disconnect(false, false, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnect(boolean z, boolean z2, Throwable th) {
        Channel channel = null;
        ConcurrentHashMap concurrentHashMap = null;
        boolean z3 = this.stateLock.getReadHoldCount() != 0;
        ChannelFuture channelFuture = null;
        synchronized (this.channelFutureLock) {
            if (this.stopping && this.channelFuture != null) {
                channelFuture = this.channelFuture;
                this.channelFuture = null;
            }
        }
        if (channelFuture != null) {
            channelFuture.cancel();
        }
        if (z3) {
            this.stateLock.readLock().unlock();
        }
        this.stateLock.writeLock().lock();
        try {
            if (this.channel != null) {
                if (th != null) {
                    LOG.debug("Disconnecting from " + this.remoteAddr, th);
                } else {
                    LOG.debug("Disconnecting from " + this.remoteAddr);
                }
                channel = this.channel;
                this.channel = null;
                this.remote = null;
                if (z2) {
                    concurrentHashMap = new ConcurrentHashMap(this.requests);
                    this.requests.clear();
                }
            }
            if (concurrentHashMap != null && !concurrentHashMap.isEmpty()) {
                LOG.debug("Removing " + concurrentHashMap.size() + " pending request(s).");
                Iterator it = concurrentHashMap.values().iterator();
                while (it.hasNext()) {
                    ((Callback) it.next()).handleError(th != null ? th : new IOException(getClass().getSimpleName() + " closed"));
                }
            }
            if (channel != null) {
                ChannelFuture close = channel.close();
                if (!z || close == null) {
                    return;
                }
                try {
                    close.await(this.connectTimeoutMillis);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.warn("Interrupted while disconnecting", e);
                }
            }
        } finally {
            if (z3) {
                this.stateLock.readLock().lock();
            }
            this.stateLock.writeLock().unlock();
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public void lockChannel() {
    }

    @Override // org.apache.avro.ipc.Transceiver
    public void unlockChannel() {
    }

    @Override // org.apache.avro.ipc.Transceiver, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(true);
    }

    public void close(boolean z) {
        try {
            this.stopping = true;
            disconnect(z, true, null);
        } finally {
            this.channelFactory.releaseExternalResources();
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public String getRemoteName() throws IOException {
        this.stateLock.readLock().lock();
        try {
            return getChannel().getRemoteAddress().toString();
        } finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public List<ByteBuffer> transceive(List<ByteBuffer> list) throws IOException {
        try {
            CallFuture callFuture = new CallFuture();
            transceive(list, callFuture);
            return (List) callFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            LOG.debug("failed to get the response", e);
            return null;
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public void transceive(List<ByteBuffer> list, Callback<List<ByteBuffer>> callback) throws IOException {
        this.stateLock.readLock().lock();
        try {
            int incrementAndGet = this.serialGenerator.incrementAndGet();
            NettyTransportCodec.NettyDataPack nettyDataPack = new NettyTransportCodec.NettyDataPack(incrementAndGet, list);
            this.requests.put(Integer.valueOf(incrementAndGet), callback);
            writeDataPack(nettyDataPack);
            this.stateLock.readLock().unlock();
        } catch (Throwable th) {
            this.stateLock.readLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public void writeBuffers(List<ByteBuffer> list) throws IOException {
        this.stateLock.readLock().lock();
        try {
            ChannelFuture writeDataPack = writeDataPack(new NettyTransportCodec.NettyDataPack(this.serialGenerator.incrementAndGet(), list));
            if (!writeDataPack.isDone()) {
                try {
                    writeDataPack.await();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Interrupted while writing Netty data pack", e);
                }
            }
            if (!writeDataPack.isSuccess()) {
                throw new IOException("Error writing buffers", writeDataPack.getCause());
            }
        } finally {
            this.stateLock.readLock().unlock();
        }
    }

    private ChannelFuture writeDataPack(NettyTransportCodec.NettyDataPack nettyDataPack) throws IOException {
        return getChannel().write(nettyDataPack);
    }

    @Override // org.apache.avro.ipc.Transceiver
    public List<ByteBuffer> readBuffers() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.avro.ipc.Transceiver
    public Protocol getRemote() {
        this.stateLock.readLock().lock();
        try {
            return this.remote;
        } finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public boolean isConnected() {
        this.stateLock.readLock().lock();
        try {
            return this.remote != null;
        } finally {
            this.stateLock.readLock().unlock();
        }
    }

    @Override // org.apache.avro.ipc.Transceiver
    public void setRemote(Protocol protocol) {
        this.stateLock.writeLock().lock();
        try {
            this.remote = protocol;
        } finally {
            this.stateLock.writeLock().unlock();
        }
    }
}
