diff --git a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs index 18a4b148..d25ab4c5 100644 --- a/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs +++ b/src/Core/src/Eventuous.Subscriptions/Checkpoints/CheckpointCommitHandler.cs @@ -35,16 +35,7 @@ public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitChe _subject .Buffer(TimeSpan.FromSeconds(5), batchSize) .Where(x => x.Count > 0) - .Select( - x => { - foreach (var position in x) { - _positions.Add(position); - } - - var next = GetCommitPosition(false); - return next; - } - ) + .Select(AddBatchAndGetLast) .Where(x => x.Valid) .Select(x => Observable.FromAsync(ct => CommitInternal1(x, ct))) .Concat() @@ -56,10 +47,16 @@ public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitChe ValueTask Process(CommitPosition position, CancellationToken cancellationToken) { _subject.OnNext(position); return default; - // _positions.Add(position); - // if (_positions.Count < batchSize) return; - // - // await CommitInternal(false, cancellationToken).NoContext(); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + CommitPosition AddBatchAndGetLast(IList list) { + foreach (var position in list) { + _positions.Add(position); + } + + var next = GetCommitPosition(false); + return next; } } @@ -112,37 +109,6 @@ await _commitCheckpoint( } } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - async ValueTask CommitInternal(bool force, CancellationToken cancellationToken) { - try { - switch (_lastCommit.Valid) { - // There's a gap between the last committed position and the list head - case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force: - // The list head is not at the very beginning - case false when _positions.Min.Sequence != 0: - return; - } - - var commitPosition = _positions.FirstBeforeGap(); - if (!commitPosition.Valid) return; - - await _commitCheckpoint( - new Checkpoint(_subscriptionId, commitPosition.Position), - force, - cancellationToken - ) - .NoContext(); - - _lastCommit = commitPosition; - - // Removing positions before and including the committed one - _positions.RemoveWhere(x => x.Sequence <= commitPosition.Sequence); - } - catch (Exception e) { - EventuousEventSource.Log.Warn("Error committing", e.ToString()); - } - } - public async ValueTask DisposeAsync() { Log.Stopping(nameof(CheckpointCommitHandler), "worker", ""); diff --git a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs index 3649f1cf..b7739a7c 100644 --- a/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs +++ b/src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs @@ -14,9 +14,35 @@ public class MongoCheckpointStore : ICheckpointStore { MongoCheckpointStore(IMongoDatabase database, MongoCheckpointStoreOptions options) { Checkpoints = Ensure.NotNull(database).GetCollection(options.CollectionName); - _batchSize = options.BatchSize; + _getSubject = GetSubject; + + Subject GetSubject() { + var subject = new Subject(); + + var observable = options switch { + { BatchSize: > 0, BatchIntervalSec: > 0 } => subject.Buffer( + TimeSpan.FromSeconds(options.BatchIntervalSec), + options.BatchSize + ), + { BatchSize: > 0, BatchIntervalSec: 0 } => subject.Buffer(options.BatchSize), + { BatchSize: 0, BatchIntervalSec: > 0 } => subject.Buffer( + TimeSpan.FromSeconds(options.BatchIntervalSec) + ), + _ => subject.Select(x => new List { x }) + }; + + observable + .Where(x => x.Count > 0) + .Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct))) + .Concat() + .Subscribe(); + + return subject; + } } + readonly Func> _getSubject; + public MongoCheckpointStore(IMongoDatabase database) : this(database, new MongoCheckpointStoreOptions()) { } public MongoCheckpointStore(IMongoDatabase database, IOptions options) @@ -35,23 +61,11 @@ public async ValueTask GetLastCheckpoint( Log.CheckpointLoaded(this, checkpoint); - // _counters[checkpointId] = 0; - - var subject = new Subject(); - - subject - .Buffer(TimeSpan.FromSeconds(5), _batchSize > 0 ? _batchSize : 1) - .Where(x => x.Count > 0) - .Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct))) - .Concat() - .Subscribe(); - _subjects[checkpointId] = subject; + _subjects[checkpointId] = _getSubject(); return checkpoint; } - // readonly ConcurrentDictionary _counters = new(); - readonly Dictionary> _subjects = new(); public async ValueTask StoreCheckpoint( @@ -59,24 +73,11 @@ public async ValueTask StoreCheckpoint( bool force, CancellationToken cancellationToken = default ) { - // _counters[checkpoint.Id]++; - // if (!force && _counters[checkpoint.Id] < _batchSize) return checkpoint; if (force) { await StoreInternal(checkpoint, cancellationToken).NoContext(); return checkpoint; } - // await Checkpoints.ReplaceOneAsync( - // x => x.Id == checkpoint.Id, - // checkpoint, - // MongoDefaults.DefaultReplaceOptions, - // cancellationToken - // ) - // .NoContext(); - // - // _counters[checkpoint.Id] = 0; - - // Log.CheckpointStored(this, checkpoint); _subjects[checkpoint.Id].OnNext(checkpoint); return checkpoint; @@ -97,6 +98,7 @@ await Checkpoints.ReplaceOneAsync( [PublicAPI] public record MongoCheckpointStoreOptions { - public string CollectionName { get; init; } = "checkpoint"; - public int BatchSize { get; init; } + public string CollectionName { get; init; } = "checkpoint"; + public int BatchSize { get; init; } = 1; + public int BatchIntervalSec { get; init; } = 5; }