package org.apache.arrow.flight;

import com.google.common.collect.ImmutableList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.arrow.flight.BackpressureStrategy;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.perf.PerformanceTestServer;
import org.apache.arrow.flight.perf.TestPerf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;

/* loaded from: input_file:org/apache/arrow/flight/TestBackPressure.class */
public class TestBackPressure {
    private static final int BATCH_SIZE = 4095;

    /* loaded from: input_file:org/apache/arrow/flight/TestBackPressure$PollingBackpressureStrategy.class */
    private static class PollingBackpressureStrategy implements SleepTimeRecordingBackpressureStrategy {
        private final AtomicLong sleepTime;
        private FlightProducer.ServerStreamListener listener;

        private PollingBackpressureStrategy() {
            this.sleepTime = new AtomicLong(0L);
        }

        @Override // org.apache.arrow.flight.TestBackPressure.SleepTimeRecordingBackpressureStrategy
        public long getSleepTime() {
            return this.sleepTime.get();
        }

        public void register(FlightProducer.ServerStreamListener serverStreamListener) {
            this.listener = serverStreamListener;
        }

        public BackpressureStrategy.WaitResult waitForListener(long j) {
            while (!this.listener.isReady()) {
                try {
                    Thread.sleep(1L);
                    this.sleepTime.addAndGet(1L);
                } catch (InterruptedException e) {
                }
            }
            return BackpressureStrategy.WaitResult.READY;
        }
    }

    /* loaded from: input_file:org/apache/arrow/flight/TestBackPressure$RecordingCallbackBackpressureStrategy.class */
    private static class RecordingCallbackBackpressureStrategy extends BackpressureStrategy.CallbackBackpressureStrategy implements SleepTimeRecordingBackpressureStrategy {
        private final AtomicLong sleepTime;

        private RecordingCallbackBackpressureStrategy() {
            this.sleepTime = new AtomicLong(0L);
        }

        @Override // org.apache.arrow.flight.TestBackPressure.SleepTimeRecordingBackpressureStrategy
        public long getSleepTime() {
            return this.sleepTime.get();
        }

