package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.utils.TimeUUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamCoordinator.class */
public class StreamCoordinator {
    private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class);
    private final boolean connectSequentially;
    private final StreamOperation streamOperation;
    private final int connectionsPerHost;
    private final boolean follower;
    private StreamingChannel.Factory factory;
    private final TimeUUID pendingRepair;
    private final PreviewKind previewKind;
    private final Map<InetSocketAddress, HostStreamingData> peerSessions = new ConcurrentHashMap();
    private Iterator<StreamSession> sessionsToConnect = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/streaming/StreamCoordinator$HostStreamingData.class */
    public class HostStreamingData {
        private final Map<Integer, StreamSession> streamSessions = new HashMap();
        private final Map<Integer, SessionInfo> sessionInfos = new HashMap();
        private int lastReturned = -1;

        private HostStreamingData() {
        }

        public boolean hasActiveSessions() {
            Iterator<StreamSession> it = this.streamSessions.values().iterator();
            while (it.hasNext()) {
                if (!it.next().state().isFinalState()) {
                    return true;
                }
            }
            return false;
        }

        public StreamSession getOrCreateOutboundSession(InetAddressAndPort inetAddressAndPort) {
            if (this.streamSessions.size() >= StreamCoordinator.this.connectionsPerHost) {
                if (this.lastReturned >= this.streamSessions.size() - 1) {
                    this.lastReturned = 0;
                }
                Map<Integer, StreamSession> map = this.streamSessions;
                int i = this.lastReturned;
                this.lastReturned = i + 1;
                return map.get(Integer.valueOf(i));
            }
            StreamSession streamSession = new StreamSession(StreamCoordinator.this.streamOperation, inetAddressAndPort, StreamCoordinator.this.factory, null, MessagingService.current_version, StreamCoordinator.this.isFollower(), this.streamSessions.size(), StreamCoordinator.this.pendingRepair, StreamCoordinator.this.previewKind);
            Map<Integer, StreamSession> map2 = this.streamSessions;
            int i2 = this.lastReturned + 1;
            this.lastReturned = i2;
            map2.put(Integer.valueOf(i2), streamSession);
            this.sessionInfos.put(Integer.valueOf(this.lastReturned), streamSession.getSessionInfo());
            return streamSession;
        }

        public void connectAllStreamSessions() {
            Iterator<StreamSession> it = this.streamSessions.values().iterator();
            while (it.hasNext()) {
                StreamCoordinator.this.startSession(it.next());
            }
        }

        public Collection<StreamSession> getAllStreamSessions() {
            return Collections.unmodifiableCollection(this.streamSessions.values());
        }

        public StreamSession getOrCreateInboundSession(InetAddressAndPort inetAddressAndPort, StreamingChannel streamingChannel, int i, int i2) {
            StreamSession streamSession = this.streamSessions.get(Integer.valueOf(i2));
            if (streamSession == null) {
                streamSession = new StreamSession(StreamCoordinator.this.streamOperation, inetAddressAndPort, StreamCoordinator.this.factory, streamingChannel, i, StreamCoordinator.this.isFollower(), i2, StreamCoordinator.this.pendingRepair, StreamCoordinator.this.previewKind);
                this.streamSessions.put(Integer.valueOf(i2), streamSession);
                this.sessionInfos.put(Integer.valueOf(i2), streamSession.getSessionInfo());
            }
            return streamSession;
        }

        public StreamSession getSessionById(int i) {
            return this.streamSessions.get(Integer.valueOf(i));
        }

        public void updateProgress(ProgressInfo progressInfo) {
            this.sessionInfos.get(Integer.valueOf(progressInfo.sessionIndex)).updateProgress(progressInfo);
        }

        public void addSessionInfo(SessionInfo sessionInfo) {
            this.sessionInfos.put(Integer.valueOf(sessionInfo.sessionIndex), sessionInfo);
        }

        public Collection<SessionInfo> getAllSessionInfo() {
            return this.sessionInfos.values();
        }

        @VisibleForTesting
        public void shutdown() {
            this.streamSessions.values().forEach(streamSession -> {
                streamSession.sessionFailed();
            });
        }
    }

    public StreamCoordinator(StreamOperation streamOperation, int i, StreamingChannel.Factory factory, boolean z, boolean z2, TimeUUID timeUUID, PreviewKind previewKind) {
        this.streamOperation = streamOperation;
        this.connectionsPerHost = i;
        this.factory = factory;
        this.follower = z;
        this.connectSequentially = z2;
        this.pendingRepair = timeUUID;
        this.previewKind = previewKind;
    }

    public void setConnectionFactory(StreamingChannel.Factory factory) {
        this.factory = factory;
    }

    public synchronized boolean hasActiveSessions() {
        Iterator<HostStreamingData> it = this.peerSessions.values().iterator();
        while (it.hasNext()) {
            if (it.next().hasActiveSessions()) {
                return true;
            }
        }
        return false;
    }

    public synchronized Collection<StreamSession> getAllStreamSessions() {
        ArrayList arrayList = new ArrayList();
        Iterator<HostStreamingData> it = this.peerSessions.values().iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next().getAllStreamSessions());
        }
        return arrayList;
    }

    public boolean isFollower() {
        return this.follower;
    }

    public void connect(StreamResultFuture streamResultFuture) {
        if (this.connectSequentially) {
            connectSequentially(streamResultFuture);
        } else {
            connectAllStreamSessions();
        }
    }

    private void connectAllStreamSessions() {
        Iterator<HostStreamingData> it = this.peerSessions.values().iterator();
        while (it.hasNext()) {
            it.next().connectAllStreamSessions();
        }
    }

    private void connectSequentially(StreamResultFuture streamResultFuture) {
        this.sessionsToConnect = getAllStreamSessions().iterator();
        streamResultFuture.addEventListener(new StreamEventHandler() { // from class: org.apache.cassandra.streaming.StreamCoordinator.1
            @Override // org.apache.cassandra.streaming.StreamEventHandler
            public void handleStreamEvent(StreamEvent streamEvent) {
                if (streamEvent.eventType == StreamEvent.Type.STREAM_PREPARED || streamEvent.eventType == StreamEvent.Type.STREAM_COMPLETE) {
                    StreamCoordinator.this.connectNext();
                }
            }

            public void onSuccess(StreamState streamState) {
            }

            public void onFailure(Throwable th) {
            }
        });
        connectNext();
    }

    private void connectNext() {
        if (this.sessionsToConnect == null) {
            return;
        }
        if (!this.sessionsToConnect.hasNext()) {
            logger.debug("Finished connecting all sessions");
            return;
        }
        StreamSession next = this.sessionsToConnect.next();
        if (logger.isDebugEnabled()) {
            logger.debug("Connecting next session {} with {}.", next.planId(), next.peer.toString());
        }
        startSession(next);
    }

    public synchronized Set<InetSocketAddress> getPeers() {
        return new HashSet(this.peerSessions.keySet());
    }

    public synchronized StreamSession getOrCreateOutboundSession(InetAddressAndPort inetAddressAndPort) {
        return getOrCreateHostData(inetAddressAndPort).getOrCreateOutboundSession(inetAddressAndPort);
    }

    public synchronized StreamSession getOrCreateInboundSession(InetAddressAndPort inetAddressAndPort, StreamingChannel streamingChannel, int i, int i2) {
        return getOrCreateHostData(inetAddressAndPort).getOrCreateInboundSession(inetAddressAndPort, streamingChannel, i, i2);
    }

    public StreamSession getSessionById(InetAddressAndPort inetAddressAndPort, int i) {
        return getHostData(inetAddressAndPort).getSessionById(i);
    }

    public synchronized void updateProgress(ProgressInfo progressInfo) {
        getHostData(progressInfo.peer).updateProgress(progressInfo);
    }

    public synchronized void addSessionInfo(SessionInfo sessionInfo) {
        getOrCreateHostData(sessionInfo.peer).addSessionInfo(sessionInfo);
    }

    public synchronized Set<SessionInfo> getAllSessionInfo() {
        HashSet hashSet = new HashSet();
        Iterator<HostStreamingData> it = this.peerSessions.values().iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next().getAllSessionInfo());
        }
        return hashSet;
    }

    public synchronized void transferStreams(InetAddressAndPort inetAddressAndPort, Collection<OutgoingStream> collection) {
        HostStreamingData orCreateHostData = getOrCreateHostData(inetAddressAndPort);
        if (this.connectionsPerHost <= 1) {
            orCreateHostData.getOrCreateOutboundSession(inetAddressAndPort).addTransferStreams(collection);
            return;
        }
        Iterator<Collection<OutgoingStream>> it = bucketStreams(collection).iterator();
        while (it.hasNext()) {
            orCreateHostData.getOrCreateOutboundSession(inetAddressAndPort).addTransferStreams(it.next());
        }
    }

    private List<Collection<OutgoingStream>> bucketStreams(Collection<OutgoingStream> collection) {
        int round = Math.round(collection.size() / Math.min(collection.size(), this.connectionsPerHost));
        int i = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = null;
        for (OutgoingStream outgoingStream : collection) {
            if (i % round == 0) {
                arrayList2 = new ArrayList();
                arrayList.add(arrayList2);
            }
            arrayList2.add(outgoingStream);
            i++;
        }
        return arrayList;
    }

    private HostStreamingData getHostData(InetAddressAndPort inetAddressAndPort) {
        HostStreamingData hostStreamingData = this.peerSessions.get(inetAddressAndPort);
        if (hostStreamingData == null) {
            throw new IllegalArgumentException("Unknown peer requested: " + inetAddressAndPort);
        }
        return hostStreamingData;
    }

    private HostStreamingData getOrCreateHostData(InetSocketAddress inetSocketAddress) {
        HostStreamingData hostStreamingData = this.peerSessions.get(inetSocketAddress);
        if (hostStreamingData == null) {
            hostStreamingData = new HostStreamingData();
            this.peerSessions.put(inetSocketAddress, hostStreamingData);
        }
        return hostStreamingData;
    }

    public TimeUUID getPendingRepair() {
        return this.pendingRepair;
    }

    private void startSession(StreamSession streamSession) {
        streamSession.start();
        logger.info("[Stream #{}, ID#{}] Beginning stream session with {}", new Object[]{streamSession.planId(), Integer.valueOf(streamSession.sessionIndex()), streamSession.peer});
    }
}
