package org.apache.cassandra.transport;

import com.google.common.primitives.Ints;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMessageSizeMetrics;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.AbstractMessageHandler;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.ShareableBytes;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.Flusher;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.NonBlockingRateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/transport/CQLMessageHandler.class */
public class CQLMessageHandler<M extends Message> extends AbstractMessageHandler {
    public static final int LARGE_MESSAGE_THRESHOLD = 131071;
    private final Envelope.Decoder envelopeDecoder;
    private final Message.Decoder<M> messageDecoder;
    private final FrameEncoder.PayloadAllocator payloadAllocator;
    private final MessageConsumer<M> dispatcher;
    private final ErrorHandler errorHandler;
    private final boolean throwOnOverload;
    private final ProtocolVersion version;
    private final NonBlockingRateLimiter requestRateLimiter;
    long channelPayloadBytesInFlight;
    private int consecutiveMessageErrors;
    private static final Logger logger = LoggerFactory.getLogger(CQLMessageHandler.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.SECONDS);
    public static final TimeUnit RATE_LIMITER_DELAY_UNIT = TimeUnit.NANOSECONDS;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/transport/CQLMessageHandler$ErrorHandler.class */
    public interface ErrorHandler {
        void accept(Throwable th);
    }

    /* loaded from: input_file:org/apache/cassandra/transport/CQLMessageHandler$LargeMessage.class */
    private class LargeMessage extends AbstractMessageHandler.LargeMessage<Envelope.Header> {
        private static final long EXPIRES_AT = Long.MAX_VALUE;
        private ClientResourceLimits.Overload overload;
        private ClientResourceLimits.Overload backpressure;

        private LargeMessage(Envelope.Header header) {
            super(CQLMessageHandler.envelopeSize(header), header, Long.MAX_VALUE, false);
            this.overload = ClientResourceLimits.Overload.NONE;
            this.backpressure = ClientResourceLimits.Overload.NONE;
        }

        /* JADX WARN: Multi-variable type inference failed */
        private Envelope assembleFrame() {
            ByteBuf wrappedBuffer = Unpooled.wrappedBuffer((ByteBuffer[]) this.buffers.stream().map((v0) -> {
                return v0.get();
            }).toArray(i -> {
                return new ByteBuffer[i];
            }));
            wrappedBuffer.readerIndex(9);
            wrappedBuffer.retain();
            return new Envelope((Envelope.Header) this.header, wrappedBuffer);
        }

        private void markOverloaded(ClientResourceLimits.Overload overload) {
            this.overload = overload;
        }

