package org.apache.cassandra.tcm.sequences;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.EndpointsByReplica;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.SystemStrategy;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.MovementMap;
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/tcm/sequences/UnbootstrapStreams.class */
public class UnbootstrapStreams implements LeaveStreams {
    private static final Logger logger = LoggerFactory.getLogger(UnbootstrapStreams.class);
    private final AtomicBoolean started = new AtomicBoolean();

    @Override // org.apache.cassandra.tcm.sequences.LeaveStreams
    public LeaveStreams.Kind kind() {
        return LeaveStreams.Kind.UNBOOTSTRAP;
    }

    @Override // org.apache.cassandra.tcm.sequences.LeaveStreams
    public void execute(NodeId nodeId, PlacementDeltas placementDeltas, PlacementDeltas placementDeltas2, PlacementDeltas placementDeltas3) throws ExecutionException, InterruptedException {
        MovementMap movementMap = movementMap(ClusterMetadata.current().directory.endpoint(nodeId), placementDeltas, placementDeltas3);
        movementMap.forEach((replicationParams, endpointsByReplica) -> {
            logger.info("Unbootstrap movements: {}: {}", replicationParams, endpointsByReplica);
        });
        this.started.set(true);
        try {
            unbootstrap(Schema.instance.getNonLocalStrategyKeyspaces(), movementMap);
        } catch (ExecutionException e) {
            logger.error("Error while decommissioning node", e);
            throw e;
        }
    }

    private static MovementMap movementMap(InetAddressAndPort inetAddressAndPort, PlacementDeltas placementDeltas, PlacementDeltas placementDeltas2) {
        MovementMap.Builder builder = MovementMap.builder();
        placementDeltas2.forEach((replicationParams, placementDelta) -> {
            if (SystemStrategy.class.isAssignableFrom(replicationParams.klass)) {
                return;
            }
            Map<Range<Token>, Replica> byRange = placementDelta.writes.removals.get(inetAddressAndPort).byRange();
            EndpointsByReplica.Builder builder2 = new EndpointsByReplica.Builder();
            RangesByEndpoint rangesByEndpoint = placementDeltas.get(replicationParams).writes.additions;
            RangesByEndpoint rangesByEndpoint2 = placementDeltas.get(replicationParams).writes.removals;
            rangesByEndpoint.flattenValues().forEach(replica -> {
                if (rangesByEndpoint2.get(replica.endpoint()).contains(replica.range(), false)) {
                    logger.debug("Streaming transient -> full conversion to {} from {}", replica, byRange.get(replica.range()));
                }
                builder2.put((Replica) byRange.get(replica.range()), replica);
            });
            builder.put(replicationParams, builder2.build());
        });
        return builder.build();
    }

    private static void unbootstrap(Keyspaces keyspaces, MovementMap movementMap) throws ExecutionException, InterruptedException {
        Supplier<Future<StreamState>> prepareUnbootstrapStreaming = prepareUnbootstrapStreaming(keyspaces, movementMap);
        StorageService.instance.repairPaxosForTopologyChange("decommission");
        logger.info("replaying batch log and streaming data to other nodes");
        Future<?> startBatchlogReplay = BatchlogManager.instance.startBatchlogReplay();
        Future<StreamState> future = prepareUnbootstrapStreaming.get();
        logger.debug("waiting for batch log processing.");
        startBatchlogReplay.get();
        logger.info("streaming hints to other nodes");
        Future<?> streamHints = StorageService.instance.streamHints();
        logger.debug("waiting for stream acks.");
        future.get();
        streamHints.get();
        logger.debug("stream acks all received.");
    }

    private static Supplier<Future<StreamState>> prepareUnbootstrapStreaming(Keyspaces keyspaces, MovementMap movementMap) {
        Map map = (Map) keyspaces.stream().collect(Collectors.toMap(keyspaceMetadata -> {
            return keyspaceMetadata.name;
        }, keyspaceMetadata2 -> {
            return movementMap.get(keyspaceMetadata2.params.replication);
        }));
        return () -> {
            return streamRanges(map);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Future<StreamState> streamRanges(Map<String, EndpointsByReplica> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, EndpointsByReplica> entry : map.entrySet()) {
            String key = entry.getKey();
            EndpointsByReplica value = entry.getValue();
            if (!value.isEmpty()) {
                Map<InetAddressAndPort, Set<Range<Token>>> transferredRanges = SystemKeyspace.getTransferredRanges("Unbootstrap", key, ClusterMetadata.current().tokenMap.partitioner());
                RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
                for (Map.Entry<Replica, Replica> entry2 : value.flattenEntries()) {
                    Replica key2 = entry2.getKey();
                    Replica value2 = entry2.getValue();
                    Set<Range<Token>> set = transferredRanges.get(value2.endpoint());
                    if (set == null || !set.contains(key2.range())) {
                        builder.put(value2.endpoint(), value2.decorateSubrange(key2.range()));
                    } else {
                        logger.debug("Skipping transferred range {} of keyspace {}, endpoint {}", new Object[]{key2, key, value2});
                    }
                }
                hashMap.put(key, builder.build());
            }
        }
        StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION);
        streamPlan.listeners(StorageService.instance.streamStateStore(), new StreamEventHandler[0]);
        for (Map.Entry entry3 : hashMap.entrySet()) {
            String str = (String) entry3.getKey();
            for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry4 : ((RangesByEndpoint) entry3.getValue()).asMap().entrySet()) {
                streamPlan.transferRanges(entry4.getKey(), str, entry4.getValue(), new String[0]);
            }
        }
        return streamPlan.execute();
    }

    @Override // org.apache.cassandra.tcm.sequences.LeaveStreams
    public String status() {
        return "streams" + (this.started.get() ? "" : " not") + " started";
    }
}
