Skip to content

Commit

Permalink
[issue-717] Let Pravega internal thread pool deal with checkpoint (#718)
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Zhou <[email protected]>
  • Loading branch information
crazyzhou authored Oct 11, 2023
1 parent 1157b7b commit a55bfe1
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public CompletableFuture<Checkpoint> triggerCheckpoint(
final String checkpointName = createCheckpointName(checkpointId);

final CompletableFuture<Checkpoint> checkpointResult =
this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService)
this.readerGroup.initiateCheckpoint(checkpointName)
.exceptionally(e -> {
if (e instanceof MaxNumberOfCheckpointsExceededException) {
readerGroup.cancelOutstandingCheckpoints();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ public void testTriggerCheckpoint() throws Exception {
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();
TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);

when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);
CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());
verify(hook.readerGroup).initiateCheckpoint(anyString());

// complete the checkpoint promise
Checkpoint expectedCheckpoint = mock(Checkpoint.class);
Expand All @@ -85,11 +85,11 @@ public void testTriggerCheckpointTimeout() throws Exception {
CompletableFuture<Checkpoint> checkpointPromise = new CompletableFuture<>();

TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);
when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);

CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());
verify(hook.readerGroup).initiateCheckpoint(anyString());

// invoke the timeout callback
hook.invokeScheduledCallables();
Expand All @@ -104,11 +104,11 @@ public void testCancelWhenExceedingMaxOutstandingCheckpoints() throws Exception
checkpointPromise.completeExceptionally(new MaxNumberOfCheckpointsExceededException("test"));

TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig);
when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise);
when(hook.readerGroup.initiateCheckpoint(anyString())).thenReturn(checkpointPromise);

CompletableFuture<Checkpoint> checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor());
assertThat(checkpointFuture).isNotNull();
verify(hook.readerGroup).initiateCheckpoint(anyString(), any());
verify(hook.readerGroup).initiateCheckpoint(anyString());

// invoke the cancelOutstandingCheckpoints
verify(hook.readerGroup).cancelOutstandingCheckpoints();
Expand Down

0 comments on commit a55bfe1

Please sign in to comment.