From a72ee4d5b8bb151e4aeecb98bee93ed3afed8ce4 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Mon, 8 Jun 2020 20:06:32 +0200 Subject: [PATCH 1/4] Adds throttle example --- .../operators/Source-or-Flow/throttle.md | 16 ++++++ .../operators/sourceorflow/Throttle.java | 49 +++++++++++++++++++ .../operators/sourceorflow/Throttle.scala | 45 +++++++++++++++++ 3 files changed, 110 insertions(+) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md index 8dc1dbc961..2d972c10fa 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md @@ -14,6 +14,22 @@ Limit the throughput to a specific number of elements per time unit, or a specif Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. +See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. + +## Example + +Imagine the server end of a streaming platform. When a client connects and requesta a video content, the server +should return the content. Instead of serving a complete video as soon as bandwith allows, `throttle` can be used +to burst the first 30 seconds and then send a constant of 24 frames per second (let's imagine this streaming +platform stores frames, not bytes). This allows the browser to buffer 30 seconds in case there's a network +outage but prevents sending too much video content at once using all bandwitdh of the user's bandwitdh. + +Scala +: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle } + +Java +: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle } + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java new file mode 100644 index 0000000000..be0a75af17 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java @@ -0,0 +1,49 @@ +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.Materializer; +import akka.stream.ThrottleMode; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.util.stream.Stream; +import docs.stream.operators.sourceorflow.ThrottleCommon.Frame; + +/** + * + */ +public class Throttle { + + public static void main(String[] args) { + ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream"); + Materializer mat = Materializer.matFromSystem(actorSystem); + + Source frameSource = + Source + .fromIterator(() -> Stream.iterate(0 , i -> i+1).iterator()) + .map(i -> new Frame(i.intValue())) ; + + // #throttle + int framesPerSecond = 24; + + Source videoThrottling = frameSource + .throttle( + framesPerSecond, + Duration.ofSeconds(1), + framesPerSecond * 30, + ThrottleMode.shaping() + ); + // serialize `Frame` and send over the network. + // #throttle + + videoThrottling + .map(f -> f.i()) + .to(Sink.foreach(System.out::println)) + .run(mat); + + } + + +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala new file mode 100644 index 0000000000..bb913dc923 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala @@ -0,0 +1,45 @@ +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ThrottleMode +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +import scala.concurrent.duration._ + +/** + * + */ +object Throttle extends App { + + implicit val sys = ActorSystem("25fps-stream") + + val frameSource: Source[Int, NotUsed] = + Source.fromIterator(() => Iterator.from(0)) + + // #throttle + val framesPerSecond = 24 + + // val frameSource: Source[Frame,_] + val videoThrottling = frameSource + .throttle( + framesPerSecond, + 1.second, + framesPerSecond * 30, // maximumBurst + ThrottleMode.shaping + ) + // serialize `Frame` and send over the network. + // #throttle + + videoThrottling + .to(Sink.foreach(println)) + .run() +} + +object ThrottleCommon{ + + // used in ThrottleJava + case class Frame(i: Int) + +} From 506cbc5bf4a7273624508144767dd6b7d78389e8 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Mon, 8 Jun 2020 20:13:29 +0200 Subject: [PATCH 2/4] x-ref to queue.md --- .../operators/Source-or-Flow/throttle.md | 3 ++ .../operators/sourceorflow/Throttle.java | 49 ++++++++----------- .../operators/sourceorflow/Throttle.scala | 24 ++++----- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md index 2d972c10fa..f4b8e4aafc 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md @@ -14,6 +14,9 @@ Limit the throughput to a specific number of elements per time unit, or a specif Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. +The throttle operator combines will with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair. + + See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. ## Example diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java index be0a75af17..75d2b4c164 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package jdocs.stream.operators.sourceorflow; import akka.NotUsed; @@ -11,39 +15,26 @@ import java.time.Duration; import java.util.stream.Stream; import docs.stream.operators.sourceorflow.ThrottleCommon.Frame; -/** - * - */ +/** */ public class Throttle { - public static void main(String[] args) { - ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream"); - Materializer mat = Materializer.matFromSystem(actorSystem); + public static void main(String[] args) { + ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream"); + Materializer mat = Materializer.matFromSystem(actorSystem); - Source frameSource = - Source - .fromIterator(() -> Stream.iterate(0 , i -> i+1).iterator()) - .map(i -> new Frame(i.intValue())) ; + Source frameSource = + Source.fromIterator(() -> Stream.iterate(0, i -> i + 1).iterator()) + .map(i -> new Frame(i.intValue())); - // #throttle - int framesPerSecond = 24; - - Source videoThrottling = frameSource - .throttle( - framesPerSecond, - Duration.ofSeconds(1), - framesPerSecond * 30, - ThrottleMode.shaping() - ); - // serialize `Frame` and send over the network. - // #throttle - - videoThrottling - .map(f -> f.i()) - .to(Sink.foreach(System.out::println)) - .run(mat); - - } + // #throttle + int framesPerSecond = 24; + Source videoThrottling = + frameSource.throttle( + framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping()); + // serialize `Frame` and send over the network. + // #throttle + videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat); + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala index bb913dc923..301341b9c3 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala @@ -1,3 +1,7 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + package docs.stream.operators.sourceorflow import akka.NotUsed @@ -9,7 +13,7 @@ import akka.stream.scaladsl.Source import scala.concurrent.duration._ /** - * + * */ object Throttle extends App { @@ -22,22 +26,18 @@ object Throttle extends App { val framesPerSecond = 24 // val frameSource: Source[Frame,_] - val videoThrottling = frameSource - .throttle( - framesPerSecond, - 1.second, - framesPerSecond * 30, // maximumBurst - ThrottleMode.shaping - ) + val videoThrottling = frameSource.throttle( + framesPerSecond, + 1.second, + framesPerSecond * 30, // maximumBurst + ThrottleMode.shaping) // serialize `Frame` and send over the network. // #throttle - videoThrottling - .to(Sink.foreach(println)) - .run() + videoThrottling.to(Sink.foreach(println)).run() } -object ThrottleCommon{ +object ThrottleCommon { // used in ThrottleJava case class Frame(i: Int) From 6eb1a9e1efa317d2232b088a23a0e590b323b55c Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 9 Jun 2020 15:53:04 +0200 Subject: [PATCH 3/4] Split example to ease into the details. PR Comments --- .../operators/Source-or-Flow/throttle.md | 30 ++++++++++++++----- .../operators/sourceorflow/Throttle.java | 10 ++++++- .../operators/sourceorflow/Throttle.scala | 17 ++++++++--- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md index f4b8e4aafc..cd0a48d78b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/throttle.md @@ -14,18 +14,15 @@ Limit the throughput to a specific number of elements per time unit, or a specif Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where a function has to be provided to calculate the individual cost of each element. -The throttle operator combines will with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair. - +The throttle operator combines well with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair. See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. ## Example -Imagine the server end of a streaming platform. When a client connects and requesta a video content, the server -should return the content. Instead of serving a complete video as soon as bandwith allows, `throttle` can be used -to burst the first 30 seconds and then send a constant of 24 frames per second (let's imagine this streaming -platform stores frames, not bytes). This allows the browser to buffer 30 seconds in case there's a network -outage but prevents sending too much video content at once using all bandwitdh of the user's bandwitdh. +Imagine the server end of a streaming platform. When a client connects and request a video content, the server +should return the content. Instead of serving a complete video as fast as bandwith allows, `throttle` can be used +to limit the network usage to 24 frames per second (let's imagine this streaming platform stores frames, not bytes). Scala : @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle } @@ -33,6 +30,25 @@ Scala Java : @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle } +The problem in the example above is that when there's a network hiccup, the video playback will interrupt. It can be +improved by sending more content than the necessary ahead of time and let the client buffer that. So, `throttle` can be used +to burst the first 30 seconds and then send a constant of 24 frames per second. This way, when a request comes in +a good chunk of content will be downloaded and after that the server will activate the throttling. + +Scala +: @@snip [Throttle.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala) { #throttle-with-burst } + +Java +: @@snip [Throttle.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java) { #throttle-with-burst } + +The extra argument to set the `ThrottleMode` to `shapping` tells `throttle` to make pauses to avoid exceeding +the maximum rate. Alternatively we could set the throttling mode to cause a stream failure when upstream is faster +than the throttle rate. + +The examples above don't cover all the parameters supported by `throttle` (e.g `cost`-based throttling). See the +@apidoc[api documentation](Flow) { scala="#throttle(cost:Int,per:scala.concurrent.duration.FiniteDuration,maximumBurst:Int,costCalculation:Out=>Int,mode:akka.stream.ThrottleMode):FlowOps.this.Repr[Out]" java="#throttle(int,java.time.Duration,int,akka.japi.function.Function,akka.stream.ThrottleMode)" } +for all the details. + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java index 75d2b4c164..d80f337491 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java @@ -31,10 +31,18 @@ public class Throttle { Source videoThrottling = frameSource.throttle( - framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping()); + framesPerSecond, Duration.ofSeconds(1)); // serialize `Frame` and send over the network. // #throttle + // #throttle-with-burst + Source throttlingWithBurst = + frameSource.throttle( + framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping()); + // serialize `Frame` and send over the network. + // #throttle-with-burst + videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat); + throttlingWithBurst.take(1000L).map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat); } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala index 301341b9c3..d07d9d992a 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala @@ -28,13 +28,22 @@ object Throttle extends App { // val frameSource: Source[Frame,_] val videoThrottling = frameSource.throttle( framesPerSecond, - 1.second, - framesPerSecond * 30, // maximumBurst - ThrottleMode.shaping) + 1.second) // serialize `Frame` and send over the network. // #throttle - videoThrottling.to(Sink.foreach(println)).run() + // #throttle-with-burst + // val frameSource: Source[Frame,_] + val videoThrottlingWithBurst = frameSource.throttle( + framesPerSecond, + 1.second, + framesPerSecond * 30, // maximumBurst + ThrottleMode.Shaping) + // serialize `Frame` and send over the network. + // #throttle-with-burst + + videoThrottling.take(1000).to(Sink.foreach(println)).run() + videoThrottlingWithBurst.take(1000).to(Sink.foreach(println)).run() } object ThrottleCommon { From 411a11dd9d1d739673cd38ec9238f31a3a084062 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 9 Jun 2020 15:55:43 +0200 Subject: [PATCH 4/4] No commit without a formatting follwup --- .../java/jdocs/stream/operators/sourceorflow/Throttle.java | 3 +-- .../scala/docs/stream/operators/sourceorflow/Throttle.scala | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java index d80f337491..184d256eca 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Throttle.java @@ -30,8 +30,7 @@ public class Throttle { int framesPerSecond = 24; Source videoThrottling = - frameSource.throttle( - framesPerSecond, Duration.ofSeconds(1)); + frameSource.throttle(framesPerSecond, Duration.ofSeconds(1)); // serialize `Frame` and send over the network. // #throttle diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala index d07d9d992a..b4d7bf201a 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Throttle.scala @@ -26,9 +26,7 @@ object Throttle extends App { val framesPerSecond = 24 // val frameSource: Source[Frame,_] - val videoThrottling = frameSource.throttle( - framesPerSecond, - 1.second) + val videoThrottling = frameSource.throttle(framesPerSecond, 1.second) // serialize `Frame` and send over the network. // #throttle