package org.apache.arrow.flight;

import com.google.common.base.Strings;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServerMiddleware;
import org.apache.arrow.flight.auth.AuthConstants;
import org.apache.arrow.flight.auth.ServerAuthHandler;
import org.apache.arrow.flight.auth.ServerAuthWrapper;
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.apache.arrow.flight.grpc.ContextPropagatingExecutorService;
import org.apache.arrow.flight.grpc.RequestContextAdapter;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/arrow/flight/FlightService.class */
public class FlightService extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger logger = LoggerFactory.getLogger(FlightService.class);
    private static final int PENDING_REQUESTS = 5;
    private final BufferAllocator allocator;
    private final FlightProducer producer;
    private final ServerAuthHandler authHandler;
    private final ExecutorService executors;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/arrow/flight/FlightService$CallContext.class */
    public static class CallContext implements FlightProducer.CallContext {
        private final String peerIdentity;
        private final BooleanSupplier isCancelled;

        CallContext(String str, BooleanSupplier booleanSupplier) {
            this.peerIdentity = str;
            this.isCancelled = booleanSupplier;
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public String peerIdentity() {
            return this.peerIdentity;
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public boolean isCancelled() {
            return this.isCancelled.getAsBoolean();
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public <T extends FlightServerMiddleware> T getMiddleware(FlightServerMiddleware.Key<T> key) {
            T t;
            Map map = (Map) ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
            if (map == null || (t = (T) map.get(key)) == null) {
                return null;
            }
            return t;
        }

        @Override // org.apache.arrow.flight.FlightProducer.CallContext
        public Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> getMiddleware() {
            Map<FlightServerMiddleware.Key<?>, FlightServerMiddleware> map = (Map) ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
            return map == null ? Collections.emptyMap() : map;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/arrow/flight/FlightService$ExchangeListener.class */
    public static class ExchangeListener extends GetListener {
        private AutoCloseable resource;
        private boolean closed;
        private Runnable onCancelHandler;

        public ExchangeListener(ServerCallStreamObserver<ArrowMessage> serverCallStreamObserver, Consumer<Throwable> consumer) {
            super(serverCallStreamObserver, consumer);
            this.closed = false;
            this.onCancelHandler = null;
            this.resource = null;
            super.setOnCancelHandler(() -> {
                try {
                    if (this.onCancelHandler != null) {
                        this.onCancelHandler.run();
                    }
                } finally {
                    cleanup();
                }
            });
        }

        private void cleanup() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                AutoCloseables.close(new AutoCloseable[]{this.resource});
            } catch (Exception e) {
                throw CallStatus.INTERNAL.withCause(e).withDescription("Server internal error cleaning up resources").toRuntimeException();
            }
        }

        @Override // org.apache.arrow.flight.FlightService.GetListener, org.apache.arrow.flight.OutboundStreamListenerImpl, org.apache.arrow.flight.OutboundStreamListener
        public void error(Throwable th) {
            try {
                cleanup();
            } finally {
                super.error(th);
            }
        }

        @Override // org.apache.arrow.flight.FlightService.GetListener, org.apache.arrow.flight.OutboundStreamListenerImpl, org.apache.arrow.flight.OutboundStreamListener
        public void completed() {
            try {
                cleanup();
            } finally {
                super.completed();
            }
        }

        @Override // org.apache.arrow.flight.FlightService.GetListener, org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public void setOnCancelHandler(Runnable runnable) {
            this.onCancelHandler = runnable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/arrow/flight/FlightService$GetListener.class */
    public static class GetListener extends OutboundStreamListenerImpl implements FlightProducer.ServerStreamListener {
        private final ServerCallStreamObserver<ArrowMessage> serverCallResponseObserver;
        private final Consumer<Throwable> errorHandler;
        private Runnable onCancelHandler;
        private Runnable onReadyHandler;
        private boolean completed;

        public GetListener(ServerCallStreamObserver<ArrowMessage> serverCallStreamObserver, Consumer<Throwable> consumer) {
            super(null, serverCallStreamObserver);
            this.onCancelHandler = null;
            this.onReadyHandler = null;
            this.errorHandler = consumer;
            this.completed = false;
            this.serverCallResponseObserver = serverCallStreamObserver;
            this.serverCallResponseObserver.setOnCancelHandler(this::onCancel);
            this.serverCallResponseObserver.setOnReadyHandler(this::onReady);
            this.serverCallResponseObserver.disableAutoInboundFlowControl();
        }

        private void onCancel() {
            FlightService.logger.debug("Stream cancelled by client.");
            if (this.onCancelHandler != null) {
                this.onCancelHandler.run();
            }
        }

        private void onReady() {
            if (this.onReadyHandler != null) {
                this.onReadyHandler.run();
            }
        }

        public void setOnCancelHandler(Runnable runnable) {
            this.onCancelHandler = runnable;
        }

        @Override // org.apache.arrow.flight.OutboundStreamListenerImpl, org.apache.arrow.flight.OutboundStreamListener
        public void setOnReadyHandler(Runnable runnable) {
            this.onReadyHandler = runnable;
        }

        @Override // org.apache.arrow.flight.FlightProducer.ServerStreamListener
        public boolean isCancelled() {
            return this.serverCallResponseObserver.isCancelled();
        }

        @Override // org.apache.arrow.flight.OutboundStreamListenerImpl
        protected void waitUntilStreamReady() {
        }

        @Override // org.apache.arrow.flight.OutboundStreamListenerImpl, org.apache.arrow.flight.OutboundStreamListener
        public void error(Throwable th) {
            if (this.completed) {
                this.errorHandler.accept(th);
            } else {
                this.completed = true;
                super.error(th);
            }
        }

        @Override // org.apache.arrow.flight.OutboundStreamListenerImpl, org.apache.arrow.flight.OutboundStreamListener
        public void completed() {
            if (this.completed) {
                this.errorHandler.accept(new IllegalStateException("Tried to complete already-completed call"));
            } else {
                this.completed = true;
                super.completed();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlightService(BufferAllocator bufferAllocator, FlightProducer flightProducer, ServerAuthHandler serverAuthHandler, ExecutorService executorService) {
        this.allocator = bufferAllocator;
        this.producer = flightProducer;
        this.authHandler = serverAuthHandler;
        this.executors = new ContextPropagatingExecutorService(executorService);
    }

    private CallContext makeContext(ServerCallStreamObserver<?> serverCallStreamObserver) {
        RequestContext requestContext = (RequestContext) RequestContextAdapter.REQUEST_CONTEXT_KEY.get();
        String str = null;
        if (requestContext != null) {
            str = requestContext.get(Auth2Constants.PEER_IDENTITY_KEY);
        }
        if (Strings.isNullOrEmpty(str)) {
            str = (String) AuthConstants.PEER_IDENTITY_KEY.get();
        }
        Objects.requireNonNull(serverCallStreamObserver);
        return new CallContext(str, serverCallStreamObserver::isCancelled);
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> streamObserver) {
        return ServerAuthWrapper.wrapHandshake(this.authHandler, streamObserver, this.executors);
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> streamObserver) {
        StreamPipe wrap = StreamPipe.wrap(streamObserver, (v0) -> {
            return v0.toProtocol();
        }, this::handleExceptionWithMiddleware);
        try {
            this.producer.listFlights(makeContext((ServerCallStreamObserver) streamObserver), new Criteria(criteria), wrap);
        } catch (Exception e) {
            wrap.onError(e);
        }
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> streamObserver) {
        ServerCallStreamObserver<?> serverCallStreamObserver = (ServerCallStreamObserver) streamObserver;
        GetListener getListener = new GetListener(serverCallStreamObserver, this::handleExceptionWithMiddleware);
        try {
            this.producer.getStream(makeContext(serverCallStreamObserver), new Ticket(ticket), getListener);
        } catch (Exception e) {
            getListener.error(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void doAction(Flight.Action action, StreamObserver<Flight.Result> streamObserver) {
        StreamPipe wrap = StreamPipe.wrap(streamObserver, (v0) -> {
            return v0.toProtocol();
        }, this::handleExceptionWithMiddleware);
        try {
            this.producer.doAction(makeContext((ServerCallStreamObserver) streamObserver), new Action(action), wrap);
        } catch (Exception e) {
            wrap.onError(e);
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void listActions(Flight.Empty empty, StreamObserver<Flight.ActionType> streamObserver) {
        StreamPipe wrap = StreamPipe.wrap(streamObserver, (v0) -> {
            return v0.toProtocol();
        }, this::handleExceptionWithMiddleware);
        try {
            this.producer.listActions(makeContext((ServerCallStreamObserver) streamObserver), wrap);
        } catch (Exception e) {
            wrap.onError(e);
        }
    }

    public StreamObserver<ArrowMessage> doPutCustom(StreamObserver<Flight.PutResult> streamObserver) {
        ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) streamObserver;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        serverCallStreamObserver.request(1);
        StreamPipe wrap = StreamPipe.wrap(serverCallStreamObserver, (v0) -> {
            return v0.toProtocol();
        }, this::handleExceptionWithMiddleware);
        BufferAllocator bufferAllocator = this.allocator;
        Objects.requireNonNull(serverCallStreamObserver);
        FlightStream flightStream = new FlightStream(bufferAllocator, 5, null, serverCallStreamObserver::request);
        wrap.setAutoCloseable(flightStream);
        StreamObserver<ArrowMessage> asObserver = flightStream.asObserver();
        this.executors.submit(() -> {
            try {
                try {
                    this.producer.acceptPut(makeContext(serverCallStreamObserver), flightStream, wrap).run();
                    wrap.ensureCompleted();
                } catch (Throwable th) {
                    wrap.onError(th);
                    wrap.ensureCompleted();
                }
            } catch (Throwable th2) {
                wrap.ensureCompleted();
                throw th2;
            }
        });
        return asObserver;
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void getFlightInfo(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.FlightInfo> streamObserver) {
        try {
            streamObserver.onNext(this.producer.getFlightInfo(makeContext((ServerCallStreamObserver) streamObserver), new FlightDescriptor(flightDescriptor)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(StatusUtils.toGrpcException(e));
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void pollFlightInfo(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.PollInfo> streamObserver) {
        try {
            streamObserver.onNext(this.producer.pollFlightInfo(makeContext((ServerCallStreamObserver) streamObserver), new FlightDescriptor(flightDescriptor)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(StatusUtils.toGrpcException(e));
        }
    }

    private void handleExceptionWithMiddleware(Throwable th) {
        Map map = (Map) ServerInterceptorAdapter.SERVER_MIDDLEWARE_KEY.get();
        if (map == null || map.isEmpty()) {
            logger.error("Uncaught exception in Flight method body", th);
        } else {
            map.forEach((key, flightServerMiddleware) -> {
                flightServerMiddleware.onCallErrored(th);
            });
        }
    }

    @Override // org.apache.arrow.flight.impl.FlightServiceGrpc.AsyncService
    public void getSchema(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.SchemaResult> streamObserver) {
        try {
            streamObserver.onNext(this.producer.getSchema(makeContext((ServerCallStreamObserver) streamObserver), new FlightDescriptor(flightDescriptor)).toProtocol());
            streamObserver.onCompleted();
        } catch (Exception e) {
            streamObserver.onError(StatusUtils.toGrpcException(e));
        }
    }

    public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage> streamObserver) {
        ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) streamObserver;
        ExchangeListener exchangeListener = new ExchangeListener(serverCallStreamObserver, this::handleExceptionWithMiddleware);
        BufferAllocator bufferAllocator = this.allocator;
        Objects.requireNonNull(serverCallStreamObserver);
        FlightStream flightStream = new FlightStream(bufferAllocator, 5, null, serverCallStreamObserver::request);
        exchangeListener.resource = flightStream;
        serverCallStreamObserver.disableAutoInboundFlowControl();
        serverCallStreamObserver.request(1);
        StreamObserver<ArrowMessage> asObserver = flightStream.asObserver();
        try {
            this.executors.submit(() -> {
                try {
                    this.producer.doExchange(makeContext(serverCallStreamObserver), flightStream, exchangeListener);
                } catch (Exception e) {
                    exchangeListener.error(e);
                }
            });
        } catch (Exception e) {
            exchangeListener.error(e);
        }
        return asObserver;
    }
}
