package org.apache.cassandra.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Multimap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RangeStreamer;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@VisibleForTesting
/* loaded from: input_file:org/apache/cassandra/service/RangeRelocator.class */
public class RangeRelocator {
    private static final Logger logger;
    private final StreamPlan streamPlan;
    private final InetAddressAndPort localAddress;
    private final TokenMetadata tokenMetaCloneAllSettled;
    private final TokenMetadata tokenMetaClone;
    private final Collection<Token> tokens;
    private final List<String> keyspaceNames;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RangeRelocator(Collection<Token> collection, List<String> list, TokenMetadata tokenMetadata) {
        this.streamPlan = new StreamPlan(StreamOperation.RELOCATION);
        this.localAddress = FBUtilities.getBroadcastAddressAndPort();
        this.tokens = collection;
        this.keyspaceNames = list;
        this.tokenMetaCloneAllSettled = tokenMetadata.cloneAfterAllSettled();
        this.tokenMetaClone = tokenMetadata.cloneOnlyTokenMap();
    }

    @VisibleForTesting
    public RangeRelocator() {
        this.streamPlan = new StreamPlan(StreamOperation.RELOCATION);
        this.localAddress = FBUtilities.getBroadcastAddressAndPort();
        this.tokens = null;
        this.keyspaceNames = null;
        this.tokenMetaCloneAllSettled = null;
        this.tokenMetaClone = null;
    }

