package org.apache.cassandra.db;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.CoordinatorBehindException;
import org.apache.cassandra.exceptions.InvalidRoutingException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/AbstractMutationVerbHandler.class */
public abstract class AbstractMutationVerbHandler<T extends IMutation> implements IVerbHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMutationVerbHandler.class);
    private static final String logMessageTemplate = "Received mutation from {} for token {} outside valid range for keyspace {}";

    @Override // org.apache.cassandra.net.IVerbHandler
    public void doVerb(Message<T> message) throws IOException {
        processMessage(message, message.respondTo());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessage(Message<T> message, InetAddressAndPort inetAddressAndPort) {
        if (message.epoch().isAfter(Epoch.EMPTY)) {
            checkSchemaVersion(checkTokenOwnership(ClusterMetadata.current(), message), message);
        }
        applyMutation(message, inetAddressAndPort);
    }

    abstract void applyMutation(Message<T> message, InetAddressAndPort inetAddressAndPort);

    private ClusterMetadata checkTokenOwnership(ClusterMetadata clusterMetadata, Message<T> message) {
        String keyspaceName = message.payload.getKeyspaceName();
        DecoratedKey key = message.payload.key();
        VersionedEndpoints.ForToken writePlacements = writePlacements(clusterMetadata, keyspaceName, key);
        if (message.epoch().isAfter(clusterMetadata.epoch)) {
            if (writePlacements.get().containsSelf()) {
                ClusterMetadataService.instance().fetchLogFromPeerOrCMSAsync(clusterMetadata, message.from(), message.epoch());
            } else {
                clusterMetadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(clusterMetadata, message.from(), message.epoch());
                writePlacements = writePlacements(clusterMetadata, keyspaceName, key);
            }
        }
        if (writePlacements.get().containsSelf()) {
            if (!writePlacements.lastModified().isAfter(message.epoch())) {
                return clusterMetadata;
            }
            TCMMetrics.instance.coordinatorBehindPlacements.mark();
            throw new CoordinatorBehindException(String.format("Routing is correct, but coordinator needs to catch-up at least to epoch %s to maintain consistency. Current coordinator epoch is %s", writePlacements.lastModified(), message.epoch()));
        }
        StorageService.instance.incOutOfRangeOperationCount();
        Keyspace.open(message.payload.getKeyspaceName()).metric.outOfRangeTokenWrites.inc();
        NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.SECONDS, logMessageTemplate, message.from(), key.getToken(), message.payload.getKeyspaceName());
        throw InvalidRoutingException.forWrite(message.from(), key.getToken(), clusterMetadata.epoch, message.payload);
    }

    private ClusterMetadata checkSchemaVersion(ClusterMetadata clusterMetadata, Message<T> message) {
        if (SchemaConstants.isSystemKeyspace(message.payload.getKeyspaceName()) || message.epoch().is(clusterMetadata.epoch)) {
            return clusterMetadata;
        }
        String keyspaceName = message.payload.getKeyspaceName();
        Keyspace keyspace = clusterMetadata.schema.getKeyspace(keyspaceName);
        if (keyspace == null) {
            if (message.epoch().isBefore(clusterMetadata.schema.lastModified())) {
                TCMMetrics.instance.coordinatorBehindSchema.mark();
                throw new CoordinatorBehindException(String.format("Schema mismatch, coordinator %s is behind, we're missing keyspace %s, our epoch = %s, their epoch = %s", message.from(), keyspaceName, clusterMetadata.epoch, message.epoch()));
            }
            clusterMetadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(clusterMetadata, message.from(), message.epoch());
        } else if (message.epoch().isAfter(clusterMetadata.epoch)) {
            for (PartitionUpdate partitionUpdate : message.payload.mo430getPartitionUpdates()) {
                Epoch epoch = partitionUpdate.serializedAtEpoch;
                if (epoch != null && epoch.isAfter(clusterMetadata.epoch)) {
                    clusterMetadata = ClusterMetadataService.instance().fetchLogFromPeerOrCMS(clusterMetadata, message.from(), message.epoch());
                    if (partitionUpdate.serializedAtEpoch.isAfter(clusterMetadata.epoch)) {
                        throw new IllegalStateException(String.format("Coordinator %s is still ahead after fetching log, our epoch = %s, their epoch = %s", message.from(), clusterMetadata.epoch, message.epoch()));
                    }
                }
            }
        } else if (message.epoch().isBefore(clusterMetadata.schema.lastModified())) {
            for (PartitionUpdate partitionUpdate2 : message.payload.mo430getPartitionUpdates()) {
                ColumnFamilyStore columnFamilyStore = keyspace.getColumnFamilyStore(partitionUpdate2.metadata().id);
                if (columnFamilyStore == null) {
                    TCMMetrics.instance.coordinatorBehindSchema.mark();
                    throw new CoordinatorBehindException(String.format("Schema mismatch, coordinator %s is behind, we're missing table %s.%s, our epoch = %s, their epoch = %s", message.from(), partitionUpdate2.metadata().keyspace, partitionUpdate2.metadata().name, clusterMetadata.epoch, message.epoch()));
                }
                Epoch epoch2 = partitionUpdate2.serializedAtEpoch;
                if (epoch2 != null && epoch2.isBefore(columnFamilyStore.metadata().epoch)) {
                    TCMMetrics.instance.coordinatorBehindSchema.mark();
                    throw new CoordinatorBehindException(String.format("Coordinator %s is behind, our epoch = %s, their epoch = %s", message.from(), clusterMetadata.epoch, message.epoch()));
                }
            }
        }
        return clusterMetadata;
    }

    private static VersionedEndpoints.ForToken writePlacements(ClusterMetadata clusterMetadata, String str, DecoratedKey decoratedKey) {
        return clusterMetadata.placements.get(clusterMetadata.schema.getKeyspace(str).getMetadata().params.replication).writes.forToken(decoratedKey.getToken());
    }
}
