package org.apache.cassandra.tcm.log;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Startup;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.listeners.ChangeListener;
import org.apache.cassandra.tcm.listeners.ClientNotificationListener;
import org.apache.cassandra.tcm.listeners.InitializationListener;
import org.apache.cassandra.tcm.listeners.LegacyStateListener;
import org.apache.cassandra.tcm.listeners.LogListener;
import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener;
import org.apache.cassandra.tcm.listeners.PlacementsChangeListener;
import org.apache.cassandra.tcm.listeners.SchemaListener;
import org.apache.cassandra.tcm.listeners.UpgradeMigrationListener;
import org.apache.cassandra.tcm.log.Entry;
import org.apache.cassandra.tcm.transformations.ForceSnapshot;
import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
import org.apache.cassandra.utils.Closeable;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.Condition;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog.class */
public abstract class LocalLog implements Closeable {
    private static final Logger logger;
    protected final AtomicReference<ClusterMetadata> committed;
    private final AtomicBoolean replayComplete = new AtomicBoolean();
    protected final ConcurrentSkipListSet<Entry> pending = new ConcurrentSkipListSet<>((entry, entry2) -> {
        if (entry.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT && entry2.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT) {
            return entry2.epoch.compareTo(entry.epoch);
        }
        if (entry.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT) {
            return -1;
        }
        if (entry2.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT) {
            return 1;
        }
        return entry.epoch.compareTo(entry2.epoch);
    });
    protected final LogStorage storage;
    protected final Set<LogListener> listeners;
    protected final Set<ChangeListener> changeListeners;
    protected final Set<ChangeListener.Async> asyncChangeListeners;
    protected final LogSpec spec;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$Async.class */
    public static class Async extends LocalLog {
        private final AsyncRunnable runnable;
        private final Interruptible executor;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$Async$AsyncRunnable.class */
        public class AsyncRunnable implements Interruptible.Task {
            private final WaitQueue logNotifier = WaitQueue.newWaitQueue();
            private final AtomicReference<Condition> subscriber = new AtomicReference<>();

            private AsyncRunnable() {
            }

            @Override // org.apache.cassandra.concurrent.Interruptible.Task
            public void run(Interruptible.State state) throws InterruptedException {
                WaitQueue.Signal signal = null;
                try {
                    try {
                        try {
                            if (state != Interruptible.State.SHUTTING_DOWN) {
                                Condition andSet = this.subscriber.getAndSet(null);
                                signal = this.logNotifier.register();
                                Async.this.processPendingInternal();
                                if (andSet != null) {
                                    andSet.signalAll();
                                }
                                if (this.subscriber.get() == null) {
                                    signal.m1362await();
                                    signal = null;
                                }
                            }
                            if (signal != null) {
                                signal.cancel();
                            }
                        } catch (StopProcessingException e) {
                            LocalLog.logger.warn("Stopping log processing on the node... All subsequent epochs will be ignored.", e);
                            Async.this.executor.shutdown();
                            if (signal != null) {
                                signal.cancel();
                            }
                        }
                    } catch (InterruptedException e2) {
                        if (signal != null) {
                            signal.cancel();
                        }
                    } catch (Throwable th) {
                        LocalLog.logger.warn("Error in log follower", th);
                        if (signal != null) {
                            signal.cancel();
                        }
                    }
                } catch (Throwable th2) {
                    if (signal != null) {
                        signal.cancel();
                    }
                    throw th2;
                }
            }
        }

        /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$Async$AwaitCommit.class */
        private class AwaitCommit {
            private final Epoch waitingFor;

            private AwaitCommit(Epoch epoch) {
                this.waitingFor = epoch;
            }

            public ClusterMetadata get() throws InterruptedException, TimeoutException {
                return get(DatabaseDescriptor.getCmsAwaitTimeout());
            }

            public ClusterMetadata get(DurationSpec durationSpec) throws InterruptedException, TimeoutException {
                ClusterMetadata metadata = Async.this.metadata();
                while (!isCommitted(metadata)) {
                    Async.this.runOnce(durationSpec);
                    metadata = Async.this.metadata();
                    if (Async.this.executor.isTerminated() && !isCommitted(metadata)) {
                        throw new Interruptible.TerminateException();
                    }
                }
                return metadata;
            }

