diff --git a/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md new file mode 100644 index 0000000000..d7e27630bb --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithContext.md @@ -0,0 +1,51 @@ +# ActorFlow.askWithContext + +Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream. + +@ref[Actor interop operators](../index.md#actor-interop-operators) + +## Dependency + +This operator is included in: + +@@dependency[sbt,Maven,Gradle] { + bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion + symbol1=AkkaVersion + value1="$akka.version$" + group="com.typesafe.akka" + artifact="akka-stream-typed_$scala.binary.version$" + version=AkkaVersion +} + +## Signature + +@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:akka.actor.typed.ActorRef%5BQ])(makeMessage:(I,akka.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),akka.NotUsed]" java="#askWithContext(akka.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" } + +## Description + +Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor. +The stream context is not sent, instead it is locally recombined to the actor's reply. + +If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException]. + +The `ask` operator requires + +* the actor `ref`, +* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message +* a timeout. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the futures (in submission order) created by the ask pattern internally are completed + +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +**fails** when the passed-in actor terminates, or when any of the `ask`s exceed a timeout + +**cancels** when downstream cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md new file mode 100644 index 0000000000..fb8d068412 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/ActorFlow/askWithStatusAndContext.md @@ -0,0 +1,51 @@ +# ActorFlow.askWithContext + +Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream. + +@ref[Actor interop operators](../index.md#actor-interop-operators) + +## Dependency + +This operator is included in: + +@@dependency[sbt,Maven,Gradle] { + bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion + symbol1=AkkaVersion + value1="$akka.version$" + group="com.typesafe.akka" + artifact="akka-stream-typed_$scala.binary.version$" + version=AkkaVersion +} + +## Signature + +@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=>Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),akka.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q])" } + +## Description + +Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor when you expect the reply to be `akka.pattern.StatusReply`. +The stream context is not sent, instead it is locally recombined to the actor's reply. + +If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException]. + +The `ask` operator requires + +* the actor `ref`, +* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message +* a timeout. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the futures (in submission order) created by the ask pattern internally are completed + +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +**fails** when the passed-in actor terminates, or when any of the `ask`s exceed a timeout + +**cancels** when downstream cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index cae12a8068..a3b777f49e 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -330,7 +330,9 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.| +|ActorFlow|@ref[askWithContext](ActorFlow/askWithContext.md)|Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.| |ActorFlow|@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream.| +|ActorFlow|@ref[askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md)|Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream.| |PubSub|@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].| |PubSub|@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. | |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| @@ -382,7 +384,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [asJavaStream](StreamConverters/asJavaStream.md) * [ask](Source-or-Flow/ask.md) * [ask](ActorFlow/ask.md) +* [askWithContext](ActorFlow/askWithContext.md) * [askWithStatus](ActorFlow/askWithStatus.md) +* [askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md) * [asOutputStream](StreamConverters/asOutputStream.md) * [asPublisher](Sink/asPublisher.md) * [asSourceWithContext](Source/asSourceWithContext.md) diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala index e613e93b6c..f6e4c870d0 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala @@ -5,10 +5,10 @@ package akka.stream.typed.javadsl import java.util.function.BiFunction - import scala.concurrent.duration._ import akka.NotUsed import akka.actor.typed.ActorRef +import akka.japi.Pair import akka.pattern.StatusReply import akka.stream.javadsl.Flow import akka.util.JavaDurationConverters @@ -134,4 +134,58 @@ object ActorFlow { .askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis) .asJava + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context. + */ + def askWithContext[I, Q, A, Ctx]( + ref: ActorRef[Q], + timeout: java.time.Duration, + makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = + akka.stream.scaladsl + .Flow[Pair[I, Ctx]] + .map(_.toScala) + .via( + akka.stream.typed.scaladsl.ActorFlow + .askWithContext[I, Q, A, Ctx](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))( + JavaDurationConverters.asFiniteDuration(timeout)) + .map { case (a, ctx) => Pair(a, ctx) }) + .asJava + + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatusAndContext[I, Q, A, Ctx]( + ref: ActorRef[Q], + timeout: java.time.Duration, + makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = + akka.stream.scaladsl + .Flow[Pair[I, Ctx]] + .map(_.toScala) + .via( + akka.stream.typed.scaladsl.ActorFlow + .askWithStatusAndContext[I, Q, A, Ctx](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))( + JavaDurationConverters.asFiniteDuration(timeout)) + .map { case (a, ctx) => Pair(a, ctx) }) + .asJava + + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context. + */ + def askWithContext[I, Q, A, Ctx]( + parallelism: Int, + ref: ActorRef[Q], + timeout: java.time.Duration, + makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = { + akka.stream.scaladsl + .Flow[Pair[I, Ctx]] + .map(_.toScala) + .via( + akka.stream.typed.scaladsl.ActorFlow + .askWithContext[I, Q, A, Ctx](parallelism)(ref)((i, ref) => makeMessage(i, ref))( + JavaDurationConverters.asFiniteDuration(timeout)) + .map { case (a, ctx) => Pair(a, ctx) }) + .asJava + } } diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala index d3c7aa2314..c1a9bac9bb 100644 --- a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala @@ -8,6 +8,7 @@ import scala.annotation.implicitNotFound import scala.concurrent.Future import akka.NotUsed import akka.actor.typed.ActorRef +import akka.dispatch.ExecutionContexts import akka.pattern.{ AskTimeoutException, StatusReply } import akka.stream._ import akka.stream.scaladsl._ @@ -20,6 +21,36 @@ object ActorFlow { // TODO would be nice to provide Implicits to allow .ask() directly on Flow/Source + private def askImpl[I, Q, A, O](parallelism: Int)(ref: ActorRef[Q])( + makeMessage: (I, ActorRef[A]) => Q, + makeOut: (I, Future[A]) => Future[O])(implicit timeout: Timeout): Flow[I, O, NotUsed] = { + import akka.actor.typed.scaladsl.adapter._ + val classicRef = ref.toClassic + + val askFlow = Flow[I] + .watch(classicRef) + .mapAsync(parallelism) { el => + val res = akka.pattern.extended.ask(classicRef, (replyTo: akka.actor.ActorRef) => makeMessage(el, replyTo)) + // we need to cast manually (yet safely, by construction!) since otherwise we need a ClassTag, + // which in Scala is fine, but then we would force JavaDSL to create one, which is a hassle in the Akka Typed DSL, + // since one may say "but I already specified the type!", and that we have to go via the classic ask is an implementation detail + makeOut(el, res.asInstanceOf[Future[A]]) + } + .mapError { + case ex: AskTimeoutException => + // in Akka Typed we use the `TimeoutException` everywhere + new java.util.concurrent.TimeoutException(ex.getMessage) + + // the purpose of this recovery is to change the name of the stage in that exception + // we do so in order to help users find which stage caused the failure -- "the ask stage" + case ex: WatchedActorTerminatedException => + new WatchedActorTerminatedException("ask()", ex.ref) + } + .named("ask") + + askFlow + } + /** * Use the `ask` pattern to send a request-reply message to the target `ref` actor. * If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]]. @@ -99,33 +130,7 @@ object ActorFlow { */ @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage") def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)( - implicit timeout: Timeout): Flow[I, A, NotUsed] = { - import akka.actor.typed.scaladsl.adapter._ - val classicRef = ref.toClassic - - val askFlow = Flow[I] - .watch(classicRef) - .mapAsync(parallelism) { el => - val res = akka.pattern.extended.ask(classicRef, (replyTo: akka.actor.ActorRef) => makeMessage(el, replyTo)) - // we need to cast manually (yet safely, by construction!) since otherwise we need a ClassTag, - // which in Scala is fine, but then we would force JavaDSL to create one, which is a hassle in the Akka Typed DSL, - // since one may say "but I already specified the type!", and that we have to go via the classic ask is an implementation detail - res.asInstanceOf[Future[A]] - } - .mapError { - case ex: AskTimeoutException => - // in Akka Typed we use the `TimeoutException` everywhere - new java.util.concurrent.TimeoutException(ex.getMessage) - - // the purpose of this recovery is to change the name of the stage in that exception - // we do so in order to help users find which stage caused the failure -- "the ask stage" - case ex: WatchedActorTerminatedException => - new WatchedActorTerminatedException("ask()", ex.ref) - } - .named("ask") - - askFlow - } + implicit timeout: Timeout): Flow[I, A, NotUsed] = askImpl(parallelism)(ref)(makeMessage, (_, o: Future[A]) => o) /** * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response @@ -151,4 +156,48 @@ object ActorFlow { } + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context. + */ + @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage") + def askWithContext[I, Q, A, Ctx](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)( + implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = + askWithContext(parallelism = 2)(ref)(makeMessage) + + /** + * Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context. + */ + @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage") + def askWithContext[I, Q, A, Ctx](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)( + implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = + askImpl[(I, Ctx), Q, A, (A, Ctx)](parallelism)(ref)( + (in, r) => makeMessage(in._1, r), + (in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)) + + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatusAndContext[I, Q, A, Ctx](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)( + implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = + askWithStatusAndContext(2)(ref)(makeMessage) + + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatusAndContext[I, Q, A, Ctx](parallelism: Int)(ref: ActorRef[Q])( + makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = { + askImpl[(I, Ctx), Q, StatusReply[A], (StatusReply[A], Ctx)](parallelism)(ref)( + (in, r) => makeMessage(in._1, r), + (in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)).map { + case (StatusReply.Success(a), ctx) => a.asInstanceOf[A] -> ctx + case (StatusReply.Error(err), _) => throw err + case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser + } + + } + } diff --git a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java index 6cda12583c..46fca67558 100644 --- a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java +++ b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java @@ -83,5 +83,28 @@ public class ActorFlowCompileTest { Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); // #ask + + // #askWithContext + + // method reference notation + Flow, akka.japi.Pair, NotUsed> askFlowWithContext = + ActorFlow.askWithContext(actorRef, timeout, Asking::new); + + // explicit creation of the sent message + Flow, akka.japi.Pair, NotUsed> + askFlowExplicitWithContext = + ActorFlow.askWithContext(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); + + Flow, akka.japi.Pair, NotUsed> + askFlowExplicitWithStatusAndContext = + ActorFlow.askWithStatusAndContext( + actorWithStatusRef, timeout, (msg, replyTo) -> new AskingWithStatus(msg, replyTo)); + + Source.repeat("hello") + .zipWithIndex() + .via(askFlowWithContext) + .map(pair -> pair.first().msg) + .runWith(Sink.seq(), system); + // #askWithContext } } diff --git a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala index 29e8f94d39..eebc335e98 100644 --- a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala @@ -6,6 +6,7 @@ package docs.scaladsl import akka.NotUsed import akka.pattern.StatusReply + //#imports import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.typed.scaladsl.ActorFlow @@ -75,6 +76,18 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { in.futureValue shouldEqual List.fill(3)(Reply("hello!!!")) } + "produce asked elements with context " in { + val in: Future[immutable.Seq[(Reply, Long)]] = + Source + .repeat("hello") + .zipWithIndex + .via(ActorFlow.askWithContext(replier)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))) + .take(3) + .runWith(Sink.seq) + + in.futureValue shouldEqual List.fill(3)(Reply("hello!!!")).zipWithIndex.map { case (r, i) => r -> i.toLong } + } + "produced status success elements unwrap " in { val in: Future[immutable.Seq[String]] = Source @@ -87,6 +100,19 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { in.futureValue shouldEqual List.fill(3)("hello!!!") } + "produced status success elements unwrap with context " in { + val in: Future[immutable.Seq[(String, Long)]] = + Source + .repeat("hello") + .zipWithIndex + .via(ActorFlow.askWithStatusAndContext(replierWithSuccess)((el, replyTo: ActorRef[StatusReply[String]]) => + AskingWithStatus(el, replyTo))) + .take(3) + .runWith(Sink.seq) + + in.futureValue shouldEqual List.fill(3)("hello!!!").zipWithIndex.map { case (r, i) => r -> i.toLong } + } + "produce status error elements unwrap " in { val in: Future[immutable.Seq[String]] = Source @@ -101,6 +127,21 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { v.getMessage shouldEqual "error!!!hello" } + "produce status error elements unwrap with context" in { + val in: Future[immutable.Seq[(String, Long)]] = + Source + .repeat("hello") + .zipWithIndex + .via(ActorFlow.askWithStatusAndContext(replierWithError)((el, replyTo: ActorRef[StatusReply[String]]) => + AskingWithStatus(el, replyTo))) + .take(3) + .runWith(Sink.seq) + + val v = in.failed.futureValue + v shouldBe a[StatusReply.ErrorMessage] + v.getMessage shouldEqual "error!!!hello" + } + "produce asked elements in order" in { //#ask-actor val ref = spawn(Behaviors.receiveMessage[Asking] { asking =>