From 130a200a859f64c694c91942bf65c9130fe79a39 Mon Sep 17 00:00:00 2001 From: Adrian <1502731+adrian-salajan@users.noreply.github.com> Date: Wed, 13 Jan 2021 08:48:59 +0200 Subject: [PATCH] Add askWithStatus Streams operator, #29504 (#29727) --- .../operators/ActorFlow/askWithStatus.md | 61 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../akka/stream/typed/javadsl/ActorFlow.scala | 30 ++++++++- .../stream/typed/scaladsl/ActorFlow.scala | 26 +++++++- .../docs/javadsl/ActorFlowCompileTest.java | 20 ++++++ .../scala/docs/scaladsl/ActorFlowSpec.scala | 47 ++++++++++++++ 6 files changed, 183 insertions(+), 3 deletions(-) create mode 100644 akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md diff --git a/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md new file mode 100644 index 0000000000..f57b937bae --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatus.md @@ -0,0 +1,61 @@ +# ActorFlow.askWithStatus + +Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream. + +@ref[Actor interop operators](../index.md#actor-interop-operators) + +## Dependency + +This operator is included in: + +@@dependency[sbt,Maven,Gradle] { + symbol1=AkkaVersion + value1="$akka.version$" + group="com.typesafe.akka" + artifact="akka-stream-typed_$scala.binary.version$" + version=AkkaVersion +} + +## Signature + +@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](parallelism:Int)(ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=>Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[I,A,akka.NotUsed]" java ="#askWithStatus[I,Q,A](parallelism:Int,ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q]):akka.stream.javadsl.Flow[I,A,akka.NotUsed]" } +@apidoc[ActorFlow.askWithStatus](ActorFlow$) { scala="#askWithStatus[I,Q,A](ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=>Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[I,A,akka.NotUsed]" java ="#askWithStatus[I,Q,A](ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q]):akka.stream.javadsl.Flow[I,A,akka.NotUsed]" } + +## Description + +Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor when you expect the reply to be `akka.pattern.StatusReply`. +If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException]. + +The `askWithStatus` operator requires + +* the actor `ref`, +* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message +* a timeout. + + +## Examples + +The `ActorFlow.askWithStatus` sends a message to the actor. The actor expects `AskingWithStatus` messages which contain the actor ref for replies of type @scala[`StatusReply[String]`]@java[`StatusReply`]. When the actor for replies receives a reply, the `ActorFlow.askWihStatus` stream stage emits the reply and the `map` extracts the message `String`. + +Scala +: @@snip [ask.scala](/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask } + +Java +: @@snip [ask.java](/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask } + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the futures (in submission order) created by the ask pattern internally are completed + +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +**fails** when the passed-in actor terminates, or when any of the `askWithStatus`s exceed a timeout + +**cancels** when downstream cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 3ae52747d8..af7d108932 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -325,6 +325,7 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.| +|ActorFlow|@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream.| |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| ## Compression operators @@ -368,6 +369,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [asJavaStream](StreamConverters/asJavaStream.md) * [ask](Source-or-Flow/ask.md) * [ask](ActorFlow/ask.md) +* [askWithStatus](ActorFlow/askWithStatus.md) * [asOutputStream](StreamConverters/asOutputStream.md) * [asPublisher](Sink/asPublisher.md) * [asSourceWithContext](Source/asSourceWithContext.md) diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala index ad085773d4..fcff69e564 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala @@ -7,9 +7,9 @@ package akka.stream.typed.javadsl import java.util.function.BiFunction import scala.concurrent.duration._ - import akka.NotUsed import akka.actor.typed.ActorRef +import akka.pattern.StatusReply import akka.stream.javadsl.Flow import akka.util.JavaDurationConverters @@ -64,6 +64,20 @@ object ActorFlow { JavaDurationConverters.asFiniteDuration(timeout)) .asJava + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatus[I, Q, A]( + ref: ActorRef[Q], + timeout: java.time.Duration, + makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = + akka.stream.typed.scaladsl.ActorFlow + .askWithStatus[I, Q, A](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))( + JavaDurationConverters.asFiniteDuration(timeout)) + .asJava + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]]. @@ -106,4 +120,18 @@ object ActorFlow { .ask[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis) .asJava + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatus[I, Q, A]( + parallelism: Int, + ref: ActorRef[Q], + timeout: java.time.Duration, + makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[I, A, NotUsed] = + akka.stream.typed.scaladsl.ActorFlow + .askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis) + .asJava + } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala index db29083802..dd6f11d7a4 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala @@ -6,10 +6,9 @@ package akka.stream.typed.scaladsl import scala.annotation.implicitNotFound import scala.concurrent.Future - import akka.NotUsed import akka.actor.typed.ActorRef -import akka.pattern.AskTimeoutException +import akka.pattern.{ AskTimeoutException, StatusReply } import akka.stream._ import akka.stream.scaladsl._ import akka.util.Timeout @@ -128,4 +127,27 @@ object ActorFlow { askFlow } + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatus[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( + implicit timeout: Timeout): Flow[I, A, NotUsed] = + askWithStatus(2)(ref)(makeMessage) + + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatus[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( + implicit timeout: Timeout): Flow[I, A, NotUsed] = { + ActorFlow.ask(parallelism)(ref)(makeMessage).map { + case StatusReply.Success(a) => a.asInstanceOf[A] + case StatusReply.Error(err) => throw err + } + + } + } diff --git a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java index 8eb6e63df7..44d685291d 100644 --- a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java +++ b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java @@ -8,6 +8,7 @@ import akka.NotUsed; // #ask-actor import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; +import akka.pattern.StatusReply; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; @@ -32,6 +33,16 @@ public class ActorFlowCompileTest { } } + static class AskingWithStatus { + final String payload; + final ActorRef> replyTo; + + public AskingWithStatus(String payload, ActorRef> replyTo) { + this.payload = payload; + this.replyTo = replyTo; + } + } + // #ask-actor static // #ask-actor @@ -51,6 +62,11 @@ public class ActorFlowCompileTest { // #ask null; + // #ask + final ActorRef actorWithStatusRef = // ??? + // #ask + null; + // #ask Duration timeout = Duration.ofSeconds(1); @@ -61,6 +77,10 @@ public class ActorFlowCompileTest { Flow askFlowExplicit = ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); + Flow askFlowExplicitWithStatus = + ActorFlow.askWithStatus( + actorWithStatusRef, timeout, (msg, replyTo) -> new AskingWithStatus(msg, replyTo)); + Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); // #ask } diff --git a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala index 9cdf0a4739..1b257c5229 100644 --- a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala @@ -5,6 +5,7 @@ package docs.scaladsl import akka.NotUsed +import akka.pattern.StatusReply //#imports import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.typed.scaladsl.ActorFlow @@ -25,6 +26,8 @@ object ActorFlowSpec { final case class Asking(s: String, replyTo: ActorRef[Reply]) final case class Reply(msg: String) + final case class AskingWithStatus(s: String, replyTo: ActorRef[StatusReply[String]]) + //#ask-actor } @@ -42,6 +45,24 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { Behaviors.same }) + val replierWithSuccess = spawn(Behaviors.receiveMessage[AskingWithStatus] { + case AskingWithStatus("TERMINATE", _) => + Behaviors.stopped + + case asking => + asking.replyTo ! StatusReply.success(asking.s + "!!!") + Behaviors.same + }) + + val replierWithError = spawn(Behaviors.receiveMessage[AskingWithStatus] { + case AskingWithStatus("TERMINATE", _) => + Behaviors.stopped + + case asking => + asking.replyTo ! StatusReply.error("error!!!" + asking.s) + Behaviors.same + }) + "produce asked elements" in { val in: Future[immutable.Seq[Reply]] = Source @@ -53,6 +74,32 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { in.futureValue shouldEqual List.fill(3)(Reply("hello!!!")) } + "produced status success elements unwrap " in { + val in: Future[immutable.Seq[String]] = + Source + .repeat("hello") + .via(ActorFlow.askWithStatus(replierWithSuccess)((el, replyTo: ActorRef[StatusReply[String]]) => + AskingWithStatus(el, replyTo))) + .take(3) + .runWith(Sink.seq) + + in.futureValue shouldEqual List.fill(3)("hello!!!") + } + + "produce status error elements unwrap " in { + val in: Future[immutable.Seq[String]] = + Source + .repeat("hello") + .via(ActorFlow.askWithStatus(replierWithError)((el, replyTo: ActorRef[StatusReply[String]]) => + AskingWithStatus(el, replyTo))) + .take(3) + .runWith(Sink.seq) + + val v = in.failed.futureValue + v shouldBe a[StatusReply.ErrorMessage] + v.getMessage shouldEqual "error!!!hello" + } + "produce asked elements in order" in { //#ask-actor val ref = spawn(Behaviors.receiveMessage[Asking] { asking =>