            private boolean isCommitted(ClusterMetadata clusterMetadata) {
                return clusterMetadata.epoch.isEqualOrAfter(this.waitingFor);
            }
        }

        private Async(LogSpec logSpec) {
            super(logSpec);
            this.runnable = new AsyncRunnable();
            this.executor = ExecutorFactory.Global.executorFactory().infiniteLoop("GlobalLogFollower", this.runnable, InfiniteLoopExecutor.SimulatorSafe.SAFE, InfiniteLoopExecutor.Daemon.NON_DAEMON, InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED);
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        public ClusterMetadata awaitAtLeast(Epoch epoch) throws InterruptedException, TimeoutException {
            ClusterMetadata clusterMetadata = this.committed.get();
            return clusterMetadata.epoch.compareTo(epoch) >= 0 ? clusterMetadata : new AwaitCommit(epoch).get();
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        public void runOnce(DurationSpec durationSpec) throws InterruptedException, TimeoutException {
            Condition newOneTimeCondition = Condition.newOneTimeCondition();
            for (int i = 0; i < 2; i++) {
                Condition condition = this.runnable.subscriber.get();
                if (condition != null) {
                    if (durationSpec == null) {
                        condition.m1361awaitUninterruptibly();
                    } else if (!condition.await(durationSpec.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS)) {
                        throw new TimeoutException(String.format("Timed out waiting for follower to run at least once. Pending is %s and current is now at epoch %s.", this.pending.stream().map(entry -> {
                            return entry.epoch;
                        }).collect(Collectors.toList()), metadata().epoch));
                    }
                }
                if (i == 1) {
                    return;
                }
                if (this.runnable.subscriber.compareAndSet(null, newOneTimeCondition)) {
                    this.runnable.logNotifier.signalAll();
                    newOneTimeCondition.m1361awaitUninterruptibly();
                    return;
                }
            }
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        void processPending() {
            this.runnable.logNotifier.signalAll();
        }

        @Override // org.apache.cassandra.utils.Closeable, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.executor.shutdownNow();
            Condition condition = this.runnable.subscriber.get();
            if (condition != null) {
                condition.signalAll();
            }
            this.runnable.logNotifier.signalAll();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LocalLog.logger.error(e.getMessage(), e);
            }
        }
    }

    /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$LogSpec.class */
    public static class LogSpec {
        private ClusterMetadata initial;
        private ClusterMetadata prev;
        private List<Startup.AfterReplay> afterReplay = Collections.emptyList();
        private LogStorage storage = LogStorage.None;
        private boolean async = true;
        private boolean defaultListeners = false;
        private boolean isReset = false;
        private boolean loadSSTables = true;
        private final Set<LogListener> listeners = new HashSet();
        private final Set<ChangeListener> changeListeners = new HashSet();
        private final Set<ChangeListener.Async> asyncChangeListeners = new HashSet();

        private LogSpec() {
        }

        public LogSpec sync() {
            this.async = false;
            return this;
        }

        public LogSpec async() {
            this.async = true;
            return this;
        }

        public LogSpec withDefaultListeners() {
            return withDefaultListeners(true);
        }

        public LogSpec loadSSTables(boolean z) {
            this.loadSSTables = z;
            return this;
        }

        public LogSpec withDefaultListeners(boolean z) {
            if (z && (!this.listeners.isEmpty() || !this.changeListeners.isEmpty() || !this.asyncChangeListeners.isEmpty())) {
                throw new IllegalStateException("LogSpec can only require all listeners OR specific listeners");
            }
            this.defaultListeners = z;
            return this;
        }

        public LogSpec withLogListener(LogListener logListener) {
            if (this.defaultListeners) {
                throw new IllegalStateException("LogSpec can only require all listeners OR specific listeners");
            }
            this.listeners.add(logListener);
            return this;
        }

