diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 51b99a9775..3bc76cf449 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -100,8 +100,9 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" val subscriber2 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber2)).run() - subscriber2.expectSubscriptionAndError().getMessage should ===( - "Substream Source cannot be materialized more than once") + val ex = subscriber2.expectSubscriptionAndError() + ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once") + ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true subscriber1.requestNext(2).expectComplete() @@ -126,7 +127,7 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) subscriber.expectSubscriptionAndError().getMessage should ===( - s"Substream Source has not been materialized in ${ms} milliseconds") + s"Substream Source(TailSource) has not been materialized in ${ms} milliseconds") } "not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped { @silent("deprecated") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index ddc17b2da0..69955a0114 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -9,6 +9,7 @@ import akka.NotUsed import akka.stream.Supervision.resumingDecider import akka.stream._ import akka.stream.impl.SubscriptionTimeoutException +import akka.stream.impl.fusing.Split import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ @@ -263,21 +264,25 @@ class FlowSplitWhenSpec extends StreamSpec(""" } "fail substream if materialized twice" in assertAllStagesStopped { - import system.dispatcher - val stream = Source(1 to 5) - .splitWhen(_ => true) - .lift - .map { src => - src.runWith(Sink.ignore).flatMap(_ => src.runWith(Sink.ignore)) - } - .toMat(TestSink.probe[Future[Done]])(Keep.right) + val stream = + Source(1 to 5) + // Need to drop to internal API to get a plain Source[Source[Int]] instead of a SubFlow. + // `lift` doesn't cut here because it will prevent the behavior we'd like to see. + // In fact, this test is somewhat useless, as a user cannot trigger double materialization using + // the public splitWhen => SubFlow API. + .via(Split.when(_ => true, SubstreamCancelStrategy.drain)) + .map { source => + // run twice, but make sure we return the result of the materialization that ran second + source.runWith(Sink.ignore).flatMap(_ => source.runWith(Sink.ignore)) + } + .toMat(TestSink.probe[Future[Done]])(Keep.right) val probe = stream.withAttributes(Attributes.inputBuffer(1, 1)).run() - probe.request(1) val future = probe.requestNext() - an[IllegalStateException] mustBe thrownBy { - Await.result(future, 3.seconds) - } + val ex = the[IllegalStateException] thrownBy Await.result(future, 3.seconds) + ex.getMessage should ===("Substream Source(SplitSource) cannot be materialized more than once") + ex.printStackTrace + ex.getStackTrace.exists(_.getClassName contains "FlowSplitWhenSpec") shouldBe true probe.cancel() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index a11393c567..c15f8819b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -700,10 +700,17 @@ import scala.util.control.NonFatal case cmd: CommandScheduledBeforeMaterialization => throw new IllegalStateException( - s"${newState.command} on subsink is illegal when ${cmd.command} is still pending") + s"${newState.command} on subsink($name) is illegal when ${cmd.command} is still pending") } override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler { + // check for previous materialization eagerly so we fail with a more useful stacktrace + private[this] val materializationException: OptionVal[IllegalStateException] = + if (status.get.isInstanceOf[AsyncCallback[_]]) + OptionVal.Some(createMaterializedTwiceException()) + else + OptionVal.None + setHandler(in, this) override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in))) @@ -726,7 +733,7 @@ import scala.util.control.NonFatal setCallback(callback) case _: /* Materialized */ AsyncCallback[Command @unchecked] => - failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) + failStage(materializationException.getOrElse(createMaterializedTwiceException())) } override def preStart(): Unit = @@ -734,6 +741,9 @@ import scala.util.control.NonFatal case RequestOne => tryPull(in) case Cancel(cause) => cancelStage(cause) } + + def createMaterializedTwiceException(): IllegalStateException = + new IllegalStateException(s"Substream Sink($name) cannot be materialized more than once") } override def toString: String = name @@ -778,9 +788,16 @@ import scala.util.control.NonFatal status.compareAndSet( null, ActorSubscriberMessage.OnError( - new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d"))) + new SubscriptionTimeoutException(s"Substream Source($name) has not been materialized in $d"))) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + // check for previous materialization eagerly so we fail with a more useful stacktrace + private[this] val materializationException: OptionVal[IllegalStateException] = + if (status.get.isInstanceOf[AsyncCallback[_]]) + OptionVal.Some(createMaterializedTwiceException()) + else + OptionVal.None + setHandler(out, this) @tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = { @@ -789,7 +806,7 @@ import scala.util.control.NonFatal case ActorSubscriberMessage.OnComplete => completeStage() case ActorSubscriberMessage.OnError(ex) => failStage(ex) case _: AsyncCallback[_] => - failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) + failStage(materializationException.getOrElse(createMaterializedTwiceException())) } } @@ -804,6 +821,9 @@ import scala.util.control.NonFatal override def onPull(): Unit = externalCallback.invoke(RequestOne) override def onDownstreamFinish(cause: Throwable): Unit = externalCallback.invoke(Cancel(cause)) + + def createMaterializedTwiceException(): IllegalStateException = + new IllegalStateException(s"Substream Source($name) cannot be materialized more than once") } override def toString: String = name