Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STAR-1872: Parallelize UCS compactions per output shard #1342

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,7 @@
<jvmarg value="-Dcassandra.tolerate_sstable_size=true"/>
<jvmarg value="-Dcassandra.skip_sync=true" />
<jvmarg value="-Dcassandra.allow_cursor_compaction=false" />
<jvmarg value="-Dunified_compaction.parallelize_output_shards=false" />
<jvmarg value="-Dlogback.configurationFile=file://${test.logback.configurationFile}"/>
</testmacrohelper>
</sequential>
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,7 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
}

Throwable accumulate = null;

for (SSTableMultiWriter writer : flushResults)
{
accumulate = writer.commit(accumulate);
Expand Down Expand Up @@ -2454,6 +2455,11 @@ public void forceMajorCompaction(boolean splitOutput)
CompactionManager.instance.performMaximal(this, splitOutput);
}

public void forceMajorCompaction(boolean splitOutput, Integer parallelism)
{
CompactionManager.instance.performMaximal(this, splitOutput, parallelism);
}

public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException
{
CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ public void shutdown()
}

/**
* @param gcBefore throw away tombstones older than this
*
* @param gcBefore throw away tombstones older than this
* @param permittedParallelism
* @return a compaction task that should be run to compact this columnfamilystore
* as much as possible. Null if nothing to do.
*
* <p>
* Is responsible for marking its sstables as compaction-pending.
*/
@Override
@SuppressWarnings("resource")
public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitOutput)
public synchronized CompactionTasks getMaximalTasks(int gcBefore, boolean splitOutput, int permittedParallelism)
{
Iterable<? extends CompactionSSTable> filteredSSTables = Iterables.filter(getSSTables(), sstable -> !sstable.isMarkedSuspect());
if (Iterables.isEmpty(filteredSSTables))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import com.google.common.base.Preconditions;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;

import static com.google.common.base.Throwables.propagate;

Expand All @@ -45,7 +43,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
CassandraRelevantProperties.COMPACTION_SKIP_REPAIR_STATE_CHECKING.getBoolean();

protected final CompactionRealm realm;
protected LifecycleTransaction transaction;
protected ILifecycleTransaction transaction;
protected boolean isUserDefined;
protected OperationType compactionType;
protected TableOperationObserver opObserver;
Expand All @@ -55,7 +53,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
* @param realm
* @param transaction the modifying managing the status of the sstables we're replacing
*/
protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction transaction)
protected AbstractCompactionTask(CompactionRealm realm, ILifecycleTransaction transaction)
{
this.realm = realm;
this.transaction = transaction;
Expand All @@ -66,10 +64,13 @@ protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction tra

try
{
// enforce contract that caller should mark sstables compacting
Set<SSTableReader> compacting = transaction.getCompacting();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
if (!transaction.isOffline())
{
// enforce contract that caller should mark sstables compacting
var compacting = realm.getCompactingSSTables();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
}

validateSSTables(transaction.originals());
}
Expand Down Expand Up @@ -120,18 +121,18 @@ private void validateSSTables(Set<SSTableReader> sstables)
* Executes the task after setting a new observer, normally the observer is the
* compaction manager metrics.
*/
public int execute(TableOperationObserver observer)
public void execute(TableOperationObserver observer)
{
return setOpObserver(observer).execute();
setOpObserver(observer).execute();
}

/** Executes the task */
public int execute()
public void execute()
{
Throwable t = null;
try
{
return executeInternal();
executeInternal();
}
catch (FSDiskFullWriteError e)
{
Expand All @@ -151,7 +152,12 @@ public int execute()
}
}

public Throwable cleanup(Throwable err)
public Throwable rejected(Throwable t)
{
return cleanup(t);
}

protected Throwable cleanup(Throwable err)
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
{
final boolean isSuccess = err == null;
for (CompactionObserver compObserver : compObservers)
Expand All @@ -160,22 +166,11 @@ public Throwable cleanup(Throwable err)
return Throwables.perform(err, () -> transaction.close());
}

public abstract CompactionAwareWriter getCompactionAwareWriter(CompactionRealm realm, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);

@VisibleForTesting
public LifecycleTransaction getTransaction()
protected void executeInternal()
{
return transaction;
}

@VisibleForTesting
public OperationType getCompactionType()
{
return compactionType;
run();
}

protected abstract int executeInternal();

// TODO Eventually these three setters should be passed in to the constructor.

public AbstractCompactionTask setUserDefined(boolean isUserDefined)
Expand Down Expand Up @@ -205,13 +200,13 @@ public void addObserver(CompactionObserver compObserver)
}

