Merge pull request #25152 from gabfssilva/master

Add method foreachAsync to Sink object
This commit is contained in:
Patrik Nordwall 2018-09-14 17:50:57 +02:00 committed by GitHub
commit c132a81f0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 374 additions and 0 deletions

View file

@ -0,0 +1,38 @@
# foreachAsync
Invoke a given procedure asynchronously for each element received.
@ref[Sink operators](../index.md#sink-operators)
@@@div { .group-scala }
## Signature
@@signature [Sink.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala) { #foreachAsync }
@@@
## Description
Invoke a given procedure asynchronously for each element received. Note that if shared state is mutated from the procedure that must be done in a thread-safe way.
The sink materializes into a @scala[`Future[Done]`] @java[`CompletionStage<Done>`] which completes when the
stream completes, or fails if the stream fails.
## Example
Scala
: @@snip [SinkRecipeDocSpec.scala](/akka-docs/src/test/scala/docs/stream/SinkRecipeDocSpec.scala) { #forseachAsync-processing }
Java
: @@snip [SinkRecipeDocTest.java](/akka-docs/src/test/java/jdocs/stream/SinkRecipeDocTest.java) { #forseachAsync-processing }
@@@div { .callout }
**cancels** when a @scala[`Future`] @java[`CompletionStage`] fails
**backpressures** when the number of @scala[`Future`s] @java[`CompletionStage`s] reaches the configured parallelism
@@@

View file

@ -49,6 +49,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|Sink|<a name="fromsubscriber"></a>@ref[fromSubscriber](Sink/fromSubscriber.md)|Integration with Reactive Streams, wraps a `org.reactivestreams.Subscriber` as a sink.|
|Sink|<a name="head"></a>@ref[head](Sink/head.md)|Materializes into a @scala[`Future`] @java[`CompletionStage`] which completes with the first value arriving, after this the stream is canceled.|
@ -378,6 +379,7 @@ Operators meant for inter-operating between Akka Streams and Actors:
* [asPublisher](Sink/asPublisher.md)
* [ignore](Sink/ignore.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [combine](Sink/combine.md)
* [foreachParallel](Sink/foreachParallel.md)
* [fold](Sink/fold.md)

View file

@ -0,0 +1,44 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.function.Function;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import jdocs.AbstractJavaTest;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class SinkRecipeDocTest extends AbstractJavaTest {
static ActorSystem system;
static Materializer mat;
@BeforeClass
public static void setup() {
system = ActorSystem.create("SinkRecipeDocTest");
mat = ActorMaterializer.create(system);
}
@Test
public void foreachAsync() {
final Function<Integer, CompletionStage<Void>> asyncProcessing = param -> CompletableFuture.completedFuture(param).thenAccept(System.out::println);
//#forseachAsync-processing
//final Function<Integer, CompletionStage<Void>> asyncProcessing = _
final Source<Integer, NotUsed> numberSource = Source.range(1, 100);
numberSource
.runWith(Sink.foreachAsync(10, asyncProcessing), mat);
//#forseachAsync-processing
}
}

View file

@ -0,0 +1,24 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream
import akka.stream.scaladsl.{ Sink, Source }
import docs.stream.cookbook.RecipeSpec
import scala.concurrent.Future
class SinkRecipeDocSpec extends RecipeSpec {
"Sink.foreachAsync" must {
"processing each element asynchronously" in {
def asyncProcessing(value: Int): Future[Unit] = Future { println(value) }(system.dispatcher)
//#forseachAsync-processing
//def asyncProcessing(value: Int): Future[Unit] = _
Source(1 to 100)
.runWith(Sink.foreachAsync(10)(asyncProcessing))
//#forseachAsync-processing
}
}
}

View file

@ -0,0 +1,246 @@
/**
* Copyright (C) 2015-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.{ CountDownLatch, Executors, TimeUnit }
import akka.Done
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.ActorMaterializer
import akka.stream.Supervision.{ resumingDecider, stoppingDecider }
import akka.stream.testkit.StreamSpec
import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.testkit.{ TestLatch, TestProbe }
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.language.postfixOps
import scala.util.control.NoStackTrace
class SinkForeachAsyncSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
"A foreachAsync" must {
"handle empty source" in assertAllStagesStopped {
import system.dispatcher
val p = Source(List.empty[Int]).runWith(Sink.foreachAsync(3)(a Future {}))
Await.result(p, remainingOrDefault)
}
"be able to run elements in parallel" in assertAllStagesStopped {
implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(4))
val probe = TestProbe()
val latch = (1 to 4).map(_ TestLatch(1)).toMap
val sink: Sink[Int, Future[Done]] = {
Sink.foreachAsync(4) { n: Int
Future {
Await.result(latch(n), remainingOrDefault)
probe.ref ! n
}
}
}
val p = Source(1 to 4).runWith(sink)
latch(1).countDown()
probe.expectMsg(1)
latch(2).countDown()
probe.expectMsg(2)
latch(3).countDown()
probe.expectMsg(3)
latch(4).countDown()
probe.expectMsg(4)
Await.result(p, 4.seconds)
assert(p.isCompleted)
}
"back-pressure upstream elements when downstream is slow" in assertAllStagesStopped {
import scala.concurrent.duration._
implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
val probe = TestProbe()
val latch = (1 to 4).map(_ TestLatch(1)).toMap
val sink: Sink[() Int, Future[Done]] = {
Sink.foreachAsync(1) { (n: () Int)
Future {
Await.result(latch(n()), remainingOrDefault)
probe.ref ! n()
Thread.sleep(2000)
}
}
}
@volatile var oneCalled = false
@volatile var twoCalled = false
@volatile var threeCalled = false
@volatile var fourCalled = false
def one = {
oneCalled = true; 1
}
def two = {
twoCalled = true; 2
}
def three = {
threeCalled = true; 3
}
def four = {
fourCalled = true; 4
}
val p =
Source(List(one _, two _, three _, four _))
.runWith(sink)
latch(1).countDown()
probe.expectMsg(1)
assert(!twoCalled)
assert(!threeCalled)
assert(!fourCalled)
probe.expectNoMessage(2 seconds)
latch(2).countDown()
probe.expectMsg(2)
assert(!threeCalled)
assert(!fourCalled)
probe.expectNoMessage(2 seconds)
latch(3).countDown()
probe.expectMsg(3)
assert(!fourCalled)
probe.expectNoMessage(2 seconds)
latch(4).countDown()
probe.expectMsg(4)
Await.result(p, 4.seconds)
assert(p.isCompleted)
assert(oneCalled)
assert(twoCalled)
assert(threeCalled)
assert(fourCalled)
}
}
"produce elements in the order they are ready" in assertAllStagesStopped {
import system.dispatcher
val probe = TestProbe()
val latch = (1 to 4).map(_ TestLatch(1)).toMap
val p = Source(1 to 4).runWith(Sink.foreachAsync(4)((n: Int) {
Future {
Await.ready(latch(n), 5.seconds)
probe.ref ! n
}
}))
latch(2).countDown()
probe.expectMsg(2)
latch(4).countDown()
probe.expectMsg(4)
latch(3).countDown()
probe.expectMsg(3)
assert(!p.isCompleted)
latch(1).countDown()
probe.expectMsg(1)
Await.result(p, 4.seconds)
assert(p.isCompleted)
}
"not run more functions in parallel then specified" in {
import system.dispatcher
val probe = TestProbe()
val latch = (1 to 5).map(_ TestLatch()).toMap
val p = Source(1 to 5).runWith(Sink.foreachAsync(4)((n: Int) {
Future {
probe.ref ! n
Await.ready(latch(n), 5.seconds)
}
}))
probe.expectMsgAllOf(1, 2, 3, 4)
probe.expectNoMessage(200.millis)
assert(!p.isCompleted)
for (i 1 to 4) latch(i).countDown()
latch(5).countDown()
probe.expectMsg(5)
Await.result(p, 5.seconds)
assert(p.isCompleted)
}
"resume after failed future" in assertAllStagesStopped {
import system.dispatcher
val probe = TestProbe()
val latch = TestLatch(1)
val p = Source(1 to 5).runWith(Sink.foreachAsync(4)((n: Int) {
Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else {
probe.ref ! n
Await.ready(latch, 10.seconds)
}
}
}).withAttributes(supervisionStrategy(resumingDecider)))
latch.countDown()
probe.expectMsgAllOf(1, 2, 4, 5)
Await.result(p, 5.seconds)
}
"finish after failed future" in assertAllStagesStopped {
import system.dispatcher
val probe = TestProbe()
val element4Latch = new CountDownLatch(1)
val errorLatch = new CountDownLatch(2)
val p = Source.fromIterator(() Iterator.from(1)).runWith(Sink.foreachAsync(3)((n: Int) {
Future {
if (n == 3) {
// Error will happen only after elements 1, 2 has been processed
errorLatch.await(5, TimeUnit.SECONDS)
throw new RuntimeException("err2") with NoStackTrace
} else {
probe.ref ! n
errorLatch.countDown()
element4Latch.await(5, TimeUnit.SECONDS) // Block element 4, 5, 6, ... from entering
}
}
}).withAttributes(supervisionStrategy(stoppingDecider)))
// Only the first two messages are guaranteed to arrive due to their enforced ordering related to the time
// of failure.
probe.expectMsgAllOf(1, 2)
element4Latch.countDown() // Release elements 4, 5, 6, ...
a[RuntimeException] must be thrownBy Await.result(p, 3.seconds)
}
}

View file

@ -99,6 +99,15 @@ object Sink {
def foreach[T](f: function.Procedure[T]): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.foreach(f.apply).toCompletionStage())
/**
* A `Sink` that will invoke the given procedure asynchronously for each received element. The sink is materialized
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def foreachAsync[T](parallelism: Int)(f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.foreachAsync(parallelism)((x: T) f(x).toScala.map(_ ())(ExecutionContexts.sameThreadExecutionContext)).toCompletionStage())
/**
* A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized
* into a [[java.util.concurrent.CompletionStage]].
@ -110,6 +119,7 @@ object Sink {
* [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the
* element is dropped and the stream continues.
*/
@deprecated("Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a CompletionStage or using CompletableFuture.supplyAsync.", since = "2.5.17")
def foreachParallel[T](parallel: Int)(f: function.Procedure[T])(ec: ExecutionContext): Sink[T, CompletionStage[Done]] =
new Sink(scaladsl.Sink.foreachParallel(parallel)(f.apply)(ec).toCompletionStage())

View file

@ -277,6 +277,15 @@ object Sink {
def foreach[T](f: T Unit): Sink[T, Future[Done]] =
Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
/**
* A `Sink` that will invoke the given procedure asynchronously for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def foreachAsync[T](parallelism: Int)(f: T Future[Unit]): Sink[T, Future[Done]] =
Flow[T].mapAsyncUnordered(parallelism)(f).toMat(Sink.ignore)(Keep.right).named("foreachAsyncSink")
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/
@ -310,6 +319,7 @@ object Sink {
*
* See also [[Flow.mapAsyncUnordered]]
*/
@deprecated("Use `foreachAsync` instead, it allows you to choose how to run the procedure, by calling some other API returning a Future or spawning a new Future.", since = "2.5.17")
def foreachParallel[T](parallelism: Int)(f: T Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] =
Flow[T].mapAsyncUnordered(parallelism)(t Future(f(t))).toMat(Sink.ignore)(Keep.right)