+str #19390 Add 'monitor' flow combinator and stage

This commit is contained in:
Tal Pressman 2016-02-21 13:03:00 +02:00 committed by Patrik Nordwall
parent bd8fcc9d9a
commit 26c25c89ed
12 changed files with 272 additions and 13 deletions

View file

@ -1084,3 +1084,15 @@ The stage otherwise passes through elements unchanged.
**completes** when upstream completes **completes** when upstream completes
monitor
^^^^^^^
Materializes to a ``FlowMonitor`` that monitors messages flowing through or completion of the stage. The stage otherwise
passes through elements unchanged. Note that the ``FlowMonitor`` inserts a memory barrier every time it processes an
event, and may therefore affect performance.
**emits** when upstream emits an element
**backpressures** when downstream **backpressures**
**completes** when upstream completes

View file

@ -32,8 +32,7 @@ object TwitterStreamQuickstartDocSpec {
//#model //#model
//#tweet-source //#tweet-source
val tweets: Source[Tweet, NotUsed] val tweets: Source[Tweet, NotUsed] //#tweet-source
//#tweet-source
= Source( = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") :: Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") :: Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::

View file

@ -1075,3 +1075,15 @@ The stage otherwise passes through elements unchanged.
**completes** when upstream completes **completes** when upstream completes
monitor
^^^^^^^
Materializes to a ``FlowMonitor`` that monitors messages flowing through or completion of the stage. The stage otherwise
passes through elements unchanged. Note that the ``FlowMonitor`` inserts a memory barrier every time it processes an
event, and may therefore affect performance.
**emits** when upstream emits an element
**backpressures** when downstream **backpressures**
**completes** when upstream completes

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.{ TestSource, TestSink }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.testkit.AkkaSpec
import akka.stream.FlowMonitorState._
import scala.concurrent.duration._
class FlowMonitorSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
"A FlowMonitor" must {
"return Finished when stream is completed" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
source.sendComplete()
awaitAssert(monitor.state == Finished, 3.seconds)
sink.expectSubscriptionAndComplete()
}
"return Finished when stream is cancelled from downstream" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
sink.cancel()
awaitAssert(monitor.state == Finished, 3.seconds)
}
"return Failed when stream fails, and propagate the error" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
val ex = new Exception("Source failed")
source.sendError(ex)
awaitAssert(monitor.state == Failed(ex), 3.seconds)
sink.expectSubscriptionAndError(ex)
}
"return Initialized for an empty stream" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
awaitAssert(monitor.state == Initialized, 3.seconds)
source.expectRequest()
sink.expectSubscription()
}
"return Received after receiving a message" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
val msg = "message"
source.sendNext(msg)
sink.requestNext(msg)
awaitAssert(monitor.state == Received(msg), 3.seconds)
}
// Check a stream that processes StreamState messages specifically, to make sure the optimization in FlowMonitorImpl
// (to avoid allocating an object for each message) doesn't introduce a bug
"return Received after receiving a StreamState message" in {
val ((source, monitor), sink) =
TestSource.probe[Any].monitor()(Keep.both).toMat(TestSink.probe[Any])(Keep.both).run()
val msg = Received("message")
source.sendNext(msg)
sink.requestNext(msg)
awaitAssert(monitor.state == Received(msg), 3.seconds)
}
}
}

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream
import akka.stream.FlowMonitorState.StreamState
/**
* Used to monitor the state of a stream
*
* @tparam T Type of messages passed by the stream
*/
trait FlowMonitor[+T] {
def state: StreamState[T]
}
object FlowMonitorState {
sealed trait StreamState[+U]
/**
* Stream was created, but no events have passed through it
*/
case object Initialized extends StreamState[Nothing]
/**
* Java API
*/
def initialized[U](): StreamState[U] = Initialized
/**
* Stream processed a message
*
* @param msg The processed message
*/
final case class Received[+U](msg: U) extends StreamState[U]
/**
* Java API
*/
def received[U](msg: U): StreamState[U] = Received(msg)
/**
* Stream failed
*
* @param cause The cause of the failure
*/
final case class Failed(cause: Throwable) extends StreamState[Nothing]
/**
* Java API
*/
def failed[U](cause: Throwable): StreamState[U] = Failed(cause)
/**
* Stream completed successfully
*/
case object Finished extends StreamState[Nothing]
/**
* Java API
*/
def finished[U]() = Finished
}

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.event.Logging import akka.event.Logging
import akka.stream.FlowMonitorState._
import akka.stream._ import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
@ -153,6 +154,72 @@ object GraphStages {
def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] =
TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]]
private class FlowMonitorImpl[T] extends AtomicReference[Any](Initialized) with FlowMonitor[T] {
override def state = get match {
case s: StreamState[_] s.asInstanceOf[StreamState[T]]
case msg Received(msg.asInstanceOf[T])
}
}
private class MonitorFlow[T] extends GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] {
val in = Inlet[T]("FlowMonitor.in")
val out = Outlet[T]("FlowMonitor.out")
val shape = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = {
val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T]
val logic: GraphStageLogic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val msg = grab(in)
push(out, msg)
monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg)
}
override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish()
monitor.set(Finished)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
super.onUpstreamFailure(ex)
monitor.set(Failed(ex))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
super.onDownstreamFinish()
monitor.set(Finished)
}
})
override def toString = "MonitorFlowLogic"
}
(logic, monitor)
}
override def toString = "MonitorFlow"
}
def monitor[T]: GraphStageWithMaterializedValue[FlowShape[T, T], FlowMonitor[T]] =
new MonitorFlow[T]
private object TickSource {
class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable {
private val cancelPromise = Promise[Done]()
def cancelFuture: Future[Done] = cancelPromise.future
override def cancel(): Boolean = {
if (!isCancelled) cancelPromise.trySuccess(Done)
true
}
override def isCancelled: Boolean = cancelled.get()
}
}
final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
override val shape = SourceShape(Outlet[T]("TickSource.out")) override val shape = SourceShape(Outlet[T]("TickSource.out"))

