Merge pull request #19498 from agolubev/agolubev-#18565-TerminationNotification-combinator

+str #18565 termination notification combinator
This commit is contained in:
drewhk 2016-01-25 10:52:48 +01:00
commit 41d8b1fadd
9 changed files with 173 additions and 3 deletions

View file

@ -149,6 +149,19 @@ broadcast all of the outputs stops backpressuring and there is an i
balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes
===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== ===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Watching status stages
^^^^^^^^^^^^^^^^^^^^^^
Materializes to a Future that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed.
The stage otherwise passes through elements unchanged.
===================== ======================================================================== ========================================================== =====================================================================================
Stage Emits when Backpressures when Completes when
===================== ======================================================================== ========================================================== =====================================================================================
watchTermination input has an element available output backpressures upstream completes
===================== ======================================================================== ========================================================== =====================================================================================
.. [1] If a Future fails, the stream also fails (unless a different supervision strategy is applied) .. [1] If a Future fails, the stream also fails (unless a different supervision strategy is applied)
.. [2] Except if the encapsulated computation is not fast enough .. [2] Except if the encapsulated computation is not fast enough
.. [3] Until the end of stream it is not possible to know whether new substreams will be needed or not .. [3] Until the end of stream it is not possible to know whether new substreams will be needed or not

View file

@ -487,6 +487,16 @@ public class FlowTest extends StreamTest {
assertEquals(input, result); assertEquals(input, result);
} }
@Test
public void mustBeAbleToUseWatchTermination() throws Exception {
final List<String> input = Arrays.asList("A", "B", "C");
Future<Done> future = Source.from(input)
.watchTermination(Keep.<NotUsed, Future<Done>>right())
.to(Sink.ignore()).run(materializer);
assertEquals(Done.getInstance(), Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS)));
}
@Test @Test
public void mustBeAbleToUseConflate() throws Exception { public void mustBeAbleToUseConflate() throws Exception {
final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe = new JavaTestKit(system);

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor.Status.Failure
import akka.Done
import akka.pattern.pipe
import akka.stream._
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
import scala.util.control.NoStackTrace
import scala.concurrent.duration._
class FlowWatchTerminationSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
implicit val patience = PatienceConfig(3.seconds)
"A WatchTermination" must {
"complete future when stream is completed" in assertAllStagesStopped {
val (future, p) = Source(1 to 4).watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run()
p.request(4).expectNext(1, 2, 3, 4)
future.futureValue should ===(Done)
p.expectComplete()
}
"complete future when stream is cancelled from downstream" in assertAllStagesStopped {
val (future, p) = Source(1 to 4).watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run()
p.request(3).expectNext(1, 2, 3).cancel()
future.futureValue should ===(Done)
}
"fail future when stream is failed" in assertAllStagesStopped {
val ex = new RuntimeException("Stream failed.") with NoStackTrace
val (p, future) = TestSource.probe[Int].watchTermination()(Keep.both).to(Sink.ignore).run()
p.sendNext(1)
p.sendError(ex)
whenReady(future.failed) { _ shouldBe (ex) }
}
"complete the future for an empty stream" in assertAllStagesStopped {
val (future, p) = Source.empty[Int].watchTermination()(Keep.right).toMat(TestSink.probe[Int])(Keep.both).run()
p.request(1)
future.futureValue should ===(Done)
}
"complete future for graph" in assertAllStagesStopped {
implicit val ec = system.dispatcher
val ((sourceProbe, future), sinkProbe) = TestSource.probe[Int].watchTermination()(Keep.both).concat(Source(2 to 5)).toMat(TestSink.probe[Int])(Keep.both).run()
future.pipeTo(testActor)
sinkProbe.request(5)
sourceProbe.sendNext(1)
sinkProbe.expectNext(1)
expectNoMsg(300.millis)
sourceProbe.sendComplete()
expectMsg(Done)
sinkProbe.expectNextN(2 to 5)
.expectComplete()
}
}
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream package akka.stream
class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached") class StreamLimitReachedException(val n: Long) extends RuntimeException(s"limit of $n reached")

View file

@ -75,6 +75,8 @@ private[stream] object Stages {
val unfoldAsync = name("unfoldAsync") val unfoldAsync = name("unfoldAsync")
val delay = name("delay") and inputBuffer(16, 16) val delay = name("delay") and inputBuffer(16, 16)
val terminationWatcher = name("terminationWatcher")
val publisherSource = name("publisherSource") val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource") val iterableSource = name("iterableSource")
val futureSource = name("futureSource") val futureSource = name("futureSource")

View file

@ -3,8 +3,8 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import java.util.concurrent.atomic.AtomicBoolean
import akka.Done import akka.Done
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.event.Logging import akka.event.Logging
@ -225,6 +225,45 @@ object GraphStages {
} }
} }
private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] {
val in = Inlet[Any]("terminationWatcher.in")
val out = Outlet[Any]("terminationWatcher.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.terminationWatcher
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val finishPromise = Promise[Done]()
(new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = {
finishPromise.success(Done)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
finishPromise.failure(ex)
failStage(ex)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
finishPromise.success(Done)
completeStage()
}
})
}, finishPromise.future)
}
override def toString = "TerminationWatcher"
}
def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] =
TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]]
final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
override val shape = SourceShape(Outlet[T]("TickSource.out")) override val shape = SourceShape(Outlet[T]("TickSource.out"))

View file

@ -3,7 +3,7 @@
*/ */
package akka.stream.javadsl package akka.stream.javadsl
import akka.NotUsed import akka.{ NotUsed, Done }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair } import akka.japi.{ function, Pair }
import akka.stream.impl.Timers.{ DelayInitial, IdleInject } import akka.stream.impl.Timers.{ DelayInitial, IdleInject }
@ -1557,6 +1557,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*/ */
def detach: javadsl.Flow[In, Out, Mat] = new Flow(delegate.detach) def detach: javadsl.Flow[In, Out, Mat] = new Flow(delegate.detach)
/**
* Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.watchTermination()(combinerToScala(matF)))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -12,7 +12,7 @@ import akka.event.LoggingAdapter
import akka.japi.{ Pair, Util, function } import akka.japi.{ Pair, Util, function }
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream._ import akka.stream._
import akka.stream.impl.fusing.Delay import akka.stream.impl.fusing.{ GraphStages, Delay }
import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.stage.Stage import akka.stream.stage.Stage
import akka.util.ByteString import akka.util.ByteString
@ -1732,6 +1732,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*/ */
def detach: javadsl.Source[Out, Mat] = new Source(delegate.detach) def detach: javadsl.Source[Out, Mat] = new Source(delegate.detach)
/**
* Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()(combinerToScala(matF)))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -6,9 +6,11 @@ package akka.stream.scaladsl
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream._ import akka.stream._
import akka.Done
import akka.stream.impl.Stages.{ DirectProcessor, StageModule } import akka.stream.impl.Stages.{ DirectProcessor, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages.TerminationWatcher
import akka.stream.impl.fusing._ import akka.stream.impl.fusing._
import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
import akka.stream.stage._ import akka.stream.stage._
@ -1851,6 +1853,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] = def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) Mat3): ReprMat[Out, Mat3] =
viaMat(alsoToGraph(that))(matF) viaMat(alsoToGraph(that))(matF)
/**
* Materializes to `Future[Done]` that completes on getting termination message.
* The Future completes with success when received complete message from upstream or cancel
* from downstream. It fails with the same error when received error message from
* downstream.
*/
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.terminationWatcher)(matF)
/** /**
* INTERNAL API. * INTERNAL API.
*/ */