        public BackpressureStrategy.WaitResult waitForListener(long j) {
            long currentTimeMillis = System.currentTimeMillis();
            BackpressureStrategy.WaitResult waitForListener = super.waitForListener(j);
            this.sleepTime.addAndGet(System.currentTimeMillis() - currentTimeMillis);
            return waitForListener;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/arrow/flight/TestBackPressure$SleepTimeRecordingBackpressureStrategy.class */
    public interface SleepTimeRecordingBackpressureStrategy extends BackpressureStrategy {
        long getSleepTime();
    }

    @Disabled
    @org.junit.jupiter.api.Test
    public void ensureIndependentSteams() throws Exception {
        ensureIndependentSteams(bufferAllocator -> {
            return location -> {
                return new PerformanceTestServer(bufferAllocator, location);
            };
        });
    }

    @Disabled
    @org.junit.jupiter.api.Test
    public void ensureIndependentSteamsWithCallbacks() throws Exception {
        ensureIndependentSteams(bufferAllocator -> {
            return location -> {
                return new PerformanceTestServer(bufferAllocator, location, new BackpressureStrategy.CallbackBackpressureStrategy(), true);
            };
        });
    }

    @Disabled
    @org.junit.jupiter.api.Test
    public void ensureWaitUntilProceed() throws Exception {
        ensureWaitUntilProceed(new PollingBackpressureStrategy(), false);
    }

    @Disabled
    @org.junit.jupiter.api.Test
    public void ensureWaitUntilProceedWithCallbacks() throws Exception {
        ensureWaitUntilProceed(new RecordingCallbackBackpressureStrategy(), true);
    }

    private static void ensureIndependentSteams(Function<BufferAllocator, Function<Location, PerformanceTestServer>> function) throws Exception {
        RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
        try {
            PerformanceTestServer start = function.apply(rootAllocator).apply(Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0)).start();
            try {
                FlightClient build = FlightClient.builder(rootAllocator, start.getLocation()).build();
                try {
                    FlightStream stream = build.getStream(((FlightEndpoint) build.getInfo(TestPerf.getPerfFlightDescriptor(450450L, BATCH_SIZE, 1), new CallOption[0]).getEndpoints().get(0)).getTicket(), new CallOption[0]);
                    try {
                        consume(stream, 10);
                        stream = build.getStream(((FlightEndpoint) build.getInfo(TestPerf.getPerfFlightDescriptor(819000L, BATCH_SIZE, 1), new CallOption[0]).getEndpoints().get(0)).getTicket(), new CallOption[0]);
                        try {
                            consume(stream, 100);
                            consume(stream, 100);
                            consume(stream, 100);
                            consume(stream);
                            consume(stream);
                            if (stream != null) {
                                stream.close();
                            }
                            if (stream != null) {
                                stream.close();
                            }
                            if (build != null) {
                                build.close();
                            }
                            if (start != null) {
                                start.close();
                            }
                            rootAllocator.close();
                        } finally {
                            if (stream != null) {
                                try {
                                    stream.close();
                                } catch (Throwable th) {
                                    th.addSuppressed(th);
                                }
                            }
                        }
                    } catch (Throwable th2) {
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (build != null) {
                        try {
                            build.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            try {
                rootAllocator.close();
            } catch (Throwable th6) {
                th5.addSuppressed(th6);
            }
            throw th5;
        }
    }

    private static void ensureWaitUntilProceed(final SleepTimeRecordingBackpressureStrategy sleepTimeRecordingBackpressureStrategy, final boolean z) throws Exception {
        final RootAllocator rootAllocator = new RootAllocator(Long.MAX_VALUE);
        try {
            NoOpFlightProducer noOpFlightProducer = new NoOpFlightProducer() { // from class: org.apache.arrow.flight.TestBackPressure.1
                public void getStream(FlightProducer.CallContext callContext, Ticket ticket, FlightProducer.ServerStreamListener serverStreamListener) {
                    SleepTimeRecordingBackpressureStrategy.this.register(serverStreamListener);
                    BufferAllocator bufferAllocator = rootAllocator;
                    SleepTimeRecordingBackpressureStrategy sleepTimeRecordingBackpressureStrategy2 = SleepTimeRecordingBackpressureStrategy.this;
                    Runnable runnable = () -> {
                        int i = 0;
                        VectorSchemaRoot create = VectorSchemaRoot.create(new Schema(ImmutableList.of(Field.nullable("a", Types.MinorType.BIGINT.getType()))), bufferAllocator);
                        try {
                            serverStreamListener.start(create);
                            while (true) {
                                sleepTimeRecordingBackpressureStrategy2.waitForListener(0L);
                                if (i > 100) {
                                    break;
                                }
                                create.allocateNew();
                                create.setRowCount(TestBackPressure.BATCH_SIZE);
                                serverStreamListener.putNext();
                                i++;
                            }
                            create.clear();
                            serverStreamListener.completed();
                            if (create != null) {
                                create.close();
                            }
                        } catch (Throwable th) {
                            if (create != null) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    };
                    if (!z) {
                        runnable.run();
                        return;
                    }
                    ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
                    newSingleThreadExecutor.submit(runnable);
                    newSingleThreadExecutor.shutdown();
                }
            };
            BufferAllocator newChildAllocator = rootAllocator.newChildAllocator("server", 0L, Long.MAX_VALUE);
            try {
                FlightServer start = FlightServer.builder(newChildAllocator, Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0), noOpFlightProducer).build().start();
                try {
                    BufferAllocator newChildAllocator2 = rootAllocator.newChildAllocator("client", 0L, Long.MAX_VALUE);
                    try {
                        FlightClient build = FlightClient.builder(newChildAllocator2, start.getLocation()).build();
                        try {
                            FlightStream stream = build.getStream(new Ticket(new byte[1]), new CallOption[0]);
                            try {
                                VectorSchemaRoot root = stream.getRoot();
                                root.clear();
                                Thread.sleep(3000L);
                                while (stream.next()) {
                                    root.clear();
                                }
                                Assertions.assertTrue(sleepTimeRecordingBackpressureStrategy.getSleepTime() > 2000, String.format("Expected a sleep of at least %dms but only slept for %d", 2000L, Long.valueOf(sleepTimeRecordingBackpressureStrategy.getSleepTime())));
                                if (stream != null) {
                                    stream.close();
                                }
                                if (build != null) {
                                    build.close();
                                }
                                if (newChildAllocator2 != null) {
                                    newChildAllocator2.close();
                                }
                                if (start != null) {
                                    start.close();
                                }
                                if (newChildAllocator != null) {
                                    newChildAllocator.close();
                                }
                                rootAllocator.close();
                            } catch (Throwable th) {
                                if (stream != null) {
                                    try {
                                        stream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } catch (Throwable th3) {
                            if (build != null) {
                                try {
                                    build.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        if (newChildAllocator2 != null) {
                            try {
                                newChildAllocator2.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    if (start != null) {
                        try {
                            start.close();
                        } catch (Throwable th8) {
                            th7.addSuppressed(th8);
                        }
                    }
                    throw th7;
                }
            } finally {
            }
        } catch (Throwable th9) {
            try {
                rootAllocator.close();
            } catch (Throwable th10) {
                th9.addSuppressed(th10);
            }
            throw th9;
        }
    }

    private static void consume(FlightStream flightStream) {
        VectorSchemaRoot root = flightStream.getRoot();
        while (flightStream.next()) {
            root.clear();
        }
    }

    private static void consume(FlightStream flightStream, int i) {
        VectorSchemaRoot root = flightStream.getRoot();
        while (i > 0 && flightStream.next()) {
            root.clear();
            i--;
        }
    }
}
