stream: ensure async boundaries propagate cancellation causes (#29213)
Before, a cancellation cause might be lost if cancellation happened before subscription was completely handled.
This commit is contained in:
parent
a80624bd66
commit
d6fd8c30e0
2 changed files with 76 additions and 4 deletions
|
|
@ -4,9 +4,16 @@
|
||||||
|
|
||||||
package akka.stream
|
package akka.stream
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.stream.impl.UnfoldResourceSource
|
||||||
import akka.stream.impl.fusing.GraphInterpreter
|
import akka.stream.impl.fusing.GraphInterpreter
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
|
import akka.stream.stage.GraphStage
|
||||||
import akka.stream.testkit.StreamSpec
|
import akka.stream.testkit.StreamSpec
|
||||||
|
import akka.stream.testkit.Utils.TE
|
||||||
|
|
||||||
|
import scala.concurrent.{ duration, Await, Promise }
|
||||||
|
import duration._
|
||||||
|
|
||||||
class FusingSpec extends StreamSpec {
|
class FusingSpec extends StreamSpec {
|
||||||
|
|
||||||
|
|
@ -84,6 +91,71 @@ class FusingSpec extends StreamSpec {
|
||||||
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
refs.toSet should have size (in.size + 1) // outer/main actor + 1 actor per subflow
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//an UnfoldResourceSource equivalent without an async boundary
|
||||||
|
case class UnfoldResourceNoAsyncBoundry[T, S](create: () => S, readData: (S) => Option[T], close: (S) => Unit)
|
||||||
|
extends GraphStage[SourceShape[T]] {
|
||||||
|
val stage_ = new UnfoldResourceSource(create, readData, close)
|
||||||
|
override def initialAttributes: Attributes = Attributes.none
|
||||||
|
override val shape = stage_.shape
|
||||||
|
def createLogic(inheritedAttributes: Attributes) = stage_.createLogic(inheritedAttributes)
|
||||||
|
def asSource = Source.fromGraph(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
"propagate downstream errors through async boundary" in {
|
||||||
|
val promise = Promise[Done]
|
||||||
|
val slowInitSrc = UnfoldResourceNoAsyncBoundry(
|
||||||
|
() => { Await.result(promise.future, 1.minute); () },
|
||||||
|
(_: Unit) => Some(1),
|
||||||
|
(_: Unit) => ()).asSource.watchTermination()(Keep.right).async //commenting this out, makes the test pass
|
||||||
|
val downstream = Flow[Int]
|
||||||
|
.prepend(Source.single(1))
|
||||||
|
.flatMapPrefix(0) {
|
||||||
|
case Nil => throw TE("I hate mondays")
|
||||||
|
}
|
||||||
|
.watchTermination()(Keep.right)
|
||||||
|
.to(Sink.ignore)
|
||||||
|
|
||||||
|
val g = slowInitSrc.toMat(downstream)(Keep.both)
|
||||||
|
|
||||||
|
val (f1, f2) = g.run()
|
||||||
|
f2.failed.futureValue shouldEqual TE("I hate mondays")
|
||||||
|
f1.value should be(empty)
|
||||||
|
//by now downstream managed to fail, hence it already processed the message from Flow.single,
|
||||||
|
//hence we know for sure that all graph stage locics in the downstream interpreter were initialized(=preStart)
|
||||||
|
//hence upstream subscription was initiated.
|
||||||
|
//since we're still blocking upstream's preStart we know for sure it didn't respond to the subscription request
|
||||||
|
//since a blocked actor can not process additional messages from its inbox.
|
||||||
|
//so long story short: downstream was able to initialize, subscribe and fail before upstream responded to the subscription request.
|
||||||
|
//prior to akka#29194, this scenario resulted with cancellation signal rather than the expected error signal.
|
||||||
|
promise.success(Done)
|
||||||
|
f1.failed.futureValue shouldEqual TE("I hate mondays")
|
||||||
|
}
|
||||||
|
|
||||||
|
"propagate 'parallel' errors through async boundary via a common downstream" in {
|
||||||
|
val promise = Promise[Done]
|
||||||
|
val slowInitSrc = UnfoldResourceNoAsyncBoundry(
|
||||||
|
() => { Await.result(promise.future, 1.minute); () },
|
||||||
|
(_: Unit) => Some(1),
|
||||||
|
(_: Unit) => ()).asSource.watchTermination()(Keep.right).async //commenting this out, makes the test pass
|
||||||
|
|
||||||
|
val failingSrc = Source.failed(TE("I hate mondays")).watchTermination()(Keep.right)
|
||||||
|
|
||||||
|
val g = slowInitSrc.zipMat(failingSrc)(Keep.both).to(Sink.ignore)
|
||||||
|
|
||||||
|
val (f1, f2) = g.run()
|
||||||
|
f2.failed.futureValue shouldEqual TE("I hate mondays")
|
||||||
|
f1.value should be(empty)
|
||||||
|
//by now downstream managed to fail, hence it already processed the message from Flow.single,
|
||||||
|
//hence we know for sure that all graph stage locics in the downstream interpreter were initialized(=preStart)
|
||||||
|
//hence upstream subscription was initiated.
|
||||||
|
//since we're still blocking upstream's preStart we know for sure it didn't respond to the subscription request
|
||||||
|
//since a blocked actor can not process additional messages from its inbox.
|
||||||
|
//so long story short: downstream was able to initialize, subscribe and fail before upstream responded to the subscription request.
|
||||||
|
//prior to akka#29194, this scenario resulted with cancellation signal rather than the expected error signal.
|
||||||
|
promise.success(Done)
|
||||||
|
f1.failed.futureValue shouldEqual TE("I hate mondays")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -220,12 +220,12 @@ import akka.util.OptionVal
|
||||||
|
|
||||||
def onSubscribe(subscription: Subscription): Unit = {
|
def onSubscribe(subscription: Subscription): Unit = {
|
||||||
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
||||||
if (upstreamCompleted) {
|
if (downstreamCanceled.isDefined) {
|
||||||
// onComplete or onError has been called before OnSubscribe
|
|
||||||
tryCancel(subscription, SubscriptionWithCancelException.NoMoreElementsNeeded)
|
|
||||||
} else if (downstreamCanceled.isDefined) {
|
|
||||||
upstreamCompleted = true
|
upstreamCompleted = true
|
||||||
tryCancel(subscription, downstreamCanceled.get)
|
tryCancel(subscription, downstreamCanceled.get)
|
||||||
|
} else if (upstreamCompleted) {
|
||||||
|
// onComplete or onError has been called before OnSubscribe
|
||||||
|
tryCancel(subscription, SubscriptionWithCancelException.NoMoreElementsNeeded)
|
||||||
} else if (upstream != null) { // reactive streams spec 2.5
|
} else if (upstream != null) { // reactive streams spec 2.5
|
||||||
tryCancel(subscription, new IllegalStateException("Publisher can only be subscribed once."))
|
tryCancel(subscription, new IllegalStateException("Publisher can only be subscribed once."))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue