From ada0d31ec7e2cb1454699d23b8faa8d28da7e8dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Fri, 17 Jul 2015 17:03:48 +0300 Subject: [PATCH] =doc #18012 rate transformation docs --- akka-docs-dev/rst/java/stream-rate.rst | 24 +++- .../stream/RateTransformationDocSpec.scala | 104 ++++++++++++++++++ akka-docs-dev/rst/scala/stream-rate.rst | 24 +++- 3 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 akka-docs-dev/rst/scala/code/docs/stream/RateTransformationDocSpec.scala diff --git a/akka-docs-dev/rst/java/stream-rate.rst b/akka-docs-dev/rst/java/stream-rate.rst index 48289d5447..2ce8a1accc 100644 --- a/akka-docs-dev/rst/java/stream-rate.rst +++ b/akka-docs-dev/rst/java/stream-rate.rst @@ -141,9 +141,29 @@ Rate transformation Understanding conflate ---------------------- -*TODO* +When a fast producer can not be informed to slow down by backpressure or some other signal, conflate might be useful to combine elements from a producer until a demand signal comes from a consumer. + +Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements that have arrived while the stats have been calculated. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/RateTransformationDocTest.java#conflate-summarize + +This example demonstrates that such flow's rate is decoupled. Element rate at the start of the flow can be much higher that the element rate at the end of the flow. + +Another possible use of conflate is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how conflate can be used to implement random drop of elements when consumer is not able to keep up with the producer. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/RateTransformationDocTest.java#conflate-sample Understanding expand -------------------- -*TODO* +Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer. + +As a simple use of expand here is a flow that sends the same element to consumer when producer does not send any new elements. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/RateTransformationDocTest.java#expand-last + +Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow that tracks and reports a drift between fast consumer and slow producer. + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/RateTransformationDocTest.java#expand-drift + +Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer if fast enough, of a larger drift otherwise. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/RateTransformationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/RateTransformationDocSpec.scala new file mode 100644 index 0000000000..d10b287755 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/RateTransformationDocSpec.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package docs.stream + +import akka.stream._ +import akka.stream.scaladsl._ +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ +import scala.util.Random +import scala.math._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.collection.immutable + +class RateTransformationDocSpec extends AkkaSpec { + + type Seq[+A] = immutable.Seq[A] + val Seq = immutable.Seq + + implicit val mat = ActorMaterializer() + + "conflate should summarize" in { + //#conflate-summarize + val statsFlow = Flow[Double] + .conflate(Seq(_))(_ :+ _) + .map { s => + val μ = s.sum / s.size + val se = s.map(x => pow(x - μ, 2)) + val σ = sqrt(se.sum / se.size) + (σ, μ, s.size) + } + //#conflate-summarize + + val fut = Source(() => Iterator.continually(Random.nextGaussian)) + .via(statsFlow) + .grouped(10) + .runWith(Sink.head) + + Await.result(fut, 100.millis) + } + + "conflate should sample" in { + //#conflate-sample + val p = 0.01 + val sampleFlow = Flow[Double] + .conflate(Seq(_)) { + case (acc, elem) if Random.nextDouble < p => acc :+ elem + case (acc, _) => acc + } + .mapConcat(identity) + //#conflate-sample + + val fut = Source(1 to 10000) + .map(_.toDouble) + .via(sampleFlow) + .runWith(Sink.fold(Seq.empty[Double])(_ :+ _)) + + val count = Await.result(fut, 100.millis).size + } + + "expand should repeat last" in { + //#expand-last + val lastFlow = Flow[Double] + .expand(identity)(s => (s, s)) + //#expand-last + + val (probe, fut) = TestSource.probe[Double] + .via(lastFlow) + .grouped(10) + .toMat(Sink.head)(Keep.both) + .run + + probe.sendNext(1.0) + val expanded = Await.result(fut, 100.millis) + expanded.size shouldBe 10 + expanded.sum shouldBe 10 + } + + "expand should track drift" in { + //#expand-drift + val driftFlow = Flow[Double] + .expand((_, 0)) { + case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1)) + } + //#expand-drift + + val (pub, sub) = TestSource.probe[Double] + .via(driftFlow) + .toMat(TestSink.probe[(Double, Int)])(Keep.both) + .run + + sub.request(1) + pub.sendNext(1.0) + sub.expectNext((1.0, 0)) + + sub.requestNext((1.0, 1)) + sub.requestNext((1.0, 2)) + + pub.sendNext(2.0) + sub.requestNext((2.0, 0)) + } + +} diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index b71b12b592..618b681a64 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -141,9 +141,29 @@ Rate transformation Understanding conflate ---------------------- -*TODO* +When a fast producer can not be informed to slow down by backpressure or some other signal, conflate might be useful to combine elements from a producer until a demand signal comes from a consumer. + +Below is an example snippet that summarizes fast stream of elements to a standart deviation, mean and count of elements that have arrived while the stats have been calculated. + +.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#conflate-summarize + +This example demonstrates that such flow's rate is decoupled. Element rate at the start of the flow can be much higher that the element rate at the end of the flow. + +Another possible use of conflate is to not consider all elements for summary when producer starts getting too fast. Example below demonstrates how conflate can be used to implement random drop of elements when consumer is not able to keep up with the producer. + +.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#conflate-sample Understanding expand -------------------- -*TODO* +Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. Expand allows to extrapolate a value to be sent as an element to a consumer. + +As a simple use of expand here is a flow that sends the same element to consumer when producer does not send any new elements. + +.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#expand-last + +Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow that tracks and reports a drift between fast consumer and slow producer. + +.. includecode:: code/docs/stream/RateTransformationDocSpec.scala#expand-drift + +Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer if fast enough, of a larger drift otherwise.