diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collect.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collect.md index 7c13b08487..cc7e3d0e8d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collect.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collect.md @@ -15,8 +15,42 @@ Apply a partial function to each incoming element, if the partial function is de ## Description Apply a partial function to each incoming element, if the partial function is defined for a value the returned -value is passed downstream. Can often replace `filter` followed by `map` to achieve the same in one single operators. +value is passed downstream. This can often replace `filter` followed by `map` to achieve the same in one single operator. +@java[`collect` is supposed to be used with @apidoc[akka.japi.pf.PFBuilder] to construct the partial function. +There is also a @ref:[collectType](collectType.md) that often can be easier to use than the `PFBuilder` and +then combine with ordinary `filter` and `map` operators.] + +## Example + +Given stream element classes `Message`, `Ping`, and `Pong`, where `Ping` extends `Message` and `Pong` is an +unrelated class. + +Scala +: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect-elements } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect-elements } + + +From a stream of `Message` elements we would like to collect all elements of type `Ping` that have an `id != 0`, +and then covert to `Pong` with same id. + +Scala +: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect } + +@@@div { .group-java } +An alternative is to use `collectType`. The same conversion be written as follows, and it is as efficient. + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectType } + +@@@ + +## Reactive Streams semantics @@@div { .callout } @@ -27,4 +61,3 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi **completes** when upstream completes @@@ - diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collectType.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collectType.md index 9327761a5f..5601481615 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collectType.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/collectType.md @@ -12,8 +12,39 @@ Transform this stream by testing the type of each of the elements on which the e @@@ - - ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +Filter elements that is of a given type. + +## Example + +Given stream element classes `Message`, `Ping`, and `Pong`, where `Ping` extends `Message` and `Pong` is an +unrelated class. + +Scala +: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect-elements } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect-elements } + + +From a stream of `Message` elements we would like to collect all elements of type `Ping` that have an `id != 0`, +and then covert to `Pong` with same id. + +Scala +: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collectType } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectType } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the element is of the given type + +**backpressures** the element is of the given type and downstream backpressures + +**completes** when upstream completes + +@@@ diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index bc98492f47..ab6a7aa214 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -4,6 +4,7 @@ package jdocs.stream.operators; +import akka.japi.pf.PFBuilder; import akka.stream.Materializer; import akka.stream.javadsl.Flow; @@ -210,4 +211,45 @@ class SourceOrFlow { .throttle(1, Duration.ofSeconds(1)); // slow downstream // #conflateWithSeed } + + // #collect-elements + static interface Message {} + + static class Ping implements Message { + final int id; + + Ping(int id) { + this.id = id; + } + } + + static class Pong { + final int id; + + Pong(int id) { + this.id = id; + } + } + // #collect-elements + + void collectExample() { + // #collect + Flow flow = + Flow.of(Message.class) + .collect( + new PFBuilder() + .match(Ping.class, p -> p.id != 0, p -> new Pong(p.id)) + .build()); + // #collect + } + + void collectTypeExample() { + // #collectType + Flow flow = + Flow.of(Message.class) + .collectType(Ping.class) + .filter(p -> p.id != 0) + .map(p -> new Pong(p.id)); + // #collectType + } } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala new file mode 100644 index 0000000000..3036da5e15 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.stream.scaladsl.Flow + +object Collect { + //#collect-elements + trait Message + final case class Ping(id: Int) extends Message + final case class Pong(id: Int) + //#collect-elements + + def collectExample(): Unit = { + //#collect + val flow: Flow[Message, Pong, NotUsed] = + Flow[Message].collect { + case Ping(id) if id != 0 => Pong(id) + } + //#collect + } + + def collectType(): Unit = { + //#collectType + val flow: Flow[Message, Pong, NotUsed] = + Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id)) + //#collectType + } +}