Added support for context propagation to ActorFlow #31308 (#31314)

* Added askWithContext and askWithStatusAndContext to ActorFlow scala
  and java API
This commit is contained in:
Andrea Zito 2022-05-04 16:50:56 +02:00 committed by GitHub
parent 74600b046b
commit cc70d71aac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 301 additions and 28 deletions

View file

@ -0,0 +1,51 @@
# ActorFlow.askWithContext
Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.
@ref[Actor interop operators](../index.md#actor-interop-operators)
## Dependency
This operator is included in:
@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}
## Signature
@apidoc[ActorFlow.askWithContext](ActorFlow$) { scala="#askWithContext%5BI,Q,A,Ctx](ref:akka.actor.typed.ActorRef%5BQ])(makeMessage:(I,akka.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow%5B(I,Ctx),(A,Ctx),akka.NotUsed]" java="#askWithContext(akka.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" }
## Description
Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor.
The stream context is not sent, instead it is locally recombined to the actor's reply.
If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException].
The `ask` operator requires
* the actor `ref`,
* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message
* a timeout.
## Reactive Streams semantics
@@@div { .callout }
**emits** when the futures (in submission order) created by the ask pattern internally are completed
**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all futures have been completed and all elements have been emitted
**fails** when the passed-in actor terminates, or when any of the `ask`s exceed a timeout
**cancels** when downstream cancels
@@@

View file

@ -0,0 +1,51 @@
# ActorFlow.askWithContext
Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.
@ref[Actor interop operators](../index.md#actor-interop-operators)
## Dependency
This operator is included in:
@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}
## Signature
@apidoc[ActorFlow.askWithStatusAndContext](ActorFlow$) { scala="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int)(ref:akka.actor.typed.ActorRef[Q])(makeMessage:(I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]])=&gt;Q)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow[(I,Ctx),(A,Ctx),akka.NotUsed]" java ="#askWithStatusAndContext[I,Q,A,Ctx](parallelism:Int,ref:akka.actor.typed.ActorRef[Q],timeout:java.time.Duration,makeMessage:java.util.function.BiFunction[I,akka.actor.typed.ActorRef[akka.pattern.StatusReply[A]],Q])" }
## Description
Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor when you expect the reply to be `akka.pattern.StatusReply`.
The stream context is not sent, instead it is locally recombined to the actor's reply.
If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException].
The `ask` operator requires
* the actor `ref`,
* a `makeMessage` function to create the message sent to the actor from the incoming element, and the actor ref accepting the actor's reply message
* a timeout.
## Reactive Streams semantics
@@@div { .callout }
**emits** when the futures (in submission order) created by the ask pattern internally are completed
**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures
**completes** when upstream completes and all futures have been completed and all elements have been emitted
**fails** when the passed-in actor terminates, or when any of the `ask`s exceed a timeout
**cancels** when downstream cancels
@@@

View file

