Docs: Sink.foreach (#28693)

This commit is contained in:
Enno 2020-03-10 09:37:34 +01:00 committed by GitHub
parent e819ae9f51
commit 52c83dca34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 23 deletions

View file

@ -4,23 +4,34 @@ Invoke a given procedure for each element received.
@ref[Sink operators](../index.md#sink-operators)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #forEach }
@@@
@apidoc[Sink.foreach](Sink$) { java="#foreach(akka.japi.function.Procedure)" scala="#foreach[T](f:T=%3EUnit):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[akka.Done]]" }
## Description
Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure.
The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage<Optional<Done>`] which completes when the
The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage<Done>`] which completes when the
stream completes, or fails if the stream fails.
Note that it is not safe to mutate state from the procedure.
See also:
* @ref[`foreachAsync`](foreachAsync.md) Invoke a given procedure asynchronously for each element received.
* @ref[`actorRef`](actorRef.md) Send the elements from the stream to an `ActorRef`.
## Example
This prints out every element to standard out.
Scala
: @@snip [snip](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #foreach }
Java
: @@snip [snip](/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java) { #foreach }
## Reactive Streams semantics
@@@div { .callout }

View file

@ -19,6 +19,11 @@ Invoke a given procedure asynchronously for each element received. Note that if
The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage<Done>`] which completes when the
stream completes, or fails if the stream fails.
See also:
* @ref[`foreach`](foreach.md) Invoke a given procedure for each element received.
* @ref[`actorRef`](actorRef.md) Send the elements from the stream to an `ActorRef`.
## Example
Scala

View file

@ -4,18 +4,12 @@ Like `foreach` but allows up to `parallellism` procedure calls to happen in para
@ref[Sink operators](../index.md#sink-operators)
@@@div { .group-scala }
@@@warning { title="Deprecated" }
## Signature
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #foreachParallel }
Use @ref[`foreachAsync`](foreachAsync.md) instead (this is deprecated since Akka 2.5.17).
@@@
## Description
Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.
## Reactive Streams semantics
@@@div { .callout }

View file

@ -9,11 +9,10 @@ import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.function.Function;
@ -153,4 +152,20 @@ public class SinkTest extends StreamTest {
akka.stream.scaladsl.Sink.cancelled();
Sink<Integer, NotUsed> javaSink = scalaSink.asJava();
}
@Test
public void sinkForeachMustBeDocumented()
throws InterruptedException, ExecutionException, TimeoutException {
// #foreach
Sink<Integer, CompletionStage<Done>> printlnSink = Sink.foreach(System.out::println);
CompletionStage<Done> cs = Source.from(Arrays.asList(1, 2, 3, 4)).runWith(printlnSink, system);
Done done = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
// will print
// 1
// 2
// 3
// 4
// #foreach
assertEquals(Done.done(), done);
}
}

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl
import akka.Done
import akka.stream._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
@ -12,10 +13,9 @@ import com.github.ghik.silencer.silent
import org.reactivestreams.Publisher
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
@silent("deprecated")
class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import GraphDSL.Implicits._
@ -153,7 +153,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name")
s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("name"), Name("headSink"))
s.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary))
@silent("deprecated")
val res = s.traversalBuilder.attributes.getFirst[Attributes.AsyncBoundary.type]
res shouldEqual (Some(AsyncBoundary))
}
"given one attribute of a class should correctly get it as first attribute with default value" in {
@ -174,7 +176,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async
s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("default")
@silent("deprecated")
val res = s.traversalBuilder.attributes.getFirst[Name](Name("default"))
res shouldEqual Name("default")
}
"given no attributes of a class when getting last attribute with default value should get default value" in {
@ -187,8 +191,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"given multiple attributes of a class when getting first attribute with default value should get first attribute" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async.named("name").named("another_name")
s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name")
@silent("deprecated")
val res = s.traversalBuilder.attributes.getFirst[Name](Name("default"))
res shouldEqual Name("name")
}
"given multiple attributes of a class when getting last attribute with default value should get last attribute" in {
@ -206,6 +211,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"The ignore sink" should {
"fail its materialized value on abrupt materializer termination" in {
@silent("deprecated")
val mat = ActorMaterializer()
val matVal = Source.maybe[Int].runWith(Sink.ignore)(mat)
@ -222,6 +228,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
val source = Source(1 to 10)
val result = source.runWith(Sink.reduce[Int]((a, b) => a + b))
result.map(println)(system.dispatcher)
// will print
// 55
//#reduce-operator-example
assert(result.futureValue == (1 to 10).sum)
@ -235,6 +242,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
val result = source.runWith(Sink.seq[Int])
val seq = result.futureValue
seq.foreach(println)
// will print
// 1
// 2
// 3
@ -243,6 +251,22 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
}
"The foreach sink" must {
"illustrate println" in {
// #foreach
val printlnSink: Sink[Any, Future[Done]] = Sink.foreach(println)
val f = Source(1 to 4).runWith(printlnSink)
val done = Await.result(f, 100.millis)
// will print
// 1
// 2
// 3
// 4
// #foreach
done shouldBe Done
}
}
"Sink pre-materialization" must {
"materialize the sink and wrap its exposed publisher in a Source" in {
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)