-
Notifications
You must be signed in to change notification settings - Fork 18
JustSaying 4.0 Release Notes
This release introduces IHandlerAsync
as an async version of IHandler
, for handlers that perform one or more await
statements. Async handlers will usually have a Handle
method like this: public async Task<bool> Handle(T message)
You can use both interfaces at present. On the fluent interface, there are two overloads of WithMessageHandler
:
WithMessageHandler<T>(IHandler<T> handler)
-
WithMessageHandler<T>(IHandlerAsync<T> handler)
.
However, the first overload just forwards to the second, after the IHandler
instance is converted into IHandlerAsync
using BlockingHandler
:
public IHaveFulfilledSubscriptionRequirements WithMessageHandler<T>(IHandler<T> handler) where T : Message
{
return WithMessageHandler(new BlockingHandler<T>(handler));
}
Internal to JustSaying, only IHandlerAsync
is used since this is the more general case.
In a later release, IHandler
will likely be obsoleted and then removed.
Suppose you have a handler like this:
public class OrderCompletedMessageHandler : IHandler<OrderCompleted>
{
public bool Handle(OrderCompleted message)
{
// do some work in response to the message, but cannot "await"
var order = orderApi.Get(message.OrderId).Result;
localOrderRepo.SetOrderStatus(order, OrderStatus.Completed);
return true;
}
}
```
We know that `.Result` and unawaited tasks are bad practice, but until now we had no other option. To take advantage of `async`, the handler becomes:
````csharp
public class OrderCompletedMessageHandler : IHandlerAsync<OrderCompleted>
{
public async Task<bool> Handle(OrderCompleted message)
{
// do some work in response to the message, and now we can await
var order = await orderApi.Get(message.OrderId);
await localOrderRepo.SetOrderStatus(order, OrderStatus.Completed);
return true;
}
}
```
If you have no async work to do at all, but want to use `IHandlerAsync`, you can do something similar to `BlockingHandler`:
````csharp
public class OrderCompletedMessageHandler : IHandlerAsync<OrderCompleted>
{
public Task<bool> Handle(OrderCompleted message)
{
// do some sync work in response to the message
var order = orderApi.Get(message.OrderId);
localOrderRepo.SetOrderStatus(order, OrderStatus.Completed);
return Task.FromResult(true);
}
}
```
### Handler resolvers
[A handler resolver is used to get handlers from an IoC container](https://github.com/justeat/JustSaying/blob/master/JustSaying/IHandlerResolver.cs). It needs to return instances of `IHandlerAsync` only. If you have both `IHandlerAsync` and `IHandler` then you return both by wrapping the `IHandler` in `BlockingHandler`, e.g.
````csharp
public IEnumerable<IHandlerAsync<T>> ResolveHandlers<T>()
{
var proposedHandlers = _container.GetAllInstances<IHandlerAsync<T>>();
var proposedBlockingHandlers = _container.GetAllInstances<IHandler<T>>()
.Select(h => new BlockingHandler<T>(h));
return proposedHandlers.Concat(proposedBlockingHandlers);
}
See this example.
The constructor of the Throttled Message Processing Strategy has been simplified. It is now:
public Throttled(int maxWorkers, IMessageMonitor messageMonitor)
or
public Throttled(Func<int> maxWorkersProducer, IMessageMonitor messageMonitor)
It used to be:
public Throttled(
Func<int> maximumAllowedMesagesInFlightProducer,
int maximumBatchSize,
IMessageMonitor messageMonitor)
For example
int maxConcurrentMessages = ReadConcurrencySetting();
IMessageMonitor monitor = GetMessageMonitor();
IMessageProcessingStrategy messageProcessingStrategy = new Throttled(() => maxConcurrentMessages, 10, monitor);
The problems with this are that
- in most cases,
maxConcurrentMessages
does not change after startup, so asking the caller to wrap it in a function is overhead. -
maximumBatchSize
is not relevant to this class - it is to do with getting messages from Amazon SQS, not processing them. It seems to be always set to 10 anyway.
Here are the suggested replacements, from simplest to most complex.
- Use the
DefaultThrottledThroughput
which will give you 8 message handlers per processor core. This is usually sufficient, and requires no further configuration even when changing target instance type. e.g.
IMessageProcessingStrategy messageProcessingStrategy = new DefaultThrottledThroughput(monitor);
- Use
Throttled
, but simply specify a fixed concurrency level at startup.
int maxConcurrentMessages = 16;
IMessageProcessingStrategy messageProcessingStrategy = new Throttled(maxConcurrentMessages, monitor);
- Use
Throttled
, but calculate a fixed concurrency level at startup.
int maxConcurrentMessages = Environment.ProcessorCount * 5;
IMessageProcessingStrategy messageProcessingStrategy = new Throttled(maxConcurrentMessages, monitor);
- Use throttled, in the rare case where the concurrency level varies after startup.
Func<int> calcConcurrency = () => someComplexMovingConcurrencyCalc();
IMessageProcessingStrategy messageProcessingStrategy = new Throttled(calcConcurrency, monitor);
The MaximumThroughput
message processing strategy has been removed as it was not suitable for production use. If you want to do this, an equivalent strategy would be a Throttled message processing strategy with a very large maxWorkers
value. i.e.
IMessageProcessingStrategy dangerouslyUnthrottled = new Throttled(1000, monitor);
But this is not recommended, as you may exhaust the thread pool when large numbers of messages are received at once.