Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams.TestKit: testing that a stage shall not pull from an inlet #7372

Open
marcotod1410 opened this issue Oct 31, 2024 · 2 comments
Open

Comments

@marcotod1410
Copy link

marcotod1410 commented Oct 31, 2024

I'm implementing a custom stream stage on Akka.NET Streams. In some sense, it backpressures in certain situations and, since it contains an inner queue, I need to check that the stage shouldn't pull new items under some circumstances.

In order to understand better how to do it, I've taken the following test on Buffer feature with backpressure from Akka Streams tests source code (FlowBufferSpec.cs in project Akka.Streams.Tests):

[Fact]
public void Buffer_must_accept_elements_that_fit_in_the_buffer_while_downstream_is_silent()
{
    var publisher = this.CreatePublisherProbe<int>();
    var subscriber = this.CreateManualSubscriberProbe<int>();

    Source.FromPublisher(publisher)
        .Buffer(100, OverflowStrategy.Backpressure)
        .To(Sink.FromSubscriber(subscriber))
        .Run(Materializer);

    var sub = subscriber.ExpectSubscription();

    // Fill up buffer
    Enumerable.Range(1, 100).ForEach(i => publisher.SendNext(i));

// * in this situation, the Buffer stage doesn't pull from the inlet. How to test it?

    // drain
    Enumerable.Range(1, 100).ForEach(i =>
    {
        sub.Request(1);
        subscriber.ExpectNext(i);
    });

    sub.Cancel();
}

As you can see from the comment with the *, I'm trying to test that the stage shall not pull from the inlet, so actually asserting that the stage is backpressuring. The test above just states that it works if 100 elements are pushed, which is the actual size of the inner buffer. In fact, if I increase the number of elements to be pushed to 113 elements or more, the SendNext raises an exception.

Pushing less elements does not make the SendNext throw, so it would be great also to know why it raises an error specifically at 13 elements more than the buffer size.

I've tried to write publisher.ExpectNoMsg() in place of the comment, but it fails due to the max-input-buffer-size setting in the materializer, which is 16 by default.

My questions are:

  • Is there a way to check that a stage should not pull an inlet, regardless of the materializer settings?
  • Is there a reason why, specifically, the 113th element pushed into the stage makes the test fail?
@Arkatufus
Copy link
Contributor

Arkatufus commented Nov 1, 2024

@marcotod1410
How did you determine that the 113th element threw? From my experiment with the code, it is the 102nd message that causes the publisher to throw.

To test for buffer backpressure, you need to change the test to this:

[Fact]
public async Task Buffer_must_accept_elements_that_fit_in_the_buffer_and_then_backpressure()
{
    var counter = new AtomicCounter(0);
    var subscriber = this.CreateManualSubscriberProbe<int>();

    Source.From(Enumerable.Range(1, 150))
        .Select(i =>
        {
            counter.IncrementAndGet();
            return i;
        })
        .Buffer(100, OverflowStrategy.Backpressure)
        .To(Sink.FromSubscriber(subscriber))
        .Run(Materializer);

    var sub = await subscriber.ExpectSubscriptionAsync();

    // Testing for backpressure, enumerable source should not push more than 100 elements into buffer
    await AwaitAssertAsync(() => Task.FromResult(counter.Current == 100));
    await Task.Delay(200);
    counter.Current.Should().Be(100);

    // drain
    Enumerable.Range(1, 150).ForEach(i =>
    {
        sub.Request(1);
        subscriber.ExpectNext(i);
    });
    counter.Current.Should().Be(150);

    subscriber.ExpectComplete();
}

This modification would make it more explicit and maybe easier to understand:

[Fact]
public async Task Buffer_must_accept_elements_that_fit_in_the_buffer_and_then_backpressure()
{
    var subscriber = this.CreateManualSubscriberProbe<int>();

    Source.From(Enumerable.Range(1, 150))
        .WireTap(i => TestActor.Tell(i))
        .Buffer(100, OverflowStrategy.Backpressure)
        .To(Sink.FromSubscriber(subscriber))
        .Run(Materializer);

    var sub = await subscriber.ExpectSubscriptionAsync();

    // Testing for backpressure, enumerable source should not push more than 100 elements into buffer
    foreach (var i in Enumerable.Range(1, 100))
    {
        await ExpectMsgAsync(i);
    }
    await ExpectNoMsgAsync();
    
    // drain
    Enumerable.Range(1, 150).ForEach(i =>
    {
        sub.Request(1);
        subscriber.ExpectNext(i);
    });

    subscriber.ExpectComplete();
}

@marcotod1410
Copy link
Author

How did you determine that the 113th element threw? From my experiment with the code, it is the 102nd message that causes the publisher to throw.

I just increased the count parameter in the first Enumerable.Range and it throws. Actually I expect it to throw even if I try to send the 101th element, since the size of the buffer is 100.
I'm not able to reproduce your case, since pushing 102 elements does not make it throw, at least until the 113th element (found it by trial-and-error). I'm editing the test directly in FlowBufferSpec.cs file in Akka.Streams.Test project.

Thank you for your suggestions anyway, it looks like a good way to test this scenario even if it avoids the publisher probe and, to be fair, it would have been more comfortable to use it, since it allows you to control the elements flow in a finer grained way. At this stage, it looks like it is specifically a publisher issue.

@Aaronontheweb Aaronontheweb modified the milestones: 1.5.31, 1.5.32 Nov 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants