package org.apache.cassandra.db.streaming;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/streaming/CassandraStreamReceiver.class */
public class CassandraStreamReceiver implements StreamReceiver {
    private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReceiver.class);
    private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100).intValue();
    private final ColumnFamilyStore cfs;
    private final StreamSession session;
    private final LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
    protected Collection<SSTableReader> sstables;
    private final boolean requiresWritePath;

    public CassandraStreamReceiver(ColumnFamilyStore columnFamilyStore, StreamSession streamSession, int i) {
        this.cfs = columnFamilyStore;
        this.session = streamSession;
        this.sstables = new ArrayList(i);
        this.requiresWritePath = requiresWritePath(columnFamilyStore);
    }

    public static CassandraStreamReceiver fromReceiver(StreamReceiver streamReceiver) {
        Preconditions.checkArgument(streamReceiver instanceof CassandraStreamReceiver);
        return (CassandraStreamReceiver) streamReceiver;
    }

    private static CassandraIncomingFile getFile(IncomingStream incomingStream) {
        Preconditions.checkArgument(incomingStream instanceof CassandraIncomingFile, "Wrong stream type: {}", incomingStream);
        return (CassandraIncomingFile) incomingStream;
    }

    @Override // org.apache.cassandra.streaming.StreamReceiver
    public synchronized void received(IncomingStream incomingStream) {
        Collection<SSTableReader> collection = null;
        SSTableMultiWriter sSTable = getFile(incomingStream).getSSTable();
        try {
            collection = sSTable.finish(true);
        } catch (Throwable th) {
            Throwables.maybeFail(sSTable.abort(th));
        }
        this.txn.update(collection, false);
        this.sstables.addAll(collection);
    }

    @Override // org.apache.cassandra.streaming.StreamReceiver
    public void discardStream(IncomingStream incomingStream) {
        Throwables.maybeFail(getFile(incomingStream).getSSTable().abort(null));
    }

    public synchronized LifecycleNewTracker createLifecycleNewTracker() {
        return new LifecycleNewTracker() { // from class: org.apache.cassandra.db.streaming.CassandraStreamReceiver.1
            @Override // org.apache.cassandra.db.lifecycle.LifecycleNewTracker
            public void trackNew(SSTable sSTable) {
                synchronized (CassandraStreamReceiver.this) {
                    CassandraStreamReceiver.this.txn.trackNew(sSTable);
                }
            }

            @Override // org.apache.cassandra.db.lifecycle.LifecycleNewTracker
            public void untrackNew(SSTable sSTable) {
                synchronized (CassandraStreamReceiver.this) {
                    CassandraStreamReceiver.this.txn.untrackNew(sSTable);
                }
            }

            @Override // org.apache.cassandra.db.lifecycle.LifecycleNewTracker
            public OperationType opType() {
                return CassandraStreamReceiver.this.txn.opType();
            }
        };
    }

    @Override // org.apache.cassandra.streaming.StreamReceiver
    public synchronized void abort() {
        this.sstables.clear();
        this.txn.abort();
    }

    private boolean hasViews(ColumnFamilyStore columnFamilyStore) {
        return !Iterables.isEmpty(View.findAll(columnFamilyStore.metadata.keyspace, columnFamilyStore.getTableName()));
    }

    private boolean hasCDC(ColumnFamilyStore columnFamilyStore) {
        return columnFamilyStore.metadata().params.cdc;
    }

    private boolean cdcRequiresWriteCommitLog(ColumnFamilyStore columnFamilyStore) {
        return DatabaseDescriptor.isCDCOnRepairEnabled() && hasCDC(columnFamilyStore);
    }

    private boolean requiresWritePath(ColumnFamilyStore columnFamilyStore) {
        return cdcRequiresWriteCommitLog(columnFamilyStore) || columnFamilyStore.streamToMemtable() || (this.session.streamOperation().requiresViewBuild() && hasViews(columnFamilyStore));
    }

    private void sendThroughWritePath(ColumnFamilyStore columnFamilyStore, Collection<SSTableReader> collection) {
        boolean cdcRequiresWriteCommitLog = cdcRequiresWriteCommitLog(columnFamilyStore);
        ColumnFilter all = ColumnFilter.all(columnFamilyStore.metadata());
        for (SSTableReader sSTableReader : collection) {
            Keyspace open = Keyspace.open(sSTableReader.getKeyspaceName());
            ISSTableScanner scanner = sSTableReader.getScanner();
            try {
                CloseableIterator<UnfilteredRowIterator> throttle = ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH);
                while (throttle.hasNext()) {
                    try {
                        open.apply(new Mutation(PartitionUpdate.fromIterator(throttle.next(), all)), cdcRequiresWriteCommitLog, true, false);
                    } finally {
                    }
                }
                if (throttle != null) {
                    throttle.close();
                }
                if (scanner != null) {
                    scanner.close();
                }
            } catch (Throwable th) {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public synchronized void finishTransaction() {
        this.txn.finish();
    }

    @Override // org.apache.cassandra.streaming.StreamReceiver
    public void finished() {
        int invalidateCounterCache;
        int invalidateRowCache;
        boolean requiresWritePath = requiresWritePath(this.cfs);
        Collection<SSTableReader> collection = this.sstables;
        Refs ref = Refs.ref(collection);
        try {
            if (requiresWritePath) {
                sendThroughWritePath(this.cfs, collection);
            } else {
                finishTransaction();
                logger.debug("[Stream #{}] Received {} sstables from {} ({})", new Object[]{this.session.planId(), Integer.valueOf(collection.size()), this.session.peer, collection});
                this.cfs.addSSTables(collection);
                if (this.cfs.isRowCacheEnabled() || this.cfs.metadata().isCounter()) {
                    ArrayList arrayList = new ArrayList(collection.size());
                    collection.forEach(sSTableReader -> {
                        arrayList.add(new Bounds(sSTableReader.first.getToken(), sSTableReader.last.getToken()));
                    });
                    Set nonOverlappingBounds = Bounds.getNonOverlappingBounds(arrayList);
                    if (this.cfs.isRowCacheEnabled() && (invalidateRowCache = this.cfs.invalidateRowCache(nonOverlappingBounds)) > 0) {
                        logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream receive task completed.", new Object[]{this.session.planId(), Integer.valueOf(invalidateRowCache), this.cfs.keyspace.getName(), this.cfs.getTableName()});
                    }
                    if (this.cfs.metadata().isCounter() && (invalidateCounterCache = this.cfs.invalidateCounterCache(nonOverlappingBounds)) > 0) {
                        logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream receive task completed.", new Object[]{this.session.planId(), Integer.valueOf(invalidateCounterCache), this.cfs.keyspace.getName(), this.cfs.getTableName()});
                    }
                }
            }
            if (ref != null) {
                ref.close();
            }
        } catch (Throwable th) {
            if (ref != null) {
                try {
                    ref.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.cassandra.streaming.StreamReceiver
    public void cleanup() {
        if (this.requiresWritePath) {
            this.cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.STREAMS_RECEIVED);
            abort();
        }
    }
}
