+str #15223 timed extra combinators for Duct

Note: not too happy about duplication of impl, but is required due to
the Flow and Duct not sharing any types (could use structural, not going there though).

Resolves #15223
This commit is contained in:
Konrad 'ktoso' Malawski 2014-05-19 15:10:13 +02:00
parent a23700969b
commit c3ed5902af
3 changed files with 108 additions and 11 deletions

View file

@ -51,4 +51,24 @@ object Implicits {
Timed.timedIntervalBetween[I](flow, matching, onInterval)
}
/**
* Provides time measurement utilities on Stream elements.
*
* See [[Timed]]
*/
implicit class TimedDuctDsl[I, O](val duct: Duct[I, O]) extends AnyVal {
/**
* Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`.
*/
def timed[Out](measuredOps: Duct[I, O] Duct[O, Out], onComplete: FiniteDuration Unit): Duct[O, Out] =
Timed.timed[I, O, Out](duct, measuredOps, onComplete)
/**
* Measures rolling interval between immediatly subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween(matching: O Boolean, onInterval: FiniteDuration Unit): Duct[I, O] =
Timed.timedIntervalBetween[I, O](duct, matching, onInterval)
}
}

View file

@ -3,7 +3,7 @@
*/
package akka.stream.extra
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{ Duct, Flow }
import scala.collection.immutable
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._
@ -32,6 +32,20 @@ private[akka] trait TimedOps {
userFlow.transform(new StopTimed(ctx, onComplete))
}
/**
* INTERNAL API
*
* Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`.
*/
def timed[I, O, Out](duct: Duct[I, O], measuredOps: Duct[I, O] Duct[O, Out], onComplete: FiniteDuration Unit): Duct[O, Out] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type)
val ctx = new TimedFlowContext
val startWithTime: Duct[I, O] = duct.transform(new StartTimedFlow(ctx))
val userFlow: Duct[O, Out] = measuredOps(startWithTime)
userFlow.transform(new StopTimed(ctx, onComplete))
}
}
/**
@ -49,6 +63,14 @@ private[akka] trait TimedIntervalBetweenOps {
def timedIntervalBetween[O](flow: Flow[O], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[O] = {
flow.transform(new TimedIntervalTransformer[O](matching, onInterval))
}
/**
* Measures rolling interval between immediatly subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween[I, O](duct: Duct[I, O], matching: O Boolean, onInterval: FiniteDuration Unit): Duct[I, O] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type)
duct.transform(new TimedIntervalTransformer[O](matching, onInterval))
}
}
object Timed extends TimedOps with TimedIntervalBetweenOps {
@ -95,11 +117,9 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
final class StopTimed[T](ctx: TimedFlowContext, _onComplete: FiniteDuration Unit) extends Transformer[T, T] {
override def name = "stopTimed"
override def onComplete(): immutable.Seq[T] = {
override def cleanup() {
val d = ctx.stop()
_onComplete(d)
Nil
}
override def onNext(element: T) = immutable.Seq(element)

View file

@ -3,9 +3,11 @@
*/
package akka.stream.extra
import akka.stream.testkit.{ ScriptedTest, AkkaSpec }
import akka.stream.testkit.{ AkkaConsumerProbe, StreamTestKit, ScriptedTest, AkkaSpec }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.testkit.TestProbe
import akka.stream.scaladsl.{ Flow, Duct }
import org.reactivestreams.api.{ Producer, Consumer }
class FlowTimedSpec extends AkkaSpec with ScriptedTest {
@ -20,22 +22,22 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
lazy val metricsConfig = system.settings.config
val gen = FlowMaterializer(settings)
val materializer = FlowMaterializer(settings)
"Timed" must {
"Timed Flow" must {
import akka.stream.extra.Implicits.TimedFlowDsl
"measure time it takes to go through intermediate" in {
"measure time it between elements matching a predicate" in {
val testActor = TestProbe()
val measureBetweenEvery = 100
val measureBetweenEvery = 5
val printInfo = (interval: Duration) {
testActor.ref ! interval
info(s"Measured interval between $measureBetweenEvery elements was: $interval")
}
val n = 500
val n = 20
val testRuns = 1 to 2
def script = Script((1 to n) map { x Seq(x) -> Seq(x) }: _*)
@ -52,7 +54,7 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
"measure time it takes from start to complete, by wrapping operations" in {
val testActor = TestProbe()
val n = 500
val n = 50
val printInfo = (d: FiniteDuration) {
testActor.ref ! d
info(s"Processing $n elements took $d")
@ -72,5 +74,60 @@ class FlowTimedSpec extends AkkaSpec with ScriptedTest {
"have a Java API" in pending
}
"Timed Duct" must {
import akka.stream.extra.Implicits.TimedDuctDsl
"measure time it between elements matching a predicate" in {
val probe = TestProbe()
val duct: Duct[Int, Long] = Duct[Int].map(_.toLong).timedIntervalBetween(in in % 2 == 1, d probe.ref ! d)
val c1 = StreamTestKit.consumerProbe[Long]()
val c2: Consumer[Int] = duct.produceTo(materializer, c1)
val p = Flow(List(1, 2, 3)).toProducer(materializer)
p.produceTo(c2)
val s = c1.expectSubscription()
s.requestMore(100)
c1.expectNext(1L)
c1.expectNext(2L)
c1.expectNext(3L)
c1.expectComplete()
val duration = probe.expectMsgType[Duration]
info(s"Got duration (first): $duration")
}
"measure time from start to complete, by wrapping operations" in {
val probe = TestProbe()
// making sure the types come out as expected
val duct: Duct[Int, String] =
Duct[Int].
timed(_.
map(_.toDouble).
map(_.toInt).
map(_.toString), duration probe.ref ! duration).
map { s: String s + "!" }
val (ductIn: Consumer[Int], ductOut: Producer[String]) = duct.build(materializer)
val c1 = StreamTestKit.consumerProbe[String]()
val c2 = ductOut.produceTo(c1)
val p = Flow(0 to 100).toProducer(materializer)
p.produceTo(ductIn)
val s = c1.expectSubscription()
s.requestMore(200)
0 to 100 foreach { i c1.expectNext(i.toString + "!") }
c1.expectComplete()
val duration = probe.expectMsgType[Duration]
info(s"Took: $duration")
}
}
}