package org.apache.cassandra.schema;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.SchemaTransformation;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Awaitable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/schema/DefaultSchemaUpdateHandler.class */
public class DefaultSchemaUpdateHandler implements SchemaUpdateHandler, IEndpointStateChangeSubscriber {
    private static final Logger logger = LoggerFactory.getLogger(DefaultSchemaUpdateHandler.class);

    @VisibleForTesting
    final MigrationCoordinator migrationCoordinator;
    private final boolean requireSchemas;
    private final BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> updateCallback;
    private volatile DistributedSchema schema;
    private volatile AsyncPromise<Void> requestedReset;

    private MigrationCoordinator createMigrationCoordinator(MessagingService messagingService) {
        return new MigrationCoordinator(messagingService, Stage.MIGRATION.executor(), ScheduledExecutors.scheduledTasks, 3, Gossiper.instance, this::getSchemaVersionForCoordinator, this::applyMutationsFromCoordinator);
    }

    public DefaultSchemaUpdateHandler(BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> biConsumer) {
        this(null, MessagingService.instance(), !CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getBoolean(), biConsumer);
    }

    public DefaultSchemaUpdateHandler(MigrationCoordinator migrationCoordinator, MessagingService messagingService, boolean z, BiConsumer<SchemaTransformation.SchemaTransformationResult, Boolean> biConsumer) {
        this.schema = DistributedSchema.EMPTY;
        this.requireSchemas = z;
        this.updateCallback = biConsumer;
        this.migrationCoordinator = migrationCoordinator == null ? createMigrationCoordinator(messagingService) : migrationCoordinator;
        Gossiper.instance.register(this);
        SchemaPushVerbHandler.instance.register(message -> {
            synchronized (this) {
                if (this.requestedReset == null) {
                    applyMutations((Collection) message.payload);
                }
            }
        });
        SchemaPullVerbHandler.instance.register(message2 -> {
            try {
                messagingService.send(message2.responseWith(getSchemaMutations()), message2.from());
            } catch (RuntimeException e) {
                logger.error("Failed to send schema mutations to " + message2.from(), e);
            }
        });
    }

    @Override // org.apache.cassandra.schema.SchemaUpdateHandler
    public synchronized void start() {
        if (StorageService.instance.isReplacing()) {
            onRemove(DatabaseDescriptor.getReplaceAddress());
        }
        SchemaKeyspace.saveSystemKeyspacesSchema();
        this.migrationCoordinator.start();
    }