        private void markBackpressure(ClientResourceLimits.Overload overload) {
            this.backpressure = overload;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.cassandra.net.AbstractMessageHandler.LargeMessage
        protected void onComplete() {
            if (this.overload != ClientResourceLimits.Overload.NONE) {
                CQLMessageHandler.this.handleErrorAndRelease(CQLMessageHandler.this.buildOverloadedException(CQLMessageHandler.this.endpointReserveCapacity, CQLMessageHandler.this.globalReserveCapacity, this.overload), (Envelope.Header) this.header);
            } else {
                if (this.isCorrupt) {
                    return;
                }
                CQLMessageHandler.this.processRequest(assembleFrame(), this.backpressure);
            }
        }

        @Override // org.apache.cassandra.net.AbstractMessageHandler.LargeMessage
        protected void abort() {
            if (this.isCorrupt) {
                return;
            }
            releaseBuffersAndCapacity();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/transport/CQLMessageHandler$MessageConsumer.class */
    public interface MessageConsumer<M extends Message> {
        void accept(Channel channel, M m, Dispatcher.FlushItemConverter flushItemConverter, ClientResourceLimits.Overload overload);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CQLMessageHandler(Channel channel, ProtocolVersion protocolVersion, FrameDecoder frameDecoder, Envelope.Decoder decoder, Message.Decoder<M> decoder2, MessageConsumer<M> messageConsumer, FrameEncoder.PayloadAllocator payloadAllocator, int i, ClientResourceLimits.ResourceProvider resourceProvider, AbstractMessageHandler.OnHandlerClosed onHandlerClosed, ErrorHandler errorHandler, boolean z) {
        super(frameDecoder, channel, LARGE_MESSAGE_THRESHOLD, i, resourceProvider.endpointLimit(), resourceProvider.globalLimit(), resourceProvider.endpointWaitQueue(), resourceProvider.globalWaitQueue(), onHandlerClosed);
        this.consecutiveMessageErrors = 0;
        this.envelopeDecoder = decoder;
        this.messageDecoder = decoder2;
        this.payloadAllocator = payloadAllocator;
        this.dispatcher = messageConsumer;
        this.errorHandler = errorHandler;
        this.throwOnOverload = z;
        this.version = protocolVersion;
        this.requestRateLimiter = resourceProvider.requestRateLimiter();
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler, org.apache.cassandra.net.FrameDecoder.FrameProcessor
    public boolean process(FrameDecoder.Frame frame) throws IOException {
        this.consecutiveMessageErrors = 0;
        return super.process(frame);
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler
    protected boolean processOneContainedMessage(ShareableBytes shareableBytes, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) {
        ByteBuffer byteBuffer = shareableBytes.get();
        Envelope.Decoder.HeaderExtractionResult extractHeader = this.envelopeDecoder.extractHeader(byteBuffer);
        if (!extractHeader.isSuccess()) {
            return handleProtocolException(extractHeader.error(), byteBuffer, extractHeader.streamId(), extractHeader.bodyLength());
        }
        Envelope.Header header = extractHeader.header();
        if (header.version != this.version) {
            return handleProtocolException(new ProtocolException(String.format("Invalid message version. Got %s but previousmessages on this connection had version %s", header.version, this.version)), byteBuffer, header.streamId, header.bodySizeInBytes);
        }
        int checkedCast = Ints.checkedCast(header.bodySizeInBytes);
        if (!this.throwOnOverload) {
            ClientResourceLimits.Overload overload = ClientResourceLimits.Overload.NONE;
            if (!acquireCapacityAndQueueOnFailure(header, limit, limit2)) {
                if (processRequestAndUpdateMetrics(shareableBytes, header, checkedCast, ClientResourceLimits.Overload.BYTES_IN_FLIGHT) && this.decoder.isActive()) {
                    ClientMetrics.instance.pauseConnection();
                }
                overload = ClientResourceLimits.Overload.BYTES_IN_FLIGHT;
            }
            if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled()) {
                long reserveAndGetDelay = this.requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
                if (overload == ClientResourceLimits.Overload.NONE && reserveAndGetDelay > 0) {
                    if (processRequestAndUpdateMetrics(shareableBytes, header, checkedCast, ClientResourceLimits.Overload.REQUESTS)) {
                        if (this.decoder.isActive()) {
                            ClientMetrics.instance.pauseConnection();
                        }
                        scheduleConnectionWakeupTask(reserveAndGetDelay, RATE_LIMITER_DELAY_UNIT);
                    }
                    overload = ClientResourceLimits.Overload.REQUESTS;
                }
            }
            if (overload != ClientResourceLimits.Overload.NONE) {
                return false;
            }
        } else {
            if (!acquireCapacity(header, limit, limit2)) {
                discardAndThrow(limit, limit2, byteBuffer, header, checkedCast, ClientResourceLimits.Overload.BYTES_IN_FLIGHT);
                return true;
            }
            if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && !this.requestRateLimiter.tryReserve()) {
                release(header);
                discardAndThrow(limit, limit2, byteBuffer, header, checkedCast, ClientResourceLimits.Overload.REQUESTS);
                return true;
            }
        }
        return processRequestAndUpdateMetrics(shareableBytes, header, checkedCast, ClientResourceLimits.Overload.NONE);
    }

    private boolean processRequestAndUpdateMetrics(ShareableBytes shareableBytes, Envelope.Header header, int i, ClientResourceLimits.Overload overload) {
        this.channelPayloadBytesInFlight += i;
        incrementReceivedMessageMetrics(i);
        return processRequest(composeRequest(header, shareableBytes), overload);
    }

    private void discardAndThrow(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, ByteBuffer byteBuffer, Envelope.Header header, int i, ClientResourceLimits.Overload overload) {
        ClientMetrics.instance.markRequestDiscarded();
        logOverload(limit, limit2, header, i);
        handleError(buildOverloadedException(limit, limit2, overload), header);
        incrementReceivedMessageMetrics(i);
        byteBuffer.position(byteBuffer.position() + 9 + i);
    }

    private OverloadedException buildOverloadedException(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, ClientResourceLimits.Overload overload) {
        return overload == ClientResourceLimits.Overload.REQUESTS ? new OverloadedException(String.format("Request breached global limit of %d requests/second. Server is currently in an overloaded state and cannot accept more requests.", Integer.valueOf(this.requestRateLimiter.getRate()))) : new OverloadedException(String.format("Request breached limit on bytes in flight. (Endpoint: %d/%d bytes, Global: %d/%d bytes.) Server is currently in an overloaded state and cannot accept more requests.", Long.valueOf(limit.using()), Long.valueOf(limit.limit()), Long.valueOf(limit2.using()), Long.valueOf(limit2.limit())));
    }

    private void logOverload(ResourceLimits.Limit limit, ResourceLimits.Limit limit2, Envelope.Header header, int i) {
        logger.trace("Discarded request of size {} with {} bytes in flight on channel. Using {}/{} bytes of endpoint limit and {}/{} bytes of global limit. Global rate limiter: {} Header: {}", new Object[]{Integer.valueOf(i), Long.valueOf(this.channelPayloadBytesInFlight), Long.valueOf(limit.using()), Long.valueOf(limit.limit()), Long.valueOf(limit2.using()), Long.valueOf(limit2.limit()), this.requestRateLimiter, header});
    }

    private boolean handleProtocolException(ProtocolException protocolException, ByteBuffer byteBuffer, int i, long j) {
        if (j >= 0) {
            int i2 = this.consecutiveMessageErrors + 1;
            this.consecutiveMessageErrors = i2;
            if (i2 <= DatabaseDescriptor.getConsecutiveMessageErrorsThreshold()) {
                handleError(protocolException, i);
                byteBuffer.position(Math.min(byteBuffer.limit(), byteBuffer.position() + 9 + Ints.checkedCast(j)));
                return true;
            }
        }
        if (!protocolException.isFatal()) {
            protocolException = ProtocolException.toFatalException(protocolException);
        }
        handleError(protocolException, i);
        return false;
    }

    private void incrementReceivedMessageMetrics(int i) {
        this.receivedCount++;
        this.receivedBytes += i + 9;
        ClientMessageSizeMetrics.bytesReceived.inc(i + 9);
        ClientMessageSizeMetrics.bytesReceivedPerRequest.update(i + 9);
    }

    private Envelope composeRequest(Envelope.Header header, ShareableBytes shareableBytes) {
        ByteBuffer byteBuffer = shareableBytes.get();
        int position = byteBuffer.position() + 9 + Ints.checkedCast(header.bodySizeInBytes);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(byteBuffer.slice());
        wrappedBuffer.readerIndex(9);
        wrappedBuffer.retain();
        byteBuffer.position(position);
        return new Envelope(header, wrappedBuffer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean processRequest(Envelope envelope) {
        return processRequest(envelope, ClientResourceLimits.Overload.NONE);
    }

    protected boolean processRequest(Envelope envelope, ClientResourceLimits.Overload overload) {
        M m = null;
        try {
            m = this.messageDecoder.decode(this.channel, envelope);
            this.dispatcher.accept(this.channel, m, this::toFlushItem, overload);
            this.consecutiveMessageErrors = 0;
            return true;
        } catch (Exception e) {
            e = e;
            if (m != null) {
                envelope.release();
            }
            boolean z = true;
            int i = this.consecutiveMessageErrors + 1;
            this.consecutiveMessageErrors = i;
            if (i > DatabaseDescriptor.getConsecutiveMessageErrorsThreshold()) {
                if (!(e instanceof ProtocolException)) {
                    logger.debug("Error decoding CQL message", e);
                    e = new ProtocolException("Error encountered decoding CQL message: " + e.getMessage());
                }
                e = ProtocolException.toFatalException((ProtocolException) e);
                z = false;
            }
            handleErrorAndRelease(e, envelope.header);
            return z;
        }
    }

    private void handleErrorAndRelease(Throwable th, Envelope.Header header) {
        release(header);
        handleError(th, header);
    }

    private void handleError(Throwable th, Envelope.Header header) {
        handleError(th, header.streamId);
    }

    private void handleError(Throwable th, int i) {
        this.errorHandler.accept(ErrorMessage.wrap(th, i));
    }

    private void handleError(Throwable th) {
        this.errorHandler.accept(th);
    }

    private Flusher.FlushItem.Framed toFlushItem(Channel channel, Message.Request request, Message.Response response) {
        Envelope encode = response.encode(request.getSource().header.version);
        int envelopeSize = envelopeSize(encode.header);
        ClientMessageSizeMetrics.bytesSent.inc(envelopeSize);
        ClientMessageSizeMetrics.bytesSentPerResponse.update(envelopeSize);
        return new Flusher.FlushItem.Framed(channel, encode, request.getSource(), this.payloadAllocator, this::release);
    }

    private void release(Flusher.FlushItem<Envelope> flushItem) {
        release(flushItem.request.header);
        flushItem.request.release();
        flushItem.response.release();
    }

    private void release(Envelope.Header header) {
        releaseCapacity(Ints.checkedCast(header.bodySizeInBytes));
        this.channelPayloadBytesInFlight -= header.bodySizeInBytes;
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler
    protected boolean processFirstFrameOfLargeMessage(FrameDecoder.IntactFrame intactFrame, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) throws IOException {
        try {
            Envelope.Decoder.HeaderExtractionResult extractHeader = this.envelopeDecoder.extractHeader(intactFrame.contents.get());
            if (!extractHeader.isSuccess()) {
                handleError(ProtocolException.toFatalException(extractHeader.error()));
                return false;
            }
            Envelope.Header header = extractHeader.header();
            int checkedCast = Ints.checkedCast(header.bodySizeInBytes);
            this.receivedBytes += r0.remaining();
            LargeMessage largeMessage = new LargeMessage(header);
            if (acquireCapacity(header, limit, limit2)) {
                if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled()) {
                    if (!this.throwOnOverload) {
                        long reserveAndGetDelay = this.requestRateLimiter.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT);
                        if (reserveAndGetDelay > 0) {
                            this.largeMessage = largeMessage;
                            largeMessage.markBackpressure(ClientResourceLimits.Overload.REQUESTS);
                            largeMessage.supply(intactFrame);
                            if (this.decoder.isActive()) {
                                ClientMetrics.instance.pauseConnection();
                            }
                            scheduleConnectionWakeupTask(reserveAndGetDelay, RATE_LIMITER_DELAY_UNIT);
                            return false;
                        }
                    } else if (!this.requestRateLimiter.tryReserve()) {
                        ClientMetrics.instance.markRequestDiscarded();
                        logOverload(limit, limit2, header, checkedCast);
                        largeMessage.markOverloaded(ClientResourceLimits.Overload.REQUESTS);
                        this.largeMessage = largeMessage;
                        largeMessage.supply(intactFrame);
                        return true;
                    }
                }
            } else if (this.throwOnOverload) {
                ClientMetrics.instance.markRequestDiscarded();
                logOverload(limit, limit2, header, checkedCast);
                largeMessage.markOverloaded(ClientResourceLimits.Overload.BYTES_IN_FLIGHT);
            }
            this.largeMessage = largeMessage;
            largeMessage.supply(intactFrame);
            return true;
        } catch (Exception e) {
            throw new IOException("Error decoding CQL Message", e);
        }
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler
    protected String id() {
        return this.channel.id().asShortText();
    }

    private void scheduleConnectionWakeupTask(long j, TimeUnit timeUnit) {
        this.channel.eventLoop().schedule(() -> {
            try {
                if (!this.decoder.isActive()) {
                    this.decoder.reactivate();
                    if (this.decoder.isActive()) {
                        ClientMetrics.instance.unpauseConnection();
                    }
                }
            } catch (Throwable th) {
                fatalExceptionCaught(th);
            }
        }, j, timeUnit);
    }

    private boolean acquireCapacityAndQueueOnFailure(Envelope.Header header, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) {
        return acquireCapacity(limit, limit2, Ints.checkedCast(header.bodySizeInBytes), MonotonicClock.Global.approxTime.now(), Murmur3Partitioner.MAXIMUM);
    }

    private boolean acquireCapacity(Envelope.Header header, ResourceLimits.Limit limit, ResourceLimits.Limit limit2) {
        return acquireCapacity(limit, limit2, Ints.checkedCast(header.bodySizeInBytes)) == ResourceLimits.Outcome.SUCCESS;
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler
    protected void processCorruptFrame(FrameDecoder.CorruptFrame corruptFrame) {
        this.corruptFramesUnrecovered++;
        Object[] objArr = new Object[4];
        objArr[0] = id();
        objArr[1] = corruptFrame.isRecoverable() ? "body" : "header";
        objArr[2] = Integer.valueOf(corruptFrame.readCRC);
        objArr[3] = Integer.valueOf(corruptFrame.computedCRC);
        String format = String.format("%s invalid, unrecoverable CRC mismatch detected in frame %s. Read %d, Computed %d", objArr);
        noSpamLogger.error(format, new Object[0]);
        if (!corruptFrame.isSelfContained) {
            if (null == this.largeMessage) {
                this.receivedBytes += corruptFrame.frameSize;
            } else {
                processSubsequentFrameOfLargeMessage(corruptFrame);
            }
        }
        handleError(ProtocolException.toFatalException(new ProtocolException(format)));
    }

    @Override // org.apache.cassandra.net.AbstractMessageHandler
    protected void fatalExceptionCaught(Throwable th) {
        this.decoder.discard();
        logger.warn("Unrecoverable exception caught in CQL message processing pipeline, closing the connection", th);
        this.channel.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static int envelopeSize(Envelope.Header header) {
        return 9 + Ints.checkedCast(header.bodySizeInBytes);
    }
}
