From 52c83dca34afa550e7b68cdce2862f0b48007f92 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Tue, 10 Mar 2020 09:37:34 +0100 Subject: [PATCH] Docs: Sink.foreach (#28693) --- .../paradox/stream/operators/Sink/foreach.md | 23 ++++++++---- .../stream/operators/Sink/foreachAsync.md | 5 +++ .../stream/operators/Sink/foreachParallel.md | 10 ++---- .../java/akka/stream/javadsl/SinkTest.java | 21 +++++++++-- .../scala/akka/stream/scaladsl/SinkSpec.scala | 36 +++++++++++++++---- 5 files changed, 72 insertions(+), 23 deletions(-) diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/foreach.md b/akka-docs/src/main/paradox/stream/operators/Sink/foreach.md index add1fb238a..7eabfba126 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/foreach.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/foreach.md @@ -4,23 +4,34 @@ Invoke a given procedure for each element received. @ref[Sink operators](../index.md#sink-operators) -@@@div { .group-scala } - ## Signature -@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #forEach } - -@@@ +@apidoc[Sink.foreach](Sink$) { java="#foreach(akka.japi.function.Procedure)" scala="#foreach[T](f:T=%3EUnit):akka.stream.scaladsl.Sink[T,scala.concurrent.Future[akka.Done]]" } ## Description Invoke a given procedure for each element received. Note that it is not safe to mutate shared state from the procedure. -The sink materializes into a @scala[`Future[Option[Done]]`] @java[`CompletionStage`] which completes when the +The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage`] which completes when the stream completes, or fails if the stream fails. Note that it is not safe to mutate state from the procedure. +See also: + +* @ref[`foreachAsync`](foreachAsync.md) Invoke a given procedure asynchronously for each element received. +* @ref[`actorRef`](actorRef.md) Send the elements from the stream to an `ActorRef`. + +## Example + +This prints out every element to standard out. + +Scala +: @@snip [snip](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala) { #foreach } + +Java +: @@snip [snip](/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java) { #foreach } + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md b/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md index ccfc1e72cb..ec38ddf497 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md @@ -19,6 +19,11 @@ Invoke a given procedure asynchronously for each element received. Note that if The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage`] which completes when the stream completes, or fails if the stream fails. +See also: + +* @ref[`foreach`](foreach.md) Invoke a given procedure for each element received. +* @ref[`actorRef`](actorRef.md) Send the elements from the stream to an `ActorRef`. + ## Example Scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/foreachParallel.md b/akka-docs/src/main/paradox/stream/operators/Sink/foreachParallel.md index ca34252429..6ff2a7be0e 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/foreachParallel.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/foreachParallel.md @@ -4,18 +4,12 @@ Like `foreach` but allows up to `parallellism` procedure calls to happen in para @ref[Sink operators](../index.md#sink-operators) -@@@div { .group-scala } +@@@warning { title="Deprecated" } -## Signature - -@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #foreachParallel } +Use @ref[`foreachAsync`](foreachAsync.md) instead (this is deprecated since Akka 2.5.17). @@@ -## Description - -Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel. - ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 2a4eedf181..7241f028f8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -9,11 +9,10 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.stream.Collectors; +import akka.Done; import akka.NotUsed; import akka.japi.Pair; import akka.japi.function.Function; @@ -153,4 +152,20 @@ public class SinkTest extends StreamTest { akka.stream.scaladsl.Sink.cancelled(); Sink javaSink = scalaSink.asJava(); } + + @Test + public void sinkForeachMustBeDocumented() + throws InterruptedException, ExecutionException, TimeoutException { + // #foreach + Sink> printlnSink = Sink.foreach(System.out::println); + CompletionStage cs = Source.from(Arrays.asList(1, 2, 3, 4)).runWith(printlnSink, system); + Done done = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); + // will print + // 1 + // 2 + // 3 + // 4 + // #foreach + assertEquals(Done.done(), done); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 307bc8de85..d51b77be55 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl +import akka.Done import akka.stream._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink @@ -12,10 +13,9 @@ import com.github.ghik.silencer.silent import org.reactivestreams.Publisher import org.scalatest.concurrent.ScalaFutures -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -@silent("deprecated") class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import GraphDSL.Implicits._ @@ -153,7 +153,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name") s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("name"), Name("headSink")) - s.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary)) + @silent("deprecated") + val res = s.traversalBuilder.attributes.getFirst[Attributes.AsyncBoundary.type] + res shouldEqual (Some(AsyncBoundary)) } "given one attribute of a class should correctly get it as first attribute with default value" in { @@ -174,7 +176,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async - s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("default") + @silent("deprecated") + val res = s.traversalBuilder.attributes.getFirst[Name](Name("default")) + res shouldEqual Name("default") } "given no attributes of a class when getting last attribute with default value should get default value" in { @@ -187,8 +191,9 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "given multiple attributes of a class when getting first attribute with default value should get first attribute" in { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async.named("name").named("another_name") - - s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name") + @silent("deprecated") + val res = s.traversalBuilder.attributes.getFirst[Name](Name("default")) + res shouldEqual Name("name") } "given multiple attributes of a class when getting last attribute with default value should get last attribute" in { @@ -206,6 +211,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "The ignore sink" should { "fail its materialized value on abrupt materializer termination" in { + @silent("deprecated") val mat = ActorMaterializer() val matVal = Source.maybe[Int].runWith(Sink.ignore)(mat) @@ -222,6 +228,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { val source = Source(1 to 10) val result = source.runWith(Sink.reduce[Int]((a, b) => a + b)) result.map(println)(system.dispatcher) + // will print // 55 //#reduce-operator-example assert(result.futureValue == (1 to 10).sum) @@ -235,6 +242,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { val result = source.runWith(Sink.seq[Int]) val seq = result.futureValue seq.foreach(println) + // will print // 1 // 2 // 3 @@ -243,6 +251,22 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The foreach sink" must { + "illustrate println" in { + // #foreach + val printlnSink: Sink[Any, Future[Done]] = Sink.foreach(println) + val f = Source(1 to 4).runWith(printlnSink) + val done = Await.result(f, 100.millis) + // will print + // 1 + // 2 + // 3 + // 4 + // #foreach + done shouldBe Done + } + } + "Sink pre-materialization" must { "materialize the sink and wrap its exposed publisher in a Source" in { val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)