package org.apache.cassandra.service.reads.repair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InOurDc;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

/* loaded from: input_file:org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.class */
public class BlockingPartitionRepair extends AsyncFuture<Object> implements RequestCallback<Object> {
    private final DecoratedKey key;
    private final ReplicaPlan.ForWrite writePlan;
    private final Map<Replica, Mutation> pendingRepairs;
    private final CountDownLatch latch;
    private final Predicate<InetAddressAndPort> shouldBlockOn;
    private volatile long mutationsSentTime;

    public BlockingPartitionRepair(DecoratedKey decoratedKey, Map<Replica, Mutation> map, ReplicaPlan.ForWrite forWrite) {
        this(decoratedKey, map, forWrite, forWrite.consistencyLevel().isDatacenterLocal() ? InOurDc.endpoints() : Predicates.alwaysTrue());
    }

    public BlockingPartitionRepair(DecoratedKey decoratedKey, Map<Replica, Mutation> map, ReplicaPlan.ForWrite forWrite, Predicate<InetAddressAndPort> predicate) {
        this.key = decoratedKey;
        this.pendingRepairs = new ConcurrentHashMap(map);
        this.writePlan = forWrite;
        this.shouldBlockOn = predicate;
        int writeQuorum = forWrite.writeQuorum();
        Iterator<Replica> it = forWrite.contacts().iterator();
        while (it.hasNext()) {
            Replica next = it.next();
            if (!map.containsKey(next) && predicate.test(next.endpoint())) {
                writeQuorum--;
            }
        }
        this.latch = CountDownLatch.newCountDownLatch(Math.max(writeQuorum, 0));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int blockFor() {
        return this.writePlan.writeQuorum();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int waitingOn() {
        return this.latch.count();
    }

    @VisibleForTesting
    void ack(InetAddressAndPort inetAddressAndPort) {
        if (this.shouldBlockOn.test(inetAddressAndPort)) {
            this.pendingRepairs.remove(this.writePlan.lookup(inetAddressAndPort));
            this.latch.decrement();
        }
    }

    @Override // org.apache.cassandra.net.RequestCallback
    public void onResponse(Message<Object> message) {
        ack(message.from());
    }

    private static PartitionUpdate extractUpdate(Mutation mutation) {
        return (PartitionUpdate) Iterables.getOnlyElement(mutation.mo411getPartitionUpdates());
    }

    private PartitionUpdate mergeUnackedUpdates() {
        ArrayList newArrayList = Lists.newArrayList(Iterables.transform(this.pendingRepairs.values(), BlockingPartitionRepair::extractUpdate));
        if (newArrayList.isEmpty()) {
            return null;
        }
        return PartitionUpdate.merge(newArrayList);
    }

    @VisibleForTesting
    protected void sendRR(Message<Mutation> message, InetAddressAndPort inetAddressAndPort) {
        MessagingService.instance().sendWithCallback(message, inetAddressAndPort, this);
    }

    public void sendInitialRepairs() {
        this.mutationsSentTime = Clock.Global.nanoTime();
        Replicas.assertFull(this.pendingRepairs.keySet());
        for (Map.Entry<Replica, Mutation> entry : this.pendingRepairs.entrySet()) {
            Replica key = entry.getKey();
            Preconditions.checkArgument(key.isFull(), "Can't send repairs to transient replicas: %s", key);
            Mutation value = entry.getValue();
            TableId tableId = extractUpdate(value).metadata().id;
            Tracing.trace("Sending read-repair-mutation to {}", key);
            sendRR(Message.out(Verb.READ_REPAIR_REQ, value), key.endpoint());
            ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
            if (!this.shouldBlockOn.test(key.endpoint())) {
                this.pendingRepairs.remove(key);
            }
            ReadRepairDiagnostics.sendInitialRepair(this, key.endpoint(), value);
        }
    }

    public boolean awaitRepairsUntil(long j, TimeUnit timeUnit) {
        try {
            return this.latch.await(timeUnit.toNanos(j) - Clock.Global.nanoTime(), TimeUnit.NANOSECONDS);
        } catch (InterruptedException e) {
            throw new UncheckedInterruptedException(e);
        }
    }

    private static int msgVersionIdx(int i) {
        return i - 12;
    }

    public void maybeSendAdditionalWrites(long j, TimeUnit timeUnit) {
        PartitionUpdate mergeUnackedUpdates;
        if (awaitRepairsUntil(j + timeUnit.convert(this.mutationsSentTime, TimeUnit.NANOSECONDS), timeUnit)) {
            return;
        }
        EndpointsForToken liveUncontacted = this.writePlan.liveUncontacted();
        if (liveUncontacted.isEmpty() || (mergeUnackedUpdates = mergeUnackedUpdates()) == null) {
            return;
        }
        ReadRepairMetrics.speculatedWrite.mark();
        Mutation[] mutationArr = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
        Iterator<Replica> it = liveUncontacted.iterator();
        while (it.hasNext()) {
            Replica next = it.next();
            int msgVersionIdx = msgVersionIdx(MessagingService.instance().versions.get(next.endpoint()));
            Mutation mutation = mutationArr[msgVersionIdx];
            if (mutation == null) {
                mutation = BlockingReadRepairs.createRepairMutation(mergeUnackedUpdates, this.writePlan.consistencyLevel(), next.endpoint(), true);
                mutationArr[msgVersionIdx] = mutation;
            }
            if (mutation == null) {
                ReadRepairDiagnostics.speculatedWriteOversized(this, next.endpoint());
            } else {
                Tracing.trace("Sending speculative read-repair-mutation to {}", next);
                sendRR(Message.out(Verb.READ_REPAIR_REQ, mutation), next.endpoint());
                ReadRepairDiagnostics.speculatedWrite(this, next.endpoint(), mutation);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Keyspace getKeyspace() {
        return this.writePlan.keyspace();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DecoratedKey getKey() {
        return this.key;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsistencyLevel getConsistency() {
        return this.writePlan.consistencyLevel();
    }
}