        public LogSpec withListener(ChangeListener changeListener) {
            if (this.defaultListeners) {
                throw new IllegalStateException("LogSpec can only require all listeners OR specific listeners");
            }
            if (changeListener instanceof ChangeListener.Async) {
                this.asyncChangeListeners.add((ChangeListener.Async) changeListener);
            } else {
                this.changeListeners.add(changeListener);
            }
            return this;
        }

        public LogSpec isReset(boolean z) {
            this.isReset = z;
            return this;
        }

        public boolean isReset() {
            return this.isReset;
        }

        public LogStorage storage() {
            return this.storage;
        }

        public LogSpec withStorage(LogStorage logStorage) {
            this.storage = logStorage;
            return this;
        }

        public LogSpec afterReplay(Startup.AfterReplay... afterReplayArr) {
            this.afterReplay = Lists.newArrayList(afterReplayArr);
            return this;
        }

        public LogSpec withInitialState(ClusterMetadata clusterMetadata) {
            this.initial = clusterMetadata;
            return this;
        }

        public LogSpec withPreviousState(ClusterMetadata clusterMetadata) {
            this.prev = clusterMetadata;
            return this;
        }

        public final LocalLog createLog() {
            return this.async ? new Async(this) : new Sync(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$StopProcessingException.class */
    public static class StopProcessingException extends RuntimeException {
        private StopProcessingException() {
        }

        private StopProcessingException(Throwable th) {
            super(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/tcm/log/LocalLog$Sync.class */
    public static class Sync extends LocalLog {
        private Sync(LogSpec logSpec) {
            super(logSpec);
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        void runOnce(DurationSpec durationSpec) {
            processPendingInternal();
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        synchronized void processPending() {
            processPendingInternal();
        }

        @Override // org.apache.cassandra.tcm.log.LocalLog
        public ClusterMetadata awaitAtLeast(Epoch epoch) {
            processPending();
            if (metadata().epoch.isBefore(epoch)) {
                throw new IllegalStateException(String.format("Could not reach %s after replay. Highest epoch after replay: %s.", epoch, metadata().epoch));
            }
            return metadata();
        }

        @Override // org.apache.cassandra.utils.Closeable, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    public static LogSpec logSpec() {
        return new LogSpec();
    }

    private LocalLog(LogSpec logSpec) {
        this.spec = logSpec;
        if (this.spec.initial == null) {
            this.spec.initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner());
        }
        if (this.spec.prev == null) {
            this.spec.prev = new ClusterMetadata(this.spec.initial.partitioner);
        }
        if (!$assertionsDisabled && !this.spec.initial.epoch.is(Epoch.EMPTY) && !this.spec.initial.epoch.is(Epoch.UPGRADE_STARTUP) && !this.spec.isReset) {
            throw new AssertionError(String.format(String.format("Should start with empty epoch, unless we're in upgrade or reset mode: %s (isReset: %s)", this.spec.initial, Boolean.valueOf(this.spec.isReset)), new Object[0]));
        }
        this.committed = new AtomicReference<>(logSpec.initial);
        this.storage = logSpec.storage;
        this.listeners = Sets.newConcurrentHashSet();
        this.changeListeners = Sets.newConcurrentHashSet();
        this.asyncChangeListeners = Sets.newConcurrentHashSet();
    }

    public void bootstrap(InetAddressAndPort inetAddressAndPort) {
        ClusterMetadata metadata = metadata();
        if (!$assertionsDisabled && !metadata.epoch.isBefore(Epoch.FIRST)) {
            throw new AssertionError(String.format("Metadata epoch %s should be before first", metadata.epoch));
        }
        append(new Entry(Entry.Id.NONE, Epoch.FIRST, PreInitialize.withFirstCMS(inetAddressAndPort)));
        waitForHighestConsecutive();
        ClusterMetadata metadata2 = metadata();
        if (!$assertionsDisabled && !metadata2.epoch.is(Epoch.FIRST)) {
            throw new AssertionError(String.format("Epoch: %s. CMS: %s", metadata2.epoch, metadata2.fullCMSMembers()));
        }
    }

    public ClusterMetadata metadata() {
        return this.committed.get();
    }

    public boolean unsafeSetCommittedFromGossip(ClusterMetadata clusterMetadata, ClusterMetadata clusterMetadata2) {
        if (clusterMetadata.epoch.isEqualOrBefore(Epoch.UPGRADE_GOSSIP) && clusterMetadata2.epoch.is(Epoch.UPGRADE_GOSSIP)) {
            return this.committed.compareAndSet(clusterMetadata, clusterMetadata2);
        }
        throw new IllegalStateException(String.format("Illegal epochs for setting from gossip; expected: %s, updated: %s", clusterMetadata.epoch, clusterMetadata2.epoch));
    }

    public void unsafeSetCommittedFromGossip(ClusterMetadata clusterMetadata) {
        if (!clusterMetadata.epoch.is(Epoch.UPGRADE_GOSSIP)) {
            throw new IllegalStateException(String.format("Illegal epoch for setting from gossip; updated: %s", clusterMetadata.epoch));
        }
        this.committed.set(clusterMetadata);
    }

    public int pendingBufferSize() {
        return this.pending.size();
    }

    public boolean hasGaps() {
        Epoch epoch = this.committed.get().epoch;
        Iterator<Entry> it = this.pending.iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            if (!next.epoch.isDirectlyAfter(epoch)) {
                return true;
            }
            epoch = next.epoch;
        }
        return false;
    }

    public Optional<Epoch> highestPending() {
        try {
            return Optional.of(this.pending.last().epoch);
        } catch (NoSuchElementException e) {
            return Optional.empty();
        }
    }

    public LogState getCommittedEntries(Epoch epoch) {
        return this.storage.getLogState(epoch);
    }

    public ClusterMetadata waitForHighestConsecutive() {
        runOnce();
        return metadata();
    }

    public void append(Collection<Entry> collection) {
        if (collection.isEmpty()) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Appending entries to the pending buffer: {}", collection.stream().map(entry -> {
                return entry.epoch;
            }).collect(Collectors.toList()));
        }
        this.pending.addAll(collection);
        processPending();
    }

    public void append(Entry entry) {
        logger.debug("Appending entry to the pending buffer: {}", entry.epoch);
        this.pending.add(entry);
        processPending();
    }

    public void append(LogState logState) {
        logger.debug("Appending log state with snapshot to the pending buffer: {}", logState);
        if (logState.baseState != null) {
            this.pending.add(new Entry(Entry.Id.NONE, logState.baseState.epoch, new ForceSnapshot(logState.baseState)));
        }
        this.pending.addAll(logState.entries);
        processPending();
    }

    public abstract ClusterMetadata awaitAtLeast(Epoch epoch) throws InterruptedException, TimeoutException;

    void runOnce() {
        try {
            runOnce(null);
        } catch (InterruptedException | TimeoutException e) {
            throw new RuntimeException("Should not have happened, since we await uninterruptibly", e);
        }
    }

    abstract void runOnce(DurationSpec durationSpec) throws InterruptedException, TimeoutException;

    abstract void processPending();

    private Entry peek() {
        try {
            return this.pending.first();
        } catch (NoSuchElementException e) {
            return null;
        }
    }

    void processPendingInternal() {
        while (true) {
            Entry peek = peek();
            if (peek == null) {
                return;
            }
            ClusterMetadata clusterMetadata = this.committed.get();
            boolean z = peek.transform.kind() == Transformation.Kind.PRE_INITIALIZE_CMS;
            boolean z2 = peek.transform.kind() == Transformation.Kind.FORCE_SNAPSHOT;
            if (peek.epoch.isDirectlyAfter(clusterMetadata.epoch) || ((z || z2) && peek.epoch.isAfter(clusterMetadata.epoch))) {
                try {
                    try {
                        try {
                            Transformation.Result execute = peek.transform.execute(clusterMetadata);
                            if (!execute.isSuccess()) {
                                logger.error("Error while processing entry {}. Transformation returned result of {}. This can mean that this node is configured differently from CMS.", clusterMetadata, execute.rejected());
                                throw new StopProcessingException();
                            }
                            ClusterMetadata clusterMetadata2 = execute.success().metadata;
                            if (!$assertionsDisabled && !peek.epoch.is(clusterMetadata2.epoch)) {
                                throw new AssertionError(String.format("Entry epoch %s does not match metadata epoch %s", peek.epoch, clusterMetadata2.epoch));
                            }
                            if (!$assertionsDisabled && !clusterMetadata2.epoch.isDirectlyAfter(clusterMetadata.epoch) && !z2 && peek.transform.kind() != Transformation.Kind.PRE_INITIALIZE_CMS) {
                                throw new AssertionError(String.format("Epoch %s for %s can either force snapshot, or immediately follow %s", clusterMetadata2.epoch, peek.transform, clusterMetadata.epoch));
                            }
                            if (this.replayComplete.get() && peek.transform.kind() != Transformation.Kind.FORCE_SNAPSHOT) {
                                this.storage.append(peek.maybeUnwrapExecuted());
                            }
                            notifyPreCommit(clusterMetadata, clusterMetadata2, z2);
                            if (!this.committed.compareAndSet(clusterMetadata, clusterMetadata2)) {
                                throw new IllegalStateException(String.format("CAS conflict while trying to commit entry with seq %s, old version tail: %s current version tail: %s", clusterMetadata2.epoch, clusterMetadata.epoch, metadata().epoch));
                            }
                            logger.info("Enacted {}. New tail is {}", peek.transform, clusterMetadata2.epoch);
                            maybeNotifyListeners(peek, execute);
                            notifyPostCommit(clusterMetadata, clusterMetadata2, z2);
                            this.pending.remove(peek);
                        } catch (Throwable th) {
                            this.pending.remove(peek);
                            throw th;
                        }
                    } catch (Throwable th2) {
                        logger.error(String.format("Caught an exception while processing entry %s. This can mean that this node is configured differently from CMS.", clusterMetadata), th2);
                        throw new StopProcessingException(th2);
                    }
                } catch (StopProcessingException e) {
                    throw e;
                } catch (Throwable th3) {
                    JVMStabilityInspector.inspectThrowable(th3);
                    logger.error("Could not process the entry", th3);
                    this.pending.remove(peek);
                }
            } else if (!peek.epoch.isAfter(metadata().epoch)) {
                logger.debug(String.format("An already appended entry %s discovered in the pending buffer, ignoring. Max consecutive: %s", peek.epoch, clusterMetadata.epoch));
                this.pending.remove(peek);
            } else if (this.pending.first().epoch.is(peek.epoch)) {
                logger.debug("Smallest entry is non-consecutive {} to {}", peek.epoch, clusterMetadata.epoch);
                return;
            }
        }
    }

    private ClusterMetadata replayPersisted() {
        if (this.replayComplete.get()) {
            throw new IllegalStateException("Can only replay persisted once.");
        }
        append(this.storage.getPersistedLogState().flatten());
        return waitForHighestConsecutive();
    }

    private void maybeNotifyListeners(Entry entry, Transformation.Result result) {
        Iterator<LogListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().notify(entry, result);
        }
    }

