Docs: mergeLatest (#28586)

This commit is contained in:
Christopher Batey 2020-04-27 16:45:04 +01:00 committed by GitHub
parent 8e0c8d07b9
commit 0e3cfbf584
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 6 deletions

View file

@ -4,20 +4,30 @@ Merge multiple sources.
@ref[Fan-in operators](../index.md#fan-in-operators) @ref[Fan-in operators](../index.md#fan-in-operators)
@@@div { .group-scala }
## Signature ## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mergeLatest } @apidoc[Flow.mergeLatest](Flow) { scala="#mergeLatest[U%3E:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],eagerComplete:Boolean):FlowOps.this.Repr[scala.collection.immutable.Seq[U]]" java="#mergeLatest(akka.stream.Graph,boolean)" }
@@@
## Description ## Description
MergeLatest joins elements from N input streams into stream of lists of size N. MergeLatest joins elements from N input streams into stream of lists of size N.
i-th element in list is the latest emitted element from i-th input stream. The i-th element in list is the latest emitted element from i-th input stream.
MergeLatest emits list for each element emitted from some input stream, MergeLatest emits list for each element emitted from some input stream,
but only after each input stream emitted at least one element but only after each input stream emitted at least one element
If `eagerComplete` is set to true then it completes as soon as the first upstream
completes otherwise when all upstreams complete.
## Example
This example takes a stream of prices and quantities and emits the price each time the
price of quantity changes:
Scala
: @@snip [MergeLatest.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MergeLatest.scala) { #mergeLatest }
Java
: @@snip [MergeLatest.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MergeLatest.java) { #mergeLatest }
## Reactive Streams semantics ## Reactive Streams semantics

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Arrays;
public class MergeLatest {
private static final ActorSystem<Void> system = null;
public static void example() {
// #mergeLatest
Source<Integer, NotUsed> prices = Source.from(Arrays.asList(100, 101, 99, 103));
Source<Integer, NotUsed> quantities = Source.from(Arrays.asList(1, 3, 4, 2));
prices
.mergeLatest(quantities, true)
.map(priceAndQuantity -> priceAndQuantity.get(0) * priceAndQuantity.get(1))
.runForeach(System.out::println, system);
// prints something like:
// 100
// 101
// 303
// 297
// 396
// 412
// 206
// #mergeLatest
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object MergeLatest extends App {
implicit val system = ActorSystem()
//#mergeLatest
val prices = Source(List(100, 101, 99, 103))
val quantity = Source(List(1, 3, 4, 2))
prices
.mergeLatest(quantity)
.map {
case price :: quantity :: Nil => price * quantity
}
.runForeach(println)
// prints something like:
// 100
// 101
// 303
// 297
// 396
// 412
// 206
//#mergeLatest
}