    @Override // org.apache.cassandra.schema.SchemaUpdateHandler
    public boolean waitUntilReady(Duration duration) {
        logger.debug("Waiting for schema to be ready (max {})", duration);
        if (this.migrationCoordinator.awaitSchemaRequests(duration.toMillis())) {
            return true;
        }
        logger.warn("There are nodes in the cluster with a different schema version than us, from which we did not merge schemas: our version: ({}), outstanding versions -> endpoints: {}. Use -D{}}=true to ignore this, -D{}=<ep1[,epN]> to skip specific endpoints, or -D{}=<ver1[,verN]> to skip specific schema versions", new Object[]{Schema.instance.getVersion(), this.migrationCoordinator.outstandingVersions(), CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey(), CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS.getKey(), CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS.getKey()});
        if (!this.requireSchemas) {
            return true;
        }
        logger.error("Didn't receive schemas for all known versions within the {}. Use -D{}=true to skip this check.", duration, CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK.getKey());
        return false;
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRemove(InetAddressAndPort inetAddressAndPort) {
        this.migrationCoordinator.removeAndIgnoreEndpoint(inetAddressAndPort);
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onChange(InetAddressAndPort inetAddressAndPort, ApplicationState applicationState, VersionedValue versionedValue) {
        EndpointState endpointStateForEndpoint;
        if (applicationState != ApplicationState.SCHEMA || (endpointStateForEndpoint = Gossiper.instance.getEndpointStateForEndpoint(inetAddressAndPort)) == null || Gossiper.instance.isDeadState(endpointStateForEndpoint) || !StorageService.instance.getTokenMetadata().isMember(inetAddressAndPort)) {
            return;
        }
        this.migrationCoordinator.reportEndpointVersion(inetAddressAndPort, UUID.fromString(versionedValue.value));
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onJoin(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void beforeChange(InetAddressAndPort inetAddressAndPort, EndpointState endpointState, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onAlive(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onDead(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRestart(InetAddressAndPort inetAddressAndPort, EndpointState endpointState) {
    }

    private synchronized SchemaTransformation.SchemaTransformationResult applyMutations(Collection<Mutation> collection) {
        DistributedSchema distributedSchema = this.schema;
        SchemaKeyspace.applyChanges(collection);
        Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(collection);
        Keyspaces fetchKeyspaces = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces);
        Keyspaces without = distributedSchema.getKeyspaces().withAddedOrReplaced(fetchKeyspaces).without((Set) affectedKeyspaces.stream().filter(str -> {
            return !fetchKeyspaces.containsKeyspace(str);
        }).collect(Collectors.toSet()));
        SchemaTransformation.SchemaTransformationResult schemaTransformationResult = new SchemaTransformation.SchemaTransformationResult(distributedSchema, new DistributedSchema(without, SchemaKeyspace.calculateSchemaDigest()), Keyspaces.diff(distributedSchema.getKeyspaces(), without));
        logger.info("Applying schema change due to received mutations: {}", schemaTransformationResult);
        updateSchema(schemaTransformationResult, false);
        return schemaTransformationResult;
    }

    @Override // org.apache.cassandra.schema.SchemaUpdateHandler
    public synchronized SchemaTransformation.SchemaTransformationResult apply(SchemaTransformation schemaTransformation, boolean z) {
        DistributedSchema distributedSchema = this.schema;
        Keyspaces apply = schemaTransformation.apply(distributedSchema.getKeyspaces());
        Keyspaces.KeyspacesDiff diff = Keyspaces.diff(distributedSchema.getKeyspaces(), apply);
        if (diff.isEmpty()) {
            return new SchemaTransformation.SchemaTransformationResult(distributedSchema, distributedSchema, diff);
        }
        Collection<Mutation> convertSchemaDiffToMutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, schemaTransformation.fixedTimestampMicros().orElse(Long.valueOf(FBUtilities.timestampMicros())).longValue());
        SchemaKeyspace.applyChanges(convertSchemaDiffToMutations);
        SchemaTransformation.SchemaTransformationResult schemaTransformationResult = new SchemaTransformation.SchemaTransformationResult(distributedSchema, new DistributedSchema(apply, SchemaKeyspace.calculateSchemaDigest()), diff);
        updateSchema(schemaTransformationResult, z);
        if (!z) {
            this.migrationCoordinator.executor.submit(() -> {
                Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> pushSchemaMutations = this.migrationCoordinator.pushSchemaMutations(convertSchemaDiffToMutations);
                SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(pushSchemaMutations.left(), pushSchemaMutations.right(), schemaTransformation);
            });
        }
        return schemaTransformationResult;
    }

    private void updateSchema(SchemaTransformation.SchemaTransformationResult schemaTransformationResult, boolean z) {
        if (schemaTransformationResult.diff.isEmpty()) {
            logger.debug("Schema update is empty - skipping");
            return;
        }
        this.schema = schemaTransformationResult.after;
        logger.debug("Schema updated: {}", schemaTransformationResult);
        this.updateCallback.accept(schemaTransformationResult, true);
        if (z) {
            return;
        }
        this.migrationCoordinator.announce(schemaTransformationResult.after.getVersion());
    }

    private synchronized void reload() {
        DistributedSchema distributedSchema = this.schema;
        DistributedSchema distributedSchema2 = new DistributedSchema(SchemaKeyspace.fetchNonSystemKeyspaces(), SchemaKeyspace.calculateSchemaDigest());
        updateSchema(new SchemaTransformation.SchemaTransformationResult(distributedSchema, distributedSchema2, Keyspaces.diff(distributedSchema.getKeyspaces(), distributedSchema2.getKeyspaces())), false);
    }

    @Override // org.apache.cassandra.schema.SchemaUpdateHandler
    public void reset(boolean z) {
        if (z) {
            reload();
            return;
        }
        this.migrationCoordinator.reset();
        if (this.migrationCoordinator.awaitSchemaRequests(CassandraRelevantProperties.MIGRATION_DELAY.getLong())) {
            return;
        }
        logger.error("Timeout exceeded when waiting for schema from other nodes");
    }

    @Override // org.apache.cassandra.schema.SchemaUpdateHandler
    public Awaitable clear() {
        AsyncPromise<Void> asyncPromise;
        synchronized (this) {
            if (this.requestedReset == null) {
                this.requestedReset = new AsyncPromise<>();
                this.migrationCoordinator.reset();
            }
            asyncPromise = this.requestedReset;
        }
        return asyncPromise;
    }

    private UUID getSchemaVersionForCoordinator() {
        return this.requestedReset != null ? SchemaConstants.emptyVersion : this.schema.getVersion();
    }

    private synchronized void applyMutationsFromCoordinator(InetAddressAndPort inetAddressAndPort, Collection<Mutation> collection) {
        if (this.requestedReset != null && !collection.isEmpty()) {
            this.schema = DistributedSchema.EMPTY;
            SchemaKeyspace.truncate();
            this.requestedReset.m1152setSuccess((AsyncPromise<Void>) null);
            this.requestedReset = null;
        }
        applyMutations(collection);
    }

    private synchronized Collection<Mutation> getSchemaMutations() {
        return this.requestedReset != null ? Collections.emptyList() : SchemaKeyspace.convertSchemaToMutations();
    }

    public Map<UUID, Set<InetAddressAndPort>> getOutstandingSchemaVersions() {
        return this.migrationCoordinator.outstandingVersions();
    }
}
