stream: fail eagerly during SubSink/SubSource materialization (#28492)

This way the stack trace will be more helpful because it contains the stage
that actually triggered the materialization.

Otherwise, we will only fail during `preStart` in the interpreter where the
stage will be failed and the error be propagated through the stream where
it can be hard to figure out what happened.

Also improve the message itself to contain the user provided name of the
sink/source.
This commit is contained in:
Johannes Rudolph 2020-02-04 11:04:31 +01:00 committed by GitHub
parent 918b556b0a
commit 1fbd1d338f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 19 deletions

View file

@ -100,8 +100,9 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
val subscriber2 = TestSubscriber.probe[Int]() val subscriber2 = TestSubscriber.probe[Int]()
tail.to(Sink.fromSubscriber(subscriber2)).run() tail.to(Sink.fromSubscriber(subscriber2)).run()
subscriber2.expectSubscriptionAndError().getMessage should ===( val ex = subscriber2.expectSubscriptionAndError()
"Substream Source cannot be materialized more than once") ex.getMessage should ===("Substream Source(TailSource) cannot be materialized more than once")
ex.getStackTrace.exists(_.getClassName contains "FlowPrefixAndTailSpec") shouldBe true
subscriber1.requestNext(2).expectComplete() subscriber1.requestNext(2).expectComplete()
@ -126,7 +127,7 @@ class FlowPrefixAndTailSpec extends StreamSpec("""
tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer)
subscriber.expectSubscriptionAndError().getMessage should ===( subscriber.expectSubscriptionAndError().getMessage should ===(
s"Substream Source has not been materialized in ${ms} milliseconds") s"Substream Source(TailSource) has not been materialized in ${ms} milliseconds")
} }
"not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped { "not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped {
@silent("deprecated") @silent("deprecated")

View file

@ -9,6 +9,7 @@ import akka.NotUsed
import akka.stream.Supervision.resumingDecider import akka.stream.Supervision.resumingDecider
import akka.stream._ import akka.stream._
import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.fusing.Split
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.StreamTestKit._
@ -263,21 +264,25 @@ class FlowSplitWhenSpec extends StreamSpec("""
} }
"fail substream if materialized twice" in assertAllStagesStopped { "fail substream if materialized twice" in assertAllStagesStopped {
import system.dispatcher import system.dispatcher
val stream = Source(1 to 5) val stream =
.splitWhen(_ => true) Source(1 to 5)
.lift // Need to drop to internal API to get a plain Source[Source[Int]] instead of a SubFlow.
.map { src => // `lift` doesn't cut here because it will prevent the behavior we'd like to see.
src.runWith(Sink.ignore).flatMap(_ => src.runWith(Sink.ignore)) // In fact, this test is somewhat useless, as a user cannot trigger double materialization using
} // the public splitWhen => SubFlow API.
.toMat(TestSink.probe[Future[Done]])(Keep.right) .via(Split.when(_ => true, SubstreamCancelStrategy.drain))
.map { source =>
// run twice, but make sure we return the result of the materialization that ran second
source.runWith(Sink.ignore).flatMap(_ => source.runWith(Sink.ignore))
}
.toMat(TestSink.probe[Future[Done]])(Keep.right)
val probe = stream.withAttributes(Attributes.inputBuffer(1, 1)).run() val probe = stream.withAttributes(Attributes.inputBuffer(1, 1)).run()
probe.request(1)
val future = probe.requestNext() val future = probe.requestNext()
an[IllegalStateException] mustBe thrownBy { val ex = the[IllegalStateException] thrownBy Await.result(future, 3.seconds)
Await.result(future, 3.seconds) ex.getMessage should ===("Substream Source(SplitSource) cannot be materialized more than once")
} ex.printStackTrace
ex.getStackTrace.exists(_.getClassName contains "FlowSplitWhenSpec") shouldBe true
probe.cancel() probe.cancel()
} }

View file

@ -700,10 +700,17 @@ import scala.util.control.NonFatal
case cmd: CommandScheduledBeforeMaterialization => case cmd: CommandScheduledBeforeMaterialization =>
throw new IllegalStateException( throw new IllegalStateException(
s"${newState.command} on subsink is illegal when ${cmd.command} is still pending") s"${newState.command} on subsink($name) is illegal when ${cmd.command} is still pending")
} }
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler { override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler {
// check for previous materialization eagerly so we fail with a more useful stacktrace
private[this] val materializationException: OptionVal[IllegalStateException] =
if (status.get.isInstanceOf[AsyncCallback[_]])
OptionVal.Some(createMaterializedTwiceException())
else
OptionVal.None
setHandler(in, this) setHandler(in, this)
override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in))) override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in)))
@ -726,7 +733,7 @@ import scala.util.control.NonFatal
setCallback(callback) setCallback(callback)
case _: /* Materialized */ AsyncCallback[Command @unchecked] => case _: /* Materialized */ AsyncCallback[Command @unchecked] =>
failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) failStage(materializationException.getOrElse(createMaterializedTwiceException()))
} }
override def preStart(): Unit = override def preStart(): Unit =
@ -734,6 +741,9 @@ import scala.util.control.NonFatal
case RequestOne => tryPull(in) case RequestOne => tryPull(in)
case Cancel(cause) => cancelStage(cause) case Cancel(cause) => cancelStage(cause)
} }
def createMaterializedTwiceException(): IllegalStateException =
new IllegalStateException(s"Substream Sink($name) cannot be materialized more than once")
} }
override def toString: String = name override def toString: String = name
@ -778,9 +788,16 @@ import scala.util.control.NonFatal
status.compareAndSet( status.compareAndSet(
null, null,
ActorSubscriberMessage.OnError( ActorSubscriberMessage.OnError(
new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d"))) new SubscriptionTimeoutException(s"Substream Source($name) has not been materialized in $d")))
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
// check for previous materialization eagerly so we fail with a more useful stacktrace
private[this] val materializationException: OptionVal[IllegalStateException] =
if (status.get.isInstanceOf[AsyncCallback[_]])
OptionVal.Some(createMaterializedTwiceException())
else
OptionVal.None
setHandler(out, this) setHandler(out, this)
@tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = { @tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = {
@ -789,7 +806,7 @@ import scala.util.control.NonFatal
case ActorSubscriberMessage.OnComplete => completeStage() case ActorSubscriberMessage.OnComplete => completeStage()
case ActorSubscriberMessage.OnError(ex) => failStage(ex) case ActorSubscriberMessage.OnError(ex) => failStage(ex)
case _: AsyncCallback[_] => case _: AsyncCallback[_] =>
failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) failStage(materializationException.getOrElse(createMaterializedTwiceException()))
} }
} }
@ -804,6 +821,9 @@ import scala.util.control.NonFatal
override def onPull(): Unit = externalCallback.invoke(RequestOne) override def onPull(): Unit = externalCallback.invoke(RequestOne)
override def onDownstreamFinish(cause: Throwable): Unit = externalCallback.invoke(Cancel(cause)) override def onDownstreamFinish(cause: Throwable): Unit = externalCallback.invoke(Cancel(cause))
def createMaterializedTwiceException(): IllegalStateException =
new IllegalStateException(s"Substream Source($name) cannot be materialized more than once")
} }
override def toString: String = name override def toString: String = name