Skip to content

Commit

Permalink
Command service improvements (#187)
Browse files Browse the repository at this point in the history
* Rename to command 
* Use fast type map
* Pass over a pre-made generic context instead of converting it in each handler
* Added command mapping to the controller base
* First version of the service without aggregates
* Make the test more like a proper sample
* Improved Postgres Projector
* Made schema create-drop per test. Slower but can run in parallel.
* Added Postgres projector tests
* Add sync version to get the command
* Registrations and HTTP for functional services
  • Loading branch information
alexeyzimarev authored Feb 10, 2023
1 parent a29bf4a commit 0b9e335
Show file tree
Hide file tree
Showing 102 changed files with 2,276 additions and 896 deletions.
9 changes: 6 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ resharper_csharp_blank_lines_around_single_line_field = 0
resharper_csharp_blank_lines_around_single_line_invocable = 1
resharper_csharp_empty_block_style = together
resharper_csharp_int_align_comments = true
resharper_csharp_max_line_length = 120
resharper_csharp_max_line_length = 140
resharper_csharp_other_braces = end_of_line
resharper_csharp_wrap_after_declaration_lpar = true
resharper_csharp_wrap_arguments_style = chop_if_long
resharper_csharp_wrap_before_binary_opsign = true
resharper_csharp_wrap_before_declaration_rpar = true
resharper_csharp_wrap_parameters_style = chop_if_long
resharper_csharp_wrap_ternary_expr_style = wrap_if_long
resharper_csharp_wrap_ternary_expr_style = chop_always
resharper_instance_members_qualify_declared_in = base_class
resharper_int_align_assignments = true
resharper_int_align_fields = true
Expand All @@ -38,11 +38,14 @@ resharper_int_align_properties = true
resharper_max_attribute_length_for_same_line = 80
resharper_nested_ternary_style = expanded
resharper_outdent_binary_ops = true
resharper_place_expr_method_on_single_line = true
resharper_place_expr_method_on_single_line = false
resharper_place_expr_property_on_single_line = true
resharper_place_field_attribute_on_same_line = false
resharper_place_simple_embedded_statement_on_same_line = true
resharper_space_within_single_line_array_initializer_braces = true
resharper_use_heuristics_for_body_style = true
resharper_wrap_object_and_collection_initializer_style = chop_if_long
resharper_wrap_parameters_style = wrap_if_long
resharper_wrap_ternary_expr_style = chop_if_long

file_header_template = Copyright (C) Ubiquitous AS. All rights reserved\nLicensed under the Apache License, Version 2.0.
23 changes: 11 additions & 12 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
</PropertyGroup>
<PropertyGroup Label="Package versions for .NET 6" Condition="$(TargetFramework) == 'net7.0'">
<MicrosoftExtensionsVer>7.0</MicrosoftExtensionsVer>
<MicrosoftTestHostVer>7.0</MicrosoftTestHostVer>
<MicrosoftTestHostVer>7.0.2</MicrosoftTestHostVer>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="BenchmarkDotNet" Version="0.13.2" />
<PackageVersion Include="BenchmarkDotNet" Version="0.13.4" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MicrosoftExtensionsVer)" />
Expand All @@ -23,26 +23,25 @@
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.4" />
<PackageVersion Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="22.0.0" />
<PackageVersion Include="EventStore.Client.Grpc.Streams" Version="22.0.0" />
<PackageVersion Include="MongoDB.Driver" Version="2.16.0" />
<PackageVersion Include="Google.Cloud.Monitoring.V3" Version="3.0.0" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.2.0" />
<PackageVersion Include="Confluent.Kafka" Version="1.9.3" />
<PackageVersion Include="MongoDB.Driver" Version="2.19.0" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.3.0" />
<PackageVersion Include="Confluent.Kafka" Version="2.0.2" />
<PackageVersion Include="Npgsql" Version="6.0.4" />
<PackageVersion Include="RabbitMQ.Client" Version="6.3.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.0.0" />
<PackageVersion Include="RabbitMQ.Client" Version="6.4.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.1.0" />
<PackageVersion Include="NEST" Version="7.17.2" />
<PackageVersion Include="Polly" Version="7.2.3" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup>
<ItemGroup Label="References for packable projects">
<PackageVersion Include="MinVer" Version="4.2.0" PrivateAssets="All" />
<PackageVersion Include="MinVer" Version="4.3.0" PrivateAssets="All" />
<PackageVersion Include="JetBrains.Annotations" Version="2022.3.1" PrivateAssets="All" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup Label="References for test projects">
<PackageVersion Include="AutoFixture" Version="4.17.0" />
<PackageVersion Include="Bogus" Version="34.0.2" />
<PackageVersion Include="FluentAssertions" Version="6.8.0" />
<PackageVersion Include="FluentAssertions" Version="6.9.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
<PackageVersion Include="xunit" Version="2.4.2" />
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
Expand All @@ -52,8 +51,8 @@
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="$(MicrosoftTestHostVer)" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="$(MicrosoftTestHostVer)" />
<PackageVersion Include="RestSharp" Version="107.3.0" />
<PackageVersion Include="Hypothesist" Version="2.0.30" />
<PackageVersion Include="NodaTime" Version="3.1.5" />
<PackageVersion Include="Hypothesist" Version="2.1.50" />
<PackageVersion Include="NodaTime" Version="3.1.6" />
<PackageVersion Include="NodaTime.Serialization.SystemTextJson" Version="1.0.0" />
<PackageVersion Include="MicroElements.AutoFixture.NodaTime" Version="1.0.0" />
<PackageVersion Include="MongoDb.Bson.NodaTime" Version="3.0.0" />
Expand Down
5 changes: 5 additions & 0 deletions Eventuous.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/CodeFormatting/CSharpFormat/PLACE_FIELD_ATTRIBUTE_ON_SAME_LINE_EX/@EntryValue">NEVER</s:String>
<s:String x:Key="/Default/CodeStyle/FileHeader/FileHeaderRegionName/@EntryValue"></s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateStaticFields/@EntryIndexedValue">&lt;Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /&gt;</s:String>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpKeepExistingMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpPlaceEmbeddedOnSameLineMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ECSharpUseContinuousIndentInsideBracesMigration/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/SettingsMigration/IsMigratorApplied/=JetBrains_002EReSharper_002EPsi_002ECSharp_002ECodeStyle_002ESettingsUpgrade_002EMigrateBlankLinesAroundFieldToBlankLinesAroundProperty/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Esdb/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Eventuous/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=pubsub/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
24 changes: 24 additions & 0 deletions src/Core/src/Eventuous.Application/CommandMap.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous;

