Skip to content

Commit

Permalink
Experiment: convert checkpoint handlers to observables
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 18, 2022
1 parent b475a11 commit a7f94fa
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Eventuous.Diagnostics;
Expand All @@ -20,20 +22,44 @@ public sealed class CheckpointCommitHandler : IAsyncDisposable {

static readonly DiagnosticSource Diagnostic = new DiagnosticListener(DiagnosticName);

readonly Subject<CommitPosition> _subject;

internal record CommitEvent(string Id, CommitPosition CommitPosition, CommitPosition? FirstPending);

public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitCheckpoint, int batchSize = 1) {
_subscriptionId = subscriptionId;
_commitCheckpoint = commitCheckpoint;
var channel = Channel.CreateBounded<CommitPosition>(batchSize * 1000);
_subject = new Subject<CommitPosition>();

_subject
.Buffer(TimeSpan.FromSeconds(5), batchSize)
.Where(x => x.Count > 0)
.Select(
x => {
foreach (var position in x) {
_positions.Add(position);
}
var next = GetCommitPosition(false);
return next;
}
)
.Where(x => x.Valid)
.Select(x => Observable.FromAsync(ct => CommitInternal1(x, ct)))
.Concat()
.Subscribe();

_worker = new ChannelWorker<CommitPosition>(channel, Process, true);

[MethodImpl(MethodImplOptions.AggressiveInlining)]
async ValueTask Process(CommitPosition position, CancellationToken cancellationToken) {
_positions.Add(position);
if (_positions.Count < batchSize) return;

await CommitInternal(false, cancellationToken).NoContext();
ValueTask Process(CommitPosition position, CancellationToken cancellationToken) {
_subject.OnNext(position);
return default;
// _positions.Add(position);
// if (_positions.Count < batchSize) return;
//
// await CommitInternal(false, cancellationToken).NoContext();
}
}

Expand All @@ -54,6 +80,38 @@ public ValueTask Commit(CommitPosition position, CancellationToken cancellationT
return _worker.Write(position, cancellationToken);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
CommitPosition GetCommitPosition(bool force) {
switch (_lastCommit.Valid) {
// There's a gap between the last committed position and the list head
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force:
// The list head is not at the very beginning
case false when _positions.Min.Sequence != 0:
return CommitPosition.None;
}

return _positions.FirstBeforeGap();
}

async Task CommitInternal1(CommitPosition position, CancellationToken cancellationToken) {
try {
await _commitCheckpoint(
new Checkpoint(_subscriptionId, position.Position),
false,
cancellationToken
)
.NoContext();

_lastCommit = position;

// Removing positions before and including the committed one
_positions.RemoveWhere(x => x.Sequence <= position.Sequence);
}
catch (Exception e) {
EventuousEventSource.Log.Warn("Error committing", e.ToString());
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
async ValueTask CommitInternal(bool force, CancellationToken cancellationToken) {
try {
Expand Down Expand Up @@ -87,7 +145,16 @@ await _commitCheckpoint(

public async ValueTask DisposeAsync() {
Log.Stopping(nameof(CheckpointCommitHandler), "worker", "");
await _worker.Stop(ct => CommitInternal(true, ct)).NoContext();

// await _worker.Stop(ct => CommitInternal(true, ct)).NoContext();
await _worker.Stop(
_ => {
_subject.Dispose();
return default;
}
)
.NoContext();

_positions.Clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class SubscriptionsEventSource : EventSource {
const int PartitionedFilterId = 26;
const int StoppingSomethingId = 27;
const int MetricCollectionFailedId = 28;

const int InfoId = 100;

[NonEvent]
public void MessageHandlingFailed(string handlerType, IBaseConsumeContext context, Exception? exception) {
Expand Down Expand Up @@ -252,4 +254,9 @@ public void SendingMessageToPartition(string subscriptionId, string messageType,
public void Stopping(string id, string what, string name) {
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) WriteEvent(StoppingSomethingId, id, what, name);
}

[Event(InfoId, Message = "{0} {1} {2}", Level = EventLevel.Informational)]
public void Info(string message, string? arg1 = null, string? arg2 = null) {
if (IsEnabled(EventLevel.Informational, EventKeywords.All)) WriteEvent(InfoId, message, arg1, arg2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<IncludeTestHost>true</IncludeTestHost>
<IncludeSutApp>true</IncludeSutApp>
<IncludeSutSubs>true</IncludeSutSubs>
<TargetFrameworks>net6.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="$(DiagRoot)\Eventuous.Diagnostics.OpenTelemetry\Eventuous.Diagnostics.OpenTelemetry.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ protected SubscriptionFixture(
CheckpointStore = new TestCheckpointStore();

_listener = new LoggingEventListener(loggerFactory);
var pipe = new ConsumePipe().AddDefaultConsumer(Handler);
var pipe = new ConsumePipe();
configurePipe?.Invoke(pipe);
pipe.AddDefaultConsumer(Handler);

Subscription = new StreamSubscription(
IntegrationFixture.Instance.Client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ public PublishAndSubscribeManyPartitionedTests(ITestOutputHelper output)
output,
new TestEventHandler(TimeSpan.FromMilliseconds(5)),
false,
pipe => pipe.AddFilterFirst(new PartitioningFilter(10, x => (x.Message as TestEvent)!.Data))
pipe => pipe.AddFilterLast(new PartitioningFilter(10, x => (x.Message as TestEvent)!.Data))
) { }

[Fact]
public async Task SubscribeAndProduceMany() {
const int count = 1000;
const int count = 10;

var testEvents = Enumerable.Range(1, count)
.Select(i => new TestEvent(Auto.Create<string>(), i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public async ValueTask<Checkpoint> GetLastCheckpoint(
var subject = new Subject<Checkpoint>();

subject
.Buffer(TimeSpan.FromSeconds(5), _batchSize)
.Select(x => Observable.FromAsync(() => StoreInternal(x.Last(), default)))
.Buffer(TimeSpan.FromSeconds(5), _batchSize > 0 ? _batchSize : 1)
.Where(x => x.Count > 0)
.Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct)))
.Concat()
.Subscribe();
_subjects[checkpointId] = subject;
Expand Down

0 comments on commit a7f94fa

Please sign in to comment.