package org.apache.cassandra.transport;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.concurrent.SharedExecutorPool;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.Flusher;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/transport/Dispatcher.class */
public class Dispatcher {
    private final boolean useLegacyFlusher;
    private static final Logger logger = LoggerFactory.getLogger(Dispatcher.class);
    private static final LocalAwareExecutorPlus requestExecutor = SharedExecutorPool.SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(), DatabaseDescriptor::setNativeTransportMaxThreads, "transport", "Native-Transport-Requests");
    private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap();
    static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/transport/Dispatcher$FlushItemConverter.class */
    public interface FlushItemConverter {
        Flusher.FlushItem<?> toFlushItem(Channel channel, Message.Request request, Message.Response response);
    }

    public Dispatcher(boolean z) {
        this.useLegacyFlusher = z;
    }

    public void dispatch(Channel channel, Message.Request request, FlushItemConverter flushItemConverter, ClientResourceLimits.Overload overload) {
        requestExecutor.submit(() -> {
            processRequest(channel, request, flushItemConverter, overload);
        });
        ClientMetrics.instance.markRequestDispatched();
    }

    private static Message.Response processRequest(ServerConnection serverConnection, Message.Request request, ClientResourceLimits.Overload overload) {
        long nanoTime = Clock.Global.nanoTime();
        if (serverConnection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4)) {
            ClientWarn.instance.captureWarnings();
        }
        if (request.isTrackable()) {
            CoordinatorWarnings.init();
        }
        if (overload == ClientResourceLimits.Overload.REQUESTS) {
            String format = String.format("Request breached global limit of %d requests/second and triggered backpressure.", Integer.valueOf(ClientResourceLimits.getNativeTransportMaxRequestsPerSecond()));
            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1L, TimeUnit.MINUTES, format, new Object[0]);
            ClientWarn.instance.warn(format);
        } else if (overload == ClientResourceLimits.Overload.BYTES_IN_FLIGHT) {
            String format2 = String.format("Request breached limit(s) on bytes in flight (Endpoint: %d, Global: %d) and triggered backpressure.", Long.valueOf(ClientResourceLimits.getEndpointLimit()), Long.valueOf(ClientResourceLimits.getGlobalLimit()));
            NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1L, TimeUnit.MINUTES, format2, new Object[0]);
            ClientWarn.instance.warn(format2);
        }
        QueryState validateNewMessage = serverConnection.validateNewMessage(request.type, serverConnection.getVersion());
        Message.logger.trace("Received: {}, v={}", request, serverConnection.getVersion());
        serverConnection.requests.inc();
        Message.Response execute = request.execute(validateNewMessage, nanoTime);
        if (request.isTrackable()) {
            CoordinatorWarnings.done();
        }
        execute.setStreamId(request.getStreamId());
        execute.setWarnings(ClientWarn.instance.getWarnings());
        execute.attach(serverConnection);
        serverConnection.applyStateTransition(request.type, execute.type);
        return execute;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Message.Response processRequest(Channel channel, Message.Request request, ClientResourceLimits.Overload overload) {
        try {
            try {
                Message.Response processRequest = processRequest((ServerConnection) request.connection(), request, overload);
                CoordinatorWarnings.reset();
                ClientWarn.instance.resetWarnings();
                return processRequest;
            } catch (Throwable th) {
                JVMStabilityInspector.inspectThrowable(th);
                if (request.isTrackable()) {
                    CoordinatorWarnings.done();
                }
                ErrorMessage fromException = ErrorMessage.fromException(th, ExceptionHandlers.getUnexpectedExceptionHandler(channel, true));
                fromException.setStreamId(request.getStreamId());
                fromException.setWarnings(ClientWarn.instance.getWarnings());
                CoordinatorWarnings.reset();
                ClientWarn.instance.resetWarnings();
                return fromException;
            }
        } catch (Throwable th2) {
            CoordinatorWarnings.reset();
            ClientWarn.instance.resetWarnings();
            throw th2;
        }
    }

    void processRequest(Channel channel, Message.Request request, FlushItemConverter flushItemConverter, ClientResourceLimits.Overload overload) {
        Message.Response processRequest = processRequest(channel, request, overload);
        Flusher.FlushItem<?> flushItem = flushItemConverter.toFlushItem(channel, request, processRequest);
        Message.logger.trace("Responding: {}, v={}", processRequest, request.connection().getVersion());
        flush(flushItem);
    }

    private void flush(Flusher.FlushItem<?> flushItem) {
        EventLoop eventLoop = flushItem.channel.eventLoop();
        Flusher flusher = flusherLookup.get(eventLoop);
        if (flusher == null) {
            Flusher legacy = this.useLegacyFlusher ? Flusher.legacy(eventLoop) : Flusher.immediate(eventLoop);
            flusher = legacy;
            Flusher putIfAbsent = flusherLookup.putIfAbsent(eventLoop, legacy);
            if (putIfAbsent != null) {
                flusher = putIfAbsent;
            }
        }
        flusher.enqueue(flushItem);
        flusher.start();
    }

    public static void shutdown() {
        if (requestExecutor != null) {
            requestExecutor.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Consumer<EventMessage> eventDispatcher(Channel channel, ProtocolVersion protocolVersion, FrameEncoder.PayloadAllocator payloadAllocator) {
        return eventMessage -> {
            flush(new Flusher.FlushItem.Framed(channel, eventMessage.encode(protocolVersion), null, payloadAllocator, flushItem -> {
                ((Envelope) flushItem.response).release();
            }));
        };
    }
}
