diff --git a/akka-docs/rst/general/stream/stages-overview.rst b/akka-docs/rst/general/stream/stages-overview.rst index a918ea2acc..bc1474d2f5 100644 --- a/akka-docs/rst/general/stream/stages-overview.rst +++ b/akka-docs/rst/general/stream/stages-overview.rst @@ -149,6 +149,19 @@ broadcast all of the outputs stops backpressuring and there is an i balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== +Watching status stages +^^^^^^^^^^^^^^^^^^^^^^ + +Materializes to a Future that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed. +The stage otherwise passes through elements unchanged. + +===================== ======================================================================== ========================================================== ===================================================================================== +Stage Emits when Backpressures when Completes when +===================== ======================================================================== ========================================================== ===================================================================================== +watchTermination input has an element available output backpressures upstream completes +===================== ======================================================================== ========================================================== ===================================================================================== + + .. [1] If a Future fails, the stream also fails (unless a different supervision strategy is applied) .. [2] Except if the encapsulated computation is not fast enough .. [3] Until the end of stream it is not possible to know whether new substreams will be needed or not diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 324caee47d..353d3a3507 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -508,6 +508,16 @@ public class FlowTest extends StreamTest { assertEquals(input, result); } + @Test + public void mustBeAbleToUseWatchTermination() throws Exception { + final List input = Arrays.asList("A", "B", "C"); + Future future = Source.from(input) + .watchTermination(Keep.>right()) + .to(Sink.ignore()).run(materializer); + + assertEquals(Done.getInstance(), Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS))); + } + @Test public void mustBeAbleToUseConflate() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala new file mode 100644 index 0000000000..56423e017f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWatchTerminationSpec.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.Status.Failure +import akka.Done +import akka.pattern.pipe +import akka.stream._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import org.scalactic.ConversionCheckedTripleEquals +import org.scalatest.concurrent.ScalaFutures + +import scala.util.control.NoStackTrace +import scala.concurrent.duration._ + +class FlowWatchTerminationSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals { + + val settings = ActorMaterializerSettings(system) + + implicit val materializer = ActorMaterializer(settings) + implicit val patience = PatienceConfig(3.seconds) + + "A WatchTermination" must { + + "complete future when stream is completed" in assertAllStagesStopped { + val (future, p) = Source(1 to 4).watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run() + p.request(4).expectNext(1, 2, 3, 4) + future.futureValue should ===(Done) + p.expectComplete() + } + + "complete future when stream is cancelled from downstream" in assertAllStagesStopped { + val (future, p) = Source(1 to 4).watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run() + p.request(3).expectNext(1, 2, 3).cancel() + future.futureValue should ===(Done) + } + + "fail future when stream is failed" in assertAllStagesStopped { + val ex = new RuntimeException("Stream failed.") with NoStackTrace + val (p, future) = TestSource.probe[Int].watchTermination()(Keep.both).to(Sink.ignore).run() + p.sendNext(1) + p.sendError(ex) + whenReady(future.failed) { _ shouldBe (ex) } + } + + "complete the future for an empty stream" in assertAllStagesStopped { + val (future, p) = Source.empty[Int].watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run() + p.request(1) + future.futureValue should ===(Done) + } + + "complete future for graph" in assertAllStagesStopped { + implicit val ec = system.dispatcher + + val ((sourceProbe, future), sinkProbe) = TestSource.probe[Int].watchTermination()(Keep.both).concat(Source(2 to 5)).toMat(TestSink.probe[Int])(Keep.both).run() + future.pipeTo(testActor) + sinkProbe.request(5) + sourceProbe.sendNext(1) + sinkProbe.expectNext(1) + expectNoMsg(300.millis) + + sourceProbe.sendComplete() + expectMsg(Done) + + sinkProbe.expectNextN(2 to 5) + .expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala index abffbb5b05..6eb8930258 100644 --- a/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala +++ b/akka-stream/src/main/scala/akka/stream/StreamLimitReachedException.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ package akka.stream class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index cb6688deab..572eb60c5d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -75,6 +75,8 @@ private[stream] object Stages { val unfoldAsync = name("unfoldAsync") val delay = name("delay") and inputBuffer(16, 16) + val terminationWatcher = name("terminationWatcher") + val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") val futureSource = name("futureSource") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index ab360ad898..c9b0d266f6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -3,8 +3,8 @@ */ package akka.stream.impl.fusing -import java.util.concurrent.atomic.AtomicBoolean import akka.Done +import java.util.concurrent.atomic.AtomicBoolean import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.event.Logging @@ -225,6 +225,45 @@ object GraphStages { } } + private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] { + val in = Inlet[Any]("terminationWatcher.in") + val out = Outlet[Any]("terminationWatcher.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.terminationWatcher + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val finishPromise = Promise[Done]() + + (new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + + override def onUpstreamFinish(): Unit = { + finishPromise.success(Done) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + finishPromise.failure(ex) + failStage(ex) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(): Unit = { + finishPromise.success(Done) + completeStage() + } + }) + }, finishPromise.future) + } + + override def toString = "TerminationWatcher" + } + + def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = + TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] + final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 739094c194..5efe0b0f86 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -3,7 +3,7 @@ */ package akka.stream.javadsl -import akka.NotUsed +import akka.{ NotUsed, Done } import akka.event.LoggingAdapter import akka.japi.{ function, Pair } import akka.stream.impl.Timers.{ DelayInitial, IdleInject } @@ -1556,6 +1556,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends */ def detach: javadsl.Flow[In, Out, Mat] = new Flow(delegate.detach) + /** + * Materializes to `Future[Done]` that completes on getting termination message. + * The Future completes with success when received complete message from upstream or cancel + * from downstream. It fails with the same error when received error message from + * downstream. + */ + def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Flow[In, Out, M] = + new Flow(delegate.watchTermination()(combinerToScala(matF))) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c19ee74240..9b4d1372d2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -13,7 +13,7 @@ import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream.Attributes._ import akka.stream._ -import akka.stream.impl.fusing.Delay +import akka.stream.impl.fusing.{ GraphStages, Delay } import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.stage.Stage import akka.util.ByteString @@ -1721,6 +1721,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap */ def detach: javadsl.Source[Out, Mat] = new Source(delegate.detach) + /** + * Materializes to `Future[Done]` that completes on getting termination message. + * The Future completes with success when received complete message from upstream or cancel + * from downstream. It fails with the same error when received error message from + * downstream. + */ + def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Source[Out, M] = + new Source(delegate.watchTermination()(combinerToScala(matF))) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index efa431e473..f03c282d97 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -6,9 +6,11 @@ package akka.stream.scaladsl import akka.event.LoggingAdapter import akka.stream.Attributes._ import akka.stream._ +import akka.Done import akka.stream.impl.Stages.{ DirectProcessor, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl._ +import akka.stream.impl.fusing.GraphStages.TerminationWatcher import akka.stream.impl.fusing._ import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ @@ -1851,6 +1853,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[Out, Mat3] = viaMat(alsoToGraph(that))(matF) + /** + * Materializes to `Future[Done]` that completes on getting termination message. + * The Future completes with success when received complete message from upstream or cancel + * from downstream. It fails with the same error when received error message from + * downstream. + */ + def watchTermination[Mat2]()(matF: (Mat, Future[Done]) ⇒ Mat2): ReprMat[Out, Mat2] = + viaMat(GraphStages.terminationWatcher)(matF) + /** * INTERNAL API. */