-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Streams.TestKit: testing that a stage shall not pull from an inlet #7372
Comments
@marcotod1410 To test for buffer backpressure, you need to change the test to this: [Fact]
public async Task Buffer_must_accept_elements_that_fit_in_the_buffer_and_then_backpressure()
{
var counter = new AtomicCounter(0);
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 150))
.Select(i =>
{
counter.IncrementAndGet();
return i;
})
.Buffer(100, OverflowStrategy.Backpressure)
.To(Sink.FromSubscriber(subscriber))
.Run(Materializer);
var sub = await subscriber.ExpectSubscriptionAsync();
// Testing for backpressure, enumerable source should not push more than 100 elements into buffer
await AwaitAssertAsync(() => Task.FromResult(counter.Current == 100));
await Task.Delay(200);
counter.Current.Should().Be(100);
// drain
Enumerable.Range(1, 150).ForEach(i =>
{
sub.Request(1);
subscriber.ExpectNext(i);
});
counter.Current.Should().Be(150);
subscriber.ExpectComplete();
} This modification would make it more explicit and maybe easier to understand: [Fact]
public async Task Buffer_must_accept_elements_that_fit_in_the_buffer_and_then_backpressure()
{
var subscriber = this.CreateManualSubscriberProbe<int>();
Source.From(Enumerable.Range(1, 150))
.WireTap(i => TestActor.Tell(i))
.Buffer(100, OverflowStrategy.Backpressure)
.To(Sink.FromSubscriber(subscriber))
.Run(Materializer);
var sub = await subscriber.ExpectSubscriptionAsync();
// Testing for backpressure, enumerable source should not push more than 100 elements into buffer
foreach (var i in Enumerable.Range(1, 100))
{
await ExpectMsgAsync(i);
}
await ExpectNoMsgAsync();
// drain
Enumerable.Range(1, 150).ForEach(i =>
{
sub.Request(1);
subscriber.ExpectNext(i);
});
subscriber.ExpectComplete();
} |
I just increased the Thank you for your suggestions anyway, it looks like a good way to test this scenario even if it avoids the publisher probe and, to be fair, it would have been more comfortable to use it, since it allows you to control the elements flow in a finer grained way. At this stage, it looks like it is specifically a publisher issue. |
I'm implementing a custom stream stage on Akka.NET Streams. In some sense, it backpressures in certain situations and, since it contains an inner queue, I need to check that the stage shouldn't pull new items under some circumstances.
In order to understand better how to do it, I've taken the following test on Buffer feature with backpressure from Akka Streams tests source code (FlowBufferSpec.cs in project Akka.Streams.Tests):
As you can see from the comment with the *, I'm trying to test that the stage shall not pull from the inlet, so actually asserting that the stage is backpressuring. The test above just states that it works if 100 elements are pushed, which is the actual size of the inner buffer. In fact, if I increase the number of elements to be pushed to 113 elements or more, the
SendNext
raises an exception.Pushing less elements does not make the
SendNext
throw, so it would be great also to know why it raises an error specifically at 13 elements more than the buffer size.I've tried to write
publisher.ExpectNoMsg()
in place of the comment, but it fails due to themax-input-buffer-size
setting in the materializer, which is 16 by default.My questions are:
The text was updated successfully, but these errors were encountered: