package org.apache.cassandra.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.asymmetric.DifferenceHolder;
import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.repair.state.JobState;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.service.paxos.Paxos;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanup;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/repair/RepairJob.class */
public class RepairJob extends AsyncFuture<RepairResult> implements Runnable {
    private static final Logger logger;
    private final SharedContext ctx;
    public final JobState state;
    private final RepairJobDesc desc;
    private final RepairSession session;
    private final RepairParallelism parallelismDegree;
    private final Executor taskExecutor;

    @VisibleForTesting
    final List<ValidationTask> validationTasks = new CopyOnWriteArrayList();

    @VisibleForTesting
    final List<SyncTask> syncTasks = new CopyOnWriteArrayList();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/cassandra/repair/RepairJob$NoSuchRepairSessionExceptionWrapper.class */
    private static class NoSuchRepairSessionExceptionWrapper extends RuntimeException {
        private final NoSuchRepairSessionException wrapped;

        private NoSuchRepairSessionExceptionWrapper(NoSuchRepairSessionException noSuchRepairSessionException) {
            this.wrapped = noSuchRepairSessionException;
        }
    }

    public RepairJob(RepairSession repairSession, String str) {
        this.ctx = repairSession.ctx;
        this.session = repairSession;
        this.taskExecutor = repairSession.taskExecutor;
        this.parallelismDegree = repairSession.parallelismDegree;
        this.desc = new RepairJobDesc(repairSession.state.parentRepairSession, repairSession.getId(), repairSession.state.keyspace, str, repairSession.state.commonRange.ranges);
        this.state = new JobState(this.ctx.clock(), this.desc, repairSession.state.commonRange.endpoints);
    }

    public long getNowInSeconds() {
        long nowInSeconds = this.ctx.clock().nowInSeconds();
        return this.session.previewKind == PreviewKind.REPAIRED ? nowInSeconds + DatabaseDescriptor.getValidationPreviewPurgeHeadStartInSec() : nowInSeconds;
    }

