From 26c25c89ed0a5f47c05f23215a7cd59e5be89910 Mon Sep 17 00:00:00 2001 From: Tal Pressman Date: Sun, 21 Feb 2016 13:03:00 +0200 Subject: [PATCH] +str #19390 Add 'monitor' flow combinator and stage --- akka-docs/rst/java/stream/stages-overview.rst | 12 +++ .../TwitterStreamQuickstartDocSpec.scala | 3 +- .../rst/scala/stream/stages-overview.rst | 12 +++ .../stream/scaladsl/FlowMonitorSpec.scala | 73 +++++++++++++++++++ .../main/scala/akka/stream/FlowMonitor.scala | 63 ++++++++++++++++ .../akka/stream/impl/fusing/GraphStages.scala | 67 +++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 13 +++- .../main/scala/akka/stream/javadsl/Sink.scala | 1 - .../scala/akka/stream/javadsl/Source.scala | 10 ++- .../scala/akka/stream/scaladsl/Flow.scala | 23 +++++- .../scala/akka/stream/scaladsl/Sink.scala | 3 +- project/MiMa.scala | 5 +- 12 files changed, 272 insertions(+), 13 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/FlowMonitor.scala diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 9bb6ce8c0f..753e4ac9ea 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -1084,3 +1084,15 @@ The stage otherwise passes through elements unchanged. **completes** when upstream completes +monitor +^^^^^^^ +Materializes to a ``FlowMonitor`` that monitors messages flowing through or completion of the stage. The stage otherwise +passes through elements unchanged. Note that the ``FlowMonitor`` inserts a memory barrier every time it processes an +event, and may therefore affect performance. + +**emits** when upstream emits an element + +**backpressures** when downstream **backpressures** + +**completes** when upstream completes + diff --git a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 818368a044..edf4828aa8 100644 --- a/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -32,8 +32,7 @@ object TwitterStreamQuickstartDocSpec { //#model //#tweet-source - val tweets: Source[Tweet, NotUsed] - //#tweet-source + val tweets: Source[Tweet, NotUsed] //#tweet-source = Source( Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 249fec4628..7fecd958cf 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -1075,3 +1075,15 @@ The stage otherwise passes through elements unchanged. **completes** when upstream completes +monitor +^^^^^^^ +Materializes to a ``FlowMonitor`` that monitors messages flowing through or completion of the stage. The stage otherwise +passes through elements unchanged. Note that the ``FlowMonitor`` inserts a memory barrier every time it processes an +event, and may therefore affect performance. + +**emits** when upstream emits an element + +**backpressures** when downstream **backpressures** + +**completes** when upstream completes + diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala new file mode 100644 index 0000000000..fd43f798da --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.{ TestSource, TestSink } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.testkit.AkkaSpec +import akka.stream.FlowMonitorState._ + +import scala.concurrent.duration._ + +class FlowMonitorSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + + implicit val materializer = ActorMaterializer(settings) + + "A FlowMonitor" must { + "return Finished when stream is completed" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + source.sendComplete() + awaitAssert(monitor.state == Finished, 3.seconds) + sink.expectSubscriptionAndComplete() + } + + "return Finished when stream is cancelled from downstream" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + sink.cancel() + awaitAssert(monitor.state == Finished, 3.seconds) + } + + "return Failed when stream fails, and propagate the error" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + val ex = new Exception("Source failed") + source.sendError(ex) + awaitAssert(monitor.state == Failed(ex), 3.seconds) + sink.expectSubscriptionAndError(ex) + } + + "return Initialized for an empty stream" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + awaitAssert(monitor.state == Initialized, 3.seconds) + source.expectRequest() + sink.expectSubscription() + } + + "return Received after receiving a message" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + val msg = "message" + source.sendNext(msg) + sink.requestNext(msg) + awaitAssert(monitor.state == Received(msg), 3.seconds) + } + + // Check a stream that processes StreamState messages specifically, to make sure the optimization in FlowMonitorImpl + // (to avoid allocating an object for each message) doesn't introduce a bug + "return Received after receiving a StreamState message" in { + val ((source, monitor), sink) = + TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run() + val msg = Received("message") + source.sendNext(msg) + sink.requestNext(msg) + awaitAssert(monitor.state == Received(msg), 3.seconds) + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala b/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala new file mode 100644 index 0000000000..3e8df4def9 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/FlowMonitor.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream + +import akka.stream.FlowMonitorState.StreamState + +/** + * Used to monitor the state of a stream + * + * @tparam T Type of messages passed by the stream + */ +trait FlowMonitor[+T] { + def state: StreamState[T] +} + +object FlowMonitorState { + sealed trait StreamState[+U] + + /** + * Stream was created, but no events have passed through it + */ + case object Initialized extends StreamState[Nothing] + + /** + * Java API + */ + def initialized[U](): StreamState[U] = Initialized + + /** + * Stream processed a message + * + * @param msg The processed message + */ + final case class Received[+U](msg: U) extends StreamState[U] + + /** + * Java API + */ + def received[U](msg: U): StreamState[U] = Received(msg) + + /** + * Stream failed + * + * @param cause The cause of the failure + */ + final case class Failed(cause: Throwable) extends StreamState[Nothing] + + /** + * Java API + */ + def failed[U](cause: Throwable): StreamState[U] = Failed(cause) + + /** + * Stream completed successfully + */ + case object Finished extends StreamState[Nothing] + + /** + * Java API + */ + def finished[U]() = Finished +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c973bb1fd4..88319b8e83 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.event.Logging +import akka.stream.FlowMonitorState._ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.impl.Stages.DefaultAttributes @@ -153,6 +154,72 @@ object GraphStages { def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] + private class FlowMonitorImpl[T] extends AtomicReference[Any](Initialized) with FlowMonitor[T] { + override def state = get match { + case s: StreamState[_] ⇒ s.asInstanceOf[StreamState[T]] + case msg ⇒ Received(msg.asInstanceOf[T]) + } + } + + private class MonitorFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] { + val in = Inlet[T]("FlowMonitor.in") + val out = Outlet[T]("FlowMonitor.out") + val shape = FlowShape.of(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = { + val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T] + + val logic: GraphStageLogic = new GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = { + val msg = grab(in) + push(out, msg) + monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg) + } + override def onUpstreamFinish(): Unit = { + super.onUpstreamFinish() + monitor.set(Finished) + } + override def onUpstreamFailure(ex: Throwable): Unit = { + super.onUpstreamFailure(ex) + monitor.set(Failed(ex)) + } + }) + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + override def onDownstreamFinish(): Unit = { + super.onDownstreamFinish() + monitor.set(Finished) + } + }) + + override def toString = "MonitorFlowLogic" + } + + (logic, monitor) + } + + override def toString = "MonitorFlow" + } + + def monitor[T]: GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] = + new MonitorFlow[T] + + private object TickSource { + class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { + private val cancelPromise = Promise[Done]() + + def cancelFuture: Future[Done] = cancelPromise.future + + override def cancel(): Boolean = { + if (!isCancelled) cancelPromise.trySuccess(Done) + true + } + + override def isCancelled: Boolean = cancelled.get() + } + } + final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 510366e513..477b7b1225 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -6,8 +6,8 @@ package akka.stream.javadsl import akka.{ NotUsed, Done } import akka.event.LoggingAdapter import akka.japi.{ function, Pair } -import akka.stream.impl.{ ConstantFun, StreamLayout } -import akka.stream.{ scaladsl, _ } +import akka.stream.impl.{ConstantFun, StreamLayout} +import akka.stream._ import akka.stream.stage.Stage import org.reactivestreams.Processor import scala.annotation.unchecked.uncheckedVariance @@ -1686,6 +1686,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = new Flow(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava))) + /** + * Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] = + new Flow(delegate.monitor()(combinerToScala(combine))) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 18335aff7d..5d9891e4ac 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -242,7 +242,6 @@ object Sink { */ def queue[T](): Sink[T, SinkQueue[T]] = new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_))) - } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index ced4097c10..55d6807e1a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -23,7 +23,6 @@ import scala.compat.java8.OptionConverters._ import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture import scala.compat.java8.FutureConverters._ -import akka.stream.impl.SourceQueueAdapter /** Java API */ object Source { @@ -1870,6 +1869,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = new Source(delegate.watchTermination()((left, right) ⇒ matF(left, right.toJava))) + /** + * Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] = + new Source(delegate.monitor()(combinerToScala(combine))) + /** * Delays the initial element by the specified duration. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0a8fd75a60..95696dd994 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -7,7 +7,7 @@ import akka.event.LoggingAdapter import akka.stream._ import akka.Done import akka.stream.impl.Stages.{ DirectProcessor, StageModule } -import akka.stream.impl.StreamLayout.{ Module } +import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } @@ -16,7 +16,7 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future -import scala.concurrent.duration.{ FiniteDuration } +import scala.concurrent.duration.FiniteDuration import scala.language.higherKinds import akka.stream.impl.fusing.FlattenMerge @@ -375,7 +375,10 @@ final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Mo /** * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only. * - * INTERNAL API: extending this trait is not supported under the binary compatibility rules for Akka. + * INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it! + * Do not implement this interface outside the Akka code base! + * + * Binary compatibility is only maintained for callers of this trait’s interface. */ trait FlowOps[+Out, +Mat] { import akka.stream.impl.Stages._ @@ -1803,7 +1806,10 @@ trait FlowOps[+Out, +Mat] { } /** - * INTERNAL API: extending this trait is not supported under the binary compatibility rules for Akka. + * INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it! + * Do not implement this interface outside the Akka code base! + * + * Binary compatibility is only maintained for callers of this trait’s interface. */ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { @@ -1995,6 +2001,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] + /** + * Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated + * by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an + * event, and may therefor affect performance. + * The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value. + */ + def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2] = + viaMat(GraphStages.monitor)(combine) + /** * INTERNAL API. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 1d32708bfd..d54cc724ec 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -194,7 +194,7 @@ object Sink { Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink") /** - * Combine several sinks with fun-out strategy like `Broadcast` or `Balance` and returns `Sink`. + * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. */ def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int ⇒ Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] = @@ -339,5 +339,4 @@ object Sink { */ def queue[T](): Sink[T, SinkQueue[T]] = Sink.fromGraph(new QueueSink()) - } diff --git a/project/MiMa.scala b/project/MiMa.scala index a67a9cbae3..fcfbb05253 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -738,7 +738,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail") + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail"), + + // #19390 Add flow monitor + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor") ) ) }