Skip to content

Commit

Permalink
Allow using persistent subscription client (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev authored Jun 27, 2024
1 parent c475ab2 commit 45ef2cb
Show file tree
Hide file tree
Showing 30 changed files with 270 additions and 166 deletions.
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ dotnet_style_parentheses_in_relational_binary_operators = never_if_unnecessary:n
dotnet_style_predefined_type_for_locals_parameters_members = true:suggestion
dotnet_style_predefined_type_for_member_access = true:suggestion
dotnet_style_require_accessibility_modifiers = never:suggestion
object_creation_when_type_evident = target_typed
object_creation_when_type_not_evident = target_typed

resharper_max_attribute_length_for_same_line = 80
resharper_nested_ternary_style = expanded
Expand Down
9 changes: 6 additions & 3 deletions src/Core/src/Eventuous.Subscriptions/EventSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class EventSubscription<T> : IMessageSubscription, IAsyncDisposa
internal ConsumePipe Pipe { get; }
protected ILoggerFactory? LoggerFactory { get; }
protected LogContext Log { get; }
protected CancellationTokenSource Stopping { get; } = new();
protected CancellationTokenSource Stopping { get; set; } = new();

protected ulong Sequence;

Expand All @@ -47,11 +47,13 @@ protected EventSubscription(T options, ConsumePipe consumePipe, ILoggerFactory?
public string SubscriptionId => Options.SubscriptionId;

public async ValueTask Subscribe(OnSubscribed onSubscribed, OnDropped onDropped, CancellationToken cancellationToken) {
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, Stopping.Token);
if (IsRunning) return;

Stopping = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

_onSubscribed = onSubscribed;
_onDropped = onDropped;
await Subscribe(cts.Token).NoContext();
await Subscribe(Stopping.Token).NoContext();
IsRunning = true;
Log.SubscriptionStarted();
onSubscribed(Options.SubscriptionId);
Expand All @@ -64,6 +66,7 @@ public async ValueTask Unsubscribe(OnUnsubscribed onUnsubscribed, CancellationTo
onUnsubscribed(Options.SubscriptionId);
await Finalize(cancellationToken);
Sequence = 0;
Stopping.Dispose();
}

protected virtual ValueTask Finalize(CancellationToken cancellationToken) => default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ Action<SubscriptionBuilder<T, TOptions>> configureSubscription
.AddSubscriptionBuilder(builder)
.AddSingleton(sp => GetBuilder(sp).ResolveSubscription(sp))
.AddSingleton<IHostedService, SubscriptionHostedService>(
sp =>
new SubscriptionHostedService(
GetBuilder(sp).ResolveSubscription(sp),
sp.GetService<ISubscriptionHealth>(),
sp.GetService<ILoggerFactory>()
)
sp => new(GetBuilder(sp).ResolveSubscription(sp), sp.GetService<ISubscriptionHealth>(), sp.GetService<ILoggerFactory>())
);

SubscriptionBuilder<T, TOptions> GetBuilder(IServiceProvider sp) => sp.GetSubscriptionBuilder<T, TOptions>(subscriptionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
<IncludeSutApp>true</IncludeSutApp>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\Testing\src\Eventuous.Testing\Eventuous.Testing.csproj" />
<ProjectReference Include="$(SrcRoot)\Testing\src\Eventuous.Testing\Eventuous.Testing.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@
<PackageReference Include="NodaTime.Serialization.SystemTextJson" />
<PackageReference Include="Testcontainers" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(RepoRoot)\test\Eventuous.TestHelpers\Eventuous.TestHelpers.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
// Copyright (C) Ubiquitous AS.All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Text.Json;
using System.Text.RegularExpressions;
using Bogus;
using DotNet.Testcontainers.Containers;
using Eventuous.TestHelpers;
using MicroElements.AutoFixture.NodaTime;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;

namespace Eventuous.Tests.Persistence.Base.Fixtures;

Expand All @@ -24,8 +22,7 @@ public abstract class StoreFixtureBase {
}

public abstract partial class StoreFixtureBase<TContainer> : StoreFixtureBase, IAsyncLifetime where TContainer : DockerContainer {
IEventSerializer Serializer { get; } =
new DefaultEventSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web).ConfigureForNodaTime(DateTimeZoneProviders.Tzdb));
IEventSerializer Serializer { get; } = new DefaultEventSerializer(TestPrimitives.DefaultOptions);

public virtual async Task InitializeAsync() {
Container = CreateContainer();
Expand Down Expand Up @@ -80,7 +77,7 @@ public virtual async Task DisposeAsync() {

protected virtual void GetDependencies(IServiceProvider provider) { }

protected TContainer Container { get; private set; } = null!;
public TContainer Container { get; private set; } = null!;

bool _disposed;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task ShouldUseExistingCheckpoint() {

await fixture.CheckpointStore.GetLastCheckpoint(fixture.SubscriptionId, default);
Logger.ConfigureIfNull(fixture.SubscriptionId, fixture.LoggerFactory);
await fixture.CheckpointStore.StoreCheckpoint(new Checkpoint(fixture.SubscriptionId, 9), true, default);
await fixture.CheckpointStore.StoreCheckpoint(new(fixture.SubscriptionId, 9), true, default);

await fixture.StartSubscription();
await Task.Delay(TimeSpan.FromSeconds(1));
Expand All @@ -88,7 +88,7 @@ async Task<List<BookingImported>> GenerateAndProduceEvents(int count) {
.ToList();

var events = commands.Select(ToEvent).ToList();
var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new Metadata(), "", 0));
var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new(), "", 0));
await fixture.EventStore.AppendEvents(streamName, ExpectedStreamVersion.Any, streamEvents.ToArray(), default);

return events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ public AllPersistentSubscription(
)
: base(eventStoreClient, options, consumePipe, loggerFactory) { }

/// <summary>
/// Persistent subscription for EventStoreDB, for $all stream
/// </summary>
/// <param name="eventStoreClient">EventStoreDB persistent subscription client instance</param>
/// <param name="options">Persistent subscription options</param>
/// <param name="consumePipe">Consume pipe, usually provided by the builder</param>
/// <param name="loggerFactory">Optional logger factory</param>
public AllPersistentSubscription(
EventStorePersistentSubscriptionsClient eventStoreClient,
AllPersistentSubscriptionOptions options,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory
)
: base(eventStoreClient, options, consumePipe, loggerFactory) { }

/// <summary>
/// Creates EventStoreDB persistent subscription service for a given stream
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ protected EventStoreCatchUpSubscriptionBase(
/// <param name="cancellationToken"></param>
protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
try {
Subscription?.Dispose();
Stopping.Cancel(false);
await Task.Delay(100, cancellationToken);
Subscription?.Dispose();
} catch (Exception) {
// Nothing to see here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace Eventuous.EventStore.Subscriptions;

static class EventStoreExtensions {
public static EventStoreClientSettings GetSettings(this EventStoreClient client) {
var prop = typeof(EventStoreClient).GetProperty("Settings", BindingFlags.NonPublic | BindingFlags.Instance);
public static EventStoreClientSettings GetSettings(this EventStoreClientBase client) {
var prop = typeof(EventStoreClientBase).GetProperty("Settings", BindingFlags.NonPublic | BindingFlags.Instance);

var getter = prop!.GetGetMethod(true);
return (EventStoreClientSettings) getter!.Invoke(client, null)!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ Exception exception
/// <typeparam name="T"></typeparam>
public abstract class PersistentSubscriptionBase<T> : EventSubscription<T> where T : PersistentSubscriptionOptions {
/// <summary>
/// EventStoreDB persistent subscription client instance
/// EventStoreDB persistent subscription client instance.
/// </summary>
protected EventStorePersistentSubscriptionsClient SubscriptionClient { get; }
/// <summary>
/// EventStoreDB client instance. It's used for custom NACK behavior as well as for measuring the subscription gap.
/// </summary>
protected EventStoreClient EventStoreClient { get; }

readonly HandleEventProcessingFailure _handleEventProcessingFailure;

Expand All @@ -47,19 +51,34 @@ protected PersistentSubscriptionBase(EventStoreClient eventStoreClient, T option
EventStoreClient = eventStoreClient;
var settings = eventStoreClient.GetSettings().Copy();
var opSettings = settings.OperationOptions.Clone();
settings.OperationOptions = opSettings;

SubscriptionClient = new EventStorePersistentSubscriptionsClient(settings);

settings.OperationOptions = opSettings;
SubscriptionClient = new(settings);
_handleEventProcessingFailure = options.FailureHandler ?? DefaultEventProcessingFailureHandler;

if (options is { FailureHandler: not null, ThrowOnError: false }) Log.ThrowOnErrorIncompatible();
}

/// <summary>
/// EventStoreDB client instance
/// EventStoreDB persistent subscription base class constructor
/// </summary>
protected EventStoreClient EventStoreClient { get; }
/// <param name="eventStoreClient">EventStoreDB persistent subscription client instance</param>
/// <param name="options">Subscription options</param>
/// <param name="consumePipe">Consume pipe instance, provided automatically</param>
/// <param name="loggerFactory">Optional logger factory</param>
protected PersistentSubscriptionBase(
EventStorePersistentSubscriptionsClient eventStoreClient,
T options,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory
)
: base(options, consumePipe, loggerFactory) {
SubscriptionClient = eventStoreClient;
var settings = eventStoreClient.GetSettings().Copy();
var opSettings = settings.OperationOptions.Clone();
settings.OperationOptions = opSettings;
EventStoreClient = new(settings);
_handleEventProcessingFailure = options.FailureHandler ?? DefaultEventProcessingFailureHandler;
if (options is { FailureHandler: not null, ThrowOnError: false }) Log.ThrowOnErrorIncompatible();
}

const string ResolvedEventKey = "resolvedEvent";
const string SubscriptionKey = "subscription";
Expand All @@ -80,23 +99,11 @@ protected override async ValueTask Subscribe(CancellationToken cancellationToken
var settings = Options.SubscriptionSettings ?? new PersistentSubscriptionSettings(Options.ResolveLinkTos);

try {
_subscription = await LocalSubscribe(
// ReSharper disable once ConvertClosureToMethodGroup
(subscription, @event, retryCount, ct) => HandleEvent(subscription, @event, retryCount, ct),
HandleDrop,
cancellationToken
)
.NoContext();
_subscription = await LocalSubscribe(HandleEvent, HandleDrop, cancellationToken).NoContext();
} catch (PersistentSubscriptionNotFoundException) {
await CreatePersistentSubscription(settings, cancellationToken);

_subscription = await LocalSubscribe(
// ReSharper disable once ConvertClosureToMethodGroup
(subscription, @event, retryCount, ct) => HandleEvent(subscription, @event, retryCount, ct),
HandleDrop,
cancellationToken
)
.NoContext();
_subscription = await LocalSubscribe(HandleEvent, HandleDrop, cancellationToken).NoContext();
}

return;
Expand Down Expand Up @@ -174,7 +181,7 @@ async ValueTask Nack(IMessageConsumeContext ctx, Exception exception) {
await _handleEventProcessingFailure(EventStoreClient, subscription, re, exception).NoContext();
}

IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancellationToken) {
MessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancellationToken) {
var evt = DeserializeData(
re.Event.ContentType,
re.Event.EventType,
Expand All @@ -183,7 +190,7 @@ IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancell
re.Event.Position.CommitPosition
);

return new MessageConsumeContext(
return new(
re.Event.EventId.ToString(),
re.Event.EventType,
re.Event.ContentType,
Expand Down Expand Up @@ -213,9 +220,9 @@ IMessageConsumeContext CreateContext(ResolvedEvent re, CancellationToken cancell
/// <param name="cancellationToken"></param>
protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
try {
_subscription?.Dispose();
Stopping.Cancel(false);
await Task.Delay(100, cancellationToken);
_subscription?.Dispose();
} catch (Exception) {
// It might throw
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ public StreamPersistentSubscription(
) : base(eventStoreClient, options, consumePipe, loggerFactory)
=> Ensure.NotEmptyString(options.StreamName);

/// <summary>
/// EventStoreDB persistent subscription service for a given stream
/// </summary>
/// <param name="eventStoreClient">EventStoreDB persistent subscription client instance</param>
/// <param name="options">Persistent subscription options <see cref="StreamPersistentSubscriptionOptions"/></param>
/// <param name="consumePipe">Consume pipe, provided automatically</param>
/// <param name="loggerFactory">Optional logger factory</param>
public StreamPersistentSubscription(
EventStorePersistentSubscriptionsClient eventStoreClient,
StreamPersistentSubscriptionOptions options,
ConsumePipe consumePipe,
ILoggerFactory? loggerFactory
) : base(eventStoreClient, options, consumePipe, loggerFactory)
=> Ensure.NotEmptyString(options.StreamName);

/// <summary>
/// Creates EventStoreDB persistent subscription service for a given stream without using the options object
/// </summary>
Expand Down Expand Up @@ -57,10 +72,7 @@ public StreamPersistentSubscription(
) { }

/// <inheritdoc/>
protected override Task CreatePersistentSubscription(
PersistentSubscriptionSettings settings,
CancellationToken cancellationToken
)
protected override Task CreatePersistentSubscription(PersistentSubscriptionSettings settings, CancellationToken cancellationToken)
=> SubscriptionClient.CreateToStreamAsync(
Options.StreamName,
Options.SubscriptionId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
using System.Diagnostics;
using System.Text.Json;
using EventStore.Client;
using Eventuous.Diagnostics;
using Eventuous.EventStore;
using Eventuous.TestHelpers;
using Eventuous.Tests.Persistence.Base.Fixtures;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NodaTime.Serialization.SystemTextJson;
using Testcontainers.EventStoreDb;

namespace Eventuous.Tests.EventStore.Fixtures;
Expand All @@ -20,9 +19,7 @@ static StoreFixture() {
AppContext.SetSwitch("System.Net.SocketsHttpHandler.Http2FlowControl.DisableDynamicWindowSizing", true);
}

IEventSerializer Serializer { get; } = new DefaultEventSerializer(
new JsonSerializerOptions(JsonSerializerDefaults.Web).ConfigureForNodaTime(DateTimeZoneProviders.Tzdb)
);
IEventSerializer Serializer { get; } = new DefaultEventSerializer(TestPrimitives.DefaultOptions);

public StoreFixture() {
DefaultEventSerializer.SetDefaultSerializer(Serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Eventuous.Diagnostics;
using Eventuous.Producers;
using Eventuous.TestHelpers;
using Eventuous.Tests.EventStore.Subscriptions;
using Eventuous.Tests.EventStore.Subscriptions.Fixtures;
using Eventuous.Tests.Subscriptions.Base;

namespace Eventuous.Tests.EventStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Microsoft.Extensions.DependencyInjection;
using Testcontainers.EventStoreDb;

namespace Eventuous.Tests.EventStore.Subscriptions;
namespace Eventuous.Tests.EventStore.Subscriptions.Fixtures;

public class CatchUpSubscriptionFixture<TSubscription, TSubscriptionOptions, TEventHandler>(
Action<TSubscriptionOptions> configureOptions,
Expand Down Expand Up @@ -65,7 +65,7 @@ async Task<ulong> GetLastFromAll() {
// ReSharper disable once StaticMemberInGenericType
static readonly string[] Categories = [
// "EventStore.Client.SharingProvider",
// "Grpc.Net.Client.Internal.GrpcCall"
"Grpc.Net.Client.Internal.GrpcCall"
];

readonly ITestOutputHelper _outputHelper = outputHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using Eventuous.Subscriptions.Filters;
using Eventuous.Tests.Subscriptions.Base;

namespace Eventuous.Tests.EventStore.Subscriptions;
namespace Eventuous.Tests.EventStore.Subscriptions.Fixtures;

public abstract class LegacySubscriptionFixture<T> : IAsyncLifetime where T : class, IEventHandler {
static LegacySubscriptionFixture() => TypeMap.Instance.RegisterKnownEventTypes(typeof(TestEvent).Assembly);
Expand Down Expand Up @@ -42,7 +42,7 @@ protected LegacySubscriptionFixture(

public async Task InitializeAsync() {
await StoreFixture.InitializeAsync();
Producer = new EventStoreProducer(StoreFixture.Client);
Producer = new(StoreFixture.Client);

var subscriptionId = $"test-{Guid.NewGuid():N}";
var pipe = new ConsumePipe();
Expand Down
Loading

0 comments on commit 45ef2cb

Please sign in to comment.