From 1fbd1d338f8b9303f5d88351e5a67c63e6416aea Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 4 Feb 2020 11:04:31 +0100 Subject: [PATCH] stream: fail eagerly during SubSink/SubSource materialization (#28492) This way the stack trace will be more helpful because it contains the stage that actually triggered the materialization. Otherwise, we will only fail during `preStart` in the interpreter where the stage will be failed and the error be propagated through the stream where it can be hard to figure out what happened. Also improve the message itself to contain the user provided name of the sink/source. --- .../scaladsl/FlowPrefixAndTailSpec.scala | 7 +++-- .../stream/scaladsl/FlowSplitWhenSpec.scala | 29 +++++++++++-------- .../stream/impl/fusing/StreamOfStreams.scala | 28 +++++++++++++++--- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 51b99a9775..3bc76cf449 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -100,8 +100,9 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" val subscriber2 = TestSubscriber.probe[Int]() tail.to(Sink.fromSubscriber(subscriber2)).run() - subscriber2.expectSubscriptionAndError().getMessage should ===( - "Substream Source cannot be materialized more than once") + val ex = subscriber2.expectSubscriptionAndError() + ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once") + ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true subscriber1.requestNext(2).expectComplete() @@ -126,7 +127,7 @@ class FlowPrefixAndTailSpec extends StreamSpec(""" tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) subscriber.expectSubscriptionAndError().getMessage should ===( - s"Substream Source has not been materialized in ${ms} milliseconds") + s"Substream Source(TailSource) has not been materialized in ${ms} milliseconds") } "not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped { @silent("deprecated") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index ddc17b2da0..69955a0114 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -9,6 +9,7 @@ import akka.NotUsed import akka.stream.Supervision.resumingDecider import akka.stream._ import akka.stream.impl.SubscriptionTimeoutException +import akka.stream.impl.fusing.Split import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ @@ -263,21 +264,25 @@ class FlowSplitWhenSpec extends StreamSpec(""" } "fail substream if materialized twice" in assertAllStagesStopped { - import system.dispatcher - val stream = Source(1 to 5) - .splitWhen(_ => true) - .lift - .map { src => - src.runWith(Sink.ignore).flatMap(_ => src.runWith(Sink.ignore)) - } - .toMat(TestSink.probe[Future[Done]])(Keep.right) + val stream = + Source(1 to 5) + // Need to drop to internal API to get a plain Source[Source[Int]] instead of a SubFlow. + // `lift` doesn't cut here because it will prevent the behavior we'd like to see. + // In fact, this test is somewhat useless, as a user cannot trigger double materialization using + // the public splitWhen => SubFlow API. + .via(Split.when(_ => true, SubstreamCancelStrategy.drain)) + .map { source => + // run twice, but make sure we return the result of the materialization that ran second + source.runWith(Sink.ignore).flatMap(_ => source.runWith(Sink.ignore)) + } + .toMat(TestSink.probe[Future[Done]])(Keep.right) val probe = stream.withAttributes(Attributes.inputBuffer(1, 1)).run() - probe.request(1) val future = probe.requestNext() - an[IllegalStateException] mustBe thrownBy { - Await.result(future, 3.seconds) - } + val ex = the[IllegalStateException] thrownBy Await.result(future, 3.seconds) + ex.getMessage should ===("Substream Source(SplitSource) cannot be materialized more than once") + ex.printStackTrace + ex.getStackTrace.exists(_.getClassName contains "FlowSplitWhenSpec") shouldBe true probe.cancel() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index a11393c567..c15f8819b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -700,10 +700,17 @@ import scala.util.control.NonFatal case cmd: CommandScheduledBeforeMaterialization => throw new IllegalStateException( - s"${newState.command} on subsink is illegal when ${cmd.command} is still pending") + s"${newState.command} on subsink($name) is illegal when ${cmd.command} is still pending") } override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler { + // check for previous materialization eagerly so we fail with a more useful stacktrace + private[this] val materializationException: OptionVal[IllegalStateException] = + if (status.get.isInstanceOf[AsyncCallback[_]]) + OptionVal.Some(createMaterializedTwiceException()) + else + OptionVal.None + setHandler(in, this) override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in))) @@ -726,7 +733,7 @@ import scala.util.control.NonFatal setCallback(callback) case _: /* Materialized */ AsyncCallback[Command @unchecked] => - failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) + failStage(materializationException.getOrElse(createMaterializedTwiceException())) } override def preStart(): Unit = @@ -734,6 +741,9 @@ import scala.util.control.NonFatal case RequestOne => tryPull(in) case Cancel(cause) => cancelStage(cause) } + + def createMaterializedTwiceException(): IllegalStateException = + new IllegalStateException(s"Substream Sink($name) cannot be materialized more than once") } override def toString: String = name @@ -778,9 +788,16 @@ import scala.util.control.NonFatal status.compareAndSet( null, ActorSubscriberMessage.OnError( - new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d"))) + new SubscriptionTimeoutException(s"Substream Source($name) has not been materialized in $d"))) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + // check for previous materialization eagerly so we fail with a more useful stacktrace + private[this] val materializationException: OptionVal[IllegalStateException] = + if (status.get.isInstanceOf[AsyncCallback[_]]) + OptionVal.Some(createMaterializedTwiceException()) + else + OptionVal.None + setHandler(out, this) @tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = { @@ -789,7 +806,7 @@ import scala.util.control.NonFatal case ActorSubscriberMessage.OnComplete => completeStage() case ActorSubscriberMessage.OnError(ex) => failStage(ex) case _: AsyncCallback[_] => - failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) + failStage(materializationException.getOrElse(createMaterializedTwiceException())) } } @@ -804,6 +821,9 @@ import scala.util.control.NonFatal override def onPull(): Unit = externalCallback.invoke(RequestOne) override def onDownstreamFinish(cause: Throwable): Unit = externalCallback.invoke(Cancel(cause)) + + def createMaterializedTwiceException(): IllegalStateException = + new IllegalStateException(s"Substream Source($name) cannot be materialized more than once") } override def toString: String = name