diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala index 259ae58ba2..232c5c5b7d 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala @@ -51,4 +51,24 @@ object Implicits { Timed.timedIntervalBetween[I](flow, matching, onInterval) } + /** + * Provides time measurement utilities on Stream elements. + * + * See [[Timed]] + */ + implicit class TimedDuctDsl[I, O](val duct: Duct[I, O]) extends AnyVal { + + /** + * Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`. + */ + def timed[Out](measuredOps: Duct[I, O] ⇒ Duct[O, Out], onComplete: FiniteDuration ⇒ Unit): Duct[O, Out] = + Timed.timed[I, O, Out](duct, measuredOps, onComplete) + + /** + * Measures rolling interval between immediatly subsequent `matching(o: O)` elements. + */ + def timedIntervalBetween(matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Duct[I, O] = + Timed.timedIntervalBetween[I, O](duct, matching, onInterval) + } + } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index bb73f11a30..bb6ca9343e 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -3,7 +3,7 @@ */ package akka.stream.extra -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{ Duct, Flow } import scala.collection.immutable import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ @@ -32,6 +32,20 @@ private[akka] trait TimedOps { userFlow.transform(new StopTimed(ctx, onComplete)) } + /** + * INTERNAL API + * + * Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`. + */ + def timed[I, O, Out](duct: Duct[I, O], measuredOps: Duct[I, O] ⇒ Duct[O, Out], onComplete: FiniteDuration ⇒ Unit): Duct[O, Out] = { + // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) + val ctx = new TimedFlowContext + + val startWithTime: Duct[I, O] = duct.transform(new StartTimedFlow(ctx)) + val userFlow: Duct[O, Out] = measuredOps(startWithTime) + userFlow.transform(new StopTimed(ctx, onComplete)) + } + } /** @@ -49,6 +63,14 @@ private[akka] trait TimedIntervalBetweenOps { def timedIntervalBetween[O](flow: Flow[O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Flow[O] = { flow.transform(new TimedIntervalTransformer[O](matching, onInterval)) } + + /** + * Measures rolling interval between immediatly subsequent `matching(o: O)` elements. + */ + def timedIntervalBetween[I, O](duct: Duct[I, O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Duct[I, O] = { + // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) + duct.transform(new TimedIntervalTransformer[O](matching, onInterval)) + } } object Timed extends TimedOps with TimedIntervalBetweenOps { @@ -95,11 +117,9 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { final class StopTimed[T](ctx: TimedFlowContext, _onComplete: FiniteDuration ⇒ Unit) extends Transformer[T, T] { override def name = "stopTimed" - override def onComplete(): immutable.Seq[T] = { + override def cleanup() { val d = ctx.stop() _onComplete(d) - - Nil } override def onNext(element: T) = immutable.Seq(element) diff --git a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala index db22e461ef..3d6df4d730 100644 --- a/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/extra/FlowTimedSpec.scala @@ -3,9 +3,11 @@ */ package akka.stream.extra -import akka.stream.testkit.{ ScriptedTest, AkkaSpec } +import akka.stream.testkit.{ AkkaConsumerProbe, StreamTestKit, ScriptedTest, AkkaSpec } import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.testkit.TestProbe +import akka.stream.scaladsl.{ Flow, Duct } +import org.reactivestreams.api.{ Producer, Consumer } class FlowTimedSpec extends AkkaSpec with ScriptedTest { @@ -20,22 +22,22 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { lazy val metricsConfig = system.settings.config - val gen = FlowMaterializer(settings) + val materializer = FlowMaterializer(settings) - "Timed" must { + "Timed Flow" must { import akka.stream.extra.Implicits.TimedFlowDsl - "measure time it takes to go through intermediate" in { + "measure time it between elements matching a predicate" in { val testActor = TestProbe() - val measureBetweenEvery = 100 + val measureBetweenEvery = 5 val printInfo = (interval: Duration) ⇒ { testActor.ref ! interval info(s"Measured interval between $measureBetweenEvery elements was: $interval") } - val n = 500 + val n = 20 val testRuns = 1 to 2 def script = Script((1 to n) map { x ⇒ Seq(x) -> Seq(x) }: _*) @@ -52,7 +54,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { "measure time it takes from start to complete, by wrapping operations" in { val testActor = TestProbe() - val n = 500 + val n = 50 val printInfo = (d: FiniteDuration) ⇒ { testActor.ref ! d info(s"Processing $n elements took $d") @@ -72,5 +74,60 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest { "have a Java API" in pending } + "Timed Duct" must { + import akka.stream.extra.Implicits.TimedDuctDsl + + "measure time it between elements matching a predicate" in { + val probe = TestProbe() + + val duct: Duct[Int, Long] = Duct[Int].map(_.toLong).timedIntervalBetween(in ⇒ in % 2 == 1, d ⇒ probe.ref ! d) + + val c1 = StreamTestKit.consumerProbe[Long]() + val c2: Consumer[Int] = duct.produceTo(materializer, c1) + + val p = Flow(List(1, 2, 3)).toProducer(materializer) + p.produceTo(c2) + + val s = c1.expectSubscription() + s.requestMore(100) + c1.expectNext(1L) + c1.expectNext(2L) + c1.expectNext(3L) + c1.expectComplete() + + val duration = probe.expectMsgType[Duration] + info(s"Got duration (first): $duration") + } + + "measure time from start to complete, by wrapping operations" in { + val probe = TestProbe() + + // making sure the types come out as expected + val duct: Duct[Int, String] = + Duct[Int]. + timed(_. + map(_.toDouble). + map(_.toInt). + map(_.toString), duration ⇒ probe.ref ! duration). + map { s: String ⇒ s + "!" } + + val (ductIn: Consumer[Int], ductOut: Producer[String]) = duct.build(materializer) + + val c1 = StreamTestKit.consumerProbe[String]() + val c2 = ductOut.produceTo(c1) + + val p = Flow(0 to 100).toProducer(materializer) + p.produceTo(ductIn) + + val s = c1.expectSubscription() + s.requestMore(200) + 0 to 100 foreach { i ⇒ c1.expectNext(i.toString + "!") } + c1.expectComplete() + + val duration = probe.expectMsgType[Duration] + info(s"Took: $duration") + } + } + }