    public void addListener(LogListener logListener) {
        this.listeners.add(logListener);
    }

    public void addListener(ChangeListener changeListener) {
        if (changeListener instanceof ChangeListener.Async) {
            this.asyncChangeListeners.add((ChangeListener.Async) changeListener);
        } else {
            this.changeListeners.add(changeListener);
        }
    }

    public void removeListener(ChangeListener changeListener) {
        this.changeListeners.remove(changeListener);
    }

    public void notifyListeners(ClusterMetadata clusterMetadata) {
        ClusterMetadata clusterMetadata2 = this.committed.get();
        logger.info("Notifying listeners, prev epoch = {}, current epoch = {}", clusterMetadata.epoch, clusterMetadata2.epoch);
        notifyPreCommit(clusterMetadata, clusterMetadata2, true);
        notifyPostCommit(clusterMetadata, clusterMetadata2, true);
    }

    private void notifyPreCommit(ClusterMetadata clusterMetadata, ClusterMetadata clusterMetadata2, boolean z) {
        Iterator<ChangeListener> it = this.changeListeners.iterator();
        while (it.hasNext()) {
            it.next().notifyPreCommit(clusterMetadata, clusterMetadata2, z);
        }
        for (ChangeListener.Async async : this.asyncChangeListeners) {
            ScheduledExecutors.optionalTasks.submit(() -> {
                async.notifyPreCommit(clusterMetadata, clusterMetadata2, z);
            });
        }
    }

