diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md index 897564e1d6..28af71722f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeLatest.md @@ -4,20 +4,30 @@ Merge multiple sources. @ref[Fan-in operators](../index.md#fan-in-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #mergeLatest } - -@@@ +@apidoc[Flow.mergeLatest](Flow) { scala="#mergeLatest[U%3E:Out,M](that:akka.stream.Graph[akka.stream.SourceShape[U],M],eagerComplete:Boolean):FlowOps.this.Repr[scala.collection.immutable.Seq[U]]" java="#mergeLatest(akka.stream.Graph,boolean)" } ## Description MergeLatest joins elements from N input streams into stream of lists of size N. -i-th element in list is the latest emitted element from i-th input stream. +The i-th element in list is the latest emitted element from i-th input stream. MergeLatest emits list for each element emitted from some input stream, but only after each input stream emitted at least one element +If `eagerComplete` is set to true then it completes as soon as the first upstream +completes otherwise when all upstreams complete. + +## Example + +This example takes a stream of prices and quantities and emits the price each time the +price of quantity changes: + +Scala +: @@snip [MergeLatest.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MergeLatest.scala) { #mergeLatest } + +Java +: @@snip [MergeLatest.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MergeLatest.java) { #mergeLatest } + ## Reactive Streams semantics diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MergeLatest.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MergeLatest.java new file mode 100644 index 0000000000..41ca921ce2 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/MergeLatest.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.typed.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; + +public class MergeLatest { + + private static final ActorSystem system = null; + + public static void example() { + // #mergeLatest + Source prices = Source.from(Arrays.asList(100, 101, 99, 103)); + Source quantities = Source.from(Arrays.asList(1, 3, 4, 2)); + + prices + .mergeLatest(quantities, true) + .map(priceAndQuantity -> priceAndQuantity.get(0) * priceAndQuantity.get(1)) + .runForeach(System.out::println, system); + + // prints something like: + // 100 + // 101 + // 303 + // 297 + // 396 + // 412 + // 206 + // #mergeLatest + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MergeLatest.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MergeLatest.scala new file mode 100644 index 0000000000..7da08732b7 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/MergeLatest.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object MergeLatest extends App { + implicit val system = ActorSystem() + + //#mergeLatest + val prices = Source(List(100, 101, 99, 103)) + val quantity = Source(List(1, 3, 4, 2)) + + prices + .mergeLatest(quantity) + .map { + case price :: quantity :: Nil => price * quantity + } + .runForeach(println) + + // prints something like: + // 100 + // 101 + // 303 + // 297 + // 396 + // 412 + // 206 + //#mergeLatest +}