Akka Typed ask() for Akka Streams (#24892)
* +str,typ introduce akka typed ask for akka stream address feedback and add actor interop stages incl ask to docs more compile tests and adjusted things last docs * document adding stages to docs in CONTRIBUTING * address review comments * rebase conflicts
This commit is contained in:
parent
ae20ecaf99
commit
256f81f97e
13 changed files with 549 additions and 16 deletions
|
|
@ -383,6 +383,32 @@ tested it becomes an officially supported Akka feature.
|
|||
|
||||
[List of Akka features marked as may change](http://doc.akka.io/docs/akka/current/common/may-change.html)
|
||||
|
||||
## Contributing new Akka Streams stages/operators
|
||||
|
||||
Documentation of Akka Streams operators and stages is automatically enforced.
|
||||
If a method exists on Source / Sink / Flow, or any other class listed in `project/StreamOperatorsIndexGenerator.scala`,
|
||||
it must also have a corresponding documentation page under `akka-docs/src/main/paradox/streams/operators/...`.
|
||||
|
||||
The pages structure is well-defined, and must be the same on all documentation pages, please refer to any neighbouring
|
||||
docs pages in there to see the pattern in action. In general though the page must consist of:
|
||||
|
||||
- the title, including where the operator is defined (e.g. `ActorFlow.ask` or `Source.map`)
|
||||
- a short explanation of what this stage does, 1 sentence is optimal
|
||||
- an image explaining the stage more visually (whenever possible)
|
||||
- a link to the stages' "category" (these are listed in `akka-docs/src/main/paradox/categories`)
|
||||
- the method signature snippet (use the built in directives to generate it)
|
||||
- a longer explanation about the stage and it's exact semantics (when it pulls, cancels, signals elements)
|
||||
- at least one usage example
|
||||
|
||||
Using this structure, the surrounding infrastructure will **generate the index pages**, so you do not need to maintain
|
||||
the index or category pages manually.
|
||||
|
||||
### Adding new top-level objects/classes containing operators
|
||||
|
||||
In case you are adding not only a new operator, but also a new class/object, you need to add it to the
|
||||
`project/StreamOperatorsIndexGenerator.scala` so it can be included in the automatic docs generation and enforcing the
|
||||
existence of those docs.
|
||||
|
||||
# Supporting infrastructure
|
||||
|
||||
## Continuous integration
|
||||
|
|
|
|||
1
akka-docs/src/main/categories/actor-interop-stages.md
Normal file
1
akka-docs/src/main/categories/actor-interop-stages.md
Normal file
|
|
@ -0,0 +1 @@
|
|||
Stages and operations meant for inter-operating between Akka Streams and Actors:
|
||||
38
akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md
Normal file
38
akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
# ActorFlow.ask
|
||||
|
||||
Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.
|
||||
|
||||
@ref[Actor interop stages](../index.md#actor-interop-stages)
|
||||
|
||||
## Dependency
|
||||
|
||||
This operator is included in:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group="com.typesafe.akka"
|
||||
artifact="akka-stream-typed_$scala.binary_version$"
|
||||
version="$akka.version$"
|
||||
}
|
||||
|
||||
@@@div { .group-scala }
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature []($akka$/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala) { #ask }
|
||||
|
||||
@@@
|
||||
|
||||
## Description
|
||||
|
||||
Emit the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with
|
||||
a `IOResult` upon reaching the end of the file or if there is a failure.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [ask.scala]($akka$/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask }
|
||||
|
||||
Java
|
||||
: @@snip [ask.java]($akka$/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask }
|
||||
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
# ActorSink.actorRef
|
||||
|
||||
Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`].
|
||||
|
||||
@ref[Actor interop stages](../index.md#actor-interop-stages)
|
||||
|
||||
@@@div { .group-scala }
|
||||
|
||||
## Dependency
|
||||
|
||||
This operator is included in:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group="com.typesafe.akka"
|
||||
artifact="akka-stream-typed_$scala.binary_version$"
|
||||
version="$akka.version$"
|
||||
}
|
||||
|
||||
## Signature
|
||||
|
||||
@@signature []($akka$/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala) { #actorRef }
|
||||
|
||||
@@@
|
||||
|
||||
## Description
|
||||
|
||||
Sends the elements of the stream to the given `ActorRef`.
|
||||
If the target actor terminates the stream will be canceled.
|
||||
When the stream is completed successfully the given `onCompleteMessage`
|
||||
will be sent to the destination actor.
|
||||
When the stream is completed with failure a the throwable that was signaled
|
||||
to the stream is adapted to the Actors protocol using `onFailureMessage` and
|
||||
then then sent to the destination actor.
|
||||
|
||||
It will request at most `maxInputBufferSize` number of elements from
|
||||
upstream, but there is no back-pressure signal from the destination actor,
|
||||
i.e. if the actor is not consuming the messages fast enough the mailbox
|
||||
of the actor will grow. For potentially slow consumer actors it is recommended
|
||||
to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
|
||||
limiting stage in front of this `Sink`.
|
||||
|
||||
## Examples
|
||||
|
||||
TODO (in progress)
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
<!-- DO NOT EDIT DIRECTLY: This file is generated by `project/StreamOperatorsIndexGenerator`. See CONTRIBUTING.md for details. -->
|
||||
# Operators
|
||||
|
||||
## Source stages
|
||||
|
|
@ -251,6 +252,16 @@ the inputs in different ways.
|
|||
|Source/Flow|<a name="monitor"></a>@ref[monitor](Source-or-Flow/monitor.md)|Materializes to a `FlowMonitor` that monitors messages flowing through or completion of the stage.|
|
||||
|Source/Flow|<a name="watchtermination"></a>@ref[watchTermination](Source-or-Flow/watchTermination.md)|Materializes to a @scala[`Future`] @java[`CompletionStage`] that will be completed with Done or failed depending whether the upstream of the stage has been completed or failed.|
|
||||
|
||||
## Actor interop stages
|
||||
|
||||
Stages and operations meant for inter-operating between Akka Streams and Actors:
|
||||
|
||||
|
||||
| |Operator|Description|
|
||||
|--|--|--|
|
||||
|ActorSink|<a name="actorref"></a>@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorRef[T]`].|
|
||||
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.|
|
||||
|
||||
@@@ index
|
||||
|
||||
* [combine](Source/combine.md)
|
||||
|
|
@ -383,5 +394,7 @@ the inputs in different ways.
|
|||
* [fromJavaStream](StreamConverters/fromJavaStream.md)
|
||||
* [fromPath](FileIO/fromPath.md)
|
||||
* [toPath](FileIO/toPath.md)
|
||||
* [ask](ActorFlow/ask.md)
|
||||
* [actorRef](ActorSink/actorRef.md)
|
||||
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -87,8 +87,6 @@ class FlowAskSpec extends StreamSpec {
|
|||
|
||||
val statusReplier = system.actorOf(Props(new StatusReplier).withDispatcher("akka.test.stream-dispatcher"), "statusReplier")
|
||||
|
||||
def replierAndProxyTo(ref: ActorRef) = system.actorOf(Props(new ReplyAndProxy(ref)).withDispatcher("akka.test.stream-dispatcher"), s"reply-and-proxy-${ref.hashCode}")
|
||||
|
||||
def replierFailOn(n: Int) = system.actorOf(Props(new FailOn(n)).withDispatcher("akka.test.stream-dispatcher"), s"failureReplier-$n")
|
||||
val failsOn1 = replierFailOn(1)
|
||||
val failsOn3 = replierFailOn(3)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.javadsl
|
||||
|
||||
import java.util.function.BiFunction
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.stream.ActorAttributes
|
||||
import akka.stream.javadsl.Flow
|
||||
import akka.util.JavaDurationConverters
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Collection of Flows aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorFlow {
|
||||
|
||||
/**
|
||||
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
|
||||
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* Do not forget to include the expected response type in the method call, like so:
|
||||
*
|
||||
* {{{
|
||||
* flow.via(ActorFlow.<String, AskMe, String>ask(ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
|
||||
* // or simply
|
||||
* flow.via(ActorFlow.ask(ref, timeout, AskMe::new))
|
||||
* }}}
|
||||
*
|
||||
* otherwise `Nothing` will be assumed, which is most likely not what you want.
|
||||
*
|
||||
* Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one
|
||||
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
|
||||
* a slightly healthier throughput.
|
||||
*
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated,
|
||||
* or with an [[java.util.concurrent.TimeoutException]] in case the ask exceeds the timeout passed in.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
|
||||
*
|
||||
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
|
||||
*
|
||||
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @tparam I Incoming element type of the Flow
|
||||
* @tparam Q Question message type that is spoken by the target actor
|
||||
* @tparam A Answer type that the Actor is expected to reply with, it will become the Output type of this Flow
|
||||
*/
|
||||
def ask[I, Q, A](ref: ActorRef[Q], timeout: java.time.Duration, makeMessage: BiFunction[I, ActorRef[A], Q]): Flow[I, A, NotUsed] =
|
||||
akka.stream.typed.scaladsl.ActorFlow.ask[I, Q, A](parallelism = 2)(ref)((i, ref) ⇒ makeMessage(i, ref))(JavaDurationConverters.asFiniteDuration(timeout))
|
||||
.asJava
|
||||
|
||||
/**
|
||||
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
|
||||
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* Do not forget to include the expected response type in the method call, like so:
|
||||
*
|
||||
* {{{
|
||||
* flow.via(ActorFlow.<String, AskMe, String>ask(ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
|
||||
* // or simply
|
||||
* flow.via(ActorFlow.ask(ref, timeout, AskMe::new))
|
||||
* }}}
|
||||
*
|
||||
* otherwise `Nothing` will be assumed, which is most likely not what you want.
|
||||
*
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated,
|
||||
* or with an [[java.util.concurrent.TimeoutException]] in case the ask exceeds the timeout passed in.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
|
||||
*
|
||||
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
|
||||
*
|
||||
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @tparam I Incoming element type of the Flow
|
||||
* @tparam Q Question message type that is spoken by the target actor
|
||||
* @tparam A Answer type that the Actor is expected to reply with, it will become the Output type of this Flow
|
||||
*/
|
||||
def ask[I, Q, A](parallelism: Int, ref: ActorRef[Q], timeout: java.time.Duration, makeMessage: (I, ActorRef[A]) ⇒ Q): Flow[I, A, NotUsed] =
|
||||
akka.stream.typed.scaladsl.ActorFlow.ask[I, Q, A](parallelism)(ref)((i, ref) ⇒ makeMessage(i, ref))(timeout.toMillis.millis)
|
||||
.asJava
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,130 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.pattern.AskTimeoutException
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.Timeout
|
||||
|
||||
import scala.annotation.implicitNotFound
|
||||
import scala.concurrent.Future
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
/**
|
||||
* Collection of Flows aimed at integrating with typed Actors.
|
||||
*/
|
||||
object ActorFlow {
|
||||
|
||||
// TODO would be nice to provide Implicits to allow .ask() directly on Flow/Source
|
||||
|
||||
/**
|
||||
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
|
||||
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* Do not forget to include the expected response type in the method call, like so:
|
||||
*
|
||||
* {{{
|
||||
* flow.via(ActorFlow.ask[String, Asking, Reply](ref)((el, replyTo) => Asking(el, replyTo)))
|
||||
*
|
||||
* // or even:
|
||||
* flow.via(ActorFlow.ask[String, Asking, Reply](ref)(Asking(_, _)))
|
||||
* }}}
|
||||
*
|
||||
* otherwise `Nothing` will be assumed, which is most likely not what you want.
|
||||
*
|
||||
* Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one
|
||||
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
|
||||
* a slightly healthier throughput.
|
||||
*
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated,
|
||||
* or with an [[java.util.concurrent.TimeoutException]] in case the ask exceeds the timeout passed in.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
|
||||
*
|
||||
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
|
||||
*
|
||||
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @tparam I Incoming element type of the Flow
|
||||
* @tparam Q Question message type that is spoken by the target actor
|
||||
* @tparam A Answer type that the Actor is expected to reply with, it will become the Output type of this Flow
|
||||
*/
|
||||
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
|
||||
def ask[I, Q, A](ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) ⇒ Q)(implicit timeout: Timeout): Flow[I, A, NotUsed] =
|
||||
ask(parallelism = 2)(ref)(makeMessage)(timeout)
|
||||
|
||||
/**
|
||||
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
|
||||
* If any of the asks times out it will fail the stream with a [[java.util.concurrent.TimeoutException]].
|
||||
*
|
||||
* Do not forget to include the expected response type in the method call, like so:
|
||||
*
|
||||
* {{{
|
||||
* flow.via(ActorFlow.ask[String, Asking, Reply](parallelism = 4)(ref, (el, replyTo) => Asking(el, replyTo)))
|
||||
*
|
||||
* // or even:
|
||||
* flow.via(ActorFlow.ask[String, Asking, Reply](parallelism = 4)(ref, Asking(_, _)))
|
||||
* }}}
|
||||
*
|
||||
* otherwise `Nothing` will be assumed, which is most likely not what you want.
|
||||
*
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated,
|
||||
* or with an [[java.util.concurrent.TimeoutException]] in case the ask exceeds the timeout passed in.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Emits when''' the futures (in submission order) created by the ask pattern internally are completed
|
||||
*
|
||||
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
|
||||
*
|
||||
* '''Fails when''' the passed in actor terminates, or a timeout is exceeded in any of the asks performed
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
* @tparam I Incoming element type of the Flow
|
||||
* @tparam Q Question message type that is spoken by the target actor
|
||||
* @tparam A answer type that the Actor is expected to reply with, it will become the Output type of this Flow
|
||||
*/
|
||||
@implicitNotFound("Missing an implicit akka.util.Timeout for the ask() stage")
|
||||
def ask[I, Q, A](parallelism: Int)(ref: ActorRef[Q])(makeMessage: (I, ActorRef[A]) ⇒ Q)(implicit timeout: Timeout): Flow[I, A, NotUsed] = {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
val untypedRef = ref.toUntyped
|
||||
|
||||
val askFlow = Flow[I]
|
||||
.watch(untypedRef)
|
||||
.mapAsync(parallelism) { el ⇒
|
||||
val res = akka.pattern.extended.ask(untypedRef, (replyTo: akka.actor.ActorRef) ⇒ makeMessage(el, replyTo))
|
||||
// we need to cast manually (yet safely, by construction!) since otherwise we need a ClassTag,
|
||||
// which in Scala is fine, but then we would force JavaDSL to create one, which is a hassle in the Akka Typed DSL,
|
||||
// since one may say "but I already specified the type!", and that we have to go via the untyped ask is an implementation detail
|
||||
res.asInstanceOf[Future[A]]
|
||||
}
|
||||
.mapError {
|
||||
case ex: AskTimeoutException ⇒
|
||||
// in Akka Typed we use the `TimeoutException` everywhere
|
||||
new java.util.concurrent.TimeoutException(ex.getMessage)
|
||||
|
||||
// the purpose of this recovery is to change the name of the stage in that exception
|
||||
// we do so in order to help users find which stage caused the failure -- "the ask stage"
|
||||
case ex: WatchedActorTerminatedException ⇒
|
||||
new WatchedActorTerminatedException("ask()", ex.ref)
|
||||
}
|
||||
.named("ask")
|
||||
|
||||
askFlow
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
public class ActorFlowCompileTest {
|
||||
|
||||
interface Protocol {}
|
||||
class Init implements Protocol {}
|
||||
class Msg implements Protocol {}
|
||||
class Complete implements Protocol {}
|
||||
class Failure implements Protocol {
|
||||
public Exception ex;
|
||||
}
|
||||
|
||||
{
|
||||
final ActorSystem<String> system = null;
|
||||
final ActorMaterializer mat = akka.stream.typed.ActorMaterializer.create(system);
|
||||
}
|
||||
|
||||
static
|
||||
//#ask-actor
|
||||
class AskMe {
|
||||
final String payload;
|
||||
final ActorRef<String> replyTo;
|
||||
|
||||
AskMe(String payload, ActorRef<String> replyTo) {
|
||||
this.payload = payload;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
//#ask-actor
|
||||
|
||||
{
|
||||
final ActorRef<AskMe> ref = null;
|
||||
|
||||
//#ask
|
||||
Duration timeout = Duration.of(1, ChronoUnit.SECONDS);
|
||||
|
||||
Source.repeat("hello")
|
||||
.via(ActorFlow.ask(ref, timeout, AskMe::new))
|
||||
.to(Sink.ignore());
|
||||
|
||||
Source.repeat("hello")
|
||||
.via(ActorFlow.<String, AskMe, String>ask(ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
|
||||
.to(Sink.ignore());
|
||||
//#ask
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
//#imports
|
||||
import akka.stream.typed.ActorMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
//#imports
|
||||
import akka.actor.typed.{ DispatcherSelector, TypedAkkaSpecWithShutdown }
|
||||
import akka.stream.testkit.TestSubscriber
|
||||
import akka.testkit.typed.scaladsl.ActorTestKit
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Await, Future }
|
||||
|
||||
object ActorFlowSpec {
|
||||
final case class Asking(s: String, replyTo: ActorRef[Reply])
|
||||
final case class Reply(s: String)
|
||||
}
|
||||
|
||||
class ActorFlowSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
|
||||
import ActorFlowSpec._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
"ActorFlow" should {
|
||||
|
||||
val replier = spawn(Behaviors.receiveMessage[Asking] {
|
||||
case Asking("TERMINATE", _) ⇒
|
||||
Behaviors.stopped
|
||||
|
||||
case asking ⇒
|
||||
asking.replyTo ! Reply(asking.s + "!!!")
|
||||
Behaviors.same
|
||||
})
|
||||
|
||||
"produce asked elements" in {
|
||||
val in: Future[immutable.Seq[Reply]] =
|
||||
Source.repeat("hello")
|
||||
.via(ActorFlow.ask(replier)((el, replyTo: ActorRef[Reply]) ⇒ Asking(el, replyTo)))
|
||||
.take(3)
|
||||
.runWith(Sink.seq)
|
||||
|
||||
in.futureValue shouldEqual List.fill(3)(Reply("hello!!!"))
|
||||
}
|
||||
|
||||
"produce asked elements in order" in {
|
||||
//#ask-actor
|
||||
val ref = spawn(Behaviors.receiveMessage[Asking] { asking ⇒
|
||||
asking.replyTo ! Reply(asking.s + "!!!")
|
||||
Behaviors.same
|
||||
})
|
||||
|
||||
//#ask-actor
|
||||
|
||||
//#ask
|
||||
val in: Future[immutable.Seq[Reply]] =
|
||||
Source(1 to 50).map(_.toString)
|
||||
.via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) ⇒ Asking(el, replyTo)))
|
||||
.runWith(Sink.seq)
|
||||
//#ask
|
||||
|
||||
in.futureValue shouldEqual List.tabulate(51)(i ⇒ Reply(s"$i!!!")).drop(1)
|
||||
}
|
||||
|
||||
"signal ask timeout failure" in {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
val dontReply = spawn(Behaviors.ignore[Asking])
|
||||
|
||||
val c = TestSubscriber.manualProbe[Reply]()(system.toUntyped)
|
||||
implicit val ec = system.dispatchers.lookup(DispatcherSelector.default())
|
||||
implicit val timeout = akka.util.Timeout(10.millis)
|
||||
|
||||
Source(1 to 5).map(_ + " nope")
|
||||
.via(ActorFlow.ask[String, Asking, Reply](4)(dontReply)(Asking(_, _)))
|
||||
.to(Sink.fromSubscriber(c)).run()
|
||||
|
||||
c.expectSubscription().request(10)
|
||||
c.expectError().getMessage should startWith("Ask timed out on [Actor")
|
||||
}
|
||||
|
||||
"signal failure when target actor is terminated" in {
|
||||
val done = Source.maybe[String]
|
||||
.via(ActorFlow.ask(replier)((el, replyTo: ActorRef[Reply]) ⇒ Asking(el, replyTo)))
|
||||
.runWith(Sink.ignore)
|
||||
|
||||
intercept[RuntimeException] {
|
||||
replier ! Asking("TERMINATE", system.deadLetters)
|
||||
Await.result(done, 3.seconds)
|
||||
}.getMessage should startWith("Actor watched by [ask()] has terminated! Was: Actor[akka://ActorFlowSpec")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -920,8 +920,6 @@ trait FlowOps[+Out, +Mat] {
|
|||
* still be in the mailbox, so defaulting to sending the second one a bit earlier than when first ask has replied maintains
|
||||
* a slightly healthier throughput.
|
||||
*
|
||||
* The mapTo class parameter is used to cast the incoming responses to the expected response type.
|
||||
*
|
||||
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
|
||||
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
|
||||
*
|
||||
|
|
@ -959,12 +957,8 @@ trait FlowOps[+Out, +Mat] {
|
|||
* Please note that the elements emitted by this stage are in-order with regards to the asks being issued
|
||||
* (i.e. same behaviour as mapAsync).
|
||||
*
|
||||
* The mapTo class parameter is used to cast the incoming responses to the expected response type.
|
||||
*
|
||||
* Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
|
||||
* An `akka.util.Status#Failure` will cause the stage to fail with the cause carried in the `Failure` message.
|
||||
*
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated.
|
||||
* The stage fails with an [[akka.stream.WatchedActorTerminatedException]] if the target actor is terminated,
|
||||
* or with an [[java.util.concurrent.TimeoutException]] in case the ask exceeds the timeout passed in.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
|
|
@ -985,7 +979,7 @@ trait FlowOps[+Out, +Mat] {
|
|||
.mapAsync(parallelism) { el ⇒
|
||||
akka.pattern.ask(ref).?(el)(timeout).mapTo[S](tag)
|
||||
}
|
||||
.recover[S] {
|
||||
.mapError {
|
||||
// the purpose of this recovery is to change the name of the stage in that exception
|
||||
// we do so in order to help users find which stage caused the failure -- "the ask stage"
|
||||
case ex: WatchedActorTerminatedException ⇒
|
||||
|
|
|
|||
|
|
@ -446,6 +446,7 @@ lazy val streamTyped = akkaModule("akka-stream-typed")
|
|||
.dependsOn(
|
||||
actorTyped,
|
||||
stream,
|
||||
streamTestkit % "test->test",
|
||||
typedTestkit % "test->test",
|
||||
actorTypedTests % "test->test"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,8 @@
|
|||
import sbt._
|
||||
import sbt.Keys._
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* Generate the "index" pages of stream operators.
|
||||
*/
|
||||
|
|
@ -32,7 +34,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"Fan-in stages",
|
||||
// TODO these don't show up as def's yet so don't show up in the index..
|
||||
// "Fan-out stages",
|
||||
"Watching status stages"
|
||||
"Watching status stages",
|
||||
"Actor interop stages",
|
||||
)
|
||||
|
||||
def categoryId(name: String): String = name.toLowerCase.replace(' ', '-')
|
||||
|
|
@ -106,6 +109,12 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"FileIO" -> Seq(
|
||||
"fromFile",
|
||||
"toFile"
|
||||
),
|
||||
"ActorSink" → Seq(
|
||||
"actorRefWithAck"
|
||||
),
|
||||
"ActorSource" → Seq(
|
||||
"actorRef"
|
||||
)
|
||||
)
|
||||
|
||||
|
|
@ -140,8 +149,17 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
"akka-stream/src/main/scala/akka/stream/javadsl/StreamConverters.scala",
|
||||
"akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala",
|
||||
"akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala",
|
||||
|
||||
// akka-stream-typed
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSource.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSource.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala",
|
||||
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala",
|
||||
).flatMap{ f ⇒
|
||||
val element = f.split("/")(7).split("\\.")(0)
|
||||
val slashesNr = f.count(_ == '/')
|
||||
val element = f.split("/")(slashesNr).split("\\.")(0)
|
||||
IO.read(new File(f)).split("\n")
|
||||
.map(_.trim).filter(_.startsWith("def "))
|
||||
.map(_.drop(4).takeWhile(c ⇒ c != '[' && c != '(' && c != ':'))
|
||||
|
|
@ -167,7 +185,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
}
|
||||
.groupBy(_._1)
|
||||
.mapValues(lines =>
|
||||
"| |Operator|Description|\n" ++
|
||||
"| |Operator|Description|\n" ++ // TODO mini images here too
|
||||
"|--|--|--|\n" ++
|
||||
lines
|
||||
.map(_._2)
|
||||
|
|
@ -183,14 +201,17 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
}.mkString("\n\n")
|
||||
|
||||
val content =
|
||||
"# Operators\n\n" + tables + "\n\n@@@ index\n\n" +
|
||||
"<!-- DO NOT EDIT DIRECTLY: This file is generated by `project/StreamOperatorsIndexGenerator`. See CONTRIBUTING.md for details. -->\n" +
|
||||
"# Operators\n\n" +
|
||||
tables +
|
||||
"\n\n@@@ index\n\n" +
|
||||
groupedDefs.map { case (_, method, md) => s"* [$method]($md)" }.mkString("\n") + "\n\n@@@\n"
|
||||
|
||||
if (!file.exists || IO.read(file) != content) IO.write(file, content)
|
||||
Seq(file)
|
||||
}
|
||||
|
||||
def getDetails(file: File): (String, String) = {
|
||||
def getDetails(file: File): (String, String) = try {
|
||||
val contents = IO.read(file)
|
||||
val lines = contents.split("\\r?\\n")
|
||||
require(
|
||||
|
|
@ -208,6 +229,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
require(categories.contains(categoryName), s"category $categoryName in $file should be known")
|
||||
require(categoryLinkId == categoryId(categoryName), s"category id $categoryLinkId in $file")
|
||||
(description, categoryName)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
throw new RuntimeException(s"Unable to extract details from $file", ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue