diff --git a/akka-docs-dev/rst/java/stream-customize.rst b/akka-docs-dev/rst/java/stream-customize.rst index cf6ccd8e8c..f734d24cdf 100644 --- a/akka-docs-dev/rst/java/stream-customize.rst +++ b/akka-docs-dev/rst/java/stream-customize.rst @@ -169,6 +169,10 @@ In the second scenario the "event token" is somewhere upstream when the terminat Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is whether it is the downstream or the ``absorbTermination()`` call that calls the event handler. +.. warning:: + It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway, + it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active + downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed. Using PushStage --------------- diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index 313603fc05..928aaefa75 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -170,6 +170,11 @@ In the second scenario the "event token" is somewhere upstream when the terminat Observe, that in both scenarios ``onPull()`` kicks off the continuation of the processing logic, the only difference is whether it is the downstream or the ``absorbTermination()`` call that calls the event handler. +.. warning:: + It is not allowed to call ``absorbTermination()`` from ``onDownstreamFinish()``. If the method is called anyway, + it will be logged at ``ERROR`` level, but no further action will be taken as at that point there is no active + downstream to propagate the error to. Cancellation in the upstream direction will continue undisturbed. + Using PushStage --------------- diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 852a6f3e8e..4d0b32decd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing import akka.stream.stage._ +import akka.testkit.EventFilter import scala.util.control.NoStackTrace import akka.stream.Supervision @@ -518,16 +519,6 @@ class InterpreterSpec extends InterpreterSpecKit { } should be(true) } - "implement expand-filter" in pending - - "implement take-conflate" in pending - - "implement conflate-take" in pending - - "implement take-expand" in pending - - "implement expand-take" in pending - "implement take-take" in new TestSetup(Seq( Take(1), Take(1))) { @@ -554,11 +545,22 @@ class InterpreterSpec extends InterpreterSpecKit { } - "implement take-drop" in pending + class InvalidAbsorbTermination extends PushPullStage[Int, Int] { + override def onPull(ctx: Context[Int]): SyncDirective = ctx.pull() + override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = ctx.push(elem) + override def onDownstreamFinish(ctx: Context[Int]): TerminationDirective = ctx.absorbTermination() + } - "implement drop-take" in pending + "not allow absorbTermination from onDownstreamFinish()" in new TestSetup(Seq( + new InvalidAbsorbTermination)) { + lastEvents() should be(Set.empty) - "work with keep-going ops" in pending + EventFilter.error("It is not allowed to call absorbTermination() from onDownstreamFinish.").intercept { + downstream.cancel() + lastEvents() should be(Set(Cancel)) + } + + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala index bdb6efa411..031b2980d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala @@ -530,6 +530,15 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], override def incomingBall = DownstreamBall override def toString = "Cancelling" + + override def absorbTermination(): TerminationDirective = { + val ex = new UnsupportedOperationException("It is not allowed to call absorbTermination() from onDownstreamFinish.") + // This MUST be logged here, since the downstream has cancelled, i.e. there is noone to send onError to, the + // stage is just about to finish so noone will catch it anyway just the interpreter + log.error(ex.getMessage) + throw ex // We still throw for correctness (although a finish() would also work here) + } + } private final case class Failing(cause: Throwable) extends State {