From a679f6f59e81ec1f1cbbba5c1b0fb6e199f1cd41 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Sun, 27 May 2018 21:49:12 -0300 Subject: [PATCH] Add method foreachAsync to Sink object --- .../stream/operators/Sink/foreachAsync.md | 38 +++ .../main/paradox/stream/operators/index.md | 2 + .../java/jdocs/stream/SinkRecipeDocTest.java | 44 ++++ .../scala/docs/stream/SinkRecipeDocSpec.scala | 24 ++ .../scaladsl/SinkForeachAsyncSpec.scala | 246 ++++++++++++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 10 + .../scala/akka/stream/scaladsl/Sink.scala | 10 + 7 files changed, 374 insertions(+) create mode 100644 akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md create mode 100644 akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java create mode 100644 akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachAsyncSpec.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md b/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md new file mode 100644 index 0000000000..d3993930b9 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Sink/foreachAsync.md @@ -0,0 +1,38 @@ +# foreachAsync + +Invoke a given procedure asynchronously for each element received. + +@ref[Sink operators](../index.md#sink-operators) + +@@@div { .group-scala } + +## Signature + +@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #foreachAsync } + +@@@ + +## Description + +Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way. + +The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage`] which completes when the +stream completes, or fails if the stream fails. + +## Example + +Scala +: @@snip [SinkRecipeDocSpec.scala](/akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala) { #forseachAsync-processing } + +Java +: @@snip [SinkRecipeDocTest.java](/akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java) { #forseachAsync-processing } + +@@@div { .callout } + +**cancels** when a @scala[`Future`] @java[`CompletionStage`] fails + +**backpressures** when the number of @scala[`Future`s] @java[`CompletionStage`s] reaches the configured parallelism + +@@@ + + diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 020df2c90f..370a3df8e5 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -49,6 +49,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| +|Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| |Sink|@ref[fromSubscriber](Sink/fromSubscriber.md)|Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.| |Sink|@ref[head](Sink/head.md)|Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, after this the stream is canceled.| @@ -378,6 +379,7 @@ Operators meant for inter-operating between Akka Streams and Actors: * [asPublisher](Sink/asPublisher.md) * [ignore](Sink/ignore.md) * [foreach](Sink/foreach.md) +* [foreachAsync](Sink/foreachAsync.md) * [combine](Sink/combine.md) * [foreachParallel](Sink/foreachParallel.md) * [fold](Sink/fold.md) diff --git a/akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java b/akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java new file mode 100644 index 0000000000..732df1b35b --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.stream; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.function.Function; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.Sink; +import jdocs.AbstractJavaTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public class SinkRecipeDocTest extends AbstractJavaTest { + static ActorSystem system; + static Materializer mat; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("SinkRecipeDocTest"); + mat = ActorMaterializer.create(system); + } + + @Test + public void foreachAsync() { + final Function> asyncProcessing = param -> CompletableFuture.completedFuture(param).thenAccept(System.out::println); + + //#forseachAsync-processing + //final Function> asyncProcessing = _ + + final Source numberSource = Source.range(1, 100); + + numberSource + .runWith(Sink.foreachAsync(10, asyncProcessing), mat); + //#forseachAsync-processing + } +} diff --git a/akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala b/akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala new file mode 100644 index 0000000000..64d01bcd00 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.stream + +import akka.stream.scaladsl.{ Sink, Source } +import docs.stream.cookbook.RecipeSpec + +import scala.concurrent.Future + +class SinkRecipeDocSpec extends RecipeSpec { + "Sink.foreachAsync" must { + "processing each element asynchronously" in { + def asyncProcessing(value: Int): Future[Unit] = Future { println(value) }(system.dispatcher) + //#forseachAsync-processing + //def asyncProcessing(value: Int): Future[Unit] = _ + + Source(1 to 100) + .runWith(Sink.foreachAsync(10)(asyncProcessing)) + //#forseachAsync-processing + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachAsyncSpec.scala new file mode 100644 index 0000000000..c038265ced --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkForeachAsyncSpec.scala @@ -0,0 +1,246 @@ +/** + * Copyright (C) 2015-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import java.util.concurrent.{ CountDownLatch, Executors, TimeUnit } + +import akka.Done +import akka.stream.ActorAttributes.supervisionStrategy +import akka.stream.ActorMaterializer +import akka.stream.Supervision.{ resumingDecider, stoppingDecider } +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.scaladsl.StreamTestKit._ +import akka.testkit.{ TestLatch, TestProbe } + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext, Future } +import scala.language.postfixOps +import scala.util.control.NoStackTrace + +class SinkForeachAsyncSpec extends StreamSpec { + implicit val materializer = ActorMaterializer() + + "A foreachAsync" must { + "handle empty source" in assertAllStagesStopped { + import system.dispatcher + val p = Source(List.empty[Int]).runWith(Sink.foreachAsync(3)(a ⇒ Future {})) + Await.result(p, remainingOrDefault) + } + + "be able to run elements in parallel" in assertAllStagesStopped { + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4)) + + val probe = TestProbe() + val latch = (1 to 4).map(_ → TestLatch(1)).toMap + + val sink: Sink[Int, Future[Done]] = { + Sink.foreachAsync(4) { n: Int ⇒ + Future { + Await.result(latch(n), remainingOrDefault) + probe.ref ! n + } + } + } + + val p = Source(1 to 4).runWith(sink) + + latch(1).countDown() + probe.expectMsg(1) + latch(2).countDown() + probe.expectMsg(2) + latch(3).countDown() + probe.expectMsg(3) + latch(4).countDown() + probe.expectMsg(4) + + Await.result(p, 4.seconds) + assert(p.isCompleted) + } + + "back-pressure upstream elements when downstream is slow" in assertAllStagesStopped { + import scala.concurrent.duration._ + + implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1)) + + val probe = TestProbe() + val latch = (1 to 4).map(_ → TestLatch(1)).toMap + + val sink: Sink[() ⇒ Int, Future[Done]] = { + Sink.foreachAsync(1) { (n: () ⇒ Int) ⇒ + Future { + Await.result(latch(n()), remainingOrDefault) + probe.ref ! n() + Thread.sleep(2000) + } + } + } + + @volatile var oneCalled = false + @volatile var twoCalled = false + @volatile var threeCalled = false + @volatile var fourCalled = false + + def one = { + oneCalled = true; 1 + } + + def two = { + twoCalled = true; 2 + } + + def three = { + threeCalled = true; 3 + } + + def four = { + fourCalled = true; 4 + } + + val p = + Source(List(one _, two _, three _, four _)) + .runWith(sink) + + latch(1).countDown() + probe.expectMsg(1) + + assert(!twoCalled) + assert(!threeCalled) + assert(!fourCalled) + + probe.expectNoMessage(2 seconds) + + latch(2).countDown() + probe.expectMsg(2) + + assert(!threeCalled) + assert(!fourCalled) + + probe.expectNoMessage(2 seconds) + + latch(3).countDown() + probe.expectMsg(3) + + assert(!fourCalled) + + probe.expectNoMessage(2 seconds) + + latch(4).countDown() + probe.expectMsg(4) + + Await.result(p, 4.seconds) + assert(p.isCompleted) + + assert(oneCalled) + assert(twoCalled) + assert(threeCalled) + assert(fourCalled) + } + } + + "produce elements in the order they are ready" in assertAllStagesStopped { + import system.dispatcher + + val probe = TestProbe() + val latch = (1 to 4).map(_ → TestLatch(1)).toMap + val p = Source(1 to 4).runWith(Sink.foreachAsync(4)((n: Int) ⇒ { + Future { + Await.ready(latch(n), 5.seconds) + probe.ref ! n + } + })) + latch(2).countDown() + probe.expectMsg(2) + latch(4).countDown() + probe.expectMsg(4) + latch(3).countDown() + probe.expectMsg(3) + + assert(!p.isCompleted) + + latch(1).countDown() + probe.expectMsg(1) + + Await.result(p, 4.seconds) + assert(p.isCompleted) + } + + "not run more functions in parallel then specified" in { + import system.dispatcher + + val probe = TestProbe() + val latch = (1 to 5).map(_ → TestLatch()).toMap + + val p = Source(1 to 5).runWith(Sink.foreachAsync(4)((n: Int) ⇒ { + Future { + probe.ref ! n + Await.ready(latch(n), 5.seconds) + } + })) + probe.expectMsgAllOf(1, 2, 3, 4) + probe.expectNoMessage(200.millis) + + assert(!p.isCompleted) + + for (i ← 1 to 4) latch(i).countDown() + + latch(5).countDown() + probe.expectMsg(5) + + Await.result(p, 5.seconds) + assert(p.isCompleted) + + } + + "resume after failed future" in assertAllStagesStopped { + import system.dispatcher + + val probe = TestProbe() + val latch = TestLatch(1) + + val p = Source(1 to 5).runWith(Sink.foreachAsync(4)((n: Int) ⇒ { + Future { + if (n == 3) throw new RuntimeException("err1") with NoStackTrace + else { + probe.ref ! n + Await.ready(latch, 10.seconds) + } + } + }).withAttributes(supervisionStrategy(resumingDecider))) + + latch.countDown() + probe.expectMsgAllOf(1, 2, 4, 5) + + Await.result(p, 5.seconds) + } + + "finish after failed future" in assertAllStagesStopped { + import system.dispatcher + + val probe = TestProbe() + val element4Latch = new CountDownLatch(1) + val errorLatch = new CountDownLatch(2) + + val p = Source.fromIterator(() ⇒ Iterator.from(1)).runWith(Sink.foreachAsync(3)((n: Int) ⇒ { + Future { + if (n == 3) { + // Error will happen only after elements 1, 2 has been processed + errorLatch.await(5, TimeUnit.SECONDS) + throw new RuntimeException("err2") with NoStackTrace + } else { + probe.ref ! n + errorLatch.countDown() + element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering + } + } + }).withAttributes(supervisionStrategy(stoppingDecider))) + + // Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time + // of failure. + probe.expectMsgAllOf(1, 2) + element4Latch.countDown() // Release elements 4, 5, 6, ... + + a[RuntimeException] must be thrownBy Await.result(p, 3.seconds) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 690de6cc64..195af3dfb9 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -99,6 +99,15 @@ object Sink { def foreach[T](f: function.Procedure[T]): Sink[T, CompletionStage[Done]] = new Sink(scaladsl.Sink.foreach(f.apply).toCompletionStage()) + /** + * A `Sink` that will invoke the given procedure asynchronously for each received element. The sink is materialized + * into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the + * normal end of the stream, or completed with `Failure` if there is a failure signaled in + * the stream. + */ + def foreachAsync[T](parallelism: Int)(f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] = + new Sink(scaladsl.Sink.foreachAsync(parallelism)((x: T) ⇒ f(x).toScala.map(_ ⇒ ())(ExecutionContexts.sameThreadExecutionContext)).toCompletionStage()) + /** * A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized * into a [[java.util.concurrent.CompletionStage]]. @@ -110,6 +119,7 @@ object Sink { * [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the * element is dropped and the stream continues. */ + @deprecated("Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.", since = "2.5.17") def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(ec: ExecutionContext): Sink[T, CompletionStage[Done]] = new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage()) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index e8f58aed92..ae7bee00bc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -277,6 +277,15 @@ object Sink { def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]] = Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink") + /** + * A `Sink` that will invoke the given procedure asynchronously for each received element. The sink is materialized + * into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the + * normal end of the stream, or completed with `Failure` if there is a failure signaled in + * the stream. + */ + def foreachAsync[T](parallelism: Int)(f: T ⇒ Future[Unit]): Sink[T, Future[Done]] = + Flow[T].mapAsyncUnordered(parallelism)(f).toMat(Sink.ignore)(Keep.right).named("foreachAsyncSink") + /** * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. */ @@ -310,6 +319,7 @@ object Sink { * * See also [[Flow.mapAsyncUnordered]] */ + @deprecated("Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a Future or spawning a new Future.", since = "2.5.17") def foreachParallel[T](parallelism: Int)(f: T ⇒ Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] = Flow[T].mapAsyncUnordered(parallelism)(t ⇒ Future(f(t))).toMat(Sink.ignore)(Keep.right)