    @Override // java.lang.Runnable
    public void run() {
        AsyncFuture success;
        this.state.phase.start();
        final ColumnFamilyStore columnFamilyStore = Keyspace.open(this.desc.keyspace).getColumnFamilyStore(this.desc.columnFamily);
        columnFamilyStore.metric.repairsStarted.inc();
        ArrayList arrayList = new ArrayList((Collection) this.session.state.commonRange.endpoints);
        arrayList.add(this.ctx.broadcastAddressAndPort());
        if (DatabaseDescriptor.paxosRepairEnabled() && (((Paxos.useV2() || isMetadataKeyspace()) && this.session.repairPaxos) || this.session.paxosOnly)) {
            logger.info("{} {}.{} starting paxos repair", new Object[]{this.session.previewKind.logPrefix(this.session.getId()), this.desc.keyspace, this.desc.columnFamily});
            success = PaxosCleanup.cleanup(this.ctx, arrayList, Schema.instance.getTableMetadata(this.desc.keyspace, this.desc.columnFamily), this.desc.ranges, this.session.state.commonRange.hasSkippedReplicas, this.taskExecutor);
        } else {
            logger.info("{} {}.{} not running paxos repair", new Object[]{this.session.previewKind.logPrefix(this.session.getId()), this.desc.keyspace, this.desc.columnFamily});
            success = ImmediateFuture.success(null);
        }
        if (this.session.paxosOnly) {
            success.addCallback((FutureCallback) new FutureCallback<Void>() { // from class: org.apache.cassandra.repair.RepairJob.1
                public void onSuccess(Void r9) {
                    RepairJob.logger.info("{} {}.{} paxos repair completed", new Object[]{RepairJob.this.session.previewKind.logPrefix(RepairJob.this.session.getId()), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily});
                    RepairJob.this.trySuccess(new RepairResult(RepairJob.this.desc, Collections.emptyList()));
                }

                public void onFailure(Throwable th) {
                    RepairJob.logger.warn("{} {}.{} paxos repair failed", new Object[]{RepairJob.this.session.previewKind.logPrefix(RepairJob.this.session.getId()), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily});
                    RepairJob.this.tryFailure(th);
                }
            }, this.taskExecutor);
            return;
        }
        AsyncFuture asyncFuture = success;
        Future map = this.parallelismDegree != RepairParallelism.PARALLEL ? this.session.isIncremental ? success.map(r3 -> {
            return arrayList;
        }) : success.flatMap(r8 -> {
            ArrayList arrayList2 = new ArrayList(arrayList.size());
            this.state.phase.snapshotsSubmitted();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SnapshotTask snapshotTask = new SnapshotTask(this.ctx, this.desc, (InetAddressAndPort) it.next());
                arrayList2.add(snapshotTask);
                this.taskExecutor.execute(snapshotTask);
            }
            return FutureCombiner.allOf(arrayList2).map(list -> {
                this.state.phase.snapshotsCompleted();
                return list;
            });
        }) : null;
        this.session.validationScheduler.schedule(() -> {
            return createSyncTasks(asyncFuture, map, arrayList);
        }, this.taskExecutor).flatMap(this::executeTasks, this.taskExecutor).addCallback(new FutureCallback<List<SyncStat>>() { // from class: org.apache.cassandra.repair.RepairJob.2
            public void onSuccess(List<SyncStat> list) {
                RepairJob.this.state.phase.success();
                if (!RepairJob.this.session.previewKind.isPreview()) {
                    RepairJob.logger.info("{} {}.{} is fully synced", new Object[]{RepairJob.this.session.previewKind.logPrefix(RepairJob.this.session.getId()), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily});
                    SystemDistributedKeyspace.successfulRepairJob(RepairJob.this.session.getId(), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily);
                }
                columnFamilyStore.metric.repairsCompleted.inc();
                RepairJob.this.trySuccess(new RepairResult(RepairJob.this.desc, list));
            }

            public void onFailure(Throwable th) {
                RepairJob.this.state.phase.fail(th);
                RepairJob.this.abort(th);
                if (!RepairJob.this.session.previewKind.isPreview()) {
                    RepairJob.logger.warn("{} {}.{} sync failed", new Object[]{RepairJob.this.session.previewKind.logPrefix(RepairJob.this.session.getId()), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily});
                    SystemDistributedKeyspace.failedRepairJob(RepairJob.this.session.getId(), RepairJob.this.desc.keyspace, RepairJob.this.desc.columnFamily, th);
                }
                columnFamilyStore.metric.repairsCompleted.inc();
                RepairJob.this.tryFailure(th instanceof NoSuchRepairSessionExceptionWrapper ? ((NoSuchRepairSessionExceptionWrapper) th).wrapped : th);
            }
        }, this.taskExecutor);
    }

    private Future<List<SyncTask>> createSyncTasks(Future<Void> future, Future<?> future2, List<InetAddressAndPort> list) {
        return (future2 != null ? future2.flatMap(obj -> {
            return this.parallelismDegree == RepairParallelism.SEQUENTIAL ? sendSequentialValidationRequest(list) : sendDCAwareValidationRequest(list);
        }, this.taskExecutor) : future.flatMap(r5 -> {
            return sendValidationRequest(list);
        })).map(list2 -> {
            this.state.phase.validationCompleted();
            return list2;
        }).map((!this.session.optimiseStreams || this.session.pullRepair) ? this::createStandardSyncTasks : this::createOptimisedSyncingSyncTasks, this.taskExecutor);
    }

    public synchronized void abort(@Nullable Throwable th) {
        if (th == null) {
            th = new RuntimeException("Abort");
        }
        Iterator<ValidationTask> it = this.validationTasks.iterator();
        while (it.hasNext()) {
            it.next().abort(th);
        }
        Iterator<SyncTask> it2 = this.syncTasks.iterator();
        while (it2.hasNext()) {
            it2.next().abort(th);
        }
    }

    private boolean isMetadataKeyspace() {
        return this.desc.keyspace.equals(SchemaConstants.METADATA_KEYSPACE_NAME);
    }

    private boolean isTransient(InetAddressAndPort inetAddressAndPort) {
        return this.session.state.commonRange.transEndpoints.contains(inetAddressAndPort);
    }

    private List<SyncTask> createStandardSyncTasks(List<TreeResponse> list) {
        return createStandardSyncTasks(this.ctx, this.desc, list, this.ctx.broadcastAddressAndPort(), this::isTransient, this.session.isIncremental, this.session.pullRepair, this.session.previewKind);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v76, types: [org.apache.cassandra.repair.AsymmetricRemoteSyncTask] */
    /* JADX WARN: Type inference failed for: r0v81, types: [org.apache.cassandra.repair.SymmetricRemoteSyncTask] */
    @VisibleForTesting
    static List<SyncTask> createStandardSyncTasks(SharedContext sharedContext, RepairJobDesc repairJobDesc, List<TreeResponse> list, InetAddressAndPort inetAddressAndPort, Predicate<InetAddressAndPort> predicate, boolean z, boolean z2, PreviewKind previewKind) {
        LocalSyncTask localSyncTask;
        long currentTimeMillis = sharedContext.clock().currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size() - 1; i++) {
            TreeResponse treeResponse = list.get(i);
            for (int i2 = i + 1; i2 < list.size(); i2++) {
                TreeResponse treeResponse2 = list.get(i2);
                if (!predicate.test(treeResponse.endpoint) || !predicate.test(treeResponse2.endpoint)) {
                    List<Range<Token>> difference = MerkleTrees.difference(treeResponse.trees, treeResponse2.trees);
                    if (!difference.isEmpty()) {
                        if (treeResponse.endpoint.equals(inetAddressAndPort) || treeResponse2.endpoint.equals(inetAddressAndPort)) {
                            TreeResponse treeResponse3 = treeResponse.endpoint.equals(inetAddressAndPort) ? treeResponse : treeResponse2;
                            TreeResponse treeResponse4 = treeResponse2.endpoint.equals(inetAddressAndPort) ? treeResponse : treeResponse2;
                            boolean z3 = !predicate.test(treeResponse3.endpoint);
                            boolean z4 = (predicate.test(treeResponse4.endpoint) || z2) ? false : true;
                            if (z3 || z4) {
                                localSyncTask = new LocalSyncTask(sharedContext, repairJobDesc, treeResponse3.endpoint, treeResponse4.endpoint, difference, z ? repairJobDesc.parentSessionId : null, z3, z4, previewKind);
                            }
                        } else if (predicate.test(treeResponse.endpoint) || predicate.test(treeResponse2.endpoint)) {
                            localSyncTask = new AsymmetricRemoteSyncTask(sharedContext, repairJobDesc, (predicate.test(treeResponse.endpoint) ? treeResponse2 : treeResponse).endpoint, (predicate.test(treeResponse.endpoint) ? treeResponse : treeResponse2).endpoint, difference, previewKind);
                        } else {
                            localSyncTask = new SymmetricRemoteSyncTask(sharedContext, repairJobDesc, treeResponse.endpoint, treeResponse2.endpoint, difference, previewKind);
                        }
                        arrayList.add(localSyncTask);
                    }
                }
            }
            list.get(i).trees.release();
        }
        list.get(list.size() - 1).trees.release();
        logger.info("Created {} sync tasks based on {} merkle tree responses for {} (took: {}ms)", new Object[]{Integer.valueOf(arrayList.size()), Integer.valueOf(list.size()), repairJobDesc.parentSessionId, Long.valueOf(sharedContext.clock().currentTimeMillis() - currentTimeMillis)});
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    Future<List<SyncStat>> executeTasks(List<SyncTask> list) {
        try {
            this.ctx.repair().getParentRepairSession(this.desc.parentSessionId);
            this.syncTasks.addAll(list);
            if (!list.isEmpty()) {
                this.state.phase.streamSubmitted();
            }
            for (SyncTask syncTask : list) {
                if (!syncTask.isLocal()) {
                    this.session.trackSyncCompletion(Pair.create(this.desc, syncTask.nodePair()), (CompletableRemoteSyncTask) syncTask);
                }
                this.taskExecutor.execute(syncTask);
            }
            return FutureCombiner.allOf(list);
        } catch (NoSuchRepairSessionException e) {
            return ImmediateFuture.failure(new NoSuchRepairSessionExceptionWrapper(e));
        }
    }

    private List<SyncTask> createOptimisedSyncingSyncTasks(List<TreeResponse> list) {
        return createOptimisedSyncingSyncTasks(this.ctx, this.desc, list, FBUtilities.getLocalAddressAndPort(), this::isTransient, this::getDC, this.session.isIncremental, this.session.previewKind);
    }

    @VisibleForTesting
    static List<SyncTask> createOptimisedSyncingSyncTasks(SharedContext sharedContext, RepairJobDesc repairJobDesc, List<TreeResponse> list, InetAddressAndPort inetAddressAndPort, Predicate<InetAddressAndPort> predicate, Function<InetAddressAndPort, String> function, boolean z, PreviewKind previewKind) {
        long currentTimeMillis = sharedContext.clock().currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        DifferenceHolder differenceHolder = new DifferenceHolder(list);
        logger.trace("diffs = {}", differenceHolder);
        ImmutableMap<InetAddressAndPort, HostDifferences> reduce = ReduceHelper.reduce(differenceHolder, (inetAddressAndPort2, set) -> {
            return (Set) set.stream().filter(inetAddressAndPort2 -> {
                return ((String) function.apply(inetAddressAndPort2)).equals(function.apply(inetAddressAndPort2));
            }).collect(Collectors.toSet());
        });
        for (int i = 0; i < list.size(); i++) {
            InetAddressAndPort inetAddressAndPort3 = list.get(i).endpoint;
            if (!predicate.test(inetAddressAndPort3)) {
                HostDifferences hostDifferences = (HostDifferences) reduce.get(inetAddressAndPort3);
                if (hostDifferences != null) {
                    Preconditions.checkArgument(hostDifferences.get(inetAddressAndPort3).isEmpty(), "We should not fetch ranges from ourselves");
                    for (InetAddressAndPort inetAddressAndPort4 : hostDifferences.hosts()) {
                        ArrayList arrayList2 = new ArrayList(hostDifferences.get(inetAddressAndPort4));
                        if (!$assertionsDisabled && arrayList2.isEmpty()) {
                            throw new AssertionError();
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace("{} is about to fetch {} from {}", new Object[]{inetAddressAndPort3, arrayList2, inetAddressAndPort4});
                        }
                        arrayList.add(inetAddressAndPort3.equals(inetAddressAndPort) ? new LocalSyncTask(sharedContext, repairJobDesc, inetAddressAndPort3, inetAddressAndPort4, arrayList2, z ? repairJobDesc.parentSessionId : null, true, false, previewKind) : new AsymmetricRemoteSyncTask(sharedContext, repairJobDesc, inetAddressAndPort3, inetAddressAndPort4, arrayList2, previewKind));
                    }
                } else {
                    logger.trace("Node {} has nothing to stream", inetAddressAndPort3);
                }
            }
        }
        logger.info("Created {} optimised sync tasks based on {} merkle tree responses for {} (took: {}ms)", new Object[]{Integer.valueOf(arrayList.size()), Integer.valueOf(list.size()), repairJobDesc.parentSessionId, Long.valueOf(sharedContext.clock().currentTimeMillis() - currentTimeMillis)});
        logger.trace("Optimised sync tasks for {}: {}", repairJobDesc.parentSessionId, arrayList);
        return arrayList;
    }

    private String getDC(InetAddressAndPort inetAddressAndPort) {
        return this.ctx.snitch().getDatacenter(inetAddressAndPort);
    }

    private Future<List<TreeResponse>> sendValidationRequest(Collection<InetAddressAndPort> collection) {
        this.state.phase.validationSubmitted();
        String format = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, collection);
        logger.info("{} {}", this.session.previewKind.logPrefix(this.desc.sessionId), format);
        Tracing.traceRepair(format, new Object[0]);
        long nowInSeconds = getNowInSeconds();
        ArrayList arrayList = new ArrayList(collection.size());
        for (InetAddressAndPort inetAddressAndPort : collection) {
            ValidationTask newValidationTask = newValidationTask(inetAddressAndPort, nowInSeconds);
            arrayList.add(newValidationTask);
            this.session.trackValidationCompletion(Pair.create(this.desc, inetAddressAndPort), newValidationTask);
            this.taskExecutor.execute(newValidationTask);
        }
        return FutureCombiner.allOf(arrayList);
    }

    private Future<List<TreeResponse>> sendSequentialValidationRequest(Collection<InetAddressAndPort> collection) {
        this.state.phase.validationSubmitted();
        String format = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, collection);
        logger.info("{} {}", this.session.previewKind.logPrefix(this.desc.sessionId), format);
        Tracing.traceRepair(format, new Object[0]);
        long nowInSeconds = getNowInSeconds();
        ArrayList arrayList = new ArrayList(collection.size());
        LinkedList linkedList = new LinkedList(collection);
        InetAddressAndPort inetAddressAndPort = (InetAddressAndPort) linkedList.poll();
        ValidationTask newValidationTask = newValidationTask(inetAddressAndPort, nowInSeconds);
        logger.info("{} Validating {}", this.session.previewKind.logPrefix(this.desc.sessionId), inetAddressAndPort);
        this.session.trackValidationCompletion(Pair.create(this.desc, inetAddressAndPort), newValidationTask);
        arrayList.add(newValidationTask);
        ValidationTask validationTask = newValidationTask;
        while (true) {
            ValidationTask validationTask2 = validationTask;
            if (linkedList.size() <= 0) {
                this.taskExecutor.execute(newValidationTask);
                return FutureCombiner.allOf(arrayList);
            }
            final InetAddressAndPort inetAddressAndPort2 = (InetAddressAndPort) linkedList.poll();
            final ValidationTask newValidationTask2 = newValidationTask(inetAddressAndPort2, nowInSeconds);
            arrayList.add(newValidationTask2);
            validationTask2.addCallback((FutureCallback) new FutureCallback<TreeResponse>() { // from class: org.apache.cassandra.repair.RepairJob.3
                public void onSuccess(TreeResponse treeResponse) {
                    RepairJob.logger.info("{} Validating {}", RepairJob.this.session.previewKind.logPrefix(RepairJob.this.desc.sessionId), inetAddressAndPort2);
                    RepairJob.this.session.trackValidationCompletion(Pair.create(RepairJob.this.desc, inetAddressAndPort2), newValidationTask2);
                    RepairJob.this.taskExecutor.execute(newValidationTask2);
                }

                public void onFailure(Throwable th) {
                }
            });
            validationTask = newValidationTask2;
        }
    }

    private Future<List<TreeResponse>> sendDCAwareValidationRequest(Collection<InetAddressAndPort> collection) {
        this.state.phase.validationSubmitted();
        String format = String.format("Requesting merkle trees for %s (to %s)", this.desc.columnFamily, collection);
        logger.info("{} {}", this.session.previewKind.logPrefix(this.desc.sessionId), format);
        Tracing.traceRepair(format, new Object[0]);
        long nowInSeconds = getNowInSeconds();
        ArrayList arrayList = new ArrayList(collection.size());
        HashMap hashMap = new HashMap();
        for (InetAddressAndPort inetAddressAndPort : collection) {
            ((Queue) hashMap.computeIfAbsent(DatabaseDescriptor.getEndpointSnitch().getDatacenter(inetAddressAndPort), str -> {
                return new LinkedList();
            })).add(inetAddressAndPort);
        }
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            Queue queue = (Queue) ((Map.Entry) it.next()).getValue();
            InetAddressAndPort inetAddressAndPort2 = (InetAddressAndPort) queue.poll();
            ValidationTask newValidationTask = newValidationTask(inetAddressAndPort2, nowInSeconds);
            logger.info("{} Validating {}", this.session.previewKind.logPrefix(this.session.getId()), inetAddressAndPort2);
            this.session.trackValidationCompletion(Pair.create(this.desc, inetAddressAndPort2), newValidationTask);
            arrayList.add(newValidationTask);
            ValidationTask validationTask = newValidationTask;
            while (true) {
                ValidationTask validationTask2 = validationTask;
                if (queue.size() > 0) {
                    final InetAddressAndPort inetAddressAndPort3 = (InetAddressAndPort) queue.poll();
                    final ValidationTask newValidationTask2 = newValidationTask(inetAddressAndPort3, nowInSeconds);
                    arrayList.add(newValidationTask2);
                    validationTask2.addCallback((FutureCallback) new FutureCallback<TreeResponse>() { // from class: org.apache.cassandra.repair.RepairJob.4
                        public void onSuccess(TreeResponse treeResponse) {
                            RepairJob.logger.info("{} Validating {}", RepairJob.this.session.previewKind.logPrefix(RepairJob.this.session.getId()), inetAddressAndPort3);
                            RepairJob.this.session.trackValidationCompletion(Pair.create(RepairJob.this.desc, inetAddressAndPort3), newValidationTask2);
                            RepairJob.this.taskExecutor.execute(newValidationTask2);
                        }

                        public void onFailure(Throwable th) {
                        }
                    });
                    validationTask = newValidationTask2;
                }
            }
            this.taskExecutor.execute(newValidationTask);
        }
        return FutureCombiner.allOf(arrayList);
    }

    private ValidationTask newValidationTask(InetAddressAndPort inetAddressAndPort, long j) {
        ValidationTask validationTask = new ValidationTask(this.session.ctx, this.desc, inetAddressAndPort, j, this.session.previewKind);
        this.validationTasks.add(validationTask);
        return validationTask;
    }

    static {
        $assertionsDisabled = !RepairJob.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(RepairJob.class);
    }
}