public class MessageMap {
readonly TypeMap<Func<object, object>> _typeMap = new();

public MessageMap Add<TIn, TOut>(Func<TIn, TOut> map) where TIn : class where TOut : class {
_typeMap.Add<TIn>(Map);
return this;

object Map(object inCmd)
=> map((TIn)inCmd);
}

public TOut Convert<TIn, TOut>(TIn command) where TIn : class {
if (!_typeMap.TryGetValue<TIn>(out var mapper)) {
throw new Exceptions.CommandMappingException<TIn, TOut>();
}

return (TOut)mapper(command);
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Tools;
using static Eventuous.Diagnostics.ApplicationEventSource;

// ReSharper disable MemberCanBePrivate.Global

namespace Eventuous;

using Tools;
using static Diagnostics.ApplicationEventSource;

/// <summary>
/// Application service base class. A derived class should be scoped to handle commands for one aggregate type only.
/// Command service base class. A derived class should be scoped to handle commands for one aggregate type only.
/// </summary>
/// <typeparam name="TAggregate">The aggregate type</typeparam>
/// <typeparam name="TState">The aggregate state type</typeparam>
/// <typeparam name="TId">The aggregate identity type</typeparam>
// [PublicAPI]
public abstract class ApplicationService<TAggregate, TState, TId>
: IApplicationService<TAggregate, TState, TId>, IApplicationService<TAggregate>
public abstract class CommandService<TAggregate, TState, TId>
: ICommandService<TAggregate, TState, TId>, ICommandService<TAggregate>
where TAggregate : Aggregate<TState>, new()
where TState : State<TState>, new()
where TId : AggregateId {
[PublicAPI]
protected IAggregateStore Store { get; }

readonly HandlersMap<TAggregate> _handlers = new();
Expand All @@ -28,7 +27,7 @@ public abstract class ApplicationService<TAggregate, TState, TId>
readonly StreamNameMap _streamNameMap;
readonly TypeMapper _typeMap;

protected ApplicationService(
protected CommandService(
IAggregateStore store,
AggregateFactoryRegistry? factoryRegistry = null,
StreamNameMap? streamNameMap = null,
Expand Down Expand Up @@ -196,19 +195,17 @@ protected void OnAsync<TCommand>(ArbitraryActAsync<TCommand> action)
/// <exception cref="Exceptions.CommandHandlerNotFound{TCommand}"></exception>
public async Task<Result<TState>> Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
where TCommand : class {
var commandType = Ensure.NotNull(command).GetType();

if (!_handlers.TryGetValue(commandType, out var registeredHandler)) {
Log.CommandHandlerNotFound(commandType);
var exception = new Exceptions.CommandHandlerNotFound(commandType);
if (!_handlers.TryGet<TCommand>(out var registeredHandler)) {
Log.CommandHandlerNotFound<TCommand>();
var exception = new Exceptions.CommandHandlerNotFound<TCommand>();
return new ErrorResult<TState>(exception);
}

var hasGetIdFunction = _idMap.TryGetValue(commandType, out var getId);
var hasGetIdFunction = _idMap.TryGet<TCommand>(out var getId);

if (!hasGetIdFunction || getId == null) {
Log.CannotCalculateAggregateId(commandType);
var exception = new Exceptions.CommandHandlerNotFound(commandType);
Log.CannotCalculateAggregateId<TCommand>();
var exception = new Exceptions.CommandHandlerNotFound<TCommand>();
return new ErrorResult<TState>(exception);
}

Expand Down Expand Up @@ -238,37 +235,41 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
if (result.Changes.Count == 0) return new OkResult<TState>(result.State, Array.Empty<Change>(), 0);

var storeResult = await Store.Store(
streamName != default ? streamName : GetAggregateStreamName(),
streamName != default
? streamName
: GetAggregateStreamName(),
result,
cancellationToken
)
.NoContext();

var changes = result.Changes.Select(x => new Change(x, _typeMap.GetTypeName(x)));

Log.CommandHandled(commandType);
Log.CommandHandled<TCommand>();

return new OkResult<TState>(result.State, changes, storeResult.GlobalPosition);
}
catch (Exception e) {
Log.ErrorHandlingCommand(commandType, e);
Log.ErrorHandlingCommand<TCommand>(e);

return new ErrorResult<TState>($"Error handling command {commandType.Name}", e);
return new ErrorResult<TState>($"Error handling command {typeof(TCommand).Name}", e);
}

TAggregate Create() => _factoryRegistry.CreateInstance<TAggregate, TState>();
TAggregate Create()
=> _factoryRegistry.CreateInstance<TAggregate, TState>();

StreamName GetAggregateStreamName() => _streamNameMap.GetStreamName<TAggregate, TId>(aggregateId);
StreamName GetAggregateStreamName()
=> _streamNameMap.GetStreamName<TAggregate, TId>(aggregateId);
}

async Task<Result> IApplicationService.Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
async Task<Result> ICommandService.Handle<TCommand>(TCommand command, CancellationToken cancellationToken)
where TCommand : class {
var result = await Handle(command, cancellationToken).NoContext();

return result switch {
OkResult<TState>(var aggregateState, var enumerable, _) => new OkResult(aggregateState, enumerable),
ErrorResult<TState> error => new ErrorResult(error.Message, error.Exception),
_ => throw new ApplicationException("Unknown result type")
OkResult<TState>(var state, var enumerable, _) => new OkResult(state, enumerable),
ErrorResult<TState> error => new ErrorResult(error.Message, error.Exception),
_ => throw new ApplicationException("Unknown result type")
};
}

Expand Down
27 changes: 27 additions & 0 deletions src/Core/src/Eventuous.Application/CommandToIdMap.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Diagnostics.CodeAnalysis;

namespace Eventuous;

public delegate Task<TId> GetIdFromCommandAsync<TId, in TCommand>(TCommand command, CancellationToken cancellationToken)
where TId : AggregateId where TCommand : class;

public delegate TId GetIdFromCommand<out TId, in TCommand>(TCommand command) where TId : AggregateId where TCommand : class;

delegate ValueTask<TId> GetIdFromUntypedCommand<TId>(object command, CancellationToken cancellationToken)
where TId : AggregateId;

class IdMap<TId> where TId : AggregateId {
readonly TypeMap<GetIdFromUntypedCommand<TId>> _typeMap = new();

public void AddCommand<TCommand>(GetIdFromCommand<TId, TCommand> getId) where TCommand : class
=> _typeMap.Add<TCommand>((cmd, _) => new ValueTask<TId>(getId((TCommand)cmd)));

public void AddCommand<TCommand>(GetIdFromCommandAsync<TId, TCommand> getId) where TCommand : class
=> _typeMap.Add<TCommand>(async (cmd, ct) => await getId((TCommand)cmd, ct));

internal bool TryGet<TCommand>([NotNullWhen(true)] out GetIdFromUntypedCommand<TId>? getId) where TCommand : class
=> _typeMap.TryGetValue<TCommand>(out getId);
}
20 changes: 20 additions & 0 deletions src/Core/src/Eventuous.Application/CommandToStreamMap.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (C) Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Diagnostics.CodeAnalysis;

namespace Eventuous;

public delegate StreamName GetStreamNameFromCommand<in TCommand>(TCommand command) where TCommand : class;

delegate ValueTask<StreamName> GetStreamNameFromUntypedCommand(object command, CancellationToken cancellationToken);

public class CommandToStreamMap {
readonly TypeMap<GetStreamNameFromUntypedCommand> _typeMap = new();

public void AddCommand<TCommand>(GetStreamNameFromCommand<TCommand> getId) where TCommand : class
=> _typeMap.Add<TCommand>((cmd, _) => new ValueTask<StreamName>(getId((TCommand)cmd)));

internal bool TryGet<TCommand>([NotNullWhen(true)] out GetStreamNameFromUntypedCommand? getId) where TCommand : class
=> _typeMap.TryGetValue<TCommand>(out getId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,44 @@
// Licensed under the Apache License, Version 2.0.

using System.Diagnostics.Tracing;

// ReSharper disable MemberCanBePrivate.Global

namespace Eventuous.Diagnostics;
namespace Eventuous.Diagnostics;

[EventSource(Name = $"{DiagnosticName.BaseName}.application")]
class ApplicationEventSource : EventSource {
public static ApplicationEventSource Log { get; } = new();

const int CommandHandlerNotFoundId = 1;
const int ErrorHandlingCommandId = 2;
const int CommandHandledId = 3;
const int CommandHandlerAlreadyRegisteredId = 4;
const int CommandHandlerRegisteredId = 5;
const int CannotGetAggregateIdFromCommandId = 11;

[NonEvent]
public void CommandHandlerNotFound(Type type) => CommandHandlerNotFound(type.Name);
public void CommandHandlerNotFound<TCommand>() => CommandHandlerNotFound(typeof(TCommand).Name);

[NonEvent]
public void CannotCalculateAggregateId(Type type) => CannotCalculateAggregateId(type.Name);
public void CannotCalculateAggregateId<TCommand>() => CannotCalculateAggregateId(typeof(TCommand).Name);

[NonEvent]
public void ErrorHandlingCommand(Type type, Exception e) => ErrorHandlingCommand(type.Name, e.ToString());
public void ErrorHandlingCommand<TCommand>(Exception e) => ErrorHandlingCommand(typeof(TCommand).Name, e.ToString());

[NonEvent]
public void CommandHandled(Type commandType) {
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandled(commandType.Name);
public void CommandHandled<TCommand>() {
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandled(typeof(TCommand).Name);
}

[NonEvent]
public void CommandHandlerAlreadyRegistered<T>() => CommandHandlerAlreadyRegistered(typeof(T).Name);

[NonEvent]
public void CommandHandlerRegistered<T>() {
if (IsEnabled(EventLevel.Verbose, EventKeywords.All)) CommandHandlerRegistered(typeof(T).Name);
}

[Event(CommandHandlerNotFoundId, Message = "Handler not found for command: '{0}'", Level = EventLevel.Error)]
void CommandHandlerNotFound(string commandType) => WriteEvent(CommandHandlerNotFoundId, commandType);

Expand All @@ -41,8 +48,7 @@ public void CommandHandled(Type commandType) {
Message = "Cannot get aggregate id from command: '{0}'",
Level = EventLevel.Error
)]
void CannotCalculateAggregateId(string commandType)
=> WriteEvent(CannotGetAggregateIdFromCommandId, commandType);
void CannotCalculateAggregateId(string commandType) => WriteEvent(CannotGetAggregateIdFromCommandId, commandType);

[Event(ErrorHandlingCommandId, Message = "Error handling command: '{0}' {1}", Level = EventLevel.Error)]
void ErrorHandlingCommand(string commandType, string exception)
Expand All @@ -57,4 +63,7 @@ void ErrorHandlingCommand(string commandType, string exception)
Level = EventLevel.Critical
)]
void CommandHandlerAlreadyRegistered(string type) => WriteEvent(CommandHandlerAlreadyRegisteredId, type);
}

[Event(CommandHandlerRegisteredId, Message = "Command handler registered for {0}", Level = EventLevel.Verbose)]
void CommandHandlerRegistered(string type) => WriteEvent(CommandHandlerRegisteredId, type);
}
Loading

0 comments on commit 0b9e335

Please sign in to comment.