Skip to content

Commit

Permalink
Improve Streams SelectAsync. Log errors and improve test (#6884)
Browse files Browse the repository at this point in the history
* Improve Streams SelectAsync. Log errors and improve test

* Fix unit test to fit the test intent better

---------

Co-authored-by: Aaron Stannard <[email protected]>
  • Loading branch information
Arkatufus and Aaronontheweb authored Aug 16, 2023
1 parent cad7180 commit 982d96d
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 71 deletions.
190 changes: 126 additions & 64 deletions src/core/Akka.Streams.Tests/Dsl/FlowSelectAsyncSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
using Xunit;
using Xunit.Abstractions;
using FluentAssertions.Extensions;
using static FluentAssertions.FluentActions;
using Directive = Akka.Streams.Supervision.Directive;

// ReSharper disable InvokeAsExtensionMethod
#pragma warning disable 162
Expand Down Expand Up @@ -87,63 +89,96 @@ public async void A_Flow_with_SelectAsync_must_produce_task_elements_in_order()
await c.ExpectCompleteAsync();
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
// Turning this on in CI/CD for now
[Fact]
public async Task A_Flow_with_SelectAsync_must_not_run_more_futures_than_requested_parallelism()
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 20))
.SelectAsync(8, n => Task.Run(() =>
{
probe.Ref.Tell(n);
return n;
}))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(1);
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(1, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(2);
probe.ReceiveN(2).Should().BeEquivalentTo(Enumerable.Range(10, 2));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(500));
sub.Request(10);
probe.ReceiveN(9).Should().BeEquivalentTo(Enumerable.Range(12, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));

