Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STAR-1872: Parallelize UCS compactions per output shard #1342

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

blambov
Copy link

@blambov blambov commented Oct 9, 2024

This splits compactions that are to produce more than one
output sstable into tasks that can execute in parallel.
Such tasks share a transaction and have combined progress
and observer. Because we cannot mark parts of an sstable
as unneeded, the transaction is only applied when all
tasks have succeeded. This also means that early open
is not supported for such tasks.

The parallelization also takes into account thread reservations,
reducing the parallelism to the number of available threads
for its level. The new functionality is turned on by default.

Major compactions will apply the same mechanism to
parallelize the operation. They will only split on pre-
existing boundary points if they are also boundary
points for the current UCS configuration. This is done
to ensure that major compactions can re-shard data when
the configuration is changed. If pre-existing boundaries
match the current state, a major compaction will still be
broken into multiple operations to reduce the space
overhead of the operation.

Also:

  • Introduces a parallelism parameter to major compactions
    (nodetool compact -j <threads>, defaulting to half the
    compaction threads) to avoid stopping all other compaction
    for the duration.

  • Changes SSTable expiration to be done in a separate
    getNextBackgroundCompactions round to improve the
    efficiency of expiration (separate task can run quickly
    and remove the relevant sstables without waiting for
    a compaction to end).

  • Applies small-partition-count correction in
    ShardManager.calculateCombinedDensity.

return tasks;
}

private <T> List<T> splitSSTablesInShards(Collection<SSTableReader> sstables,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making this method static and writing specific unit tests to cover all of the cases?

@blambov
Copy link
Author

blambov commented Oct 16, 2024

The PR is not yet ready for review.

@blambov blambov force-pushed the STAR-1872 branch 3 times, most recently from 6cc862f to b6295c0 Compare October 29, 2024 12:38
@blambov
Copy link
Author

blambov commented Oct 29, 2024

The patch is now ready for review.

Copy link

sonarcloud bot commented Nov 8, 2024

@cassci-bot
Copy link

❌ Build ds-cassandra-pr-gate/PR-1342 rejected by Butler


13 new test failure(s) in 16 builds
See build details here


Found 13 new test failures

Test Explanation Branch history Upstream history
...47,483,647 Modifier 0.5 Levels 4 Compactors 15] flaky 🔵🔴🔵
...,147,483,647 Modifier 1 Levels 3 Compactors 30] flaky 🔵🔴🔵
...,147,483,647 Modifier 1 Levels 4 Compactors 15] flaky 🔵🔴🔵
...oadCommitLogAndSSTablesWithDroppedColumnTestDSE regression 🔴🔴🔵🔵 🔵🔵🔵🔵🔵🔵🔵
...ToolEnableDisableBinaryTest.testMaybeChangeDocs flaky 🔵🔴🔵🔵🔵🔵🔴 🔵🔵🔵🔵🔵🔵🔵
...positePartitionKeyDataModel{primaryKey=p1, p2}] regression 🔴🔴🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...positePartitionKeyDataModel{primaryKey=p1, p2}] failing 🔴🔴🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...positePartitionKeyDataModel{primaryKey=p1, p2}] regression 🔴🔵🔵🔵 🔵🔵🔵🔵🔵🔵🔵
...positePartitionKeyDataModel{primaryKey=p1, p2}] regression 🔴🔴🔵🔵🔴🔵🔴 🔵🔵🔵🔵🔵🔵🔵
...positePartitionKeyDataModel{primaryKey=p1, p2}] failing 🔴🔴🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...i.s.c.VectorSiftSmallTest.testMultiSegmentBuild failing 🔴🔴🔴🔴🔴🔴🔴 🔵🔵🔵🔵🔵🔵🔵
...st.testTTLOverwriteHasCorrectOnDiskRowCount[dc] regression 🔴🔵🔵🔵 🔵🔵🔵🔵🔵🔵🔵
o.a.c.u.b.BinLogTest.testTruncationReleasesLogS... flaky 🔵🔴🔵🔴🔴🔵🔵 🔵🔵🔵🔵🔵🔵🔵