    private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint rangesAtEndpoint, AbstractReplicationStrategy abstractReplicationStrategy, String str, TokenMetadata tokenMetadata, TokenMetadata tokenMetadata2) {
        IEndpointSnitch endpointSnitch = DatabaseDescriptor.getEndpointSnitch();
        Objects.requireNonNull(endpointSnitch);
        return RangeStreamer.convertPreferredEndpointsToWorkMap(RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((v1, v2) -> {
            return r0.sortedByProximity(v1, v2);
        }, abstractReplicationStrategy, rangesAtEndpoint, StorageService.useStrictConsistency, tokenMetadata, tokenMetadata2, str, Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance), new RangeStreamer.ExcludeLocalNodeFilter())));
    }

    public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint rangesAtEndpoint, AbstractReplicationStrategy abstractReplicationStrategy, TokenMetadata tokenMetadata, TokenMetadata tokenMetadata2) {
        RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
        Iterator<Replica> it = rangesAtEndpoint.iterator();
        while (it.hasNext()) {
            Replica next = it.next();
            EndpointsForRange calculateNaturalReplicas = abstractReplicationStrategy.calculateNaturalReplicas(next.range().right, tokenMetadata);
            EndpointsForRange calculateNaturalReplicas2 = abstractReplicationStrategy.calculateNaturalReplicas(next.range().right, tokenMetadata2);
            logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", new Object[]{next, calculateNaturalReplicas, calculateNaturalReplicas2});
            Iterator<Replica> it2 = calculateNaturalReplicas2.iterator();
            while (it2.hasNext()) {
                Replica next2 = it2.next();
                Replica replica = calculateNaturalReplicas.byEndpoint().get(next2.endpoint());
                if (!next2.equals(replica)) {
                    if (replica != null) {
                        Set<Range<Token>> singleton = Collections.singleton(next.range());
                        if (replica.isFull() == next2.isFull() || replica.isFull()) {
                            singleton = next.range().subtract(replica.range());
                        }
                        singleton.stream().flatMap(range -> {
                            return range.intersectionWith(next2.range()).stream();
                        }).forEach(range2 -> {
                            builder.put(next2.endpoint(), next2.decorateSubrange(range2));
                        });
                    } else {
                        if (next.isTransient() && next2.isFull()) {
                            throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", next2, next));
                        }
                        Iterator<Range<Token>> it3 = next2.range().intersectionWith(next.range()).iterator();
                        while (it3.hasNext()) {
                            builder.put(next2.endpoint(), next2.decorateSubrange(it3.next()));
                        }
                    }
                }
            }
        }
        return builder.build();
    }

    public void calculateToFromStreams() {
        logger.debug("Current tmd: {}, Updated tmd: {}", this.tokenMetaClone, this.tokenMetaCloneAllSettled);
        for (String str : this.keyspaceNames) {
            AbstractReplicationStrategy replicationStrategy = Keyspace.open(str).getReplicationStrategy();
            logger.info("Calculating ranges to stream and request for keyspace {}", str);
            for (Token token : this.tokens) {
                Collection<Token> tokens = this.tokenMetaClone.getTokens(this.localAddress);
                if (tokens.size() > 1 || tokens.isEmpty()) {
                    throw new AssertionError("Unexpected current tokens: " + tokens);
                }
                Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges = this.tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter()).size() > 1 ? calculateStreamAndFetchRanges(replicationStrategy.getAddressReplicas(this.localAddress), replicationStrategy.getPendingAddressRanges(this.tokenMetaClone, token, this.localAddress)) : Pair.create(RangesAtEndpoint.empty(this.localAddress), RangesAtEndpoint.empty(this.localAddress));
                RangesByEndpoint calculateRangesToStreamWithEndpoints = calculateRangesToStreamWithEndpoints(calculateStreamAndFetchRanges.left, replicationStrategy, this.tokenMetaClone, this.tokenMetaCloneAllSettled);
                logger.info("Endpoint ranges to stream to " + calculateRangesToStreamWithEndpoints);
                for (InetAddressAndPort inetAddressAndPort : calculateRangesToStreamWithEndpoints.keySet()) {
                    logger.debug("Will stream range {} of keyspace {} to endpoint {}", new Object[]{calculateRangesToStreamWithEndpoints.get(inetAddressAndPort), str, inetAddressAndPort});
                    this.streamPlan.transferRanges(inetAddressAndPort, str, calculateRangesToStreamWithEndpoints.get(inetAddressAndPort), new String[0]);
                }
                Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints = calculateRangesToFetchWithPreferredEndpoints(calculateStreamAndFetchRanges.right, replicationStrategy, str, this.tokenMetaClone, this.tokenMetaCloneAllSettled);
                calculateRangesToFetchWithPreferredEndpoints.asMap().forEach((inetAddressAndPort2, collection) -> {
                    RangesAtEndpoint rangesAtEndpoint = (RangesAtEndpoint) collection.stream().filter(fetchReplica -> {
                        return fetchReplica.remote.isFull();
                    }).map(fetchReplica2 -> {
                        return fetchReplica2.local;
                    }).collect(RangesAtEndpoint.collector(this.localAddress));
                    RangesAtEndpoint rangesAtEndpoint2 = (RangesAtEndpoint) collection.stream().filter(fetchReplica3 -> {
                        return fetchReplica3.remote.isTransient();
                    }).map(fetchReplica4 -> {
                        return fetchReplica4.local;
                    }).collect(RangesAtEndpoint.collector(this.localAddress));
                    logger.debug("Will request range {} of keyspace {} from endpoint {}", new Object[]{calculateRangesToFetchWithPreferredEndpoints.get(inetAddressAndPort2), str, inetAddressAndPort2});
                    this.streamPlan.requestRanges(inetAddressAndPort2, str, rangesAtEndpoint, rangesAtEndpoint2);
                });
                logger.debug("Keyspace {}: work map {}.", str, calculateRangesToFetchWithPreferredEndpoints);
            }
        }
    }

    public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint rangesAtEndpoint, RangesAtEndpoint rangesAtEndpoint2) {
        RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(rangesAtEndpoint.endpoint());
        RangesAtEndpoint.Builder builder2 = RangesAtEndpoint.builder(rangesAtEndpoint.endpoint());
        logger.debug("Calculating toStream");
        computeRanges(rangesAtEndpoint, rangesAtEndpoint2, builder);
        logger.debug("Calculating toFetch");
        computeRanges(rangesAtEndpoint2, rangesAtEndpoint, builder2);
        logger.debug("To stream {}", builder);
        logger.debug("To fetch {}", builder2);
        return Pair.create(builder.build(), builder2.build());
    }

    private static void computeRanges(RangesAtEndpoint rangesAtEndpoint, RangesAtEndpoint rangesAtEndpoint2, RangesAtEndpoint.Builder builder) {
        RangesAtEndpoint build;
        Iterator<Replica> it = rangesAtEndpoint.iterator();
        while (it.hasNext()) {
            Replica next = it.next();
            boolean z = false;
            RangesAtEndpoint rangesAtEndpoint3 = null;
            Iterator<Replica> it2 = rangesAtEndpoint2.iterator();
            while (it2.hasNext()) {
                Replica next2 = it2.next();
                logger.debug("Comparing {} and {}", next, next2);
                if (next.intersectsOnRange(next2) && (!next.isFull() || !next2.isTransient())) {
                    if (rangesAtEndpoint3 == null) {
                        build = next.subtractIgnoreTransientStatus(next2.range());
                    } else {
                        RangesAtEndpoint.Builder builder2 = new RangesAtEndpoint.Builder(rangesAtEndpoint3.endpoint());
                        Iterator<Replica> it3 = rangesAtEndpoint3.iterator();
                        while (it3.hasNext()) {
                            builder2.addAll(it3.next().subtractIgnoreTransientStatus(next2.range()));
                        }
                        build = builder2.build();
                    }
                    rangesAtEndpoint3 = build;
                    z = true;
                }
            }
            if (z) {
                builder.addAll(rangesAtEndpoint3);
                logger.debug("    Intersects adding {}", rangesAtEndpoint3);
            } else {
                if (!$assertionsDisabled && rangesAtEndpoint3 != null) {
                    throw new AssertionError();
                }
                logger.debug("    Doesn't intersect adding {}", next);
                builder.add2(next);
            }
        }
    }

    public Future<StreamState> stream() {
        return this.streamPlan.execute();
    }

    public boolean streamsNeeded() {
        return !this.streamPlan.isEmpty();
    }

    static {
        $assertionsDisabled = !RangeRelocator.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(StorageService.class);
    }
}