View file

@ -6,8 +6,8 @@ package akka.stream.javadsl
import akka.{ NotUsed, Done } import akka.{ NotUsed, Done }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair } import akka.japi.{ function, Pair }
import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.impl.{ConstantFun, StreamLayout}
import akka.stream.{ scaladsl, _ } import akka.stream._
import akka.stream.stage.Stage import akka.stream.stage.Stage
import org.reactivestreams.Processor import org.reactivestreams.Processor
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
@ -1686,6 +1686,15 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.watchTermination()((left, right) matF(left, right.toJava))) new Flow(delegate.watchTermination()((left, right) matF(left, right.toJava)))
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Flow[In, Out, M] =
new Flow(delegate.monitor()(combinerToScala(combine)))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -242,7 +242,6 @@ object Sink {
*/ */
def queue[T](): Sink[T, SinkQueue[T]] = def queue[T](): Sink[T, SinkQueue[T]] =
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_))) new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_)))
} }
/** /**

View file

@ -23,7 +23,6 @@ import scala.compat.java8.OptionConverters._
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import akka.stream.impl.SourceQueueAdapter
/** Java API */ /** Java API */
object Source { object Source {
@ -1870,6 +1869,15 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] =
new Source(delegate.watchTermination()((left, right) matF(left, right.toJava))) new Source(delegate.watchTermination()((left, right) matF(left, right.toJava)))
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
def monitor[M]()(combine: function.Function2[Mat, FlowMonitor[Out], M]): javadsl.Source[Out, M] =
new Source(delegate.monitor()(combinerToScala(combine)))
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *

View file

@ -7,7 +7,7 @@ import akka.event.LoggingAdapter
import akka.stream._ import akka.stream._
import akka.Done import akka.Done
import akka.stream.impl.Stages.{ DirectProcessor, StageModule } import akka.stream.impl.Stages.{ DirectProcessor, StageModule }
import akka.stream.impl.StreamLayout.{ Module } import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.fusing._ import akka.stream.impl.fusing._
import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
@ -16,7 +16,7 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.{ FiniteDuration } import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge import akka.stream.impl.fusing.FlattenMerge
@ -375,7 +375,10 @@ final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Mo
/** /**
* Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only. * Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only.
* *
* INTERNAL API: extending this trait is not supported under the binary compatibility rules for Akka. * INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it!
* Do not implement this interface outside the Akka code base!
*
* Binary compatibility is only maintained for callers of this traits interface.
*/ */
trait FlowOps[+Out, +Mat] { trait FlowOps[+Out, +Mat] {
import akka.stream.impl.Stages._ import akka.stream.impl.Stages._
@ -1803,7 +1806,10 @@ trait FlowOps[+Out, +Mat] {
} }
/** /**
* INTERNAL API: extending this trait is not supported under the binary compatibility rules for Akka. * INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it!
* Do not implement this interface outside the Akka code base!
*
* Binary compatibility is only maintained for callers of this traits interface.
*/ */
trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
@ -1995,6 +2001,15 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/ */
def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2]
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an
* event, and may therefor affect performance.
* The `combine` function is used to combine the `FlowMonitor` with this flow's materialized value.
*/
def monitor[Mat2]()(combine: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2] =
viaMat(GraphStages.monitor)(combine)
/** /**
* INTERNAL API. * INTERNAL API.
*/ */

View file

@ -194,7 +194,7 @@ object Sink {
Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink") Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
/** /**
* Combine several sinks with fun-out strategy like `Broadcast` or `Balance` and returns `Sink`. * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/ */
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] = def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
@ -339,5 +339,4 @@ object Sink {
*/ */
def queue[T](): Sink[T, SinkQueue[T]] = def queue[T](): Sink[T, SinkQueue[T]] =
Sink.fromGraph(new QueueSink()) Sink.fromGraph(new QueueSink())
} }

View file

@ -738,7 +738,10 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail"),
// #19390 Add flow monitor
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.monitor")
) )
) )
} }