=str add simplified ask(ref) that defaults parallism 2

This commit is contained in:
Konrad Malawski 2018-02-22 19:31:04 +09:00 committed by Konrad `ktoso` Malawski
parent 4714f16dcf
commit d6000df367
4 changed files with 114 additions and 2 deletions

View file

@ -114,6 +114,19 @@ class FlowAskSpec extends StreamSpec {
c.expectNext(Reply(3))
c.expectComplete()
}
"produce asked elements (simple ask)" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Reply]()
implicit val ec = system.dispatcher
val p = Source(1 to 3).ask[Reply](replyOnInts).runWith(Sink.fromSubscriber(c))
val sub = c.expectSubscription()
sub.request(2)
c.expectNext(Reply(1))
c.expectNext(Reply(2))
c.expectNoMessage(200.millis)
sub.request(2)
c.expectNext(Reply(3))
c.expectComplete()
}
"produce asked elements, when replies are akka.actor.Status.Success" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Reply]()
implicit val ec = system.dispatcher

View file

@ -575,6 +575,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
*
* The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
*
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
*
* Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
* a slightly healthier throughput.
*
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' any of the CompletionStages returned by the provided function complete
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
*
* '''Cancels when''' downstream cancels
*/
def ask[S](ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Flow[In, S, Mat] =
ask(2, ref, mapTo, timeout)
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].

View file

@ -1253,6 +1253,36 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism)(x f(x).toScala))
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
*
* The `mapTo` class parameter is used to cast the incoming responses to the expected response type.
*
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
*
* Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
* a slightly healthier throughput.
*
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' any of the CompletionStages returned by the provided function complete
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
*
* '''Cancels when''' downstream cancels
*/
def ask[S](ref: ActorRef, mapTo: Class[S], timeout: Timeout): javadsl.Source[S, Mat] =
ask(2, ref, mapTo, timeout)
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].

View file

@ -853,9 +853,48 @@ trait FlowOps[+Out, +Mat] {
*
* Do not forget to include the expected response type in the method call, like so:
*
* '''
* {{{
* flow.ask[ExpectedReply](ref)
* '''
* }}}
*
* otherwise `Nothing` will be assumed, which is most likely not what you want.
*
* Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
* a slightly healthier throughput.
*
* The mapTo class parameter is used to cast the incoming responses to the expected response type.
*
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
*
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
*
* '''Cancels when''' downstream cancels
*/
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
def ask[S](ref: ActorRef)(implicit timeout: Timeout, tag: ClassTag[S]): Repr[S] =
ask(2)(ref)(timeout, tag)
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[akka.pattern.AskTimeoutException]].
*
* Do not forget to include the expected response type in the method call, like so:
*
* {{{
* flow.ask[ExpectedReply](parallelism = 4)(ref)
* }}}
*
* otherwise `Nothing` will be assumed, which is most likely not what you want.
*