@ -330,7 +330,9 @@ Operators meant for inter-operating between Akka Streams and Actors:
|ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |ActorSink|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`] of the new actors API with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.| |ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.|
|ActorFlow|<a name="askwithcontext"></a>@ref[askWithContext](ActorFlow/askWithContext.md)|Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.|
|ActorFlow|<a name="askwithstatus"></a>@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.| |ActorFlow|<a name="askwithstatus"></a>@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.|
|ActorFlow|<a name="askwithstatusandcontext"></a>@ref[askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md)|Use the "Ask Pattern" to send each stream element (without the context) as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.|
|PubSub|<a name="sink"></a>@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].| |PubSub|<a name="sink"></a>@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].|
|PubSub|<a name="source"></a>@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. | |PubSub|<a name="source"></a>@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. |
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| |Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
@ -382,7 +384,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [asJavaStream](StreamConverters/asJavaStream.md) * [asJavaStream](StreamConverters/asJavaStream.md)
* [ask](Source-or-Flow/ask.md) * [ask](Source-or-Flow/ask.md)
* [ask](ActorFlow/ask.md) * [ask](ActorFlow/ask.md)
* [askWithContext](ActorFlow/askWithContext.md)
* [askWithStatus](ActorFlow/askWithStatus.md) * [askWithStatus](ActorFlow/askWithStatus.md)
* [askWithStatusAndContext](ActorFlow/askWithStatusAndContext.md)
* [asOutputStream](StreamConverters/asOutputStream.md) * [asOutputStream](StreamConverters/asOutputStream.md)
* [asPublisher](Sink/asPublisher.md) * [asPublisher](Sink/asPublisher.md)
* [asSourceWithContext](Source/asSourceWithContext.md) * [asSourceWithContext](Source/asSourceWithContext.md)

View file

@ -5,10 +5,10 @@
package akka.stream.typed.javadsl package akka.stream.typed.javadsl
import java.util.function.BiFunction import java.util.function.BiFunction
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.NotUsed import akka.NotUsed
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.japi.Pair
import akka.pattern.StatusReply import akka.pattern.StatusReply
import akka.stream.javadsl.Flow import akka.stream.javadsl.Flow
import akka.util.JavaDurationConverters import akka.util.JavaDurationConverters
@ -134,4 +134,58 @@ object ActorFlow {
.askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis) .askWithStatus[I, Q, A](parallelism)(ref)((i, ref) => makeMessage(i, ref))(timeout.toMillis.millis)
.asJava .asJava
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context.
*/
def askWithContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
akka.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
.via(
akka.stream.typed.scaladsl.ActorFlow
.askWithContext[I, Q, A, Ctx](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
.map { case (a, ctx) => Pair(a, ctx) })
.asJava
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatusAndContext[I, Q, A, Ctx](
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[StatusReply[A]], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] =
akka.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
.via(
akka.stream.typed.scaladsl.ActorFlow
.askWithStatusAndContext[I, Q, A, Ctx](parallelism = 2)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
.map { case (a, ctx) => Pair(a, ctx) })
.asJava
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context.
*/
def askWithContext[I, Q, A, Ctx](
parallelism: Int,
ref: ActorRef[Q],
timeout: java.time.Duration,
makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[Pair[I, Ctx], Pair[A, Ctx], NotUsed] = {
akka.stream.scaladsl
.Flow[Pair[I, Ctx]]
.map(_.toScala)
.via(
akka.stream.typed.scaladsl.ActorFlow
.askWithContext[I, Q, A, Ctx](parallelism)(ref)((i, ref) => makeMessage(i, ref))(
JavaDurationConverters.asFiniteDuration(timeout))
.map { case (a, ctx) => Pair(a, ctx) })
.asJava
}
} }

View file

@ -8,6 +8,7 @@ import scala.annotation.implicitNotFound
import scala.concurrent.Future import scala.concurrent.Future
import akka.NotUsed import akka.NotUsed
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.dispatch.ExecutionContexts
import akka.pattern.{ AskTimeoutException, StatusReply } import akka.pattern.{ AskTimeoutException, StatusReply }
import akka.stream._ import akka.stream._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
@ -20,6 +21,36 @@ object ActorFlow {
// TODO would be nice to provide Implicits to allow .ask() directly on Flow/Source // TODO would be nice to provide Implicits to allow .ask() directly on Flow/Source
private def askImpl[I, Q, A, O](parallelism: Int)(ref: ActorRef[Q])(
makeMessage: (I, ActorRef[A]) => Q,
makeOut: (I, Future[A]) => Future[O])(implicit timeout: Timeout): Flow[I, O, NotUsed] = {
import akka.actor.typed.scaladsl.adapter._
val classicRef = ref.toClassic
val askFlow = Flow[I]
.watch(classicRef)
.mapAsync(parallelism) { el =>
val res = akka.pattern.extended.ask(classicRef, (replyTo: akka.actor.ActorRef) => makeMessage(el, replyTo))
// we need to cast manually (yet safely, by construction!) since otherwise we need a ClassTag,
// which in Scala is fine, but then we would force JavaDSL to create one, which is a hassle in the Akka Typed DSL,
// since one may say "but I already specified the type!", and that we have to go via the classic ask is an implementation detail
makeOut(el, res.asInstanceOf[Future[A]])
}
.mapError {
case ex: AskTimeoutException =>
// in Akka Typed we use the `TimeoutException` everywhere
new java.util.concurrent.TimeoutException(ex.getMessage)
// the purpose of this recovery is to change the name of the stage in that exception
// we do so in order to help users find which stage caused the failure -- "the ask stage"
case ex: WatchedActorTerminatedException =>
new WatchedActorTerminatedException("ask()", ex.ref)
}
.named("ask")
askFlow
}
/** /**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor. * Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]]. * If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
@ -99,33 +130,7 @@ object ActorFlow {
*/ */
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage") @implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)( def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(
implicit timeout: Timeout): Flow[I, A, NotUsed] = { implicit timeout: Timeout): Flow[I, A, NotUsed] = askImpl(parallelism)(ref)(makeMessage, (_, o: Future[A]) => o)
import akka.actor.typed.scaladsl.adapter._
val classicRef = ref.toClassic
val askFlow = Flow[I]
.watch(classicRef)
.mapAsync(parallelism) { el =>
val res = akka.pattern.extended.ask(classicRef, (replyTo: akka.actor.ActorRef) => makeMessage(el, replyTo))
// we need to cast manually (yet safely, by construction!) since otherwise we need a ClassTag,
// which in Scala is fine, but then we would force JavaDSL to create one, which is a hassle in the Akka Typed DSL,
// since one may say "but I already specified the type!", and that we have to go via the classic ask is an implementation detail
res.asInstanceOf[Future[A]]
}
.mapError {
case ex: AskTimeoutException =>
// in Akka Typed we use the `TimeoutException` everywhere
new java.util.concurrent.TimeoutException(ex.getMessage)
// the purpose of this recovery is to change the name of the stage in that exception
// we do so in order to help users find which stage caused the failure -- "the ask stage"
case ex: WatchedActorTerminatedException =>
new WatchedActorTerminatedException("ask()", ex.ref)
}
.named("ask")
askFlow
}
/** /**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
@ -151,4 +156,48 @@ object ActorFlow {
} }
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context.
*/
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
def askWithContext[I, Q, A, Ctx](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(
implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] =
askWithContext(parallelism = 2)(ref)(makeMessage)
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor without including the context.
*/
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
def askWithContext[I, Q, A, Ctx](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) => Q)(
implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] =
askImpl[(I, Ctx), Q, A, (A, Ctx)](parallelism)(ref)(
(in, r) => makeMessage(in._1, r),
(in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic))
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatusAndContext[I, Q, A, Ctx](ref: ActorRef[Q])(makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(
implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] =
askWithStatusAndContext(2)(ref)(makeMessage)
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatusAndContext[I, Q, A, Ctx](parallelism: Int)(ref: ActorRef[Q])(
makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = {
askImpl[(I, Ctx), Q, StatusReply[A], (StatusReply[A], Ctx)](parallelism)(ref)(
(in, r) => makeMessage(in._1, r),
(in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)).map {
case (StatusReply.Success(a), ctx) => a.asInstanceOf[A] -> ctx
case (StatusReply.Error(err), _) => throw err
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
}
} }

View file

@ -83,5 +83,28 @@ public class ActorFlowCompileTest {
Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system);
// #ask // #ask
// #askWithContext
// method reference notation
Flow<akka.japi.Pair<String, Long>, akka.japi.Pair<Reply, Long>, NotUsed> askFlowWithContext =
ActorFlow.askWithContext(actorRef, timeout, Asking::new);
// explicit creation of the sent message
Flow<akka.japi.Pair<String, Long>, akka.japi.Pair<Reply, Long>, NotUsed>
askFlowExplicitWithContext =
ActorFlow.askWithContext(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo));
Flow<akka.japi.Pair<String, Long>, akka.japi.Pair<String, Long>, NotUsed>
askFlowExplicitWithStatusAndContext =
ActorFlow.askWithStatusAndContext(
actorWithStatusRef, timeout, (msg, replyTo) -> new AskingWithStatus(msg, replyTo));
Source.repeat("hello")
.zipWithIndex()
.via(askFlowWithContext)
.map(pair -> pair.first().msg)
.runWith(Sink.seq(), system);
// #askWithContext
} }
} }

View file

@ -6,6 +6,7 @@ package docs.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.pattern.StatusReply import akka.pattern.StatusReply
//#imports //#imports
import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.typed.scaladsl.ActorFlow import akka.stream.typed.scaladsl.ActorFlow
@ -75,6 +76,18 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
in.futureValue shouldEqual List.fill(3)(Reply("hello!!!")) in.futureValue shouldEqual List.fill(3)(Reply("hello!!!"))
} }
"produce asked elements with context " in {
val in: Future[immutable.Seq[(Reply, Long)]] =
Source
.repeat("hello")
.zipWithIndex
.via(ActorFlow.askWithContext(replier)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)))
.take(3)
.runWith(Sink.seq)
in.futureValue shouldEqual List.fill(3)(Reply("hello!!!")).zipWithIndex.map { case (r, i) => r -> i.toLong }
}
"produced status success elements unwrap " in { "produced status success elements unwrap " in {
val in: Future[immutable.Seq[String]] = val in: Future[immutable.Seq[String]] =
Source Source
@ -87,6 +100,19 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
in.futureValue shouldEqual List.fill(3)("hello!!!") in.futureValue shouldEqual List.fill(3)("hello!!!")
} }
"produced status success elements unwrap with context " in {
val in: Future[immutable.Seq[(String, Long)]] =
Source
.repeat("hello")
.zipWithIndex
.via(ActorFlow.askWithStatusAndContext(replierWithSuccess)((el, replyTo: ActorRef[StatusReply[String]]) =>
AskingWithStatus(el, replyTo)))
.take(3)
.runWith(Sink.seq)
in.futureValue shouldEqual List.fill(3)("hello!!!").zipWithIndex.map { case (r, i) => r -> i.toLong }
}
"produce status error elements unwrap " in { "produce status error elements unwrap " in {
val in: Future[immutable.Seq[String]] = val in: Future[immutable.Seq[String]] =
Source Source
@ -101,6 +127,21 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
v.getMessage shouldEqual "error!!!hello" v.getMessage shouldEqual "error!!!hello"
} }
"produce status error elements unwrap with context" in {
val in: Future[immutable.Seq[(String, Long)]] =
Source
.repeat("hello")
.zipWithIndex
.via(ActorFlow.askWithStatusAndContext(replierWithError)((el, replyTo: ActorRef[StatusReply[String]]) =>
AskingWithStatus(el, replyTo)))
.take(3)
.runWith(Sink.seq)
val v = in.failed.futureValue
v shouldBe a[StatusReply.ErrorMessage]
v.getMessage shouldEqual "error!!!hello"
}
"produce asked elements in order" in { "produce asked elements in order" in {
//#ask-actor //#ask-actor
val ref = spawn(Behaviors.receiveMessage[Asking] { asking => val ref = spawn(Behaviors.receiveMessage[Asking] { asking =>