add usage examples for conflate / conflate with seed (#25498)

* +doc conflate and conflateWithSeed docs examples

* fixes

* show throttle in the example so its more obvious that "slow upstream" etc
This commit is contained in:
Konrad `ktoso` Malawski 2018-09-20 16:22:11 +09:00 committed by GitHub
parent 4206e16954
commit bf3c11464e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 18 deletions

View file

@ -18,6 +18,17 @@ Allow for a slower downstream by passing incoming elements and a summary into an
there is backpressure. The summary value must be of the same type as the incoming elements, for example the sum or
average of incoming numbers, if aggregation should lead to a different type `conflateWithSeed` can be used:
## Example
Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflate }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflate }
If downstream is slower the elements is conflated by summing them. This means that upstream can continue producing elements while downstream is applying backpressure. For example: downstream is backpressuring while 1, 10 and 100 arrives from upstream, then backpressure stops and the conflated 111 is emitted downstream.
## Reactive Streams semantics
@@@div { .callout }

View file

@ -18,6 +18,25 @@ Allow for a slower downstream by passing incoming elements and a summary into an
is backpressure. When backpressure starts or there is no backpressure element is passed into a `seed` function to
transform it to the summary type.
## Example
Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflateWithSeed }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflateWithSeed-type #conflateWithSeed }
If downstream is slower, the "seed" function is called which is able to change the type of the to be conflated
elements if needed (it can also be an identity function, in which case this `conflateWithSeed` is equivalent to
a plain `conflate`). Next, the conflating function is applied while there is back-pressure from the downstream,
such that the upstream can produce elements at an rate independent of the downstream.
You may want to use this operation for example to apply an average operation on the upstream elements,
while the downstream backpressures. This allows us to keep processing upstream elements, and give an average
number to the downstream once it is ready to process the next one.
## Reactive Streams semantics
@@@div { .callout }

View file

@ -18,6 +18,15 @@ Log elements flowing through the stream as well as completion and erroring. By d
completion signals are logged on debug level, and errors are logged on Error level.
This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attributes.createLogLevels(...)`] on the given Flow.
## Example
Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #log }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #log }
## Reactive Streams semantics
@@@div { .callout }
@ -28,11 +37,3 @@ This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attrib
**completes** when upstream completes
@@@
## Example
Scala
: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #log }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #log }

View file

@ -405,7 +405,6 @@ public class IntegrationDocTest extends AbstractJavaTest {
//#actorRefWithAck
}
@Test
public void callingExternalServiceWithMapAsync() throws Exception {
new TestKit(system) {

View file

@ -8,9 +8,13 @@ import akka.stream.javadsl.Flow;
//#log
import akka.stream.Attributes;
import akka.stream.javadsl.Source;
//#log
import java.time.Duration;
import java.util.Arrays;
class SourceOrFlow {
void logExample() {
@ -24,4 +28,38 @@ class SourceOrFlow {
//#log
;
}
void conflateExample() {
//#conflate
Source.cycle(() -> Arrays.asList(1, 10, 100).iterator())
.throttle(10, Duration.ofSeconds(1)) // fast upstream
.conflate((Integer acc, Integer el) -> acc + el)
.throttle(1, Duration.ofSeconds(1)); // slow downstream
//#conflate
}
static //#conflateWithSeed-type
class Summed {
private final Integer el;
public Summed(Integer el) {
this.el = el;
}
public Summed sum(Summed other) {
return new Summed(this.el + other.el);
}
}
//#conflateWithSeed-type
void conflateWithSeedExample() {
//#conflateWithSeed
Source.cycle(() -> Arrays.asList(1, 10, 100).iterator())
.throttle(10, Duration.ofSeconds(1)) // fast upstream
.conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el)))
.throttle(1, Duration.ofSeconds(1)); // slow downstream
//#conflateWithSeed
}
}

View file

@ -26,4 +26,30 @@ object SourceOrFlow {
//#log
}
def conflateExample(): Unit = {
//#conflate
import scala.concurrent.duration._
Source.cycle(() List(1, 10, 100, 1000).iterator)
.throttle(10, per = 1.second) // faster upstream
.conflate((acc, el) acc + el) // acc: Int, el: Int
.throttle(1, per = 1.second) // slow downstream
//#conflate
}
def conflateWithSeedExample(): Unit = {
//#conflateWithSeed
import scala.concurrent.duration._
case class Summed(i: Int) {
def sum(other: Summed) = Summed(this.i + other.i)
}
Source.cycle(() List(1, 10, 100, 1000).iterator)
.throttle(10, per = 1.second) // faster upstream
.conflateWithSeed(el Summed(el))((acc, el) acc sum Summed(el)) // (Summed, Int) => Summed
.throttle(1, per = 1.second) // slow downstream
//#conflateWithSeed
}
}

View file

@ -66,7 +66,7 @@ class FlowConflateSpec extends StreamSpec {
for (i 1 to 100) {
publisher.sendNext(i)
}
subscriber.expectNoMsg(1.second)
subscriber.expectNoMessage(1.second)
sub.request(1)
subscriber.expectNext(5050)
sub.cancel()
@ -82,7 +82,7 @@ class FlowConflateSpec extends StreamSpec {
for (i 1 to 100) {
publisher.sendNext(i)
}
subscriber.expectNoMsg(1.second)
subscriber.expectNoMessage(1.second)
sub.request(1)
subscriber.expectNext(5050)
sub.cancel()