diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 066b1eba75..a42ac62eb4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -92,21 +92,25 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" "throw if tail is attempted to be materialized twice" in assertAllStagesStopped { val futureSink = newHeadSink - val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink) - val (takes, tail) = Await.result(fut, 3.seconds) - takes should be(Seq(1)) + val fut = Source(1 to 3).prefixAndTail(1).runWith(futureSink) + val (prefix, tail) = Await.result(fut, 3.seconds) + prefix should be(Seq(1)) val subscriber1 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber1)).run() + // make sure it was materialized once before ... + subscriber1.ensureSubscription() + subscriber1.request(1) + subscriber1.expectNext(2) + // ... verifying what happens on a second materialization val subscriber2 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber2)).run() val ex = subscriber2.expectSubscriptionAndError() ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once") ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true - subscriber1.requestNext(2).expectComplete() - + subscriber1.requestNext(3).expectComplete() } "signal error if substream has been not subscribed in time" in assertAllStagesStopped {