foreach (var n in Enumerable.Range(1, 13))
await c.ExpectNextAsync(n);
//Enumerable.Range(1, 13).ForEach(n => c.ExpectNext(n));
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
await this.AssertAllStagesStoppedAsync(async () =>
{
var probe = CreateTestProbe();
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 20))
.SelectAsync(8, async n =>
{
await Task.Yield();
probe.Ref.Tell(n);
return n;
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(1);
(await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(1, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(2);
(await probe.ReceiveNAsync(2).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(10, 2));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
sub.Request(10);
(await probe.ReceiveNAsync(9).ToListAsync()).Should().BeEquivalentTo(Enumerable.Range(12, 9));
await probe.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
foreach (var n in Enumerable.Range(1, 13))
await c.ExpectNextAsync(n);
await c.ExpectNoMsgAsync(TimeSpan.FromMilliseconds(200));
}, Materializer).ShouldCompleteWithin(RemainingOrDefault);
}

[LocalFact(SkipLocal = "Racy on Azure DevOps")]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure()
// Turning this on in CI/CD for now
[Fact]
public async Task A_Flow_with_parallel_execution_SelectAsync_must_signal_task_failure()
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
.SelectAsync(4, async n =>
{
if (n == 3)
if (n == 4)
throw new TestException("err1");
await Task.Delay(10.Seconds());
latch.Ready(TimeSpan.FromSeconds(10));
return n;
}))
})
.To(Sink.FromSubscriber(c)).Run(Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
latch.CountDown();
}, Materializer);
var exception = await c.ExpectErrorAsync();
exception.InnerException!.Message.Should().Be("err1");
}, Materializer).ShouldCompleteWithin(RemainingOrDefault);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure()
{
await this.AssertAllStagesStoppedAsync(async() => {
var probe = Source.From(Enumerable.Range(1, 5))
.SelectAsync(1, async n =>
{
await Task.Delay(10);
if (n == 3)
throw new TestException("err1");
return n;
})
.RunWith(this.SinkProbe<int>(), Materializer);
var exception = await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[]{1, 2})
.ExpectErrorAsync()
.ShouldCompleteWithin(RemainingOrDefault);
exception.InnerException!.Message.Should().Be("err1");
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_task_failure_asap()
{
await this.AssertAllStagesStoppedAsync(() => {
await this.AssertAllStagesStoppedAsync(async () => {
var latch = CreateTestLatch();
var done = Source.From(Enumerable.Range(1, 5))
.Select(n =>
Expand All @@ -165,60 +200,87 @@ await this.AssertAllStagesStoppedAsync(() => {
return Task.FromResult(n);
}).RunWith(Sink.Ignore<int>(), Materializer);
done.Invoking(d => d.Wait(RemainingOrDefault)).Should().Throw<Exception>().WithMessage("err1");
await Awaiting(async () => await done).Should()
.ThrowAsync<Exception>()
.WithMessage("err1")
.ShouldCompleteWithin(RemainingOrDefault);
latch.CountDown();
return Task.CompletedTask;
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_signal_error_from_SelectAsync()
{
await this.AssertAllStagesStoppedAsync(async() => {
var latch = new TestLatch(1);
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n =>
{
if (n == 3)
throw new TestException("err2");
return Task.Run(() =>
return Task.Run(async () =>
{
latch.Ready(TimeSpan.FromSeconds(10));
await Task.Delay(10.Seconds());
return n;
});
})
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
latch.CountDown();
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure()
public async Task A_Flow_with_SelectAsync_must_invoke_supervision_strategy_on_task_failure()
{
await this.AssertAllStagesStoppedAsync(async() =>
await this.AssertAllStagesStoppedAsync(async () =>
{
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
foreach (var i in new[] { 1, 2, 4, 5 })
await c.ExpectNextAsync(i);
await c.ExpectCompleteAsync();
}, Materializer);
var invoked = false;
var probe = Source.From(Enumerable.Range(1, 5))
.SelectAsync(1, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(_ =>
{
invoked = true;
return Directive.Stop;
}))
.RunWith(this.SinkProbe<int>(), Materializer);
await probe.AsyncBuilder()
.Request(10)
.ExpectNextN(new[] { 1, 2 })
.ExpectErrorAsync();
invoked.Should().BeTrue();
}, Materializer);
}

[Fact]
public async Task A_Flow_with_SelectAsync_must_resume_after_task_failure()
{
await this.AssertAllStagesStoppedAsync(async () => {
var c = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 5))
.SelectAsync(4, n => Task.Run(() =>
{
if (n == 3)
throw new TestException("err3");
return n;
}))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.ResumingDecider))
.RunWith(Sink.FromSubscriber(c), Materializer);
var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
foreach (var i in new[] { 1, 2, 4, 5 })
await c.ExpectNextAsync(i);
await c.ExpectCompleteAsync();
}, Materializer);
}

Expand All @@ -227,7 +289,7 @@ public async Task A_Flow_with_SelectAsync_must_resume_after_multiple_failures()
{
await this.AssertAllStagesStoppedAsync(() => {
var futures = new[]
{
{
Task.Run(() => { throw new TestException("failure1"); return "";}),
Task.Run(() => { throw new TestException("failure2"); return "";}),
Task.Run(() => { throw new TestException("failure3"); return "";}),
Expand Down
49 changes: 42 additions & 7 deletions src/core/Akka.Streams/Implementation/Fusing/Ops.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2571,9 +2571,10 @@ public Logic(Attributes inheritedAttributes, SelectAsync<TIn, TOut> stage) : bas

public override void OnPush()
{
var message = Grab(_stage.In);
try
{
var task = _stage._mapFunc(Grab(_stage.In));
var task = _stage._mapFunc(message);
var holder = new Holder<TOut>(NotYetThere, _taskCallback);
_buffer.Enqueue(holder);

Expand All @@ -2590,8 +2591,21 @@ public override void OnPush()
}
catch (Exception e)
{
if (_decider(e) == Directive.Stop)
FailStage(e);
var strategy = _decider(e);
Log.Error(e, "An exception occured inside SelectAsync while processing message [{0}]. Supervision strategy: {1}", message, strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(e);
break;

case Directive.Resume:
case Directive.Restart:
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", e);
}
}
if (Todo < _stage._parallelism && !HasBeenPulled(_stage.In))
TryPull(_stage.In);
Expand Down Expand Up @@ -2644,10 +2658,31 @@ private void PushOne()
private void HolderCompleted(Holder<TOut> holder)
{
var element = holder.Element;
if (!element.IsSuccess && _decider(element.Exception) == Directive.Stop)
FailStage(element.Exception);
else if (IsAvailable(_stage.Out))
PushOne();
if (element.IsSuccess)
{
if (IsAvailable(_stage.Out))
PushOne();
return;
}

var exception = element.Exception;
var strategy = _decider(exception);
Log.Error(exception, "An exception occured inside SelectAsync while executing Task. Supervision strategy: {0}", strategy);
switch (strategy)
{
case Directive.Stop:
FailStage(exception);
break;

case Directive.Resume:
case Directive.Restart:
if (IsAvailable(_stage.Out))
PushOne();
break;

default:
throw new AggregateException($"Unknown SupervisionStrategy directive: {strategy}", exception);
}
}

public override string ToString() => $"SelectAsync.Logic(buffer={_buffer})";
Expand Down

0 comments on commit 982d96d

Please sign in to comment.