Skip to content

Commit

Permalink
Cleanup mongo store
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 18, 2022
1 parent a7f94fa commit 4adbae2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,7 @@ public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitChe
_subject
.Buffer(TimeSpan.FromSeconds(5), batchSize)
.Where(x => x.Count > 0)
.Select(
x => {
foreach (var position in x) {
_positions.Add(position);
}
var next = GetCommitPosition(false);
return next;
}
)
.Select(AddBatchAndGetLast)
.Where(x => x.Valid)
.Select(x => Observable.FromAsync(ct => CommitInternal1(x, ct)))
.Concat()
Expand All @@ -56,10 +47,16 @@ public CheckpointCommitHandler(string subscriptionId, CommitCheckpoint commitChe
ValueTask Process(CommitPosition position, CancellationToken cancellationToken) {
_subject.OnNext(position);
return default;
// _positions.Add(position);
// if (_positions.Count < batchSize) return;
//
// await CommitInternal(false, cancellationToken).NoContext();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
CommitPosition AddBatchAndGetLast(IList<CommitPosition> list) {
foreach (var position in list) {
_positions.Add(position);
}

var next = GetCommitPosition(false);
return next;
}
}

Expand Down Expand Up @@ -112,37 +109,6 @@ await _commitCheckpoint(
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
async ValueTask CommitInternal(bool force, CancellationToken cancellationToken) {
try {
switch (_lastCommit.Valid) {
// There's a gap between the last committed position and the list head
case true when _lastCommit.Sequence + 1 != _positions.Min.Sequence && !force:
// The list head is not at the very beginning
case false when _positions.Min.Sequence != 0:
return;
}

var commitPosition = _positions.FirstBeforeGap();
if (!commitPosition.Valid) return;

await _commitCheckpoint(
new Checkpoint(_subscriptionId, commitPosition.Position),
force,
cancellationToken
)
.NoContext();

_lastCommit = commitPosition;

// Removing positions before and including the committed one
_positions.RemoveWhere(x => x.Sequence <= commitPosition.Sequence);
}
catch (Exception e) {
EventuousEventSource.Log.Warn("Error committing", e.ToString());
}
}

public async ValueTask DisposeAsync() {
Log.Stopping(nameof(CheckpointCommitHandler), "worker", "");

Expand Down
60 changes: 31 additions & 29 deletions src/Mongo/src/Eventuous.Projections.MongoDB/MongoCheckpointStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,35 @@ public class MongoCheckpointStore : ICheckpointStore {

MongoCheckpointStore(IMongoDatabase database, MongoCheckpointStoreOptions options) {
Checkpoints = Ensure.NotNull(database).GetCollection<Checkpoint>(options.CollectionName);
_batchSize = options.BatchSize;
_getSubject = GetSubject;

Subject<Checkpoint> GetSubject() {
var subject = new Subject<Checkpoint>();

var observable = options switch {
{ BatchSize: > 0, BatchIntervalSec: > 0 } => subject.Buffer(
TimeSpan.FromSeconds(options.BatchIntervalSec),
options.BatchSize
),
{ BatchSize: > 0, BatchIntervalSec: 0 } => subject.Buffer(options.BatchSize),
{ BatchSize: 0, BatchIntervalSec: > 0 } => subject.Buffer(
TimeSpan.FromSeconds(options.BatchIntervalSec)
),
_ => subject.Select(x => new List<Checkpoint> { x })
};

observable
.Where(x => x.Count > 0)
.Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct)))
.Concat()
.Subscribe();

return subject;
}
}

readonly Func<Subject<Checkpoint>> _getSubject;

public MongoCheckpointStore(IMongoDatabase database) : this(database, new MongoCheckpointStoreOptions()) { }

public MongoCheckpointStore(IMongoDatabase database, IOptions<MongoCheckpointStoreOptions> options)
Expand All @@ -35,48 +61,23 @@ public async ValueTask<Checkpoint> GetLastCheckpoint(

Log.CheckpointLoaded(this, checkpoint);

// _counters[checkpointId] = 0;

var subject = new Subject<Checkpoint>();

subject
.Buffer(TimeSpan.FromSeconds(5), _batchSize > 0 ? _batchSize : 1)
.Where(x => x.Count > 0)
.Select(x => Observable.FromAsync(ct => StoreInternal(x.Last(), ct)))
.Concat()
.Subscribe();
_subjects[checkpointId] = subject;
_subjects[checkpointId] = _getSubject();

return checkpoint;
}

// readonly ConcurrentDictionary<string, int> _counters = new();

readonly Dictionary<string, Subject<Checkpoint>> _subjects = new();

public async ValueTask<Checkpoint> StoreCheckpoint(
Checkpoint checkpoint,
bool force,
CancellationToken cancellationToken = default
) {
// _counters[checkpoint.Id]++;
// if (!force && _counters[checkpoint.Id] < _batchSize) return checkpoint;
if (force) {
await StoreInternal(checkpoint, cancellationToken).NoContext();
return checkpoint;
}

// await Checkpoints.ReplaceOneAsync(
// x => x.Id == checkpoint.Id,
// checkpoint,
// MongoDefaults.DefaultReplaceOptions,
// cancellationToken
// )
// .NoContext();
//
// _counters[checkpoint.Id] = 0;

// Log.CheckpointStored(this, checkpoint);
_subjects[checkpoint.Id].OnNext(checkpoint);

return checkpoint;
Expand All @@ -97,6 +98,7 @@ await Checkpoints.ReplaceOneAsync(

[PublicAPI]
public record MongoCheckpointStoreOptions {
public string CollectionName { get; init; } = "checkpoint";
public int BatchSize { get; init; }
public string CollectionName { get; init; } = "checkpoint";
public int BatchSize { get; init; } = 1;
public int BatchIntervalSec { get; init; } = 5;
}

0 comments on commit 4adbae2

Please sign in to comment.