package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamResultFuture.class */
public final class StreamResultFuture extends AsyncFuture<StreamState> {
    private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class);
    public final TimeUUID planId;
    public final StreamOperation streamOperation;
    private final StreamCoordinator coordinator;
    private final Collection<StreamEventHandler> eventListeners;
    private final long slowEventsLogTimeoutNanos;

    public StreamResultFuture(TimeUUID timeUUID, StreamOperation streamOperation, StreamCoordinator streamCoordinator) {
        this.eventListeners = new ConcurrentLinkedQueue();
        this.slowEventsLogTimeoutNanos = DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds();
        this.planId = timeUUID;
        this.streamOperation = streamOperation;
        this.coordinator = streamCoordinator;
        if (streamCoordinator.isFollower() || streamCoordinator.hasActiveSessions()) {
            return;
        }
        trySuccess(getCurrentState());
    }

    @VisibleForTesting
    public StreamResultFuture(TimeUUID timeUUID, StreamOperation streamOperation, TimeUUID timeUUID2, PreviewKind previewKind) {
        this(timeUUID, streamOperation, new StreamCoordinator(streamOperation, 0, StreamingChannel.Factory.Global.streamingFactory(), true, false, timeUUID2, previewKind));
    }

    public static StreamResultFuture createInitiator(TimeUUID timeUUID, StreamOperation streamOperation, Collection<StreamEventHandler> collection, StreamCoordinator streamCoordinator) {
        StreamResultFuture createAndRegisterInitiator = createAndRegisterInitiator(timeUUID, streamOperation, streamCoordinator);
        if (collection != null) {
            Iterator<StreamEventHandler> it = collection.iterator();
            while (it.hasNext()) {
                createAndRegisterInitiator.addEventListener(it.next());
            }
        }
        logger.info("[Stream #{}] Executing streaming plan for {}", timeUUID, streamOperation.getDescription());
        Iterator<StreamSession> it2 = streamCoordinator.getAllStreamSessions().iterator();
        while (it2.hasNext()) {
            it2.next().init(createAndRegisterInitiator);
        }
        streamCoordinator.connect(createAndRegisterInitiator);
        return createAndRegisterInitiator;
    }

    public static synchronized StreamResultFuture createFollower(int i, TimeUUID timeUUID, StreamOperation streamOperation, InetAddressAndPort inetAddressAndPort, StreamingChannel streamingChannel, int i2, TimeUUID timeUUID2, PreviewKind previewKind) {
        StreamResultFuture receivingStream = StreamManager.instance.getReceivingStream(timeUUID);
        if (receivingStream == null) {
            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} {}", new Object[]{timeUUID, Integer.valueOf(i), streamOperation.getDescription(), inetAddressAndPort, streamingChannel.description()});
            receivingStream = new StreamResultFuture(timeUUID, streamOperation, timeUUID2, previewKind);
            StreamManager.instance.registerFollower(receivingStream);
        }
        receivingStream.initInbound(inetAddressAndPort, streamingChannel, i2, i);
        logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from {} {}", new Object[]{timeUUID, Integer.valueOf(i), streamOperation.getDescription(), inetAddressAndPort, streamingChannel.description()});
        return receivingStream;
    }

    private static StreamResultFuture createAndRegisterInitiator(TimeUUID timeUUID, StreamOperation streamOperation, StreamCoordinator streamCoordinator) {
        StreamResultFuture streamResultFuture = new StreamResultFuture(timeUUID, streamOperation, streamCoordinator);
        StreamManager.instance.registerInitiator(streamResultFuture);
        return streamResultFuture;
    }

    public StreamCoordinator getCoordinator() {
        return this.coordinator;
    }

    private void initInbound(InetAddressAndPort inetAddressAndPort, StreamingChannel streamingChannel, int i, int i2) {
        this.coordinator.getOrCreateInboundSession(inetAddressAndPort, streamingChannel, i, i2).init(this);
    }

    public void addEventListener(StreamEventHandler streamEventHandler) {
        addCallback((FutureCallback) streamEventHandler);
        this.eventListeners.add(streamEventHandler);
    }

    public StreamState getCurrentState() {
        return new StreamState(this.planId, this.streamOperation, this.coordinator.getAllSessionInfo());
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return this.planId.equals(((StreamResultFuture) obj).planId);
    }

    public int hashCode() {
        return this.planId.hashCode();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleSessionPrepared(StreamSession streamSession, StreamSession.PrepareDirection prepareDirection) {
        SessionInfo sessionInfo = streamSession.getSessionInfo();
        logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({}), sending {} files({})", new Object[]{streamSession.planId(), Integer.valueOf(streamSession.sessionIndex()), Long.valueOf(sessionInfo.getTotalFilesToReceive()), FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()), Long.valueOf(sessionInfo.getTotalFilesToSend()), FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend())});
        StreamEvent.SessionPreparedEvent sessionPreparedEvent = new StreamEvent.SessionPreparedEvent(this.planId, sessionInfo, prepareDirection);
        this.coordinator.addSessionInfo(sessionInfo);
        fireStreamEvent(sessionPreparedEvent);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleSessionComplete(StreamSession streamSession) {
        logger.info("[Stream #{}] Session with {} is {}", new Object[]{streamSession.planId(), streamSession.peer, streamSession.state().name().toLowerCase()});
        fireStreamEvent(new StreamEvent.SessionCompleteEvent(streamSession));
        this.coordinator.addSessionInfo(streamSession.getSessionInfo());
        maybeComplete();
    }

    public void handleProgress(ProgressInfo progressInfo) {
        this.coordinator.updateProgress(progressInfo);
        fireStreamEvent(new StreamEvent.ProgressEvent(this.planId, progressInfo));
    }

    synchronized void fireStreamEvent(StreamEvent streamEvent) {
        long nanoTime = Clock.Global.nanoTime();
        Iterator<StreamEventHandler> it = this.eventListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().handleStreamEvent(streamEvent);
            } catch (Throwable th) {
                logger.warn("Unexpected exception in listern while calling handleStreamEvent", th);
            }
        }
        long nanoTime2 = Clock.Global.nanoTime() - nanoTime;
        if (nanoTime2 > this.slowEventsLogTimeoutNanos) {
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}", (Supplier<Object[]>) () -> {
                return new Object[]{Duration.ofNanos(this.slowEventsLogTimeoutNanos), Duration.ofNanos(nanoTime2)};
            });
        }
    }

    private synchronized void maybeComplete() {
        if (finishedAllSessions()) {
            StreamState currentState = getCurrentState();
            if (!currentState.hasFailedSession()) {
                if (currentState.hasAbortedSession()) {
                    logger.info("[Stream #{}] Stream aborted", this.planId);
                    trySuccess(currentState);
                    return;
                } else {
                    logger.info("[Stream #{}] All sessions completed", this.planId);
                    trySuccess(currentState);
                    return;
                }
            }
            StringBuilder sb = new StringBuilder();
            sb.append("Stream failed: ");
            for (SessionInfo sessionInfo : currentState.sessions()) {
                if (sessionInfo.isFailed()) {
                    sb.append("\nSession peer ").append(sessionInfo.peer).append(' ').append(sessionInfo.failureReason);
                }
            }
            String sb2 = sb.toString();
            logger.warn("[Stream #{}] {}", this.planId, sb2);
            tryFailure(new StreamException(currentState, sb2));
        }
    }

    public StreamSession getSession(InetAddressAndPort inetAddressAndPort, int i) {
        return this.coordinator.getSessionById(inetAddressAndPort, i);
    }

    private boolean finishedAllSessions() {
        return this.coordinator.getAllSessionInfo().stream().allMatch(sessionInfo -> {
            return sessionInfo.state.isFinalState();
        });
    }
}
