Code example for collect and collectType (#27744)

Co-Authored-By: Helena Edelson <helena@users.noreply.github.com>
This commit is contained in:
Patrik Nordwall 2019-09-19 11:22:37 +02:00 committed by GitHub
parent edad69b38c
commit d8c8121c4d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 143 additions and 5 deletions

View file

@ -15,8 +15,42 @@ Apply a partial function to each incoming element, if the partial function is de
## Description
Apply a partial function to each incoming element, if the partial function is defined for a value the returned
value is passed downstream. Can often replace `filter` followed by `map` to achieve the same in one single operators.
value is passed downstream. This can often replace `filter` followed by `map` to achieve the same in one single operator.
@java[`collect` is supposed to be used with @apidoc[akka.japi.pf.PFBuilder] to construct the partial function.
There is also a @ref:[collectType](collectType.md) that often can be easier to use than the `PFBuilder` and
then combine with ordinary `filter` and `map` operators.]
## Example
Given stream element classes `Message`, `Ping`, and `Pong`, where `Ping` extends `Message` and `Pong` is an
unrelated class.
Scala
: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect-elements }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect-elements }
From a stream of `Message` elements we would like to collect all elements of type `Ping` that have an `id != 0`,
and then covert to `Pong` with same id.
Scala
: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect }
@@@div { .group-java }
An alternative is to use `collectType`. The same conversion be written as follows, and it is as efficient.
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectType }
@@@
## Reactive Streams semantics
@@@div { .callout }
@ -27,4 +61,3 @@ value is passed downstream. Can often replace `filter` followed by `map` to achi
**completes** when upstream completes
@@@

View file

@ -12,8 +12,39 @@ Transform this stream by testing the type of each of the elements on which the e
@@@
## Description
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
Filter elements that is of a given type.
## Example
Given stream element classes `Message`, `Ping`, and `Pong`, where `Ping` extends `Message` and `Pong` is an
unrelated class.
Scala
: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collect-elements }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collect-elements }
From a stream of `Message` elements we would like to collect all elements of type `Ping` that have an `id != 0`,
and then covert to `Pong` with same id.
Scala
: @@snip [Collect.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collectType }
Java
: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectType }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the element is of the given type
**backpressures** the element is of the given type and downstream backpressures
**completes** when upstream completes
@@@

View file

@ -4,6 +4,7 @@
package jdocs.stream.operators;
import akka.japi.pf.PFBuilder;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
@ -210,4 +211,45 @@ class SourceOrFlow {
.throttle(1, Duration.ofSeconds(1)); // slow downstream
// #conflateWithSeed
}
// #collect-elements
static interface Message {}
static class Ping implements Message {
final int id;
Ping(int id) {
this.id = id;
}
}
static class Pong {
final int id;
Pong(int id) {
this.id = id;
}
}
// #collect-elements
void collectExample() {
// #collect
Flow<Message, Pong, NotUsed> flow =
Flow.of(Message.class)
.collect(
new PFBuilder<Message, Pong>()
.match(Ping.class, p -> p.id != 0, p -> new Pong(p.id))
.build());
// #collect
}
void collectTypeExample() {
// #collectType
Flow<Message, Pong, NotUsed> flow =
Flow.of(Message.class)
.collectType(Ping.class)
.filter(p -> p.id != 0)
.map(p -> new Pong(p.id));
// #collectType
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.stream.scaladsl.Flow
object Collect {
//#collect-elements
trait Message
final case class Ping(id: Int) extends Message
final case class Pong(id: Int)
//#collect-elements
def collectExample(): Unit = {
//#collect
val flow: Flow[Message, Pong, NotUsed] =
Flow[Message].collect {
case Ping(id) if id != 0 => Pong(id)
}
//#collect
}
def collectType(): Unit = {
//#collectType
val flow: Flow[Message, Pong, NotUsed] =
Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id))
//#collectType
}
}