@VisibleForTesting
public List<CompactionObserver> getCompObservers()
List<CompactionObserver> getCompObservers()
{
return compObservers;
}

@VisibleForTesting
public LifecycleTransaction transaction()
ILifecycleTransaction getTransaction()
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
{
return transaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public boolean managesSSTable(CompactionSSTable sstable)

public abstract Collection<TasksSupplier> getBackgroundTaskSuppliers(int gcBefore);

public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput);
public abstract Collection<AbstractCompactionTask> getMaximalTasks(int gcBefore, boolean splitOutput, int permittedParallelism);

public abstract Collection<AbstractCompactionTask> getUserDefinedTasks(GroupedSSTableContainer<CompactionSSTable> sstables, int gcBefore);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,12 @@ CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collection<Ab
if (!compactionTasks.isEmpty())
{
logger.debug("Running compaction tasks: {}", compactionTasks);
return compactionTasks.stream()
.map(task -> startTask(cfs, task))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture<Void>[] arr = new CompletableFuture[compactionTasks.size()];
int index = 0;
for (AbstractCompactionTask task : compactionTasks)
arr[index++] = startTask(cfs, task);

return arr;
}
else
{
Expand Down Expand Up @@ -381,8 +384,9 @@ private CompletableFuture<Void> startTask(ColumnFamilyStore cfs, AbstractCompact
catch (RejectedExecutionException ex)
{
ongoingCompactions.decrementAndGet();
logger.debug("Background compaction task for {} was rejected", cfs);
return CompletableFuture.completedFuture(null);
logger.debug("Background compaction task for {} was rejected", cfs, ex);
Throwable t = task.rejected(null);
return t == null ? CompletableFuture.completedFuture(null) : CompletableFuture.failedFuture(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,13 @@ public static class UnifiedAggregate extends CompactionAggregate
/** The level generated by the compaction strategy */
private final UnifiedCompactionStrategy.Level level;

private UnifiedCompactionStrategy.ShardingStats shardingStats;

/** The maximum number of overlapping sstables in the level. */
private final int maxOverlap;

private int permittedParallelism;

UnifiedAggregate(Iterable<? extends CompactionSSTable> sstables,
int maxOverlap,
CompactionPick selected,
Expand All @@ -704,6 +708,17 @@ public UnifiedCompactionStrategy.Arena getArena()
return arena;
}

public void setShardingStats(UnifiedCompactionStrategy.ShardingStats shardingStats)
{
assert this.shardingStats == null;
this.shardingStats = shardingStats;
}

public UnifiedCompactionStrategy.ShardingStats getShardingStats()
{
return shardingStats;
}

@Override
public CompactionAggregateStatistics getStatistics()
{
Expand Down Expand Up @@ -792,6 +807,16 @@ public int hashCode()
{
return Objects.hash(sstables, selected, compactions, level, arena);
}

public void setPermittedParallelism(int parallelism)
{
this.permittedParallelism = parallelism;
}

public int getPermittedParallelism()
{
return permittedParallelism;
}
}

public static UnifiedAggregate createUnified(Collection<? extends CompactionSSTable> sstables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.*;
import java.util.function.LongPredicate;
import java.util.function.UnaryOperator;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -120,7 +121,7 @@ public Set<CompactionSSTable> getFullyExpiredSSTables()
{
if (overlapTracker == null)
return Collections.emptySet();
return getFullyExpiredSSTables(realm, compacting, overlapTracker.overlaps(), gcBefore, ignoreOverlaps());
return getFullyExpiredSSTables(realm, compacting, c -> overlapTracker.overlaps(), gcBefore, ignoreOverlaps());
}

/**
Expand All @@ -135,15 +136,15 @@ public Set<CompactionSSTable> getFullyExpiredSSTables()
*
* @param realm
* @param compacting we take the drop-candidates from this set, it is usually the sstables included in the compaction
* @param overlapping the sstables that overlap the ones in compacting.
* @param overlappingSupplier function used to get the sstables that overlap the ones in compacting.
* @param gcBefore
* @param ignoreOverlaps don't check if data shadows/overlaps any data in other sstables
* @return
*/
public static
Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
Iterable<? extends CompactionSSTable> compacting,
Iterable<? extends CompactionSSTable> overlapping,
UnaryOperator<Iterable<? extends CompactionSSTable>> overlappingSupplier,
int gcBefore,
boolean ignoreOverlaps)
{
Expand All @@ -158,6 +159,7 @@ Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
long minTimestamp;
if (!ignoreOverlaps)
{
var overlapping = overlappingSupplier.apply(compacting);
minTimestamp = Math.min(Math.min(minSurvivingTimestamp(overlapping, gcBefore),
minSurvivingTimestamp(compacting, gcBefore)),
minTimestamp(realm.getAllMemtables()));
Expand Down Expand Up @@ -215,10 +217,10 @@ private static long minSurvivingTimestamp(Iterable<? extends CompactionSSTable>
public static
Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
Iterable<? extends CompactionSSTable> compacting,
Iterable<? extends CompactionSSTable> overlapping,
UnaryOperator<Iterable<? extends CompactionSSTable>> overlappingSupplier,
int gcBefore)
{
return getFullyExpiredSSTables(realm, compacting, overlapping, gcBefore, false);
return getFullyExpiredSSTables(realm, compacting, overlappingSupplier, gcBefore, false);
}

/**
Expand Down
18 changes: 6 additions & 12 deletions src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.compaction.SortedStringTableCursor;
import org.apache.cassandra.io.sstable.compaction.IteratorFromCursor;
import org.apache.cassandra.io.sstable.compaction.PurgeCursor;
Expand Down Expand Up @@ -69,26 +71,23 @@ public class CompactionCursor implements SSTableCursorMerger.MergeListener, Auto
private final long[] mergedPartitionsHistogram;
private final long[] mergedRowsHistogram;

private final long totalCompressedSize;

@SuppressWarnings("resource")
public CompactionCursor(OperationType type, Collection<SSTableReader> readers, CompactionController controller, RateLimiter limiter, int nowInSec, UUID compactionId)
public CompactionCursor(OperationType type, Collection<SSTableReader> readers, Range<Token> tokenRange, CompactionController controller, RateLimiter limiter, int nowInSec, UUID compactionId)
{
this.controller = controller;
this.type = type;
this.compactionId = compactionId;
this.totalCompressedSize = readers.stream().mapToLong(SSTableReader::onDiskLength).sum();
this.mergedPartitionsHistogram = new long[readers.size()];
this.mergedRowsHistogram = new long[readers.size()];
this.rowBuilder = BTreeRow.sortedBuilder();
this.sstables = ImmutableSet.copyOf(readers);
this.cursor = makeMergedAndPurgedCursor(readers, controller, limiter, nowInSec);
this.cursor = makeMergedAndPurgedCursor(readers, tokenRange, controller, limiter, nowInSec);
this.totalBytes = cursor.bytesTotal();
this.currentBytes = 0;
this.currentProgressMillisSinceStartup = System.currentTimeMillis();
}

private SSTableCursor makeMergedAndPurgedCursor(Collection<SSTableReader> readers,
Range<Token> tokenRange,
CompactionController controller,
RateLimiter limiter,
int nowInSec)
Expand All @@ -97,7 +96,7 @@ private SSTableCursor makeMergedAndPurgedCursor(Collection<SSTableReader> reader
return SSTableCursor.empty();

SSTableCursor merged = new SSTableCursorMerger(readers.stream()
.map(r -> new SortedStringTableCursor(r, limiter))
.map(r -> new SortedStringTableCursor(r, tokenRange, limiter))
.collect(Collectors.toList()),
metadata(),
this);
Expand Down Expand Up @@ -247,11 +246,6 @@ long totalSourceRows()
return Arrays.stream(mergedRowsHistogram).reduce(0L, Long::sum);
}

public long getTotalCompressedSize()
{
return totalCompressedSize;
}

long[] mergedPartitionsHistogram()
{
return mergedPartitionsHistogram;
Expand Down
Loading