diff --git a/akka-docs/rst/java/stream/stream-customize.rst b/akka-docs/rst/java/stream/stream-customize.rst index 3fb422796e..9ec4c44764 100644 --- a/akka-docs/rst/java/stream/stream-customize.rst +++ b/akka-docs/rst/java/stream/stream-customize.rst @@ -469,3 +469,16 @@ stage as state of an actor, and the callbacks as the ``receive`` block of the ac is unsafe to access the state of an actor from the outside. This means that Future callbacks should **not close over** internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined behavior. + + +Resources and the stage lifecycle +================================= + +If a stage manages a resource with a lifecycle, for example objects that need to be shutdown when they are not +used anymore it is important to make sure this will happen in all circumstances when the stage shuts down. + +Cleaning up resources should be done in ``GraphStageLogic.postStop`` and not in the ``InHandler`` and ``OutHandler`` +callbacks. The reason for this is that when the stage itself completes or is failed there is no signal from the upstreams +or the downstreams. Even for stages that do not complete or fail in this manner, this can happen when the +``Materializer`` is shutdown or the ``ActorSystem`` is terminated while a stream is still running, what is called an +"abrupt termination". \ No newline at end of file diff --git a/akka-docs/rst/scala/stream/stream-customize.rst b/akka-docs/rst/scala/stream/stream-customize.rst index 000750739e..8e0839c5bf 100644 --- a/akka-docs/rst/scala/stream/stream-customize.rst +++ b/akka-docs/rst/scala/stream/stream-customize.rst @@ -480,6 +480,18 @@ stage as state of an actor, and the callbacks as the ``receive`` block of the ac internal state of custom stages because such access can be concurrent with the provided callbacks, leading to undefined behavior. +Resources and the stage lifecycle +================================= + +If a stage manages a resource with a lifecycle, for example objects that need to be shutdown when they are not +used anymore it is important to make sure this will happen in all circumstances when the stage shuts down. + +Cleaning up resources should be done in ``GraphStageLogic.postStop`` and not in the ``InHandler`` and ``OutHandler`` +callbacks. The reason for this is that when the stage itself completes or is failed there is no signal from the upstreams +or the downstreams. Even for stages that do not complete or fail in this manner, this can happen when the +``Materializer`` is shutdown or the ``ActorSystem`` is terminated while a stream is still running, what is called an +"abrupt termination". + Extending Flow Combinators with Custom Operators ================================================ diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala index fafdf269fd..1d496aba0e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -5,14 +5,16 @@ package akka.stream.impl.fusing import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicInteger import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance.SpecViolation +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.scaladsl._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.stream.testkit.Utils._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.testkit.EventFilter +import akka.testkit.{ EventFilter, TestLatch } import scala.concurrent.Await import scala.concurrent.duration._ @@ -390,6 +392,41 @@ class ActorGraphInterpreterSpec extends StreamSpec { upstream.expectCancellation() } + "trigger postStop in all stages when abruptly terminated (and no upstream boundaries)" in { + val mat = ActorMaterializer() + val gotStop = TestLatch(1) + + object PostStopSnitchFlow extends SimpleLinearGraphStage[String] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + + override def postStop(): Unit = { + gotStop.countDown() + } + } + } + + val downstream = TestSubscriber.probe[String]() + + Source.repeat("whatever") + .via(PostStopSnitchFlow) + .to(Sink.fromSubscriber(downstream)) + .run()(mat) + + downstream.requestNext() + + mat.shutdown() + Await.ready(gotStop, remainingOrDefault) + + val propagatedError = downstream.expectError() + propagatedError shouldBe an[AbruptTerminationException] + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 9548bfc846..43b7824c9b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -15,7 +15,7 @@ import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Keep, Source, StreamConverters } import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSource -import akka.stream.testkit.{ GraphStageMessages, StreamSpec, TestSinkStage } +import akka.stream.testkit._ import akka.testkit.TestProbe import akka.util.ByteString @@ -227,18 +227,30 @@ class InputStreamSinkSpec extends StreamSpec(UnboundedMailboxConfig) { List.fill(5)(inputStream.read()) should ===(List(0, 100, 200, 255, -1)) inputStream.close() } - } - "fail to materialize with zero sized input buffer" in { - an[IllegalArgumentException] shouldBe thrownBy { - Source.single(byteString) - .runWith(StreamConverters.asInputStream(timeout).withAttributes(inputBuffer(0, 0))) - /* - With Source.single we test the code path in which the sink - itself throws an exception when being materialized. If - Source.empty is used, the same exception is thrown by - Materializer. - */ + "fail to materialize with zero sized input buffer" in { + an[IllegalArgumentException] shouldBe thrownBy { + Source.single(byteString) + .runWith(StreamConverters.asInputStream(timeout).withAttributes(inputBuffer(0, 0))) + /* + With Source.single we test the code path in which the sink + itself throws an exception when being materialized. If + Source.empty is used, the same exception is thrown by + Materializer. + */ + } + } + + "throw from inputstream read if terminated abruptly" in { + val mat = ActorMaterializer() + val probe = TestPublisher.probe[ByteString]() + val inputStream = Source.fromPublisher(probe).runWith(StreamConverters.asInputStream())(mat) + mat.shutdown() + + intercept[IOException] { + inputStream.read() + } } } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 646bc19252..36f054acac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -3,30 +3,27 @@ */ package akka.stream.io -import akka.{ Done, NotUsed } -import akka.actor.{ ActorSystem, Address, Kill } -import akka.io.Tcp._ -import akka.stream.scaladsl.Tcp.{ IncomingConnection, ServerBinding } -import akka.stream.scaladsl.{ Flow, _ } -import akka.stream.testkit.TestUtils.temporaryServerAddress - -import scala.util.control.NonFatal -import akka.stream.testkit.Utils._ -import akka.stream.testkit._ -import akka.stream._ -import akka.util.{ ByteString, Helpers } - -import scala.collection.immutable -import scala.concurrent.{ Await, Future, Promise } -import scala.concurrent.duration._ import java.net._ import java.util.concurrent.atomic.AtomicInteger +import akka.actor.{ ActorSystem, Kill } +import akka.io.Tcp._ +import akka.stream._ +import akka.stream.scaladsl.Tcp.{ IncomingConnection, ServerBinding } +import akka.stream.scaladsl.{ Flow, _ } +import akka.stream.testkit.TestUtils.temporaryServerAddress +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe } +import akka.util.ByteString +import akka.{ Done, NotUsed } import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.PatienceConfiguration.Timeout -import scala.util.Try +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future, Promise } +import scala.util.control.NonFatal class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.timeout = 2s") with TcpHelper { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index ac81bcd7bd..09367e1b97 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -4,12 +4,13 @@ package akka.stream.scaladsl import scala.concurrent.duration._ -import akka.actor.{ Actor, ActorRef, Props } +import akka.actor.{ Actor, ActorRef, Props, Status } import akka.stream.ActorMaterializer import akka.stream.Attributes.inputBuffer import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ +import akka.testkit.TestProbe object ActorRefBackpressureSinkSpec { val initMessage = "start" @@ -171,6 +172,21 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { } } + "signal failure on abrupt termination" in { + val mat = ActorMaterializer() + val probe = TestProbe() + + val sink = Sink + .actorRefWithAck[String](probe.ref, initMessage, ackMessage, completeMessage) + .withAttributes(inputBuffer(1, 1)) + + val maybe = Source.maybe[String].to(sink).run()(mat) + + probe.expectMsg(initMessage) + mat.shutdown() + probe.expectMsgType[Status.Failure] + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala index 1e018c1d13..25f892069c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala @@ -4,8 +4,8 @@ package akka.stream.scaladsl import akka.stream.testkit.StreamSpec -import akka.stream.testkit.scaladsl.{ TestSource, TestSink } -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, FlowMonitorState } import akka.stream.FlowMonitorState._ import scala.concurrent.duration._ @@ -69,5 +69,15 @@ class FlowMonitorSpec extends StreamSpec { awaitAssert(monitor.state == Received(msg), 3.seconds) } + "return Failed when stream is abruptly terminated" in { + val mat = ActorMaterializer() + val (source, monitor) = + TestSource.probe[Any].monitor()(Keep.both).to(Sink.ignore).run()(mat) + mat.shutdown() + + awaitAssert( + monitor.state shouldBe a[FlowMonitorState.Failed], remainingOrDefault) + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala index 163c2ee435..00bd179057 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOnCompleteSpec.scala @@ -78,6 +78,18 @@ class FlowOnCompleteSpec extends StreamSpec with ScriptedTest { onCompleteProbe.expectMsg(Success(Done)) } + "yield error on abrupt termination" in { + val mat = ActorMaterializer() + val onCompleteProbe = TestProbe() + val p = TestPublisher.manualProbe[Int]() + Source.fromPublisher(p).to(Sink.onComplete[Int](onCompleteProbe.ref ! _)).run()(mat) + val proc = p.expectSubscription() + proc.expectRequest() + mat.shutdown() + + onCompleteProbe.expectMsgType[Failure[_]] + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala index 26d8c4e500..e1e7bf5c17 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala @@ -65,6 +65,15 @@ class FlowWatchTerminationSpec extends StreamSpec { .expectComplete() } + "fail future when stream abruptly terminated" in { + val mat = ActorMaterializer() + + val (p, future) = TestSource.probe[Int].watchTermination()(Keep.both).to(Sink.ignore).run()(mat) + mat.shutdown() + + future.failed.futureValue shouldBe an[AbruptTerminationException] + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 0286e2d203..d33fd20d7b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -6,9 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ - -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream.{ AbruptStageTerminationException, AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -82,6 +80,18 @@ class HeadSinkSpec extends StreamSpec with ScriptedTest { Await.result(Source.empty[Int].runWith(Sink.headOption), 1.second) should be(None) } + "fail on abrupt termination" in { + val mat = ActorMaterializer() + val source = TestPublisher.probe() + val f = Source.fromPublisher(source) + .runWith(Sink.headOption)(mat) + mat.shutdown() + + // this one always fails with the AbruptTerminationException rather than the + // AbruptStageTerminationException for some reason + f.failed.futureValue shouldBe an[AbruptTerminationException] + } + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala index a552cc6fae..e4132b4cd1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SeqSinkSpec.scala @@ -3,10 +3,11 @@ */ package akka.stream.scaladsl -import akka.stream.testkit.StreamSpec -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.{ StreamSpec, TestPublisher } +import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } + import scala.collection.immutable -import scala.concurrent.{ Future, Await } +import scala.concurrent.{ Await, Future } class SeqSinkSpec extends StreamSpec { @@ -29,5 +30,14 @@ class SeqSinkSpec extends StreamSpec { val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) result should be(input) } + + "fail the future on abrupt termination" in { + val mat = ActorMaterializer() + val probe = TestPublisher.probe() + val future: Future[immutable.Seq[Int]] = + Source.fromPublisher(probe).runWith(Sink.seq)(mat) + mat.shutdown() + future.failed.futureValue shouldBe an[AbruptTerminationException] + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala index fa9456ae8f..0a207bca00 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala @@ -15,6 +15,7 @@ import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.testkit.{ StreamSpec, TestSubscriber } import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.TestLatch import akka.util.ByteString import scala.concurrent.{ Await, Future, Promise } @@ -247,6 +248,25 @@ class UnfoldResourceAsyncSourceSpec extends StreamSpec(UnboundedMailboxConfig) { c.expectNextN(60) c.expectError() } + + "close resource when stream is abruptly terminated" in { + val closeLatch = TestLatch(1) + val mat = ActorMaterializer() + val p = Source.unfoldResourceAsync[String, BufferedReader]( + open, + read, + reader ⇒ Future.successful { + closeLatch.countDown() + Done + }) + .runWith(Sink.asPublisher(false))(mat) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + + mat.shutdown() + + Await.ready(closeLatch, remainingOrDefault) + } } override def afterTermination(): Unit = { manyLinesFile.delete() diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index af96073c52..c6a837eafd 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -17,6 +17,7 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import akka.japi.function import akka.stream.impl.fusing.GraphInterpreterShell +import akka.stream.stage.GraphStageLogic import scala.util.control.NoStackTrace @@ -221,6 +222,15 @@ class MaterializationException(msg: String, cause: Throwable = null) extends Run final case class AbruptTerminationException(actor: ActorRef) extends RuntimeException(s"Processor actor [$actor] terminated abruptly") with NoStackTrace +/** + * Signal that the stage was abruptly terminated, usually seen as a call to `postStop` of the `GraphStageLogic` without + * any of the handler callbacks seeing completion or failure from upstream or cancellation from downstream. This can happen when + * the actor running the graph is killed, which happens when the materializer or actor system is terminated. + */ +final class AbruptStageTerminationException(logic: GraphStageLogic) + extends RuntimeException(s"GraphStage [$logic] terminated abruptly, caused by for example materializer or actor system termination.") + with NoStackTrace + object ActorMaterializerSettings { /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index bdb8ba9cd2..227fd93cdf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -8,7 +8,7 @@ import java.util import akka.actor._ import akka.annotation.InternalApi import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.{ Attributes, Inlet, SinkShape } +import akka.stream._ import akka.stream.Attributes.InputBuffer import akka.stream.stage._ @@ -34,6 +34,7 @@ import akka.stream.stage._ val buffer: util.Deque[In] = new util.ArrayDeque[In]() var acknowledgementReceived = false var completeReceived = false + var completionSignalled = false private def receive(evt: (ActorRef, Any)): Unit = { evt._2 match { @@ -65,6 +66,7 @@ import akka.stream.stage._ private def finish(): Unit = { ref ! onCompleteMessage + completionSignalled = true completeStage() } @@ -84,9 +86,16 @@ import akka.stream.stage._ override def onUpstreamFailure(ex: Throwable): Unit = { ref ! onFailureMessage(ex) + completionSignalled = true failStage(ex) } + override def postStop(): Unit = { + if (!completionSignalled) { + ref ! onFailureMessage(new AbruptStageTerminationException(this)) + } + } + setHandler(in, this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index eaf71962d5..2cf8de2a6b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -255,6 +255,10 @@ import akka.util.OptionVal failStage(ex) } + override def postStop(): Unit = { + if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this)) + } + setHandler(in, this) }, p.future) } @@ -297,6 +301,10 @@ import akka.util.OptionVal failStage(ex) } + override def postStop(): Unit = { + if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this)) + } + setHandler(in, this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 95e0894ba0..30b8ff475b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -17,6 +17,7 @@ import akka.Done import java.util.concurrent.CompletionStage import akka.annotation.InternalApi +import akka.util.OptionVal import scala.compat.java8.FutureConverters._ import scala.util.Try @@ -212,10 +213,14 @@ import scala.util.control.NonFatal def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var open = false var blockingStream: S = _ setHandler(out, this) - override def preStart(): Unit = blockingStream = create() + override def preStart(): Unit = { + blockingStream = create() + open = true + } @tailrec final override def onPull(): Unit = { @@ -245,16 +250,22 @@ import scala.util.control.NonFatal private def restartState(): Unit = { close(blockingStream) blockingStream = create() + open = true } private def closeStage(): Unit = try { close(blockingStream) + open = false completeStage() } catch { case NonFatal(ex) ⇒ failStage(ex) } + override def postStop(): Unit = { + if (open) close(blockingStream) + } + } override def toString = "UnfoldResourceSource" } @@ -273,6 +284,7 @@ import scala.util.control.NonFatal def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) var resource = Promise[S]() + var open = false implicit val context = ExecutionContexts.sameThreadExecutionContext setHandler(out, this) @@ -280,22 +292,21 @@ import scala.util.control.NonFatal override def preStart(): Unit = createStream(false) private def createStream(withPull: Boolean): Unit = { - val cb = getAsyncCallback[Try[S]] { + val createdCallback = getAsyncCallback[Try[S]] { case scala.util.Success(res) ⇒ + open = true resource.success(res) if (withPull) onPull() case scala.util.Failure(t) ⇒ failStage(t) } try { - create().onComplete(cb.invoke) + create().onComplete(createdCallback.invoke) } catch { case NonFatal(ex) ⇒ failStage(ex) } } - private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach { - resource ⇒ f(resource) - } + private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach(f) val errorHandler: PartialFunction[Throwable, Unit] = { case NonFatal(ex) ⇒ decider(ex) match { @@ -306,7 +317,8 @@ import scala.util.control.NonFatal case Supervision.Resume ⇒ onPull() } } - val callback = getAsyncCallback[Try[Option[T]]] { + + val readCallback = getAsyncCallback[Try[Option[T]]] { case scala.util.Success(data) ⇒ data match { case Some(d) ⇒ push(out, d) case None ⇒ closeStage() @@ -314,22 +326,26 @@ import scala.util.control.NonFatal case scala.util.Failure(t) ⇒ errorHandler(t) }.invoke _ - final override def onPull(): Unit = onResourceReady { - case resource ⇒ - try { readData(resource).onComplete(callback) } catch errorHandler - } + final override def onPull(): Unit = + onResourceReady { resource ⇒ + try { readData(resource).onComplete(readCallback) } catch errorHandler + } override def onDownstreamFinish(): Unit = closeStage() private def closeAndThen(f: () ⇒ Unit): Unit = { setKeepGoing(true) - val cb = getAsyncCallback[Try[Done]] { - case scala.util.Success(_) ⇒ f() - case scala.util.Failure(t) ⇒ failStage(t) + val closedCallback = getAsyncCallback[Try[Done]] { + case scala.util.Success(_) ⇒ + open = false + f() + case scala.util.Failure(t) ⇒ + open = false + failStage(t) } onResourceReady(res ⇒ - try { close(res).onComplete(cb.invoke) } catch { + try { close(res).onComplete(closedCallback.invoke) } catch { case NonFatal(ex) ⇒ failStage(ex) }) } @@ -337,7 +353,11 @@ import scala.util.control.NonFatal resource = Promise[S]() createStream(true) }) - private def closeStage(): Unit = closeAndThen(completeStage _) + private def closeStage(): Unit = closeAndThen(completeStage) + + override def postStop(): Unit = { + if (open) closeStage() + } } override def toString = "UnfoldResourceSourceAsync" diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 066503a027..0216e90cc1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -546,7 +546,7 @@ import scala.util.control.NonFatal if (enabled) shutdownCounter(logic.stageId) |= KeepGoingFlag else shutdownCounter(logic.stageId) &= KeepGoingMask - private def finalizeStage(logic: GraphStageLogic): Unit = { + private[stream] def finalizeStage(logic: GraphStageLogic): Unit = { try { logic.postStop() logic.afterPostStop() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c26c137d73..30848abc29 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -138,6 +138,10 @@ import scala.concurrent.{ Future, Promise } completeStage() } + override def postStop(): Unit = { + if (!finishPromise.isCompleted) finishPromise.failure(new AbruptStageTerminationException(this)) + } + setHandlers(in, out, this) }, finishPromise.future) } @@ -188,6 +192,13 @@ import scala.concurrent.{ Future, Promise } monitor.set(Finished) } + override def postStop(): Unit = { + monitor.state match { + case Finished | _: Failed ⇒ + case _ ⇒ monitor.set(Failed(new AbruptStageTerminationException(this))) + } + } + setHandler(in, this) setHandler(out, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index e0752e6f7b..f3347e3f61 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -11,7 +11,7 @@ import akka.stream.Attributes.InputBuffer import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.InputStreamSinkStage._ import akka.stream.stage._ -import akka.stream.{ Attributes, Inlet, SinkShape } +import akka.stream.{ AbruptStageTerminationException, Attributes, Inlet, SinkShape } import akka.util.ByteString import scala.annotation.tailrec @@ -51,6 +51,8 @@ private[stream] object InputStreamSinkStage { val logic = new GraphStageLogic(shape) with StageWithCallback with InHandler { + var completionSignalled = false + private val callback: AsyncCallback[AdapterToStageMessage] = getAsyncCallback { case ReadElementAcknowledgement ⇒ sendPullIfAllowed() @@ -77,15 +79,22 @@ private[stream] object InputStreamSinkStage { override def onUpstreamFinish(): Unit = { dataQueue.add(Finished) + completionSignalled = true completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { dataQueue.add(Failed(ex)) + completionSignalled = true failStage(ex) } + override def postStop(): Unit = { + if (!completionSignalled) dataQueue.add(Failed(new AbruptStageTerminationException(this))) + } + setHandler(in, this) + } (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index aa81c21dca..977f56b3c0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -116,14 +116,6 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration } setHandler(out, new OutHandler { - override def onDownstreamFinish(): Unit = { - //assuming there can be no further in messages - downstreamStatus.set(Canceled) - dataQueue.clear() - // if blocked reading, make sure the take() completes - dataQueue.put(ByteString.empty) - completeStage() - } override def onPull(): Unit = { implicit val ec = dispatcher Future { @@ -143,6 +135,11 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration }) override def postStop(): Unit = { + //assuming there can be no further in messages + downstreamStatus.set(Canceled) + dataQueue.clear() + // if blocked reading, make sure the take() completes + dataQueue.put(ByteString.empty) // interrupt any pending blocking take if (blockingThread != null) blockingThread.interrupt() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 63799abcb7..a202c77587 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -54,7 +54,7 @@ import scala.util.Try val connectionFlowsAwaitingInitialization = new AtomicLong() var listener: ActorRef = _ - var unbindPromise = Promise[Unit]() + val unbindPromise = Promise[Unit]() var unbindStarted = false override def preStart(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index f61ffa605d..c5ee665b90 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -296,21 +296,30 @@ object Sink { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + var completionSignalled = false + override def onPush(): Unit = pull(in) override def onPull(): Unit = pull(in) override def onUpstreamFailure(cause: Throwable): Unit = { callback(Failure(cause)) + completionSignalled = true failStage(cause) } override def onUpstreamFinish(): Unit = { callback(Success(Done)) + completionSignalled = true completeStage() } + override def postStop(): Unit = { + if (!completionSignalled) callback(Failure(new AbruptStageTerminationException(this))) + } + setHandlers(in, out, this) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 4659917dc8..4abebefb03 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -217,9 +217,13 @@ object GraphStageLogic { * * The lifecycle hooks [[preStart()]] and [[postStop()]] * * Methods for performing stream processing actions, like pulling or pushing elements * - * The stage logic is always once all its input and output ports have been closed, i.e. it is not possible to - * keep the stage alive for further processing once it does not have any open ports. This can be changed by - * overriding `keepGoingAfterAllPortsClosed` to return true. + * The stage logic is completed once all its input and output ports have been closed. This can be changed by + * setting `setKeepGoing` to true. + * + * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down + * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never + * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource + * cleanup should always be done in `postStop`. */ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) { import GraphInterpreter._ @@ -538,7 +542,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Automatically invokes [[cancel()]] or [[complete()]] on all the input or output ports that have been called, - * then stops the stage, then [[postStop()]] is called. + * then marks the stage as stopped. */ final def completeStage(): Unit = { var i = 0 @@ -556,7 +560,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: /** * Automatically invokes [[cancel()]] or [[fail()]] on all the input or output ports that have been called, - * then stops the stage, then [[postStop()]] is called. + * then marks the stage as stopped. */ final def failStage(ex: Throwable): Unit = { var i = 0