    private void notifyPostCommit(ClusterMetadata clusterMetadata, ClusterMetadata clusterMetadata2, boolean z) {
        Iterator<ChangeListener> it = this.changeListeners.iterator();
        while (it.hasNext()) {
            it.next().notifyPostCommit(clusterMetadata, clusterMetadata2, z);
        }
        for (ChangeListener.Async async : this.asyncChangeListeners) {
            ScheduledExecutors.optionalTasks.submit(() -> {
                async.notifyPostCommit(clusterMetadata, clusterMetadata2, z);
            });
        }
    }

    @VisibleForTesting
    public final ClusterMetadata readyUnchecked() {
        try {
            return ready();
        } catch (StartupException e) {
            throw new RuntimeException(e);
        }
    }

    public ClusterMetadata ready() throws StartupException {
        ClusterMetadata replayPersisted = replayPersisted();
        Iterator<Startup.AfterReplay> it = this.spec.afterReplay.iterator();
        while (it.hasNext()) {
            it.next().accept(replayPersisted);
        }
        logger.info("Marking LocalLog ready at epoch {}", replayPersisted.epoch);
        if (!this.replayComplete.compareAndSet(false, true)) {
            throw new IllegalStateException("Log is already fully initialised");
        }
        logger.debug("Marking LocalLog ready at epoch {}", this.committed.get().epoch);
        if (this.spec.defaultListeners) {
            logger.info("Adding default listeners to LocalLog");
            addListeners();
        } else {
            logger.info("Adding specified listeners to LocalLog");
            this.spec.listeners.forEach(this::addListener);
            this.spec.changeListeners.forEach(this::addListener);
            this.spec.asyncChangeListeners.forEach((v1) -> {
                addListener(v1);
            });
        }
        logger.info("Notifying all registered listeners of both pre and post commit event");
        notifyListeners(this.spec.prev);
        return replayPersisted;
    }

