package org.apache.cassandra.streaming.async;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.IntFunction;
import org.apache.cassandra.net.AsyncChannelPromise;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
import org.apache.cassandra.net.AsyncStreamingOutputPlus;
import org.apache.cassandra.net.GlobalBufferPoolAllocator;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.StreamingDataInputPlus;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.StreamingDataOutputPlusFixed;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/async/NettyStreamingChannel.class */
public class NettyStreamingChannel extends ChannelInboundHandlerAdapter implements StreamingChannel {
    private static final Logger logger;
    private static volatile boolean trackInboundHandlers;
    private static Collection<NettyStreamingChannel> inboundHandlers;

    @VisibleForTesting
    static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR;
    final Channel channel;

    @VisibleForTesting
    final AsyncStreamingInputPlus in;
    private volatile boolean closed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.cassandra.streaming.async.NettyStreamingChannel$1Factory, reason: invalid class name */
    /* loaded from: input_file:org/apache/cassandra/streaming/async/NettyStreamingChannel$1Factory.class */
    public class C1Factory implements IntFunction<StreamingDataOutputPlus> {
        ByteBuf buf;
        ByteBuffer buffer;

        C1Factory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.IntFunction
        public StreamingDataOutputPlus apply(int i) {
            this.buf = GlobalBufferPoolAllocator.instance.buffer(i);
            this.buffer = this.buf.nioBuffer(this.buf.writerIndex(), i);
            return new StreamingDataOutputPlusFixed(this.buffer);
        }
    }

    public NettyStreamingChannel(Channel channel, StreamingChannel.Kind kind) {
        this.channel = channel;
        channel.attr(TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
        if (kind != StreamingChannel.Kind.CONTROL) {
            this.in = null;
            return;
        }
        if (trackInboundHandlers) {
            inboundHandlers.add(this);
        }
        this.in = new AsyncStreamingInputPlus(channel);
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public Object id() {
        return this.channel.id();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public String description() {
        return "channel.remote " + this.channel.remoteAddress() + " channel.local " + this.channel.localAddress() + " channel.id " + this.channel.id();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public InetSocketAddress peer() {
        return (InetSocketAddress) this.channel.remoteAddress();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public InetSocketAddress connectedTo() {
        return peer();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public boolean connected() {
        return this.channel.isOpen();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public StreamingDataInputPlus in() {
        return this.in;
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public StreamingDataOutputPlus acquireOut() {
        if (this.channel.attr(TRANSFERRING_FILE_ATTR).compareAndSet(false, true)) {
            return new AsyncStreamingOutputPlus(this.channel) { // from class: org.apache.cassandra.streaming.async.NettyStreamingChannel.1
                @Override // org.apache.cassandra.net.AsyncChannelOutputPlus, org.apache.cassandra.io.util.BufferedDataOutputStreamPlus, java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
                public void close() throws IOException {
                    try {
                        super.close();
                    } finally {
                        NettyStreamingChannel.this.channel.attr(NettyStreamingChannel.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
                    }
                }
            };
        }
        throw new IllegalStateException("channel's transferring state is currently set to true. refusing to start new stream");
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public Future<?> send(StreamingChannel.Send send) {
        C1Factory c1Factory = new C1Factory();
        try {
            send.send(c1Factory);
            ByteBuf byteBuf = c1Factory.buf;
            ByteBuffer byteBuffer = c1Factory.buffer;
            try {
                if (!$assertionsDisabled && byteBuffer.position() != byteBuffer.limit()) {
                    throw new AssertionError();
                }
                byteBuf.writerIndex(byteBuffer.position());
                AsyncChannelPromise asyncChannelPromise = new AsyncChannelPromise(this.channel);
                this.channel.writeAndFlush(byteBuf, asyncChannelPromise);
                return asyncChannelPromise;
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        } catch (Throwable th2) {
            return ImmediateFuture.failure(th2);
        }
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public synchronized io.netty.util.concurrent.Future<?> close() {
        if (this.closed) {
            return this.channel.closeFuture();
        }
        this.closed = true;
        if (this.in != null) {
            this.in.requestClosure();
            if (trackInboundHandlers) {
                inboundHandlers.remove(this);
            }
        }
        return this.channel.close();
    }

    @Override // org.apache.cassandra.streaming.StreamingChannel
    public void onClose(Runnable runnable) {
        this.channel.closeFuture().addListener(future -> {
            runnable.run();
        });
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (!this.closed && (obj instanceof ByteBuf) && this.in.append((ByteBuf) obj)) {
            return;
        }
        ReferenceCountUtil.release(obj);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        close();
        channelHandlerContext.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        if (th instanceof IOException) {
            logger.trace("connection problem while streaming", th);
        } else {
            logger.warn("exception occurred while in processing streaming data", th);
        }
        close();
    }

    @VisibleForTesting
    public static void shutdown() {
        if (!$assertionsDisabled && !trackInboundHandlers) {
            throw new AssertionError("in-JVM tests required tracking of inbound streaming handlers");
        }
        inboundHandlers.forEach((v0) -> {
            v0.close();
        });
        inboundHandlers.clear();
    }

    public static void trackInboundHandlers() {
        inboundHandlers = Collections.newSetFromMap(new ConcurrentHashMap());
        trackInboundHandlers = true;
    }

    static {
        $assertionsDisabled = !NettyStreamingChannel.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(NettyStreamingChannel.class);
        trackInboundHandlers = false;
        TRANSFERRING_FILE_ATTR = AttributeKey.valueOf("transferringFile");
    }
}
