package org.apache.cassandra.db.view;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/cassandra/db/view/ViewBuilder.class */
public class ViewBuilder {
    private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
    private static final int NUM_TASKS = Runtime.getRuntime().availableProcessors() * 4;
    private final ColumnFamilyStore baseCfs;
    private final View view;
    private final String ksName;
    private final Set<Range<Token>> builtRanges = Sets.newConcurrentHashSet();
    private final Map<Range<Token>, Pair<Token, Long>> pendingRanges = Maps.newConcurrentMap();
    private final Set<ViewBuilderTask> tasks = Sets.newConcurrentHashSet();
    private volatile long keysBuilt = 0;
    private volatile boolean isStopped = false;
    private volatile Future<?> future = ImmediateFuture.success(null);
    private final UUID localHostId = ClusterMetadata.current().myNodeId().toUUID();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ViewBuilder(ColumnFamilyStore columnFamilyStore, View view) {
        this.baseCfs = columnFamilyStore;
        this.view = view;
        this.ksName = columnFamilyStore.metadata.keyspace;
    }

    public void start() {
        if (SystemKeyspace.isViewBuilt(this.ksName, this.view.name)) {
            logger.debug("View already marked built for {}.{}", this.ksName, this.view.name);
            if (SystemKeyspace.isViewStatusReplicated(this.ksName, this.view.name)) {
                return;
            }
            updateDistributed();
            return;
        }
        SystemDistributedKeyspace.startViewBuild(this.ksName, this.view.name, this.localHostId);
        logger.debug("Starting build of view({}.{}). Flushing base table {}.{}", new Object[]{this.ksName, this.view.name, this.ksName, this.baseCfs.name});
        this.baseCfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.VIEW_BUILD_STARTED);
        loadStatusAndBuild();
    }

    private void loadStatusAndBuild() {
        loadStatus();
        build();
    }

    private void loadStatus() {
        this.builtRanges.clear();
        this.pendingRanges.clear();
        SystemKeyspace.getViewBuildStatus(this.ksName, this.view.name).forEach((range, pair) -> {
            Token token = (Token) pair.left;
            if (token == null || !token.equals(range.right)) {
                this.pendingRanges.put(range, pair);
            } else {
                this.builtRanges.add(range);
                this.keysBuilt += ((Long) pair.right).longValue();
            }
        });
    }

    private synchronized void build() {
        if (this.isStopped) {
            logger.debug("Stopped build for view({}.{}) after covering {} keys", new Object[]{this.ksName, this.view.name, Long.valueOf(this.keysBuilt)});
            return;
        }
        RangesAtEndpoint localReplicas = StorageService.instance.getLocalReplicas(this.ksName);
        Replicas.temporaryAssertFull(localReplicas);
        Set set = (Set) localReplicas.ranges().stream().map(range -> {
            return range.subtractAll(this.builtRanges);
        }).flatMap((v0) -> {
            return v0.stream();
        }).map(range2 -> {
            return range2.subtractAll(this.pendingRanges.keySet());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        if (set.isEmpty() && this.pendingRanges.isEmpty()) {
            finish();
            return;
        }
        ((Set) DatabaseDescriptor.getPartitioner().splitter().map(splitter -> {
            return splitter.split(set, NUM_TASKS);
        }).orElse(set)).forEach(range3 -> {
            this.pendingRanges.put(range3, Pair.create(null, 0L));
        });
        Stream<R> map = this.pendingRanges.entrySet().stream().map(entry -> {
            return new ViewBuilderTask(this.baseCfs, this.view, (Range) entry.getKey(), (Token) ((Pair) entry.getValue()).left, ((Long) ((Pair) entry.getValue()).right).longValue());
        });
        Set<ViewBuilderTask> set2 = this.tasks;
        Objects.requireNonNull(set2);
        Stream peek = map.peek((v1) -> {
            r1.add(v1);
        });
        CompactionManager compactionManager = CompactionManager.instance;
        Objects.requireNonNull(compactionManager);
        Future<?> allOf = FutureCombiner.allOf((List) peek.map(compactionManager::submitViewBuilder).collect(Collectors.toList()));
        allOf.addCallback(new FutureCallback<List<Long>>() { // from class: org.apache.cassandra.db.view.ViewBuilder.1
            public void onSuccess(List<Long> list) {
                ViewBuilder.this.keysBuilt += list.stream().mapToLong(l -> {
                    return l.longValue();
                }).sum();
                ViewBuilder.this.builtRanges.addAll(ViewBuilder.this.pendingRanges.keySet());
                ViewBuilder.this.pendingRanges.clear();
                ViewBuilder.this.build();
            }

            public void onFailure(Throwable th) {
                if (!(th instanceof CompactionInterruptedException)) {
                    ScheduledExecutors.nonPeriodicTasks.schedule(() -> {
                        ViewBuilder.this.loadStatusAndBuild();
                    }, 5L, TimeUnit.MINUTES);
                    ViewBuilder.logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", th);
                } else {
                    ViewBuilder.this.internalStop(true);
                    ViewBuilder.this.keysBuilt = ViewBuilder.this.tasks.stream().mapToLong((v0) -> {
                        return v0.keysBuilt();
                    }).sum();
                    ViewBuilder.logger.info("Interrupted build for view({}.{}) after covering {} keys", new Object[]{ViewBuilder.this.ksName, ViewBuilder.this.view.name, Long.valueOf(ViewBuilder.this.keysBuilt)});
                }
            }
        });
        this.future = allOf;
    }

    private void finish() {
        logger.debug("Marking view({}.{}) as built after covering {} keys ", new Object[]{this.ksName, this.view.name, Long.valueOf(this.keysBuilt)});
        SystemKeyspace.finishViewBuildStatus(this.ksName, this.view.name);
        updateDistributed();
    }

    private void updateDistributed() {
        try {
            SystemDistributedKeyspace.successfulViewBuild(this.ksName, this.view.name, this.localHostId);
            SystemKeyspace.setViewBuiltReplicated(this.ksName, this.view.name);
        } catch (Exception e) {
            ScheduledExecutors.nonPeriodicTasks.schedule(this::updateDistributed, 5L, TimeUnit.MINUTES);
            logger.warn("Failed to update the distributed status of view, sleeping 5 minutes before retrying", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        boolean z;
        synchronized (this) {
            z = this.isStopped;
            internalStop(false);
        }
        if (z) {
            return;
        }
        FBUtilities.waitOnFuture(this.future);
    }

    private void internalStop(boolean z) {
        this.isStopped = true;
        this.tasks.forEach(viewBuilderTask -> {
            viewBuilderTask.stop(z);
        });
    }
}
