From a7f94faf44307481c6af44a349f573c93de0a7aa Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Wed, 18 May 2022 21:36:04 +0200 Subject: [PATCH] Experiment: convert checkpoint handlers to observables --- .../Checkpoints/CheckpointCommitHandler.cs | 79 +++++++++++++++++-- .../Diagnostics/SubscriptionsEventSource.cs | 7 ++ .../Eventuous.Tests.EventStore.csproj | 1 + .../Fixtures/SubscriptionFixture.cs | 3 +- ...PublishAndSubscribeManyPartitionedTests.cs | 4 +- .../MongoCheckpointStore.cs | 5 +- 6 files changed, 88 insertions(+), 11 deletions(-) diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs index de5de35a..18a4b148 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs @@ -1,4 +1,6 @@ using System.Diagnostics; +using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Runtime.CompilerServices; using System.Threading.Channels; using Eventuous.Diagnostics; @@ -20,20 +22,44 @@ public sealed class CheckpointCommitHandler : IAsyncDisposable { static readonly DiagnosticSource Diagnostic = new DiagnosticListener(DiagnosticName); + readonly Subject _subject; + internal record CommitEvent(string Id, CommitPosition CommitPosition, CommitPosition? FirstPending); public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitCheckpoint, int batchSize = 1) { _subscriptionId = subscriptionId; _commitCheckpoint = commitCheckpoint; var channel = Channel.CreateBounded(batchSize * 1000); + _subject = new Subject(); + + _subject + .Buffer(TimeSpan.FromSeconds(5), batchSize) + .Where(x => x.Count > 0) + .Select( + x => { + foreach (var position in x) { + _positions.Add(position); + } + + var next = GetCommitPosition(false); + return next; + } + ) + .Where(x => x.Valid) + .Select(x => Observable.FromAsync(ct => CommitInternal1(x, ct))) + .Concat() + .Subscribe(); + _worker = new ChannelWorker(channel, Process, true); [MethodImpl(MethodImplOptions.AggressiveInlining)] - async ValueTask Process(CommitPosition position, CancellationToken cancellationToken) { - _positions.Add(position); - if (_positions.Count < batchSize) return; - - await CommitInternal(false, cancellationToken).NoContext(); + ValueTask Process(CommitPosition position, CancellationToken cancellationToken) { + _subject.OnNext(position); + return default; + // _positions.Add(position); + // if (_positions.Count < batchSize) return; + // + // await CommitInternal(false, cancellationToken).NoContext(); } } @@ -54,6 +80,38 @@ public ValueTask Commit(CommitPosition position, CancellationToken cancellationT return _worker.Write(position, cancellationToken); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + CommitPosition GetCommitPosition(bool force) { + switch (_lastCommit.Valid) { + // There's a gap between the last committed position and the list head + case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force: + // The list head is not at the very beginning + case false when _positions.Min.Sequence != 0: + return CommitPosition.None; + } + + return _positions.FirstBeforeGap(); + } + + async Task CommitInternal1(CommitPosition position, CancellationToken cancellationToken) { + try { + await _commitCheckpoint( + new Checkpoint(_subscriptionId, position.Position), + false, + cancellationToken + ) + .NoContext(); + + _lastCommit = position; + + // Removing positions before and including the committed one + _positions.RemoveWhere(x => x.Sequence <= position.Sequence); + } + catch (Exception e) { + EventuousEventSource.Log.Warn("Error committing", e.ToString()); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] async ValueTask CommitInternal(bool force, CancellationToken cancellationToken) { try { @@ -87,7 +145,16 @@ await _commitCheckpoint( public async ValueTask DisposeAsync() { Log.Stopping(nameof(CheckpointCommitHandler), "worker", ""); - await _worker.Stop(ct => CommitInternal(true, ct)).NoContext(); + + // await _worker.Stop(ct => CommitInternal(true, ct)).NoContext(); + await _worker.Stop( + _ => { + _subject.Dispose(); + return default; + } + ) + .NoContext(); + _positions.Clear(); } } diff --git a/src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionsEventSource.cs b/src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionsEventSource.cs index 2ef33dbf..576b288e 100644 --- a/src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionsEventSource.cs +++ b/src/Core/src/Eventuous.Subscriptions/Diagnostics/SubscriptionsEventSource.cs @@ -33,6 +33,8 @@ public class SubscriptionsEventSource : EventSource { const int PartitionedFilterId = 26; const int StoppingSomethingId = 27; const int MetricCollectionFailedId = 28; + + const int InfoId = 100; [NonEvent] public void MessageHandlingFailed(string handlerType, IBaseConsumeContext context, Exception? exception) { @@ -252,4 +254,9 @@ public void SendingMessageToPartition(string subscriptionId, string messageType, public void Stopping(string id, string what, string name) { if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) WriteEvent(StoppingSomethingId, id, what, name); } + + [Event(InfoId, Message = "{0} {1} {2}", Level = EventLevel.Informational)] + public void Info(string message, string? arg1 = null, string? arg2 = null) { + if (IsEnabled(EventLevel.Informational, EventKeywords.All)) WriteEvent(InfoId, message, arg1, arg2); + } } diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Eventuous.Tests.EventStore.csproj b/src/EventStore/test/Eventuous.Tests.EventStore/Eventuous.Tests.EventStore.csproj index 5aee9081..851da243 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Eventuous.Tests.EventStore.csproj +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Eventuous.Tests.EventStore.csproj @@ -4,6 +4,7 @@ true true true + net6.0 diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs index cec8b4ec..f90d6b56 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/Fixtures/SubscriptionFixture.cs @@ -38,8 +38,9 @@ protected SubscriptionFixture( CheckpointStore = new TestCheckpointStore(); _listener = new LoggingEventListener(loggerFactory); - var pipe = new ConsumePipe().AddDefaultConsumer(Handler); + var pipe = new ConsumePipe(); configurePipe?.Invoke(pipe); + pipe.AddDefaultConsumer(Handler); Subscription = new StreamSubscription( IntegrationFixture.Instance.Client, diff --git a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs index 417bcc52..29b66369 100644 --- a/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs +++ b/src/EventStore/test/Eventuous.Tests.EventStore/PublishAndSubscribeManyPartitionedTests.cs @@ -11,12 +11,12 @@ public PublishAndSubscribeManyPartitionedTests(ITestOutputHelper output) output, new TestEventHandler(TimeSpan.FromMilliseconds(5)), false, - pipe => pipe.AddFilterFirst(new PartitioningFilter(10, x => (x.Message as TestEvent)!.Data)) + pipe => pipe.AddFilterLast(new PartitioningFilter(10, x => (x.Message as TestEvent)!.Data)) ) { } [Fact] public async Task SubscribeAndProduceMany() { - const int count = 1000; + const int count = 10; var testEvents = Enumerable.Range(1, count) .Select(i => new TestEvent(Auto.Create(), i)) diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs index 932f7b73..3649f1cf 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs @@ -40,8 +40,9 @@ public async ValueTask GetLastCheckpoint( var subject = new Subject(); subject - .Buffer(TimeSpan.FromSeconds(5), _batchSize) - .Select(x => Observable.FromAsync(() => StoreInternal(x.Last(), default))) + .Buffer(TimeSpan.FromSeconds(5), _batchSize > 0 ? _batchSize : 1) + .Where(x => x.Count > 0) + .Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct))) .Concat() .Subscribe(); _subjects[checkpointId] = subject;