diff --git a/akka-bench-jmh/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/InterpreterBenchmark.scala index 5f38a07d45..4f60c3ba85 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -67,7 +67,7 @@ object InterpreterBenchmark { completeStage() } } - override def onDownstreamFinish(): Unit = completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = cancelStage(cause) }) } diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 61d4473445..437a96c52a 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -379,6 +379,9 @@ down. It is no longer required to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException]. +Additionally when downstream of the IO-sources cancels with a failure, the materialized value +is failed with that failure rather than completed successfully. + ### Akka now uses Fork Join Pool from JDK Previously, Akka contained a shaded copy of the ForkJoinPool. In benchmarks, we could not find significant benefits of @@ -535,3 +538,20 @@ Scala Java : @@snip [StreamAttributeDocTest.java](/akka-stream-tests/src/test/java/akka/stream/StreamAttributeDocTest.java) { #attributes-on-stream } + +### Stream cancellation available upstream + +Previously an Akka streams stage or operator failed it was impossible to discern this from +the stage just cancelling. This has been improved so that when a stream stage fails the cause +will be propagated upstream. + +The following operators have a slight change in behavior because of this: + +* `FileIO.fromPath`, `FileIO.fromFile` and `StreamConverters.fromInputStream` will fail the materialized future with + an `IOOperationIncompleteException` when downstream fails +* `.watchTermination` will fail the materialized `Future` or `CompletionStage` rather than completing it when downstream fails + +This also means that custom `GraphStage` implementations should be changed to pass on the +cancellation cause when downstream cancels by implementing the `OutHandler.onDownstreamFinish` signature +taking a `cause` parameter and calling `cancelStage(cause)` to pass the cause upstream. The old zero-argument +`onDownstreamFinish` method has been deprecated. diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index 2709c34c9b..f04601da08 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -69,12 +69,12 @@ class ActorGraphInterpreterSpec extends StreamSpec { setHandler(out1, new OutHandler { override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = cancel(in1) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in1, cause) }) setHandler(out2, new OutHandler { override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = cancel(in2) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in2, cause) }) } @@ -115,13 +115,13 @@ class ActorGraphInterpreterSpec extends StreamSpec { setHandler(out1, new OutHandler { override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = cancel(in1) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in1, cause) }) setHandler(out2, new OutHandler { override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = cancel(in2) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in2, cause) }) } @@ -164,13 +164,13 @@ class ActorGraphInterpreterSpec extends StreamSpec { setHandler(out1, new OutHandler { override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = cancel(in1) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in1, cause) }) setHandler(out2, new OutHandler { override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = cancel(in2) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in2, cause) }) } @@ -216,13 +216,13 @@ class ActorGraphInterpreterSpec extends StreamSpec { setHandler(out1, new OutHandler { override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = cancel(in2) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in2, cause) }) setHandler(out2, new OutHandler { override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = cancel(in1) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in1, cause) }) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala index 49b0230daf..29216c880c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterFailureModesSpec.scala @@ -19,7 +19,8 @@ class GraphInterpreterFailureModesSpec extends StreamSpec with GraphInterpreterS failOnNextEvent() stepAll() - lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(insideOutStage))) + lastEvents() should be( + Set(Cancel(upstream, testException), OnError(downstream, testException), PostStop(insideOutStage))) } "handle failure on onPush" in new FailingStageSetup { @@ -32,7 +33,8 @@ class GraphInterpreterFailureModesSpec extends StreamSpec with GraphInterpreterS failOnNextEvent() stepAll() - lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(insideOutStage))) + lastEvents() should be( + Set(Cancel(upstream, testException), OnError(downstream, testException), PostStop(insideOutStage))) } "handle failure on onPull while cancel is pending" in new FailingStageSetup { @@ -43,7 +45,7 @@ class GraphInterpreterFailureModesSpec extends StreamSpec with GraphInterpreterS failOnNextEvent() stepAll() - lastEvents() should be(Set(Cancel(upstream), PostStop(insideOutStage))) + lastEvents() should be(Set(Cancel(upstream, testException), PostStop(insideOutStage))) } "handle failure on onPush while complete is pending" in new FailingStageSetup { @@ -87,13 +89,14 @@ class GraphInterpreterFailureModesSpec extends StreamSpec with GraphInterpreterS failOnNextEvent() stepAll() - lastEvents() should be(Set(Cancel(upstream), PostStop(insideOutStage))) + lastEvents() should be(Set(Cancel(upstream, testException), PostStop(insideOutStage))) } "handle failure in preStart" in new FailingStageSetup(initFailOnNextEvent = true) { stepAll() - lastEvents() should be(Set(Cancel(upstream), OnError(downstream, testException), PostStop(insideOutStage))) + lastEvents() should be( + Set(Cancel(upstream, testException), OnError(downstream, testException), PostStop(insideOutStage))) } "handle failure in postStop" in new FailingStageSetup { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala index d9713639d5..ba5876d752 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing +import akka.stream.SubscriptionWithCancelException import akka.stream.testkit.StreamSpec import akka.stream.testkit.Utils._ @@ -426,7 +427,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) @@ -487,7 +488,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) @@ -548,7 +549,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) @@ -611,7 +612,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) @@ -660,7 +661,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) @@ -692,7 +693,7 @@ class GraphInterpreterPortsSpec extends StreamSpec with GraphInterpreterSpecKit an[IllegalArgumentException] should be thrownBy { in.grab() } stepAll() - lastEvents() should be(Set(Cancel(out))) + lastEvents() should be(Set(Cancel(out, SubscriptionWithCancelException.NoMoreElementsNeeded))) out.isAvailable should be(false) out.isClosed should be(true) in.isAvailable should be(false) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index bf8ea811ce..4ca5cd9195 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -335,7 +335,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { } case class OnComplete(source: GraphStageLogic) extends TestEvent - case class Cancel(source: GraphStageLogic) extends TestEvent + case class Cancel(source: GraphStageLogic, cause: Throwable) extends TestEvent case class OnError(source: GraphStageLogic, cause: Throwable) extends TestEvent case class OnNext(source: GraphStageLogic, elem: Any) extends TestEvent case class RequestOne(source: GraphStageLogic) extends TestEvent @@ -359,7 +359,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { setHandler(out, new OutHandler { override def onPull(): Unit = lastEvent += RequestOne(UpstreamProbe.this) - override def onDownstreamFinish(): Unit = lastEvent += Cancel(UpstreamProbe.this) + override def onDownstreamFinish(cause: Throwable): Unit = lastEvent += Cancel(UpstreamProbe.this, cause) override def toString = s"${UpstreamProbe.this.toString}.outHandler" }) @@ -425,7 +425,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { override def onPull(): Unit = pull(in) override def onUpstreamFinish(): Unit = complete(out) override def onUpstreamFailure(ex: Throwable): Unit = fail(out, ex) - override def onDownstreamFinish(): Unit = cancel(in) + override def onDownstreamFinish(cause: Throwable): Unit = cancel(in, cause) setHandlers(in, out, this) } @@ -525,7 +525,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { setHandler(stageout, new OutHandler { override def onPull(): Unit = mayFail(pull(stagein)) - override def onDownstreamFinish(): Unit = mayFail(completeStage()) + override def onDownstreamFinish(cause: Throwable): Unit = mayFail(completeStage()) override def toString = "insideOutStage.stageout" }) @@ -567,7 +567,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { sealed trait TestEvent case object OnComplete extends TestEvent - case object Cancel extends TestEvent + case class Cancel(cause: Throwable) extends TestEvent case class OnError(cause: Throwable) extends TestEvent case class OnNext(elem: Any) extends TestEvent case object RequestOne extends TestEvent @@ -604,7 +604,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { else lastEvent += RequestOne } - override def onDownstreamFinish(): Unit = lastEvent += Cancel + override def onDownstreamFinish(cause: Throwable): Unit = lastEvent += Cancel(cause) }) def onNext(elem: TT): Unit = { @@ -649,7 +649,7 @@ trait GraphInterpreterSpecKit extends StreamSpec { } def cancel(): Unit = { - cancel(in) + cancel(in, SubscriptionWithCancelException.NoMoreElementsNeeded) run() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 9e03fc6b65..0fe0a82b07 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -64,7 +64,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(5))) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } "work with only boundary ops" in new OneBoundedSetup[Int]() { @@ -127,7 +127,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } "implement take" in new OneBoundedSetup[Int](takeTwo) { @@ -144,7 +144,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(1) - lastEvents() should be(Set(OnNext(1), Cancel, OnComplete)) + lastEvents() should be(Set(OnNext(1), Cancel(SubscriptionWithCancelException.StageWasCompleted), OnComplete)) } "implement take inside a chain" in new OneBoundedSetup[Int]( @@ -167,7 +167,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(2) - lastEvents() should be(Set(Cancel, OnComplete, OnNext(3))) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.StageWasCompleted), OnComplete, OnNext(3))) } "implement fold" in new OneBoundedSetup[Int](Fold(0, (agg: Int, x: Int) => agg + x)) { @@ -206,7 +206,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } "work if fold completes while not in a push position" in new OneBoundedSetup[Int]( @@ -273,7 +273,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(4), RequestOne)) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } "implement expand" in new OneBoundedSetup[Int](new Expand(Iterator.continually(_: Int))) { @@ -330,7 +330,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(4), RequestOne)) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } @@ -395,7 +395,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(2))) downstream.cancel() - lastEvents() should be(Set(Cancel)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.NoMoreElementsNeeded))) } "implement doubler-conflate (doubler-batch)" in new OneBoundedSetup[Int]( @@ -518,7 +518,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(1) - lastEvents() should be(Set(OnNext(1), OnComplete, Cancel)) + lastEvents() should be(Set(OnNext(1), OnComplete, Cancel(SubscriptionWithCancelException.StageWasCompleted))) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index 83c04e7f9c..099a78b25f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl.fusing +import akka.stream.SubscriptionWithCancelException import akka.stream.testkit.StreamSpec import akka.testkit.LongRunningTest import akka.util.ConstantFun @@ -68,7 +69,8 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(0) - lastEvents() should be(Set(Cancel, OnComplete, OnNext(0 + chainLength))) + lastEvents() should be( + Set(Cancel(SubscriptionWithCancelException.StageWasCompleted), OnComplete, OnNext(0 + chainLength))) val time = (System.nanoTime() - tstamp) / (1000.0 * 1000.0 * 1000.0) // Not a real benchmark, just for sanity check @@ -82,7 +84,7 @@ class InterpreterStressSpec extends StreamSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(RequestOne)) upstream.onNext(0) - lastEvents() should be(Set(Cancel, OnNext(0), OnComplete)) + lastEvents() should be(Set(Cancel(SubscriptionWithCancelException.StageWasCompleted), OnNext(0), OnComplete)) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index b2f963d05a..028f5d562c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -33,7 +33,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(0) // boom - lastEvents() should be(Set(Cancel, OnError(TE))) + lastEvents() should be(Set(Cancel(TE), OnError(TE))) } "emit failure when op throws in middle of the chain" in new OneBoundedSetup[Int]( @@ -49,7 +49,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit downstream.requestOne() lastEvents() should be(Set(RequestOne)) upstream.onNext(-1) // boom - lastEvents() should be(Set(Cancel, OnError(TE))) + lastEvents() should be(Set(Cancel(TE), OnError(TE))) } "resume when Map throws" in new OneBoundedSetupWithDecider[Int]( @@ -158,7 +158,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnNext(-1))) upstream.onNext(2) // boom - lastEvents() should be(Set(OnError(TE), Cancel)) + lastEvents() should be(Set(OnError(TE), Cancel(TE))) } "fail when Expand `expander` throws" in new OneBoundedSetup[Int](new Expand((in: Int) => @@ -179,7 +179,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set.empty) downstream.requestOne() - lastEvents() should be(Set(OnError(TE), Cancel)) + lastEvents() should be(Set(OnError(TE), Cancel(TE))) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala index 03094774a4..719ea34dbd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/LifecycleInterpreterSpec.scala @@ -14,6 +14,8 @@ import scala.concurrent.duration._ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { + val boom = TE("Boom!") + "Interpreter" must { "call preStart in order on stages" in new OneBoundedSetup[String]( @@ -78,8 +80,8 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { expectNoMessage(300.millis) } - "onError when preStart fails" in new OneBoundedSetup[String](PreStartFailer(() => throw TE("Boom!"))) { - lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) + "onError when preStart fails" in new OneBoundedSetup[String](PreStartFailer(() => throw boom)) { + lastEvents() should ===(Set(Cancel(boom), OnError(boom))) } "not blow up when postStop fails" in new OneBoundedSetup[String](PostStopFailer(() => throw TE("Boom!"))) { @@ -89,9 +91,9 @@ class LifecycleInterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { "onError when preStart fails with stages after" in new OneBoundedSetup[String]( Map((x: Int) => x), - PreStartFailer(() => throw TE("Boom!")), + PreStartFailer(() => throw boom), Map((x: Int) => x)) { - lastEvents() should ===(Set(Cancel, OnError(TE("Boom!")))) + lastEvents() should ===(Set(Cancel(boom), OnError(boom))) } "continue with stream shutdown when postStop fails" in new OneBoundedSetup[String](PostStopFailer(() => diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index cf7b39aaee..5b1c51c3da 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -25,6 +25,8 @@ import com.google.common.jimfs.{ Configuration, Jimfs } import scala.concurrent.duration._ import com.github.ghik.silencer.silent +import scala.concurrent.Future + object FileSourceSpec { final case class Settings(chunkSize: Int, readAhead: Int) } @@ -242,6 +244,13 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } } + "complete materialized future with a failure if upstream fails" in { + val matVal: Future[IOResult] = + FileIO.fromPath(manyLines, chunkSize = 4).map(_ => throw new RuntimeException).to(Sink.ignore).run() + + matVal.failed.futureValue shouldBe a[IOOperationIncompleteException] + } + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 7e549cac86..6fe4a9de5b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -377,8 +377,8 @@ class TlsSpec extends StreamSpec(TlsSpec.configOverrides) with WithLogCapturing override def onPush() = push(out, grab(in)) override def onPull() = pull(in) - override def onDownstreamFinish() = { - system.log.debug("me cancelled") + override def onDownstreamFinish(cause: Throwable) = { + system.log.debug(s"me cancelled, cause {}", cause) completeStage() } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala index 210637f7bc..5c39d0b154 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphStageTimersSpec.scala @@ -160,7 +160,7 @@ class GraphStageTimersSpec extends StreamSpec { setHandler(out, new OutHandler { override def onPull() = () // Do nothing - override def onDownstreamFinish() = completeStage() + override def onDownstreamFinish(cause: Throwable) = completeStage() }) setHandler(in, new InHandler { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala index 25a77dfdd8..3c9e80600d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphUnzipWithSpec.scala @@ -6,12 +6,17 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.testkit.Utils.TE import akka.stream.testkit._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.testkit.EventFilter +import akka.testkit.TestProbe import akka.util.unused +import akka.Done +import akka.NotUsed import org.reactivestreams.Publisher +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -191,6 +196,40 @@ class GraphUnzipWithSpec extends StreamSpec(""" rightProbe.expectNoMessage(100.millis) } + "propagagate last downstream cancellation cause once all downstreams have cancelled" in { + val probe = TestProbe() + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b => + val source = Source + .maybe[Int] + .watchTermination()(Keep.right) + .mapMaterializedValue(termination => + // side effecting our way out of this + probe.ref ! termination) + + val unzip = b.add(UnzipWith[Int, Int, String]((b: Int) => (1 / b, s"1 / $b"))) + + source ~> unzip.in + + def killSwitchFlow[T] = Flow[T].viaMat(KillSwitches.single)(Keep.right).mapMaterializedValue { killSwitch => + probe.ref ! killSwitch + NotUsed + } + unzip.out0 ~> killSwitchFlow[Int] ~> Sink.ignore + unzip.out1 ~> killSwitchFlow[String] ~> Sink.ignore + + ClosedShape + }) + .run() + val termination = probe.expectMsgType[Future[_]].asInstanceOf[Future[Done]] + val killSwitch1 = probe.expectMsgType[UniqueKillSwitch] + val killSwitch2 = probe.expectMsgType[UniqueKillSwitch] + val boom = TE("Boom") + killSwitch1.abort(boom) + killSwitch2.abort(boom) + termination.failed.futureValue should ===(boom) + } + "unzipWith expanded Person.unapply (3 outputs)" in { val probe0 = TestSubscriber.manualProbe[String]() val probe1 = TestSubscriber.manualProbe[String]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala index e50454a8ab..ae9c30af10 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySourceSpec.scala @@ -15,10 +15,8 @@ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.StreamSpec import akka.stream.testkit.TestPublisher import akka.stream.testkit.TestSubscriber -import akka.stream.Attributes -import akka.stream.Outlet -import akka.stream.SourceShape -import akka.testkit.DefaultTimeout +import akka.stream.{ Attributes, KillSwitches, Outlet, SourceShape } +import akka.testkit.{ DefaultTimeout, TestProbe } import org.scalatest.concurrent.ScalaFutures import scala.collection.immutable.Seq @@ -86,6 +84,25 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { probe.cancel() } + "propagate downstream cancellation cause when inner source has been materialized" in { + val probe = TestProbe() + val (doneF, killswitch) = + Source + .lazily(() => + Source.maybe[Int].watchTermination()(Keep.right).mapMaterializedValue { done => + probe.ref ! Done + done + }) + .mapMaterializedValue(_.flatten) + .viaMat(KillSwitches.single)(Keep.both) + .to(Sink.ignore) + .run() + val boom = TE("boom") + probe.expectMsg(Done) + killswitch.abort(boom) + doneF.failed.futureValue should ===(boom) + } + "fail stage when upstream fails" in assertAllStagesStopped { val outProbe = TestSubscriber.probe[Int]() val inProbe = TestPublisher.probe[Int]() @@ -115,6 +132,7 @@ class LazySourceSpec extends StreamSpec with DefaultTimeout with ScalaFutures { result.failed.futureValue should ===(matFail) } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala index 90bef35106..11f09f3bba 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/MaybeSourceSpec.scala @@ -4,11 +4,11 @@ package akka.stream.scaladsl -import akka.stream.AbruptStageTerminationException -import akka.stream.Materializer +import akka.stream.{ AbruptStageTerminationException, KillSwitches, Materializer } import akka.stream.testkit.StreamSpec import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.stream.testkit.Utils.TE import akka.testkit.DefaultTimeout import scala.concurrent.duration._ @@ -18,7 +18,7 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { "The Maybe Source" must { - "complete materialized future with None when stream cancels" in assertAllStagesStopped { + "complete materialized promise with None when stream cancels" in assertAllStagesStopped { val neverSource = Source.maybe[Int] val pubSink = Sink.asPublisher[Int](false) @@ -35,6 +35,15 @@ class MaybeSourceSpec extends StreamSpec with DefaultTimeout { f.future.futureValue shouldEqual None } + "complete materialized promise with None when stream cancels with a failure cause" in assertAllStagesStopped { + val (promise, killswitch) = Source.maybe[Int].viaMat(KillSwitches.single)(Keep.both).to(Sink.ignore).run() + val boom = TE("Boom") + killswitch.abort(boom) + // Could make sense to fail it with the propagated exception instead but that breaks + // the assumptions in the CoupledTerminationFlowSpec + promise.future.futureValue should ===(None) + } + "allow external triggering of empty completion" in assertAllStagesStopped { val neverSource = Source.maybe[Int].filter(_ => false) val counterSink = Sink.fold[Int, Int](0) { (acc, _) => diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template index f242d5e8c9..a38fa20edb 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/UnzipWithApply.scala.template @@ -75,9 +75,9 @@ class UnzipWith1[In, [#A1#]](val unzipper: In => ([#A1#])) extends GraphStage[Fa if (pendingCount == ##0) pull(in) } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { downstreamRunning -= ##1 - if (downstreamRunning == ##0) completeStage() + if (downstreamRunning == ##0) cancelStage(cause) else { if (pending0) pendingCount -= ##1 if (pendingCount == ##0 && !hasBeenPulled(in)) pull(in) diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index c611811bfd..c5e5eedf87 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -224,4 +224,7 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.s ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Materializer.shutdown") # StageActor was never meant to be constructed by users ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic#StageActor.this") -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.stage.GraphStageLogic#StageActor.this") \ No newline at end of file +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.stage.GraphStageLogic#StageActor.this") + +# Internal class made final #27472 +ProblemFilters.exclude[FinalClassProblem]("akka.stream.scaladsl.CoupledTerminationBidi") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/IOResult.scala b/akka-stream/src/main/scala/akka/stream/IOResult.scala index 5afcc32a7f..bd48636e60 100644 --- a/akka-stream/src/main/scala/akka/stream/IOResult.scala +++ b/akka-stream/src/main/scala/akka/stream/IOResult.scala @@ -79,4 +79,10 @@ final case class AbruptIOTerminationException(ioResult: IOResult, cause: Throwab * @param count The number of bytes read/written up until the error * @param cause cause */ -final class IOOperationIncompleteException(val count: Long, cause: Throwable) extends RuntimeException(cause) +final class IOOperationIncompleteException(message: String, val count: Long, cause: Throwable) + extends RuntimeException(message, cause) { + + def this(count: Long, cause: Throwable) = + this(s"IO operation was stopped unexpectedly after $count bytes because of $cause", count, cause) + +} diff --git a/akka-stream/src/main/scala/akka/stream/SubscriptionWithCancelException.scala b/akka-stream/src/main/scala/akka/stream/SubscriptionWithCancelException.scala index 29500561b7..24ec4dcb89 100644 --- a/akka-stream/src/main/scala/akka/stream/SubscriptionWithCancelException.scala +++ b/akka-stream/src/main/scala/akka/stream/SubscriptionWithCancelException.scala @@ -4,6 +4,7 @@ package akka.stream +import akka.annotation.DoNotInherit import org.reactivestreams.Subscription import scala.util.control.NoStackTrace @@ -19,6 +20,12 @@ trait SubscriptionWithCancelException extends Subscription { def cancel(cause: Throwable): Unit } object SubscriptionWithCancelException { - case object NoMoreElementsNeeded extends RuntimeException with NoStackTrace - case object StageWasCompleted extends RuntimeException with NoStackTrace + + /** + * Not for user extension + */ + @DoNotInherit + sealed abstract class NonFailureCancellation extends RuntimeException with NoStackTrace + case object NoMoreElementsNeeded extends NonFailureCancellation + case object StageWasCompleted extends NonFailureCancellation } diff --git a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala index aa542e4923..e30fc91225 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala @@ -34,8 +34,9 @@ import scala.util.control.NonFatal val matPromise = Promise[M]() val logic = new GraphStageLogic(shape) with OutHandler { - override def onDownstreamFinish(): Unit = { - matPromise.failure(new RuntimeException("Downstream canceled without triggering lazy source materialization")) + override def onDownstreamFinish(cause: Throwable): Unit = { + matPromise.failure( + new RuntimeException("Downstream canceled without triggering lazy source materialization", cause)) completeStage() } @@ -49,8 +50,8 @@ import scala.util.control.NonFatal subSink.pull() } - override def onDownstreamFinish(): Unit = { - subSink.cancel() + override def onDownstreamFinish(cause: Throwable): Unit = { + subSink.cancel(cause) completeStage() } }) diff --git a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala index 05ab3f5bcf..50c479f9c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala @@ -6,9 +6,9 @@ package akka.stream.impl import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts -import akka.stream.{ AbruptStageTerminationException, Attributes, Outlet, SourceShape } import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler } +import akka.stream._ import akka.util.OptionVal import scala.concurrent.Promise @@ -26,7 +26,7 @@ import scala.util.Try override def createLogicAndMaterializedValue( inheritedAttributes: Attributes): (GraphStageLogic, Promise[Option[AnyRef]]) = { - import scala.util.{ Success => ScalaSuccess, Failure => ScalaFailure } + import scala.util.{ Failure => ScalaFailure, Success => ScalaSuccess } val promise = Promise[Option[AnyRef]]() val logic = new GraphStageLogic(shape) with OutHandler { @@ -67,7 +67,7 @@ import scala.util.Try } } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { promise.tryComplete(ScalaSuccess(None)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index da0a00dfe6..73466a84cf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -152,7 +152,7 @@ import scala.concurrent.{ Future, Promise } setHandler(out, this) - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { pendingOffer match { case Some(Offer(_, promise)) => promise.success(QueueOfferResult.QueueClosed) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala index 8cb585832a..92932064f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SetupStage.scala @@ -35,7 +35,7 @@ import scala.util.control.NonFatal import SetupStage._ val subOutlet = new SubSourceOutlet[T]("SetupSinkStage") - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) + subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause))) setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) override def preStart(): Unit = { @@ -74,7 +74,7 @@ import scala.util.control.NonFatal val subOutlet = new SubSourceOutlet[T]("SetupFlowStage") subInlet.setHandler(delegateToOutlet(push(out, _: U), () => complete(out), fail(out, _), subInlet)) - subOutlet.setHandler(delegateToInlet(() => pull(in), () => cancel(in))) + subOutlet.setHandler(delegateToInlet(() => pull(in), cause => cancel(in, cause))) setHandler(in, delegateToSubOutlet(() => grab(in), subOutlet)) setHandler(out, delegateToSubInlet(subInlet)) @@ -158,14 +158,14 @@ private object SetupStage { def delegateToSubInlet[T](subInlet: GraphStageLogic#SubSinkInlet[T]) = new OutHandler { override def onPull(): Unit = subInlet.pull() - override def onDownstreamFinish(): Unit = - subInlet.cancel() + override def onDownstreamFinish(cause: Throwable): Unit = + subInlet.cancel(cause) } - def delegateToInlet(pull: () => Unit, cancel: () => Unit) = new OutHandler { + def delegateToInlet(pull: () => Unit, cancel: (Throwable) => Unit) = new OutHandler { override def onPull(): Unit = pull() - override def onDownstreamFinish(): Unit = - cancel() + override def onDownstreamFinish(cause: Throwable): Unit = + cancel(cause) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala index e3974ac951..d1360f048e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala @@ -61,7 +61,7 @@ import scala.util.control.NonFatal if (resumingMode) onPull() } - override def onDownstreamFinish(): Unit = closeStage() + override def onDownstreamFinish(cause: Throwable): Unit = closeStage() private def restartState(): Unit = { open = false diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 014d8653b0..4cec235074 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -137,9 +137,14 @@ import scala.concurrent.{ Future, Promise } def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = { - finishPromise.success(Done) - completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = { + cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + finishPromise.success(Done) + case ex => + finishPromise.failure(ex) + } + cancelStage(cause) } override def postStop(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 747e31d981..4e3cf951fb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1996,7 +1996,7 @@ private[stream] object Collect { val outHandler = new OutHandler { override def onPull(): Unit = sinkIn.pull() - override def onDownstreamFinish(): Unit = sinkIn.cancel() + override def onDownstreamFinish(cause: Throwable): Unit = sinkIn.cancel(cause) } Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer) @@ -2218,8 +2218,8 @@ private[stream] object Collect { override def onPull(): Unit = { subInlet.pull() } - override def onDownstreamFinish(): Unit = { - subInlet.cancel() + override def onDownstreamFinish(cause: Throwable): Unit = { + subInlet.cancel(cause) maybeCompleteStage() } }) @@ -2240,9 +2240,9 @@ private[stream] object Collect { } } } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { if (!isClosed(in)) { - cancel(in) + cancel(in, cause) } maybeCompleteStage() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index aae124ff85..070b9159e3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -235,8 +235,8 @@ import scala.util.control.NonFatal } else failStage(ex) } - override def onDownstreamFinish(): Unit = { - if (!prefixComplete) completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = { + if (!prefixComplete) cancelStage(cause) // Otherwise substream is open, ignore } @@ -298,10 +298,10 @@ import scala.util.control.NonFatal true } else false - private def tryCancel(): Boolean = + private def tryCancel(cause: Throwable): Boolean = // if there's no active substreams or there's only one but it's not been pushed yet if (activeSubstreamsMap.isEmpty || (activeSubstreamsMap.size == 1 && substreamWaitingToBePushed.isDefined)) { - completeStage() + cancelStage(cause) true } else false @@ -334,7 +334,7 @@ import scala.util.control.NonFatal override def onUpstreamFinish(): Unit = if (!tryCompleteAll()) setKeepGoing(true) - override def onDownstreamFinish(): Unit = if (!tryCancel()) setKeepGoing(true) + override def onDownstreamFinish(cause: Throwable): Unit = if (!tryCancel(cause)) setKeepGoing(true) override def onPush(): Unit = try { @@ -427,11 +427,11 @@ import scala.util.control.NonFatal tryCompleteHandler() } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { if (hasNextElement && nextElementKey == key) clearNextElement() if (firstPush()) firstPushCounter -= 1 completeSubStream() - if (parent.isClosed(out)) tryCancel() + if (parent.isClosed(out)) tryCancel(cause) if (parent.isClosed(in)) tryCompleteAll() else if (needToPull) pull(in) } @@ -504,9 +504,9 @@ import scala.util.control.NonFatal } else if (substreamWaitingToBePushed) pushSubstreamSource() } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { // If the substream is already cancelled or it has not been handed out, we can go away - if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) completeStage() + if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause) } }) @@ -588,10 +588,10 @@ import scala.util.control.NonFatal } else pull(in) } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { substreamCancelled = true if (isClosed(in) || propagateSubstreamCancel) { - completeStage() + cancelStage(cause) } else { // Start draining if (!hasBeenPulled(in)) pull(in) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 582697a5c0..2c5babd8f4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -121,7 +121,16 @@ private[akka] final class FileSource(path: Path, chunkSize: Int, startPosition: } } else chunks - override def onDownstreamFinish(): Unit = success() + override def onDownstreamFinish(cause: Throwable): Unit = { + cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + success() + case ex => + ioResultPromise.tryFailure( + new IOOperationIncompleteException("Downstream failed before reaching file end", position, ex)) + completeStage() + } + } override def postStop(): Unit = { ioResultPromise.trySuccess(IOResult(position, Success(Done))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala index 8dc46f08dd..2efad288a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala @@ -14,7 +14,8 @@ import akka.stream.{ IOOperationIncompleteException, IOResult, Outlet, - SourceShape + SourceShape, + SubscriptionWithCancelException } import akka.stream.stage.{ GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, OutHandler } import akka.util.ByteString @@ -70,10 +71,19 @@ private[akka] final class InputStreamSource(factory: () => InputStream, chunkSiz failStage(t) } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { if (!isClosed) { closeInputStream() - mat.trySuccess(IOResult(readBytesTotal)) + cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + mat.trySuccess(IOResult(readBytesTotal)) + case ex: Throwable => + mat.tryFailure( + new IOOperationIncompleteException( + "Downstream failed before input stream reached end", + readBytesTotal, + ex)) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index acd3e9276d..bc0190917a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -55,7 +55,7 @@ import scala.concurrent.{ Future, Promise } override def createLogicAndMaterializedValue(inheritedAttributes: Attributes, eagerMaterialzer: Materializer) = { val bindingPromise = Promise[ServerBinding] - val logic = new TimerGraphStageLogic(shape) { + val logic = new TimerGraphStageLogic(shape) with StageLogging { implicit def self: ActorRef = stageActor.ref val connectionFlowsAwaitingInitialization = new AtomicLong() @@ -115,14 +115,33 @@ import scala.concurrent.{ Future, Promise } } } - setHandler(out, new OutHandler { - override def onPull(): Unit = { - // Ignore if still binding - if (listener ne null) listener ! ResumeAccepting(1) - } + setHandler( + out, + new OutHandler { + override def onPull(): Unit = { + // Ignore if still binding + if (listener ne null) listener ! ResumeAccepting(1) + } - override def onDownstreamFinish(): Unit = tryUnbind() - }) + override def onDownstreamFinish(cause: Throwable): Unit = { + if (log.isDebugEnabled) { + cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + log.debug( + "Unbinding from {}:{} because downstream cancelled stream", + endpoint.getHostString, + endpoint.getPort) + case ex => + log.debug( + "Unbinding from {}:{} because of downstream failure: {}", + endpoint.getHostString, + endpoint.getPort, + ex) + } + } + tryUnbind() + } + }) private def connectionFor(connected: Connected, connection: ActorRef): StreamTcp.IncomingConnection = { connectionFlowsAwaitingInitialization.incrementAndGet() @@ -215,7 +234,8 @@ private[stream] object ConnectionSourceStage { inheritedAttributes: Attributes, remoteAddress: InetSocketAddress, eagerMaterializer: Materializer) - extends GraphStageLogic(shape) { + extends GraphStageLogic(shape) + with StageLogging { implicit def self: ActorRef = stageActor.ref private def bytesIn = shape.in @@ -334,9 +354,24 @@ private[stream] object ConnectionSourceStage { connection ! ResumeReading } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { if (!isClosed(bytesIn)) connection ! ResumeReading else { + if (log.isDebugEnabled) { + cause match { + case _: SubscriptionWithCancelException.NonFailureCancellation => + log.debug( + "Aborting connection from {}:{} because downstream cancelled stream", + remoteAddress.getHostString, + remoteAddress.getPort) + case ex => + log.debug( + "Aborting connection from {}:{} because of downstream failure: {}", + remoteAddress.getHostString, + remoteAddress.getPort, + ex) + } + } connection ! Abort completeStage() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala index 79df76eaed..db4903d8fd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala @@ -128,7 +128,8 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file to read from */ @@ -144,7 +145,8 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file path to read from */ @@ -159,7 +161,9 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. + * * @param f the file to read from * @param chunkSize the size of each read operation */ @@ -176,7 +180,8 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file path to read from * @param chunkSize the size of each read operation @@ -193,7 +198,8 @@ object FileIO { * set it for a given Source by using [[ActorAttributes]]. * * It materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file path to read from * @param chunkSize the size of each read operation diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala index de47c40787..878840413e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala @@ -128,7 +128,9 @@ object StreamConverters { * You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or * set it for a given Source by using [[akka.stream.ActorAttributes]]. * - * It materializes a [[CompletionStage]] containing the number of bytes read from the source file upon completion. + * It materializes a [[CompletionStage]] of [[IOResult]] containing the number of bytes read from the source file upon completion, + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * The created [[InputStream]] will be closed when the [[Source]] is cancelled. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala index 0d551dd34e..fa12471d3f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala @@ -66,7 +66,7 @@ object CoupledTerminationFlow { } /** INTERNAL API */ -private[stream] class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[I, I, O, O]] { +private[stream] final class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[I, I, O, O]] { val in1: Inlet[I] = Inlet("CoupledCompletion.in1") val out1: Outlet[I] = Outlet("CoupledCompletion.out1") val in2: Inlet[O] = Inlet("CoupledCompletion.in2") @@ -79,7 +79,7 @@ private[stream] class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[ override def onPush(): Unit = push(out1, grab(in1)) override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = cancelStage(cause) override def onUpstreamFinish(): Unit = completeStage() override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) } @@ -88,7 +88,7 @@ private[stream] class CoupledTerminationBidi[I, O] extends GraphStage[BidiShape[ override def onPush(): Unit = push(out2, grab(in2)) override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = cancelStage(cause) override def onUpstreamFinish(): Unit = completeStage() override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala index d6d312e0bb..31db18dfef 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala @@ -29,7 +29,8 @@ object FileIO { * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file to read from * @param chunkSize the size of each read operation, defaults to 8192 @@ -47,7 +48,8 @@ object FileIO { * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file path to read from * @param chunkSize the size of each read operation, defaults to 8192 @@ -64,7 +66,8 @@ object FileIO { * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * @param f the file path to read from * @param chunkSize the size of each read operation, defaults to 8192 diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ad61589989..b6bbc59779 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3308,8 +3308,8 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { /** * Materializes to `Future[Done]` that completes on getting termination message. * The Future completes with success when received complete message from upstream or cancel - * from downstream. It fails with the same error when received error message from - * downstream. + * from downstream. It fails with the propagated error when received error message from + * upstream or downstream. * * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index f64683c846..ee4b175494 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -644,11 +644,11 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends tryPull() } - override def onDownstreamFinish() = { - if (eagerCancel) completeStage() + override def onDownstreamFinish(cause: Throwable) = { + if (eagerCancel) cancelStage(cause) else { downstreamsRunning -= 1 - if (downstreamsRunning == 0) completeStage() + if (downstreamsRunning == 0) cancelStage(cause) else if (pending(i)) { pending(i) = false pendingCount -= 1 @@ -719,8 +719,8 @@ private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] pull(in) } - override def onDownstreamFinish(): Unit = { - completeStage() + override def onDownstreamFinish(cause: Throwable): Unit = { + cancelStage(cause) } }) @@ -737,7 +737,7 @@ private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] } } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { setHandler(in, new InHandler { override def onPush() = { push(outMain, grab(in)) @@ -869,12 +869,12 @@ final class Partition[T](val outputPorts: Int, val partitioner: T => Int, val ea pull(in) } - override def onDownstreamFinish(): Unit = - if (eagerCancel) completeStage() + override def onDownstreamFinish(cause: Throwable) = + if (eagerCancel) cancelStage(cause) else { downstreamRunning -= 1 if (downstreamRunning == 0) - completeStage() + cancelStage(cause) else if (outPendingElem != null) { if (idx == outPendingIdx) { outPendingElem = null @@ -987,11 +987,11 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, } else pendingQueue.enqueue(o) } - override def onDownstreamFinish() = { - if (eagerCancel) completeStage() + override def onDownstreamFinish(cause: Throwable) = { + if (eagerCancel) cancelStage(cause) else { downstreamsRunning -= 1 - if (downstreamsRunning == 0) completeStage() + if (downstreamsRunning == 0) cancelStage(cause) else if (!hasPulled && needDownstreamPulls > 0) { needDownstreamPulls -= 1 if (needDownstreamPulls == 0 && !hasBeenPulled(in)) pull(in) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index 8e874eab59..a55fc87935 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -12,9 +12,10 @@ import akka.stream._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.scaladsl.RestartWithBackoffFlow.Delay import akka.stream.stage._ -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.stream.Attributes.LogLevels +import akka.util.OptionVal /** * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. @@ -272,9 +273,9 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( setHandler(out, new OutHandler { override def onPull() = sinkIn.pull() - override def onDownstreamFinish() = { + override def onDownstreamFinish(cause: Throwable) = { finishing = true - sinkIn.cancel() + sinkIn.cancel(cause) } }) sinkIn @@ -385,12 +386,15 @@ object RestartWithBackoffFlow { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + private var cause: OptionVal[Throwable] = OptionVal.None + setHandlers(in, out, this) def onPush(): Unit = push(out, grab(in)) def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { + this.cause = OptionVal.Some(cause) scheduleOnce("CompleteState", delay) setHandler(in, new InHandler { def onPush(): Unit = {} @@ -399,7 +403,13 @@ object RestartWithBackoffFlow { override protected def onTimer(timerKey: Any): Unit = { log.debug(s"Stage was canceled after delay of $delay") - completeStage() + cause match { + case OptionVal.Some(ex) => + cancelStage(ex) + case OptionVal.None => + throw new IllegalStateException("Timer hitting without first getting a cancel cannot happen") + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala index 66251416f0..3dbcee2c1a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamConverters.scala @@ -35,7 +35,8 @@ object StreamConverters { * set it for a given Source by using [[akka.stream.ActorAttributes]]. * * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion, - * and a possible exception if IO operation was not completed successfully. + * and a possible exception if IO operation was not completed successfully. Note that bytes having been read by the source does + * not give any guarantee that the bytes were seen by downstream stages. * * The created [[InputStream]] will be closed when the [[Source]] is cancelled. * diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index b63993f3da..6c28fe9bef 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -1800,8 +1800,7 @@ trait OutHandler { * be called for this port. */ @throws(classOf[Exception]) - // FIXME: add this after fixing our own usages, https://github.com/akka/akka/issues/27472 - // @deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "2.6.0") // warns when overriding + @deprecatedOverriding("Override `def onDownstreamFinish(cause: Throwable)`, instead.", since = "2.6.0") // warns when overriding @deprecated("Call onDownstreamFinish with a cancellation cause.", since = "2.6.0") // warns when calling def onDownstreamFinish(): Unit = { val thisStage = GraphInterpreter.currentInterpreter.activeStage