From bf3c11464e8a522dadc5f06d86978e4f68890f74 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 20 Sep 2018 16:22:11 +0900 Subject: [PATCH] add usage examples for conflate / conflate with seed (#25498) * +doc conflate and conflateWithSeed docs examples * fixes * show throttle in the example so its more obvious that "slow upstream" etc --- .../operators/Source-or-Flow/conflate.md | 11 ++++ .../Source-or-Flow/conflateWithSeed.md | 19 +++++++ .../stream/operators/Source-or-Flow/log.md | 17 +++--- .../java/jdocs/stream/IntegrationDocTest.java | 1 - .../jdocs/stream/operators/SourceOrFlow.java | 52 ++++++++++++++++--- .../docs/stream/operators/SourceOrFlow.scala | 26 ++++++++++ .../stream/scaladsl/FlowConflateSpec.scala | 4 +- 7 files changed, 112 insertions(+), 18 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md index 453b10bd6a..4fecb22ee2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md @@ -18,6 +18,17 @@ Allow for a slower downstream by passing incoming elements and a summary into an there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or average of incoming numbers, if aggregation should lead to a different type `conflateWithSeed` can be used: +## Example + +Scala +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflate } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflate } + +If downstream is slower the elements is conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream. + +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md index 85003a6487..a013ad08ac 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md @@ -18,6 +18,25 @@ Allow for a slower downstream by passing incoming elements and a summary into an is backpressure. When backpressure starts or there is no backpressure element is passed into a `seed` function to transform it to the summary type. +## Example + +Scala +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflateWithSeed } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflateWithSeed-type #conflateWithSeed } + + +If downstream is slower, the "seed" function is called which is able to change the type of the to be conflated +elements if needed (it can also be an identity function, in which case this `conflateWithSeed` is equivalent to +a plain `conflate`). Next, the conflating function is applied while there is back-pressure from the downstream, +such that the upstream can produce elements at an rate independent of the downstream. + +You may want to use this operation for example to apply an average operation on the upstream elements, +while the downstream backpressures. This allows us to keep processing upstream elements, and give an average +number to the downstream once it is ready to process the next one. + +## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md index 1dc9e16684..cd8f80560d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md @@ -18,6 +18,15 @@ Log elements flowing through the stream as well as completion and erroring. By d completion signals are logged on debug level, and errors are logged on Error level. This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow. +## Example + +Scala +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #log } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #log } + +## Reactive Streams semantics @@@div { .callout } @@ -28,11 +37,3 @@ This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attrib **completes** when upstream completes @@@ - -## Example - -Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #log } - -Java -: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #log } diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index 912b00b288..87e3ac7841 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -405,7 +405,6 @@ public class IntegrationDocTest extends AbstractJavaTest { //#actorRefWithAck } - @Test public void callingExternalServiceWithMapAsync() throws Exception { new TestKit(system) { diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 5db22ab610..3a3b56a59a 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -8,20 +8,58 @@ import akka.stream.javadsl.Flow; //#log import akka.stream.Attributes; - +import akka.stream.javadsl.Source; //#log +import java.time.Duration; +import java.util.Arrays; + + class SourceOrFlow { void logExample() { Flow.of(String.class) - //#log - .log("myStream") - .addAttributes(Attributes.createLogLevels( - Attributes.logLevelOff(), // onElement - Attributes.logLevelError(), // onFailure - Attributes.logLevelInfo())) // onFinish + //#log + .log("myStream") + .addAttributes(Attributes.createLogLevels( + Attributes.logLevelOff(), // onElement + Attributes.logLevelError(), // onFailure + Attributes.logLevelInfo())) // onFinish //#log ; } + + void conflateExample() { + //#conflate + Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) + .throttle(10, Duration.ofSeconds(1)) // fast upstream + .conflate((Integer acc, Integer el) -> acc + el) + .throttle(1, Duration.ofSeconds(1)); // slow downstream + //#conflate + } + + static //#conflateWithSeed-type + class Summed { + + private final Integer el; + + public Summed(Integer el) { + this.el = el; + } + + public Summed sum(Summed other) { + return new Summed(this.el + other.el); + } + } + //#conflateWithSeed-type + + void conflateWithSeedExample() { + //#conflateWithSeed + + Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) + .throttle(10, Duration.ofSeconds(1)) // fast upstream + .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el))) + .throttle(1, Duration.ofSeconds(1)); // slow downstream + //#conflateWithSeed + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala index 900c057e90..18000a350b 100644 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala +++ b/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala @@ -26,4 +26,30 @@ object SourceOrFlow { //#log } + def conflateExample(): Unit = { + //#conflate + import scala.concurrent.duration._ + + Source.cycle(() ⇒ List(1, 10, 100, 1000).iterator) + .throttle(10, per = 1.second) // faster upstream + .conflate((acc, el) ⇒ acc + el) // acc: Int, el: Int + .throttle(1, per = 1.second) // slow downstream + //#conflate + } + + def conflateWithSeedExample(): Unit = { + //#conflateWithSeed + import scala.concurrent.duration._ + + case class Summed(i: Int) { + def sum(other: Summed) = Summed(this.i + other.i) + } + + Source.cycle(() ⇒ List(1, 10, 100, 1000).iterator) + .throttle(10, per = 1.second) // faster upstream + .conflateWithSeed(el ⇒ Summed(el))((acc, el) ⇒ acc sum Summed(el)) // (Summed, Int) => Summed + .throttle(1, per = 1.second) // slow downstream + //#conflateWithSeed + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 4aebedbc13..0272e61972 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -66,7 +66,7 @@ class FlowConflateSpec extends StreamSpec { for (i ← 1 to 100) { publisher.sendNext(i) } - subscriber.expectNoMsg(1.second) + subscriber.expectNoMessage(1.second) sub.request(1) subscriber.expectNext(5050) sub.cancel() @@ -82,7 +82,7 @@ class FlowConflateSpec extends StreamSpec { for (i ← 1 to 100) { publisher.sendNext(i) } - subscriber.expectNoMsg(1.second) + subscriber.expectNoMessage(1.second) sub.request(1) subscriber.expectNext(5050) sub.cancel()