Add askWithStatus Streams operator, #29504 (#29727)

This commit is contained in:
Adrian 2021-01-13 08:48:59 +02:00 committed by GitHub
parent 96548837c6
commit 130a200a85
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 183 additions and 3 deletions

View file

@ -0,0 +1,61 @@
# ActorFlow.askWithStatus
Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.
@ref[Actor interop operators](../index.md#actor-interop-operators)
## Dependency
This operator is included in:
@@dependency[sbt,Maven,Gradle] {
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}
## Signature
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[I,A,akka.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q]):akka.stream.javadsl.Flow[I,A,akka.NotUsed]" }
@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[I,A,akka.NotUsed]" java ="#askWithStatus[I,Q,A](ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q]):akka.stream.javadsl.Flow[I,A,akka.NotUsed]" }
## Description
Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor when you expect the reply to be `akka.pattern.StatusReply`.
If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException].
The `askWithStatus` operator requires
* the actor `ref`,
* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message
* a timeout.
## Examples
The `ActorFlow.askWithStatus` sends a message to the actor. The actor expects `AskingWithStatus` messages which contain the actor ref for replies of type @scala[`StatusReply[String]`]@java[`StatusReply<String>`]. When the actor for replies receives a reply, the `ActorFlow.askWihStatus` stream stage emits the reply and the `map` extracts the message `String`.
Scala
: @@snip [ask.scala](/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask }
Java
: @@snip [ask.java](/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the futures (in submission order) created by the ask pattern internally are completed
**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all futures have been completed and all elements have been emitted
**fails** when the passed-in actor terminates, or when any of the `askWithStatus`s exceed a timeout
**cancels** when downstream cancels
@@@

View file

@ -325,6 +325,7 @@ Operators meant for inter-operating between Akka Streams and Actors:
|ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.|
|ActorFlow|<a name="askwithstatus"></a>@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.|
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
## Compression operators
@ -368,6 +369,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [asJavaStream](StreamConverters/asJavaStream.md)
* [ask](Source-or-Flow/ask.md)
* [ask](ActorFlow/ask.md)
* [askWithStatus](ActorFlow/askWithStatus.md)
* [asOutputStream](StreamConverters/asOutputStream.md)
* [asPublisher](Sink/asPublisher.md)
* [asSourceWithContext](Source/asSourceWithContext.md)

View file

@ -7,9 +7,9 @@ package akka.stream.typed.javadsl
import java.util.function.BiFunction
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.pattern.StatusReply
import akka.stream.javadsl.Flow
import akka.util.JavaDurationConverters
@ -64,6 +64,20 @@ object ActorFlow {
JavaDurationConverters.asFiniteDuration(timeout))
.asJava
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatus[I, Q, A](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
akka.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
.asJava
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
@ -106,4 +120,18 @@ object ActorFlow {
.ask[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis)
.asJava
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatus[I, Q, A](
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] =
akka.stream.typed.scaladsl.ActorFlow
.askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis)
.asJava
}

View file

@ -6,10 +6,9 @@ package akka.stream.typed.scaladsl
import scala.annotation.implicitNotFound
import scala.concurrent.Future
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.pattern.AskTimeoutException
import akka.pattern.{ AskTimeoutException, StatusReply }
import akka.stream._
import akka.stream.scaladsl._
import akka.util.Timeout
@ -128,4 +127,27 @@ object ActorFlow {
askFlow
}
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatus[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(
implicit timeout: Timeout): Flow[I, A, NotUsed] =
askWithStatus(2)(ref)(makeMessage)
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatus[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(
implicit timeout: Timeout): Flow[I, A, NotUsed] = {
ActorFlow.ask(parallelism)(ref)(makeMessage).map {
case StatusReply.Success(a) => a.asInstanceOf[A]
case StatusReply.Error(err) => throw err
}
}
}

View file

@ -8,6 +8,7 @@ import akka.NotUsed;
// #ask-actor
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.pattern.StatusReply;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
@ -32,6 +33,16 @@ public class ActorFlowCompileTest {
}
}
static class AskingWithStatus {
final String payload;
final ActorRef<StatusReply<String>> replyTo;
public AskingWithStatus(String payload, ActorRef<StatusReply<String>> replyTo) {
this.payload = payload;
this.replyTo = replyTo;
}
}
// #ask-actor
static
// #ask-actor
@ -51,6 +62,11 @@ public class ActorFlowCompileTest {
// #ask
null;
// #ask
final ActorRef<AskingWithStatus> actorWithStatusRef = // ???
// #ask
null;
// #ask
Duration timeout = Duration.ofSeconds(1);
@ -61,6 +77,10 @@ public class ActorFlowCompileTest {
Flow<String, Reply, NotUsed> askFlowExplicit =
ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo));
Flow<String, String, NotUsed> askFlowExplicitWithStatus =
ActorFlow.askWithStatus(
actorWithStatusRef, timeout, (msg, replyTo) -> new AskingWithStatus(msg, replyTo));
Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system);
// #ask
}

View file

@ -5,6 +5,7 @@
package docs.scaladsl
import akka.NotUsed
import akka.pattern.StatusReply
//#imports
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.typed.scaladsl.ActorFlow
@ -25,6 +26,8 @@ object ActorFlowSpec {
final case class Asking(s: String, replyTo: ActorRef[Reply])
final case class Reply(msg: String)
final case class AskingWithStatus(s: String, replyTo: ActorRef[StatusReply[String]])
//#ask-actor
}
@ -42,6 +45,24 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
Behaviors.same
})
val replierWithSuccess = spawn(Behaviors.receiveMessage[AskingWithStatus] {
case AskingWithStatus("TERMINATE", _) =>
Behaviors.stopped
case asking =>
asking.replyTo ! StatusReply.success(asking.s + "!!!")
Behaviors.same
})
val replierWithError = spawn(Behaviors.receiveMessage[AskingWithStatus] {
case AskingWithStatus("TERMINATE", _) =>
Behaviors.stopped
case asking =>
asking.replyTo ! StatusReply.error("error!!!" + asking.s)
Behaviors.same
})
"produce asked elements" in {
val in: Future[immutable.Seq[Reply]] =
Source
@ -53,6 +74,32 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
in.futureValue shouldEqual List.fill(3)(Reply("hello!!!"))
}
"produced status success elements unwrap " in {
val in: Future[immutable.Seq[String]] =
Source
.repeat("hello")
.via(ActorFlow.askWithStatus(replierWithSuccess)((el, replyTo: ActorRef[StatusReply[String]]) =>
AskingWithStatus(el, replyTo)))
.take(3)
.runWith(Sink.seq)
in.futureValue shouldEqual List.fill(3)("hello!!!")
}
"produce status error elements unwrap " in {
val in: Future[immutable.Seq[String]] =
Source
.repeat("hello")
.via(ActorFlow.askWithStatus(replierWithError)((el, replyTo: ActorRef[StatusReply[String]]) =>
AskingWithStatus(el, replyTo)))
.take(3)
.runWith(Sink.seq)
val v = in.failed.futureValue
v shouldBe a[StatusReply.ErrorMessage]
v.getMessage shouldEqual "error!!!hello"
}
"produce asked elements in order" in {
//#ask-actor
val ref = spawn(Behaviors.receiveMessage[Asking] { asking =>