package org.apache.cassandra.tcm;

import com.codahale.metrics.Timer;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.tcm.RemoteProcessor;
import org.apache.cassandra.tcm.Retry;
import org.apache.cassandra.tcm.log.LocalLog;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/tcm/PeerLogFetcher.class */
public class PeerLogFetcher {
    private static final Logger logger = LoggerFactory.getLogger(RemoteProcessor.class);
    private final LocalLog log;

    public PeerLogFetcher(LocalLog localLog) {
        this.log = localLog;
    }

    public ClusterMetadata fetchLogEntriesAndWait(InetAddressAndPort inetAddressAndPort, Epoch epoch) {
        ClusterMetadata current = ClusterMetadata.current();
        if (current.epoch.isEqualOrAfter(epoch)) {
            return current;
        }
        try {
            return (ClusterMetadata) asyncFetchLog(inetAddressAndPort, epoch).get(DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException("Can not fetch log entries during shutdown", e);
        } catch (ExecutionException | TimeoutException e2) {
            logger.warn("Could not fetch log entries from peer, remote = {}, await = {}", inetAddressAndPort, epoch);
            return current;
        }
    }

    public Future<ClusterMetadata> asyncFetchLog(InetAddressAndPort inetAddressAndPort, Epoch epoch) {
        return EpochAwareDebounce.instance.getAsync(() -> {
            return fetchLogEntriesAndWaitInternal(inetAddressAndPort, epoch);
        }, epoch);
    }

    private Future<ClusterMetadata> fetchLogEntriesAndWaitInternal(InetAddressAndPort inetAddressAndPort, Epoch epoch) {
        Epoch epoch2 = ClusterMetadata.current().epoch;
        if (epoch2.isEqualOrAfter(epoch)) {
            AsyncPromise asyncPromise = new AsyncPromise();
            asyncPromise.m1369setSuccess((AsyncPromise) ClusterMetadata.current());
            return asyncPromise;
        }
        AsyncPromise asyncPromise2 = new AsyncPromise();
        logger.info("Fetching log from {}, at least {}", inetAddressAndPort, epoch);
        try {
            Timer.Context time = TCMMetrics.instance.fetchPeerLogLatency.time();
            try {
                RemoteProcessor.sendWithCallbackAsync(asyncPromise2, Verb.TCM_FETCH_PEER_LOG_REQ, new FetchPeerLog(epoch2), new RemoteProcessor.CandidateIterator(Collections.singletonList(inetAddressAndPort), false), Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
                Future map = asyncPromise2.map(logState -> {
                    this.log.append(logState);
                    ClusterMetadata waitForHighestConsecutive = this.log.waitForHighestConsecutive();
                    if (!waitForHighestConsecutive.epoch.isEqualOrAfter(epoch)) {
                        throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up", epoch));
                    }
                    TCMMetrics.instance.peerLogEntriesFetched(epoch2, logState.latestEpoch());
                    return waitForHighestConsecutive;
                });
                if (time != null) {
                    time.close();
                }
                return map;
            } finally {
            }
        } catch (Throwable th) {
            asyncPromise2.cancel(true);
            JVMStabilityInspector.inspectThrowable(th);
            logger.warn("Unable to fetch log entries from " + inetAddressAndPort, th);
            AsyncPromise asyncPromise3 = new AsyncPromise();
            asyncPromise3.m1368setFailure((Throwable) new IllegalStateException("Unable to fetch log entries from " + inetAddressAndPort, th));
            return asyncPromise3;
        }
    }
}
