diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md index 72238e0740..405ec3aabe 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/expand.md @@ -4,13 +4,10 @@ Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` @ref[Backpressure aware operators](../index.md#backpressure-aware-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #expand } +@apidoc[Flow.expand](Flow) { scala="#expand[U](expander:Out=%3EIterator[U]):FlowOps.this.Repr[U]" java="#expand(akka.japi.function.Function)" } -@@@ ## Description @@ -20,6 +17,21 @@ element, allowing for it to be rewritten and/or filtered. See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +## Example + +Imagine a streaming client decoding a video. It is possible the network bandwidth is a bit +unreliable. It's fine, as long as the audio remains fluent, it doesn't matter if we can't decode +a frame or two (or more). But we also want to watermark every decoded frame with the name of +our colleague. `expand` provides access to the element flowing through the stream +and let's us create extra frames in case the producer slows down: + +Scala +: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #expand } + +Java +: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #expand } + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md index 10a85db494..37cef38b28 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/extrapolate.md @@ -4,13 +4,10 @@ Allow for a faster downstream by expanding the last emitted element to an `Itera @ref[Backpressure aware operators](../index.md#backpressure-aware-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #extrapolate } +@apidoc[Flow.extrapolate](Flow) { scala="#extrapolate%5BU%3E:Out](extrapolator:U=%3EIterator[U],initial:Option[U]):FlowOps.this.Repr[U]" java="#extrapolate(akka.japi.function.Function)" } -@@@ ## Description @@ -25,6 +22,19 @@ Includes an optional `initial` argument to prevent blocking the entire stream wh See @ref:[Understanding extrapolate and expand](../../stream-rate.md#understanding-extrapolate-and-expand) for more information and examples. +## Example + +Imagine a videoconference client decoding a video feed from a colleague working remotely. It is possible +the network bandwidth is a bit unreliable. It's fine, as long as the audio remains fluent, it doesn't matter +if we can't decode a frame or two (or more). When a frame is dropped, though, we want the UI to show the last +frame decoded: + +Scala +: @@snip [ExtrapolateAndExpand.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala) { #extrapolate } + +Java +: @@snip [ExtrapolateAndExpand.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java) { #extrapolate } + ## Reactive Streams semantics @@@div { .callout } @@ -36,4 +46,3 @@ and examples. **completes** when upstream completes @@@ - diff --git a/akka-docs/src/main/paradox/stream/stream-rate.md b/akka-docs/src/main/paradox/stream/stream-rate.md index fb4e405066..5482fdf30b 100644 --- a/akka-docs/src/main/paradox/stream/stream-rate.md +++ b/akka-docs/src/main/paradox/stream/stream-rate.md @@ -234,7 +234,7 @@ Scala Java : @@snip [RateTransformationDocTest.java](/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-seed } -`extrapolate` and `expand` also allow to produce metainformation based on demand signalled from the downstream. +`extrapolate` and `expand` also allow to produce meta-information based on demand signalled from the downstream. Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer. Scala @@ -260,3 +260,5 @@ This makes `expand` able to transform or even filter out (by providing an empty Regardless, since we provide a non-empty `Iterator` in both examples, this means that the output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise. + +See also @ref:[`extrapolate`](operators/Source-or-Flow/extrapolate.md) and @ref:[`expand`](operators/Source-or-Flow/expand.md) for more information and examples. \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java new file mode 100644 index 0000000000..4593d5ded3 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/ExtrapolateAndExpand.java @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.japi.function.Function; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon; +import docs.stream.operators.sourceorflow.ExtrapolateAndExpandCommon.Frame; + +import java.time.Duration; +import java.util.stream.Stream; + +/** */ +public class ExtrapolateAndExpand { + public static Function decodeAsFrame = + ExtrapolateAndExpandCommon.Frame$.MODULE$::decode; + + public static Frame BLACK_FRAME = ExtrapolateAndExpandCommon.Frame$.MODULE$.blackFrame(); + + public static long nowInSeconds() { + return ExtrapolateAndExpand.nowInSeconds(); + } + + public static void main(String[] args) { + ActorSystem actorSystem = ActorSystem.create("25fps-stream"); + + Source networkSource = ExtrapolateAndExpandCommon.networkSource().asJava(); + + Flow decode = Flow.of(ByteString.class).map(decodeAsFrame); + + // #extrapolate + // if upstream is too slow, produce copies of the last frame but grayed out. + Flow rateControl = + Flow.of(Frame.class) + .extrapolate( + lastFrame -> { + Frame gray = + new Frame( + ByteString.fromString( + "gray frame!! - " + lastFrame.pixels().utf8String())); + return Stream.iterate(gray, i -> i).iterator(); + }, + BLACK_FRAME // initial value + ); + + Source videoSource = networkSource.via(decode).via(rateControl); + + // let's create a 25fps stream (a Frame every 40.millis) + Source tickSource = + Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); + + Source videoAt25Fps = tickSource.zip(videoSource).map(Pair::second); + + // #extrapolate + + // #expand + // each element flowing through the stream is expanded to a watermark copy + // of the upstream frame and grayed out copies. The grayed out copies should + // only be used downstream if the producer is too slow. + Flow watermarkerRateControl = + Flow.of(Frame.class) + .expand( + lastFrame -> { + Frame watermarked = + new Frame( + lastFrame.pixels().$plus$plus(ByteString.fromString(" - watermark"))); + Frame gray = + new Frame(lastFrame.pixels().$plus$plus(ByteString.fromString(" - gray"))); + return Stream.concat(Stream.of(watermarked), Stream.iterate(gray, i -> i)) + .iterator(); + }); + + Source watermakedVideoSource = + networkSource.via(decode).via(watermarkerRateControl); + + // let's create a 25fps stream (a Frame every 40.millis) + Source ticks = Source.tick(Duration.ZERO, Duration.ofMillis(40), "tick"); + + Source watermarkedVideoAt25Fps = + ticks.zip(watermakedVideoSource).map(Pair::second); + + // #expand + videoAt25Fps + .map(Frame::pixels) + .map(ByteString::utf8String) + .map(pixels -> nowInSeconds() + " - " + pixels) + .to(Sink.foreach(System.out::println)) + .run(actorSystem); + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala new file mode 100644 index 0000000000..59fc31e473 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/ExtrapolateAndExpand.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.Cancellable +import akka.stream.DelayOverflowStrategy +import akka.stream.scaladsl.DelayStrategy +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.fps +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.nowInSeconds +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.periodInMillis +import docs.stream.operators.sourceorflow.ExtrapolateAndExpand.videoAt25Fps + +import scala.concurrent.duration._ +import scala.util.Random + +/** + * + */ +object ExtrapolateAndExpandMain extends App { + implicit val sys = ActorSystem("25fps-stream") + videoAt25Fps.map(_.pixels.utf8String).map(frame => s"$nowInSeconds - $frame").to(Sink.foreach(println)).run() + +} +object ExtrapolateAndExpand { + + val periodInMillis = 40 + val fps = 1000 / periodInMillis + + import ExtrapolateAndExpandCommon._ + + val decode: Flow[ByteString, Frame, NotUsed] = + Flow[ByteString].map(Frame.decode) + + // #extrapolate + // if upstream is too slow, produce copies of the last frame but grayed out. + val rateControl: Flow[Frame, Frame, NotUsed] = + Flow[Frame].extrapolate((frame: Frame) => { + val grayedOut = frame.withFilter(Gray) + Iterator.continually(grayedOut) + }, Some(Frame.blackFrame)) + + val videoSource: Source[Frame, NotUsed] = networkSource.via(decode).via(rateControl) + + // let's create a 25fps stream (a Frame every 40.millis) + val tickSource: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) + + val videoAt25Fps: Source[Frame, Cancellable] = + tickSource.zip(videoSource).map(_._2) + // #extrapolate + + // #expand + // each element flowing through the stream is expanded to a watermark copy + // of the upstream frame and grayed out copies. The grayed out copies should + // only be used downstream if the producer is too slow. + val watermarkerRateControl: Flow[Frame, Frame, NotUsed] = + Flow[Frame].expand((frame: Frame) => { + val watermarked = frame.withFilter(Watermark) + val grayedOut = frame.withFilter(Gray) + (Iterator.single(watermarked) ++ Iterator.continually(grayedOut)) + }) + + val watermarkedVideoSource: Source[Frame, NotUsed] = + networkSource.via(decode).via(rateControl) + + // let's create a 25fps stream (a Frame every 40.millis) + val ticks: Source[Tick.type, Cancellable] = Source.tick(0.seconds, 40.millis, Tick) + + val watermarkedVideoAt25Fps: Source[Frame, Cancellable] = + ticks.zip(watermarkedVideoSource).map(_._2) + + // #expand + + def nowInSeconds = System.nanoTime() / 1000000000 +} + +object ExtrapolateAndExpandCommon { + // This `networkSource` simulates a client sending frames over the network. There's a + // stage throttling the elements at 24fps and then a `delayWith` that randomly delays + // frames simulating network latency and bandwidth limitations (uses buffer of + // default capacity). + val networkSource: Source[ByteString, NotUsed] = + Source + .fromIterator(() => Iterator.from(0)) // produce frameIds + .throttle(fps, 1.second) + .map(i => ByteString.fromString(s"fakeFrame-$i")) + .delayWith( + () => + new DelayStrategy[ByteString] { + override def nextDelay(elem: ByteString): FiniteDuration = + Random.nextInt(periodInMillis * 10).millis + }, + DelayOverflowStrategy.dropBuffer) + + case object Tick + + sealed trait Filter { + def filter(fr: Frame): Frame + } + object Gray extends Filter { + override def filter(fr: Frame): Frame = + Frame(ByteString.fromString(s"gray frame!! - ${fr.pixels.utf8String}")) + } + object Watermark extends Filter { + override def filter(fr: Frame): Frame = + Frame(fr.pixels.++(ByteString.fromString(" - watermark"))) + } + + case class Frame(pixels: ByteString) { + def withFilter(f: Filter): Frame = f.filter(this) + } + object Frame { + val blackFrame: Frame = Frame(ByteString.empty) + def decode(bs: ByteString): Frame = Frame(bs) + } +}