We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Building on top of #220, here's an approach I've used to battle increasing aggregate creation time with each event.
services.Decorate<IAggregateStore, AggregateStoreCachingDecorator>(); // provided by Scrutor nuget
public class AggregateStoreCachingDecorator : IAggregateStore { readonly IEventReader _eventReader; readonly AggregateFactoryRegistry _factoryRegistry; readonly IAggregateStore _implementation; readonly IMemoryCache _memoryCache; AggregateStoreCachingDecorator( IAggregateStore implementation, IMemoryCache memoryCache, IEventReader eventReader, AggregateFactoryRegistry factoryRegistry ) { _implementation = implementation; _memoryCache = memoryCache; _eventReader = eventReader; _factoryRegistry = factoryRegistry; } public AggregateStoreCachingDecorator( IAggregateStore implementation, IMemoryCache memoryCache, IEventStore eventStore, AggregateFactoryRegistry factoryRegistry ) : this(implementation, memoryCache, (IEventReader)eventStore, factoryRegistry) {} public Task<AppendEventsResult> Store<T>(StreamName streamName, T aggregate, CancellationToken cancellationToken) where T : Aggregate { Cache(streamName, aggregate); return _implementation.Store(streamName, aggregate, cancellationToken); } public Task<T> Load<T>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate => LoadInternal<T>(streamName, true, cancellationToken); public Task<T> LoadOrNew<T>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate => LoadInternal<T>(streamName, false, cancellationToken); async Task<T> LoadInternal<T>(StreamName streamName, bool failIfNotFound, CancellationToken cancellationToken) where T : Aggregate { var aggregate = _factoryRegistry.CreateInstance<T>(); var cachedEvents = GetCachedEvents(); try { var events = await _eventReader.ReadStream(streamName, new StreamReadPosition(cachedEvents.LongLength), failIfNotFound, cancellationToken) .ConfigureAwait(false); aggregate.Load(cachedEvents.Concat(events.Select(x => x.Payload))); Cache(streamName, aggregate); } catch (StreamNotFound) when (!failIfNotFound) { return aggregate; } catch (Exception e) { Log.UnableToLoadAggregate<T>(streamName, e); throw e is StreamNotFound ? new AggregateNotFoundException<T>(streamName, e) : e; } return aggregate; object[] GetCachedEvents() => _memoryCache.Get<IEnumerable<object>>(streamName)?.ToArray() ?? Array.Empty<object>(); } void Cache<T>(StreamName streamName, T aggregate) where T : Aggregate => _memoryCache.Set(streamName, aggregate.Current.AsEnumerable()); }
The text was updated successfully, but these errors were encountered:
Isn't it related to #157?
Sorry, something went wrong.
It also is. But I based this work on the one mentioned.
No branches or pull requests
Building on top of #220, here's an approach I've used to battle increasing aggregate creation time with each event.
Setup
The text was updated successfully, but these errors were encountered: