package org.apache.cassandra.schema;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.concurrent.FutureTask;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Simulate;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Simulate(with = {Simulate.With.MONITORS})
/* loaded from: input_file:org/apache/cassandra/schema/MigrationCoordinator.class */
public class MigrationCoordinator {
    public static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3;
    private final ScheduledExecutorService periodicCheckExecutor;
    private final MessagingService messagingService;
    private final int maxOutstandingVersionRequests;
    private final Gossiper gossiper;
    private final Supplier<UUID> schemaVersion;
    private final BiConsumer<InetAddressAndPort, Collection<Mutation>> schemaUpdateCallback;
    final ExecutorPlus executor;
    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class);
    private static final Future<Void> FINISHED_FUTURE = ImmediateFuture.success(null);
    private static LongSupplier getUptimeFn = () -> {
        return ManagementFactory.getRuntimeMXBean().getUptime();
    };
    private static final int MIGRATION_DELAY_IN_MS = CassandraRelevantProperties.MIGRATION_DELAY.getInt();
    private static final Set<UUID> IGNORED_VERSIONS = getIgnoredVersions();
    private final Map<UUID, VersionInfo> versionInfo = new HashMap();
    private final Map<InetAddressAndPort, UUID> endpointVersions = new HashMap();
    private final Set<InetAddressAndPort> ignoredEndpoints = getIgnoredEndpoints();
    private final AtomicReference<ScheduledFuture<?>> periodicPullTask = new AtomicReference<>();

    /* loaded from: input_file:org/apache/cassandra/schema/MigrationCoordinator$Callback.class */
    private class Callback implements RequestCallback<Collection<Mutation>> {
        final InetAddressAndPort endpoint;
        final VersionInfo info;

        public Callback(InetAddressAndPort inetAddressAndPort, VersionInfo versionInfo) {
            this.endpoint = inetAddressAndPort;
            this.info = versionInfo;
        }

        @Override // org.apache.cassandra.net.RequestCallback
        public void onFailure(InetAddressAndPort inetAddressAndPort, RequestFailureReason requestFailureReason) {
            fail();
        }

        Future<Void> fail() {
            return MigrationCoordinator.this.pullComplete(this.endpoint, this.info, false);
        }

        @Override // org.apache.cassandra.net.RequestCallback
        public void onResponse(Message<Collection<Mutation>> message) {
            response(message.payload);
        }

        Future<Void> response(Collection<Mutation> collection) {
            Future<Void> pullComplete;
            synchronized (this.info) {
                if (MigrationCoordinator.this.shouldApplySchemaFor(this.info)) {
                    try {
                        MigrationCoordinator.this.schemaUpdateCallback.accept(this.endpoint, collection);
                    } catch (Exception e) {
                        MigrationCoordinator.logger.error(String.format("Unable to merge schema from %s", this.endpoint), e);
                        return fail();
                    }
                }
                pullComplete = MigrationCoordinator.this.pullComplete(this.endpoint, this.info, true);
            }
            return pullComplete;
        }

        public boolean isLatencyForSnitch() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/schema/MigrationCoordinator$VersionInfo.class */
    public static class VersionInfo {
        final UUID version;
        final Set<InetAddressAndPort> endpoints = Sets.newConcurrentHashSet();
        final Set<InetAddressAndPort> outstandingRequests = Sets.newConcurrentHashSet();
        final Deque<InetAddressAndPort> requestQueue = new ArrayDeque();
        private final WaitQueue waitQueue = WaitQueue.newWaitQueue();
        volatile boolean receivedSchema;

        VersionInfo(UUID uuid) {
            this.version = uuid;
        }

        WaitQueue.Signal register() {
            return this.waitQueue.register();
        }

        void markReceived() {
            if (this.receivedSchema) {
                return;
            }
            this.receivedSchema = true;
            this.waitQueue.signalAll();
        }

        boolean wasReceived() {
            return this.receivedSchema;
        }

        public String toString() {
            return "VersionInfo{version=" + this.version + ", outstandingRequests=" + this.outstandingRequests + ", requestQueue=" + this.requestQueue + ", waitQueue.waiting=" + this.waitQueue.getWaiting() + ", receivedSchema=" + this.receivedSchema + "}";
        }
    }

    @VisibleForTesting
    public static void setUptimeFn(LongSupplier longSupplier) {
        getUptimeFn = longSupplier;
    }

    private static ImmutableSet<UUID> getIgnoredVersions() {
        String string = CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_VERSIONS.getString();
        if (string == null || string.isEmpty()) {
            return ImmutableSet.of();
        }
        ImmutableSet.Builder builder = ImmutableSet.builder();
        for (String str : string.split(",")) {
            builder.add(UUID.fromString(str));
        }
        return builder.build();
    }

    private static Set<InetAddressAndPort> getIgnoredEndpoints() {
        HashSet hashSet = new HashSet();
        String string = CassandraRelevantProperties.IGNORED_SCHEMA_CHECK_ENDPOINTS.getString();
        if (string == null || string.isEmpty()) {
            return hashSet;
        }
        for (String str : string.split(",")) {
            try {
                hashSet.add(InetAddressAndPort.getByName(str));
            } catch (UnknownHostException e) {
                throw new RuntimeException(e);
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MigrationCoordinator(MessagingService messagingService, ExecutorPlus executorPlus, ScheduledExecutorService scheduledExecutorService, int i, Gossiper gossiper, Supplier<UUID> supplier, BiConsumer<InetAddressAndPort, Collection<Mutation>> biConsumer) {
        this.messagingService = messagingService;
        this.executor = executorPlus;
        this.periodicCheckExecutor = scheduledExecutorService;
        this.maxOutstandingVersionRequests = i;
        this.gossiper = gossiper;
        this.schemaVersion = supplier;
        this.schemaUpdateCallback = biConsumer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        logger.info("Starting migration coordinator and scheduling pulling schema versions every {}", Duration.ofMillis(CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS.getLong()));
        announce(this.schemaVersion.get());
        this.periodicPullTask.updateAndGet(scheduledFuture -> {
            return scheduledFuture == null ? this.periodicCheckExecutor.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS.getLong(), CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS) : scheduledFuture;
        });
    }

    private synchronized void pullUnreceivedSchemaVersions() {
        logger.debug("Pulling unreceived schema versions...");
        for (VersionInfo versionInfo : this.versionInfo.values()) {
            if (versionInfo.wasReceived() || versionInfo.outstandingRequests.size() > 0) {
                logger.trace("Skipping pull of schema {} because it has been already recevied, or it is being received ({})", versionInfo.version, versionInfo);
            } else {
                maybePullSchema(versionInfo);
            }
        }
    }

    private synchronized Future<Void> maybePullSchema(VersionInfo versionInfo) {
        if (versionInfo.endpoints.isEmpty() || versionInfo.wasReceived() || !shouldPullSchema(versionInfo.version)) {
            logger.trace("Not pulling schema {} because it was received, there is no endpoint to provide it, or we should not pull it ({})", versionInfo.version, versionInfo);
            return FINISHED_FUTURE;
        }
        if (versionInfo.outstandingRequests.size() >= this.maxOutstandingVersionRequests) {
            logger.trace("Not pulling schema {} because the number of outstanding requests has been exceeded ({} >= {})", new Object[]{versionInfo.version, Integer.valueOf(versionInfo.outstandingRequests.size()), Integer.valueOf(this.maxOutstandingVersionRequests)});
            return FINISHED_FUTURE;
        }
        int size = versionInfo.requestQueue.size();
        for (int i = 0; i < size; i++) {
            InetAddressAndPort remove = versionInfo.requestQueue.remove();
            if (!versionInfo.endpoints.contains(remove)) {
                logger.trace("Skipping request of schema {} from {} because the endpoint does not have that schema any longer", versionInfo.version, remove);
            } else {
                if (shouldPullFromEndpoint(remove) && versionInfo.outstandingRequests.add(remove)) {
                    return scheduleSchemaPull(remove, versionInfo);
                }
                logger.trace("Could not pull schema {} from {} - the request will be added back to the queue", versionInfo.version, remove);
                versionInfo.requestQueue.offer(remove);
            }
        }
        return FINISHED_FUTURE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Map<UUID, Set<InetAddressAndPort>> outstandingVersions() {
        HashMap hashMap = new HashMap();
        for (VersionInfo versionInfo : this.versionInfo.values()) {
            if (!versionInfo.wasReceived()) {
                hashMap.put(versionInfo.version, ImmutableSet.copyOf(versionInfo.endpoints));
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    VersionInfo getVersionInfoUnsafe(UUID uuid) {
        return this.versionInfo.get(uuid);
    }

    private boolean shouldPullSchema(UUID uuid) {
        UUID uuid2 = this.schemaVersion.get();
        if (uuid2 == null) {
            logger.debug("Not pulling schema {} because the local schama version is not known yet", uuid);
            return false;
        }
        if (!uuid2.equals(uuid)) {
            return true;
        }
        logger.debug("Not pulling schema {} because it is the same as the local schema", uuid);
        return false;
    }

    private boolean shouldPullFromEndpoint(InetAddressAndPort inetAddressAndPort) {
        if (inetAddressAndPort.equals(FBUtilities.getBroadcastAddressAndPort())) {
            logger.trace("Not pulling schema from local endpoint");
            return false;
        }
        EndpointState endpointStateForEndpoint = this.gossiper.getEndpointStateForEndpoint(inetAddressAndPort);
        if (endpointStateForEndpoint == null) {
            logger.trace("Not pulling schema from endpoint {} because its state is unknown", inetAddressAndPort);
            return false;
        }
        VersionedValue applicationState = endpointStateForEndpoint.getApplicationState(ApplicationState.RELEASE_VERSION);
        if (applicationState == null) {
            return false;
        }
        String str = applicationState.value;
        String releaseVersionMajor = FBUtilities.getReleaseVersionMajor();
        if (!str.startsWith(releaseVersionMajor)) {
            logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}", new Object[]{inetAddressAndPort, releaseVersionMajor, str});
            return false;
        }
        if (!this.messagingService.versions.knows(inetAddressAndPort)) {
            logger.debug("Not pulling schema from {} because their messaging version is unknown", inetAddressAndPort);
            return false;
        }
        if (this.messagingService.versions.getRaw(inetAddressAndPort) != 12) {
            logger.debug("Not pulling schema from {} because their schema format is incompatible", inetAddressAndPort);
            return false;
        }
        if (!this.gossiper.isGossipOnlyMember(inetAddressAndPort)) {
            return true;
        }
        logger.debug("Not pulling schema from {} because it's a gossip only member", inetAddressAndPort);
        return false;
    }

    private boolean shouldPullImmediately(InetAddressAndPort inetAddressAndPort, UUID uuid) {
        UUID uuid2 = this.schemaVersion.get();
        if (!SchemaConstants.emptyVersion.equals(uuid2) && getUptimeFn.getAsLong() >= MIGRATION_DELAY_IN_MS) {
            return false;
        }
        logger.debug("Immediately submitting migration task for {}, schema versions: local={}, remote={}", new Object[]{inetAddressAndPort, DistributedSchema.schemaVersionToString(uuid2), DistributedSchema.schemaVersionToString(uuid)});
        return true;
    }

    private synchronized boolean shouldApplySchemaFor(VersionInfo versionInfo) {
        return (versionInfo.wasReceived() || Objects.equals(this.schemaVersion.get(), versionInfo.version)) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Future<Void> reportEndpointVersion(InetAddressAndPort inetAddressAndPort, UUID uuid) {
        logger.debug("Reported schema {} at endpoint {}", uuid, inetAddressAndPort);
        if (this.ignoredEndpoints.contains(inetAddressAndPort) || IGNORED_VERSIONS.contains(uuid)) {
            this.endpointVersions.remove(inetAddressAndPort);
            removeEndpointFromVersion(inetAddressAndPort, null);
            logger.debug("Discarding endpoint {} or schema {} because either endpoint or schema version were marked as ignored", inetAddressAndPort, uuid);
            return FINISHED_FUTURE;
        }
        UUID put = this.endpointVersions.put(inetAddressAndPort, uuid);
        if (put != null && put.equals(uuid)) {
            logger.trace("Skipping report of schema {} from {} because we already know that", uuid, inetAddressAndPort);
            return FINISHED_FUTURE;
        }
        VersionInfo computeIfAbsent = this.versionInfo.computeIfAbsent(uuid, VersionInfo::new);
        if (Objects.equals(this.schemaVersion.get(), uuid)) {
            computeIfAbsent.markReceived();
            logger.trace("Schema {} from {} has been marked as recevied because it is equal the local schema", uuid, inetAddressAndPort);
        } else {
            computeIfAbsent.requestQueue.addFirst(inetAddressAndPort);
        }
        computeIfAbsent.endpoints.add(inetAddressAndPort);
        logger.trace("Added endpoint {} to schema {}: {}", new Object[]{inetAddressAndPort, computeIfAbsent.version, computeIfAbsent});
        removeEndpointFromVersion(inetAddressAndPort, put);
        return maybePullSchema(computeIfAbsent);
    }

    private synchronized void removeEndpointFromVersion(InetAddressAndPort inetAddressAndPort, UUID uuid) {
        VersionInfo versionInfo;
        if (uuid == null || (versionInfo = this.versionInfo.get(uuid)) == null) {
            return;
        }
        versionInfo.endpoints.remove(inetAddressAndPort);
        logger.trace("Removed endpoint {} from schema {}: {}", new Object[]{inetAddressAndPort, uuid, versionInfo});
        if (versionInfo.endpoints.isEmpty()) {
            versionInfo.waitQueue.signalAll();
            this.versionInfo.remove(uuid);
            logger.trace("Removed schema info: {}", versionInfo);
        }
    }

    private void clearVersionsInfo() {
        Iterator<Map.Entry<UUID, VersionInfo>> it = this.versionInfo.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<UUID, VersionInfo> next = it.next();
            it.remove();
            next.getValue().waitQueue.signal();
        }
    }

    private void reportCurrentSchemaVersionOnEndpoint(InetAddressAndPort inetAddressAndPort) {
        UUID schemaVersion;
        if (FBUtilities.getBroadcastAddressAndPort().equals(inetAddressAndPort)) {
            reportEndpointVersion(inetAddressAndPort, this.schemaVersion.get());
            return;
        }
        EndpointState endpointStateForEndpoint = this.gossiper.getEndpointStateForEndpoint(inetAddressAndPort);
        if (endpointStateForEndpoint == null || (schemaVersion = endpointStateForEndpoint.getSchemaVersion()) == null) {
            return;
        }
        reportEndpointVersion(inetAddressAndPort, schemaVersion);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void reset() {
        logger.info("Resetting migration coordinator...");
        this.endpointVersions.clear();
        clearVersionsInfo();
        this.gossiper.getLiveMembers().forEach(this::reportCurrentSchemaVersionOnEndpoint);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void removeAndIgnoreEndpoint(InetAddressAndPort inetAddressAndPort) {
        logger.debug("Removing and ignoring endpoint {}", inetAddressAndPort);
        Preconditions.checkArgument(inetAddressAndPort != null);
        this.ignoredEndpoints.add(inetAddressAndPort);
        Iterator it = ImmutableSet.copyOf(this.versionInfo.keySet()).iterator();
        while (it.hasNext()) {
            removeEndpointFromVersion(inetAddressAndPort, (UUID) it.next());
        }
    }

    private Future<Void> scheduleSchemaPull(InetAddressAndPort inetAddressAndPort, VersionInfo versionInfo) {
        FutureTask futureTask = new FutureTask(() -> {
            pullSchema(inetAddressAndPort, new Callback(inetAddressAndPort, versionInfo));
        });
        if (shouldPullImmediately(inetAddressAndPort, versionInfo.version)) {
            logger.debug("Pulling {} immediately from {}", versionInfo, inetAddressAndPort);
            submitToMigrationIfNotShutdown(futureTask);
        } else {
            logger.debug("Postponing pull of {} from {} for {}ms", new Object[]{versionInfo, inetAddressAndPort, Integer.valueOf(MIGRATION_DELAY_IN_MS)});
            ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
                return submitToMigrationIfNotShutdown(futureTask);
            }, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
        }
        return futureTask;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void announce(UUID uuid) {
        if (this.gossiper.isEnabled()) {
            this.gossiper.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(uuid));
        }
        SchemaDiagnostics.versionAnnounced(Schema.instance);
    }

    private Future<?> submitToMigrationIfNotShutdown(Runnable runnable) {
        try {
            try {
                if (this.executor.isShutdown() || this.executor.isTerminated()) {
                    ImmediateFuture success = ImmediateFuture.success(null);
                    if (1 != 0) {
                        logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
                    }
                    return success;
                }
                Future<?> submit = this.executor.submit(runnable);
                if (0 != 0) {
                    logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
                }
                return submit;
            } catch (RejectedExecutionException e) {
                ImmediateFuture success2 = ImmediateFuture.success(null);
                if (1 != 0) {
                    logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
                }
                return success2;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                logger.info("Skipped scheduled pulling schema from other nodes: the MIGRATION executor service has been shutdown.");
            }
            throw th;
        }
    }

    private void pullSchema(InetAddressAndPort inetAddressAndPort, RequestCallback<Collection<Mutation>> requestCallback) {
        if (!this.gossiper.isAlive(inetAddressAndPort)) {
            logger.warn("Can't send schema pull request: node {} is down.", inetAddressAndPort);
            requestCallback.onFailure(inetAddressAndPort, RequestFailureReason.UNKNOWN);
        } else if (shouldPullFromEndpoint(inetAddressAndPort)) {
            logger.debug("Requesting schema from {}", inetAddressAndPort);
            sendMigrationMessage(inetAddressAndPort, requestCallback);
        } else {
            logger.info("Skipped sending a migration request: node {} has a higher major version now.", inetAddressAndPort);
            requestCallback.onFailure(inetAddressAndPort, RequestFailureReason.UNKNOWN);
        }
    }

    private void sendMigrationMessage(InetAddressAndPort inetAddressAndPort, RequestCallback<Collection<Mutation>> requestCallback) {
        Message out = Message.out(Verb.SCHEMA_PULL_REQ, NoPayload.noPayload);
        logger.info("Sending schema pull request to {}", inetAddressAndPort);
        this.messagingService.sendWithCallback(out, inetAddressAndPort, requestCallback);
    }

    private synchronized Future<Void> pullComplete(InetAddressAndPort inetAddressAndPort, VersionInfo versionInfo, boolean z) {
        if (z) {
            versionInfo.markReceived();
        }
        versionInfo.outstandingRequests.remove(inetAddressAndPort);
        versionInfo.requestQueue.add(inetAddressAndPort);
        return maybePullSchema(versionInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean awaitSchemaRequests(long j) {
        if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) {
            Gossiper.waitToSettle();
        }
        if (this.versionInfo.isEmpty()) {
            logger.debug("Nothing in versionInfo - so no schemas to wait for");
        }
        List list = null;
        try {
            synchronized (this) {
                ArrayList arrayList = new ArrayList(this.versionInfo.size());
                for (VersionInfo versionInfo : this.versionInfo.values()) {
                    if (!versionInfo.wasReceived()) {
                        arrayList.add(versionInfo.register());
                    }
                }
                if (arrayList.isEmpty()) {
                    if (arrayList != null) {
                        arrayList.forEach((v0) -> {
                            v0.cancel();
                        });
                    }
                    return true;
                }
                long nanoTime = Clock.Global.nanoTime() + TimeUnit.MILLISECONDS.toNanos(j);
                boolean allMatch = arrayList.stream().allMatch(signal -> {
                    return signal.awaitUntilUninterruptibly(nanoTime);
                });
                if (arrayList != null) {
                    arrayList.forEach((v0) -> {
                        v0.cancel();
                    });
                }
                return allMatch;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                list.forEach((v0) -> {
                    v0.cancel();
                });
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<Set<InetAddressAndPort>, Set<InetAddressAndPort>> pushSchemaMutations(Collection<Mutation> collection) {
        logger.debug("Pushing schema mutations: {}", collection);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Message out = Message.out(Verb.SCHEMA_PUSH_REQ, collection);
        for (InetAddressAndPort inetAddressAndPort : this.gossiper.getLiveMembers()) {
            if (shouldPushSchemaTo(inetAddressAndPort)) {
                logger.debug("Pushing schema mutations to {}: {}", inetAddressAndPort, collection);
                this.messagingService.send(out, inetAddressAndPort);
                hashSet.add(inetAddressAndPort);
            } else {
                hashSet2.add(inetAddressAndPort);
            }
        }
        return Pair.create(hashSet, hashSet2);
    }

    private boolean shouldPushSchemaTo(InetAddressAndPort inetAddressAndPort) {
        return !inetAddressAndPort.equals(FBUtilities.getBroadcastAddressAndPort()) && this.messagingService.versions.knows(inetAddressAndPort) && this.messagingService.versions.getRaw(inetAddressAndPort) == 12;
    }
}
