x-ref to queue.md

This commit is contained in:
Ignasi Marimon-Clos 2020-06-08 20:13:29 +02:00
parent a72ee4d5b8
commit 506cbc5bf4
3 changed files with 35 additions and 41 deletions

View file

@ -14,6 +14,9 @@ Limit the throughput to a specific number of elements per time unit, or a specif
Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where Limit the throughput to a specific number of elements per time unit, or a specific total cost per time unit, where
a function has to be provided to calculate the individual cost of each element. a function has to be provided to calculate the individual cost of each element.
The throttle operator combines will with the @ref[`queue`](./../Source/queue.md) operator to adapt the speeds on both ends of the `queue`-`throttle` pair.
See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators. See also @ref:[Buffers and working with rate](../../stream-rate.md) for related operators.
## Example ## Example

View file

@ -1,3 +1,7 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow; package jdocs.stream.operators.sourceorflow;
import akka.NotUsed; import akka.NotUsed;
@ -11,39 +15,26 @@ import java.time.Duration;
import java.util.stream.Stream; import java.util.stream.Stream;
import docs.stream.operators.sourceorflow.ThrottleCommon.Frame; import docs.stream.operators.sourceorflow.ThrottleCommon.Frame;
/** /** */
*
*/
public class Throttle { public class Throttle {
public static void main(String[] args) { public static void main(String[] args) {
ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream"); ActorSystem actorSystem = ActorSystem.create("25fps-throttled-stream");
Materializer mat = Materializer.matFromSystem(actorSystem); Materializer mat = Materializer.matFromSystem(actorSystem);
Source<Frame, NotUsed> frameSource = Source<Frame, NotUsed> frameSource =
Source Source.fromIterator(() -> Stream.iterate(0, i -> i + 1).iterator())
.fromIterator(() -> Stream.iterate(0 , i -> i+1).iterator()) .map(i -> new Frame(i.intValue()));
.map(i -> new Frame(i.intValue())) ;
// #throttle // #throttle
int framesPerSecond = 24; int framesPerSecond = 24;
Source<Frame, NotUsed> videoThrottling = frameSource
.throttle(
framesPerSecond,
Duration.ofSeconds(1),
framesPerSecond * 30,
ThrottleMode.shaping()
);
// serialize `Frame` and send over the network.
// #throttle
videoThrottling
.map(f -> f.i())
.to(Sink.foreach(System.out::println))
.run(mat);
}
Source<Frame, NotUsed> videoThrottling =
frameSource.throttle(
framesPerSecond, Duration.ofSeconds(1), framesPerSecond * 30, ThrottleMode.shaping());
// serialize `Frame` and send over the network.
// #throttle
videoThrottling.map(f -> f.i()).to(Sink.foreach(System.out::println)).run(mat);
}
} }

View file

@ -1,3 +1,7 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow package docs.stream.operators.sourceorflow
import akka.NotUsed import akka.NotUsed
@ -9,7 +13,7 @@ import akka.stream.scaladsl.Source
import scala.concurrent.duration._ import scala.concurrent.duration._
/** /**
* *
*/ */
object Throttle extends App { object Throttle extends App {
@ -22,22 +26,18 @@ object Throttle extends App {
val framesPerSecond = 24 val framesPerSecond = 24
// val frameSource: Source[Frame,_] // val frameSource: Source[Frame,_]
val videoThrottling = frameSource val videoThrottling = frameSource.throttle(
.throttle( framesPerSecond,
framesPerSecond, 1.second,
1.second, framesPerSecond * 30, // maximumBurst
framesPerSecond * 30, // maximumBurst ThrottleMode.shaping)
ThrottleMode.shaping
)
// serialize `Frame` and send over the network. // serialize `Frame` and send over the network.
// #throttle // #throttle
videoThrottling videoThrottling.to(Sink.foreach(println)).run()
.to(Sink.foreach(println))
.run()
} }
object ThrottleCommon{ object ThrottleCommon {
// used in ThrottleJava // used in ThrottleJava
case class Frame(i: Int) case class Frame(i: Int)