Found 99 known test failures

This splits compactions that are to produce more than one
output sstable into tasks that can execute in parallel.
Such tasks share a transaction and have combined progress
and observer. Because we cannot mark parts of an sstable
as unneeded, the transaction is only applied when all
tasks have succeeded. This also means that early open
is not supported for such tasks.

At this time the new parallelization mechanism is not taken
into account by the thread allocation scheme, and thus
some levels may take more resources than they should.
Because of this limitation (which should be fixed in the
near future), the new behaviour is off by default.

Also:
- Adds a flag to combine non-overlapping sets in major
  compactions to reshard data, as major compactions can
  can now be executed as a parallelized operation.

- Changes SSTable expiration to be done in a separate
  getNextBackgroundCompactions round to improve the
  efficiency of expiration (separate task can run quickly
  and remove the relevant sstables without waiting for
  a compaction to end).

- Applies small-partition-count correction in
  ShardManager.calculateCombinedDensity.
Change parallelize_output_shards default to true.
Copy link

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code and left some smaller comments.
I am not very familiar with this code, but I cannot find anything wrong.
All of my previos comments have been addressed.

I am testing the patch on CNDB https://github.com/riptano/cndb/pull/11690 to see if there is there is something that breaks.

}
if (nonEmptyTasks > 1)
logger.info("Major compaction will not result in a single sstable - repaired and unrepaired data is kept separate and compaction runs per data_file_directory.");
logger.info("Major compaction will not result in a single sstable.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about adding a reference to the CFS name ? in a live system this log may happen for multiple tables

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I could see something from CFS being a valuable addition.

@@ -18,6 +18,8 @@

package org.apache.cassandra.db.compaction;

import java.util.ArrayList;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused new imports


import static org.mockito.Mockito.*;

/// Tests mostly written by Copilot.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this reference to Copilot ? (here and in otherplaces)
I am not sure about licensing issues

@@ -382,6 +385,7 @@ private CompletableFuture<Void> startTask(ColumnFamilyStore cfs, AbstractCompact
{
ongoingCompactions.decrementAndGet();
logger.debug("Background compaction task for {} was rejected", cfs);
task.rejected(ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we relying on this to call the observer with the error? I see a warning in my IDE because we're ignoring the resulting throwable. Do we want to return a failed future on the next line? It's minor, but a comment would make that clearer.

Comment on lines +54 to +59
public synchronized void addSubtask(CompactionProgress progress)
{
if (!sources.isEmpty())
assert sources.get(0).operationId() == progress.operationId();
sources.add(progress);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this method is synchronized, but none of the others are. How are we ensuring safe publication of the sources list after it is modified?

this.endPosition = position.position; // 0 if end is before our first key.
else
{
assert false : "Range " + tokenRange + " end is before last sstable token " + sstable.last.getToken() + " but no position was found";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this only assert false as opposed to always throwing an IllegalArgumentException?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to AssertionError, because this is a case of broken sstable invariants rather than illegal input.

transaction.finish();
transaction.prepareToCommit();
transaction.commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of making this change? It looks like finish() calls these methods by default but could be overridden, so after a quick glance, it's not clear that this is a safe replacement.

Comment on lines +1498 to +1501
if (lowerChunkStart < lastEnd) // if regions include the same chunk, count it only once
lowerChunkStart = lastEnd;
total += upperChunkEnd - lowerChunkStart;
lastEnd = upperChunkEnd;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Range.normalize ensures the ranges don't overlap. It seems like this code is being defensive in case that implementation changes. In that case, should we also add something like the following:

if (upperChunkEnd <= lastEnd)
    continue;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there is something about compression metadata that I'm not familiar with here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have two sections immediately (or close after) one another, they may fall in the same compression chunk. In that case the lower bound is its start and the upper its end. Without this adjustment the chunk would be counted twice.

Comment on lines +228 to +231
protected Set<SSTableReader> inputSSTables()
{
return transaction.originals();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that not all transaction.originals() references in this class were replaced by inputSSTables(). Is that intentional? It seems like it might only be in methods that are not overridden in subclasses, but I worry that this might be fragile, assuming I understand it correctly.


public SSTableReader current(SSTableReader reader)
{
return mainTransaction.current(reader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't wrap this one in a synchronized block? In looking at how it is implemented in LifecycleTransaction, it seems like current(reader) might not be thread safe.

Comment on lines +205 to +212
/**
* @return The token range that the operation should compact. This is usually null, but if we have a parallelizable
* multi-task operation (see {@link UnifiedCompactionStrategy#createAndAddTasks}), it will specify a subrange.
*/
protected Range<Token> tokenRange()
{
return null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the InclusionMethod enum become ignored when using parallelizable compaction? This comment indicates we only compact sstables within a given range, which seems like it might map to the NONE method, is that right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the range is generated by the ShardManager, which indicates that maybe this token range is independent of the InclusionMethod.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range is indeed independent of the InclusionMethod. The range here is a range inside the input sstables, and however we select these we still split the output at predetermined positions.

In other words the InclusionMethod is a feature of the compaction selection, while the range here is a feature of the output sharding. The two are generally independent (and should be, to be able to correctly act in case of changes in sharding or upgrade from a legacy strategy), even though the latter should work in a way that makes the former efficient.

Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am completed my initial review. It looks generally good to me, though I am not sure I fully comprehend the nuance in the UnifiedCompactionStrategy class. I left several minor comments.

public List<Future<?>> submitMaximal(final ColumnFamilyStore cfStore,
final int gcBefore,
boolean splitOutput,
Integer parallelism,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in callers, I feel an OptionalInt would be clearer/less error prone.

@@ -930,9 +950,15 @@ protected void runMayThrow()
Future<?> fut = executor.submitIfRunning(runnable, "maximal task");
if (!fut.isCancelled())
futures.add(fut);
else
{
Throwable error = task.rejected(null);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of the code is that, because the argument is null, this will call the observers CompactionObserver#onCompleted method with isSuccess == true, which the javadoc says means "compaction finished without any exceptions". Given this case means the task was rejected (probably because of shutdown, but could theoretically be something else), that feels a bit dodgy to me. I'd have passed a RejectedExecutionException or something.

*
* @param gcBefore throw away tombstones older than this
* @param permittedParallelism
* @param reshard
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: that reshard param does not exists (but the splitOutput does and isn't listed). Of course, listing parameters without actual documentation is of limited interested in the first place :).

///
/// Subtasks may start and add themselves in any order. There may also be periods of time when all started tasks have
/// completed but there are new ones to still initiate. Because of this all parameters returned by this progress may
/// increase over time, including the total sizes and sstable lists.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a non-negligible issue at least for correct progress tracking. My understanding is that in practice the tasks that share on such object will execute completely sequentially, so if you wanted to get an idea of progress by looking at completed() / total(), then you will get something fairly off due to total() essentially lying to you.

I guess my question is, can't we register all the tasks first at the beginning (we create all tasks upfront anyway, and it's not like addSubtask triggers any action by itself), instead of adding task only when they start?

else
return getNextBackgroundTasks(getNextCompactionAggregates(gcBefore), gcBefore);

// Always check for expired sstables (not just periodically) as expiration will save us unnecessary work.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find that comment (the "always check ... (not just periodically)") a tad contradictory with the implementation since getExpirationTasks start by giving up if we have check in less that some predefined "period". I'm not sure if the comment means something else and could be maybe rephrased, or ...?

if (committedOrAborted.get())
throw new IllegalStateException("Partial transaction already committed or aborted.");

throwIfAborted();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maybe rename to throwIfCompositeAborted; looks weird otherwise on reading as it looks like we already checked for aborted the previous line.

@blambov
Copy link
Author

blambov commented Nov 22, 2024

I need to port over some changes that came from the development of the OSS version.

I think it would be easier for all of us if we continue the review on the OSS version (CASSANDRA-18802 and CASSANDRA-20092) -- it is quite a bit simpler. We can then come back to the additions here, with any updates that come from that review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants