From d6000df36788678ad7f0d893053981a096289134 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Thu, 22 Feb 2018 19:31:04 +0900 Subject: [PATCH] =str add simplified ask(ref) that defaults parallism 2 --- .../akka/stream/scaladsl/FlowAskSpec.scala | 13 ++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 30 +++++++++++++ .../scala/akka/stream/javadsl/Source.scala | 30 +++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 43 ++++++++++++++++++- 4 files changed, 114 insertions(+), 2 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala index 184a20ee30..0c9d49df7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAskSpec.scala @@ -114,6 +114,19 @@ class FlowAskSpec extends StreamSpec { c.expectNext(Reply(3)) c.expectComplete() } + "produce asked elements (simple ask)" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Reply]() + implicit val ec = system.dispatcher + val p = Source(1 to 3).ask[Reply](replyOnInts).runWith(Sink.fromSubscriber(c)) + val sub = c.expectSubscription() + sub.request(2) + c.expectNext(Reply(1)) + c.expectNext(Reply(2)) + c.expectNoMessage(200.millis) + sub.request(2) + c.expectNext(Reply(3)) + c.expectComplete() + } "produce asked elements, when replies are akka.actor.Status.Success" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Reply]() implicit val ec = system.dispatcher diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ec3b01e65c..2d5064fc81 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -575,6 +575,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor. + * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. + * + * The `mapTo` class parameter is used to cast the incoming responses to the expected response type. + * + * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`. + * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message. + * + * Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one + * still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains + * a slightly healthier throughput. + * + * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' any of the CompletionStages returned by the provided function complete + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed + * + * '''Cancels when''' downstream cancels + */ + def ask[S](ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Flow[In, S, Mat] = + ask(2, ref, mapTo, timeout) + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index edafd5097a..b2b94f23d0 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1253,6 +1253,36 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapAsyncUnordered(parallelism)(x ⇒ f(x).toScala)) + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor. + * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. + * + * The `mapTo` class parameter is used to cast the incoming responses to the expected response type. + * + * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`. + * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message. + * + * Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one + * still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains + * a slightly healthier throughput. + * + * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' any of the CompletionStages returned by the provided function complete + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed + * + * '''Cancels when''' downstream cancels + */ + def ask[S](ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Source[S, Mat] = + ask(2, ref, mapTo, timeout) + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7d129ce092..22c649bd4c 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -853,9 +853,48 @@ trait FlowOps[+Out, +Mat] { * * Do not forget to include the expected response type in the method call, like so: * - * ''' + * {{{ * flow.ask[ExpectedReply](ref) - * ''' + * }}} + * + * otherwise `Nothing` will be assumed, which is most likely not what you want. + * + * Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one + * still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains + * a slightly healthier throughput. + * + * The mapTo class parameter is used to cast the incoming responses to the expected response type. + * + * Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`. + * An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message. + * + * The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed + * + * '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures + * + * '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted + * + * '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed + * + * '''Cancels when''' downstream cancels + */ + @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage") + def ask[S](ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] = + ask(2)(ref)(timeout, tag) + + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor. + * If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]]. + * + * Do not forget to include the expected response type in the method call, like so: + * + * {{{ + * flow.ask[ExpectedReply](parallelism = 4)(ref) + * }}} * * otherwise `Nothing` will be assumed, which is most likely not what you want. *