    protected void addListeners() {
        this.listeners.clear();
        this.changeListeners.clear();
        this.asyncChangeListeners.clear();
        addListener(snapshotListener());
        addListener(new InitializationListener());
        addListener(new SchemaListener(this.spec.loadSSTables));
        addListener(new LegacyStateListener());
        addListener(new PlacementsChangeListener());
        addListener(new MetadataSnapshotListener());
        addListener(new ClientNotificationListener());
        addListener(new UpgradeMigrationListener());
    }

    private LogListener snapshotListener() {
        return (entry, result) -> {
            if (ClusterMetadataService.state() == ClusterMetadataService.State.LOCAL && entry.epoch.getEpoch() % DatabaseDescriptor.getMetadataSnapshotFrequency() == 0) {
                ArrayList arrayList = new ArrayList(ClusterMetadata.current().fullCMSMembers());
                arrayList.sort(Comparator.comparing(inetAddressAndPort -> {
                    return Byte.valueOf(inetAddressAndPort.addressBytes[inetAddressAndPort.addressBytes.length - 1]);
                }));
                if (((InetAddressAndPort) arrayList.get(0)).equals(FBUtilities.getBroadcastAddressAndPort())) {
                    ScheduledExecutors.nonPeriodicTasks.submit(() -> {
                        return ClusterMetadataService.instance().triggerSnapshot();
                    });
                }
            }
        };
    }

    static {
        $assertionsDisabled = !LocalLog.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(LocalLog.class);
    }
}
