diff --git a/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md b/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md
index b70f1a4b43..8f65f6aa7f 100644
--- a/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md
+++ b/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md
@@ -1,6 +1,6 @@
# ActorFlow.ask
-Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.
+Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.
@ref[Actor interop operators](../index.md#actor-interop-operators)
@@ -14,25 +14,48 @@ This operator is included in:
version="$akka.version$"
}
-@@@div { .group-scala }
-
## Signature
-@@signature [ActorFlow.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala) { #ask }
-
-@@@
+@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:akka.actor.typed.ActorRef%5BQ])(makeMessage:(I,akka.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow%5BI,A,akka.NotUsed]" java="#ask(akka.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" }
## Description
-Emit the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with
-a `IOResult` upon reaching the end of the file or if there is a failure.
+Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor.
+If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException].
+
+The `ask` operator requires
+
+* the actor `ref`,
+* a `makeMessage` function to create the message sent to the actor from the incoming element and the actor ref accepting the actor's reply message,
+* and a timeout.
+
+See also:
+
+* @ref[Flow.ask](../Source-or-Flow/ask.md) for the classic actors variant
## Examples
+The `ActorFlow.ask` sends a message to the actor which expects `Asking` messages which contain the actor ref for replies of type `Reply`. The replies are emitted when received and the `map` extracts the message `String`.
Scala
-: @@snip [ask.scala](/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask }
+: @@snip [ask.scala](/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask }
Java
-: @@snip [ask.java](/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask }
+: @@snip [ask.java](/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the futures (in submission order) created by the ask pattern internally are completed
+
+**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures
+
+**completes** when upstream completes and all futures have been completed and all elements have been emitted
+
+**fails** when the passed in actor terminates, or a timeout is exceeded in any of the asks performed
+
+**cancels** when downstream cancels
+
+@@@
diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md
index 09f3d343ad..ff5a538f80 100644
--- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md
@@ -1,27 +1,28 @@
# Flow.ask
-Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).
-@ref[Asynchronous operators](../index.md#asynchronous-operators)
+@ref[Actor interop operators](../index.md#actor-interop-operators)
-@@@ div { .group-scala }
## Signature
-@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #ask }
-@@@
+@apidoc[Flow.ask](Flow$) { scala="#ask%5BS](ref:akka.actor.ActorRef)(implicittimeout:akka.util.Timeout,implicittag:scala.reflect.ClassTag%5BS]):FlowOps.this.Repr%5BS]" java="#ask(akka.actor.ActorRef,java.lang.Class,akka.util.Timeout)" }
## Description
-Use the `ask` pattern to send a request-reply message to the target `ref` actor.
+Use the @ref[Ask Pattern](../../../actors.md#ask-send-and-receive-future) to send a request-reply message to the target `ref` actor.
If any of the asks times out it will fail the stream with a @apidoc[AskTimeoutException].
The @java[`mapTo` class]@scala[`S` generic] parameter is used to cast the responses from the actor to the expected outgoing flow type.
-Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`.
-An `akka.util.Status#Failure` will cause the operator to fail with the cause carried in the `Failure` message.
+Similar to the plain ask pattern, the target actor is allowed to reply with @apidoc[akka.actor.Status].
+An @apidoc[akka.actor.Status.Failure] will cause the operator to fail with the cause carried in the `Failure` message.
-Adheres to the @scala[@scaladoc[`ActorAttributes.SupervisionStrategy`](akka.stream.ActorAttributes$$SupervisionStrategy)]
-@java[`ActorAttributes.SupervisionStrategy`] attribute.
+Adheres to the @apidoc[ActorAttributes.SupervisionStrategy] attribute.
+
+See also:
+
+* @ref[ActorFlow.ask](../ActorFlow/ask.md) for the `akka.actor.typed.ActorRef[_]` variant
## Reactive Streams semantics
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index bf3191c578..1cdb7a980f 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -198,7 +198,6 @@ operation at the same time (usually handling the completion of a @scala[`Future`
| |Operator|Description|
|--|--|--|
-|Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the `ask` pattern to send a request-reply message to the target `ref` actor.|
|Source/Flow|@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.|
|Source/Flow|@ref[mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)|Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.|
@@ -322,7 +321,8 @@ Operators meant for inter-operating between Akka Streams and Actors:
|ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`], without considering backpressure.|
|ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.|
-|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.|
+|Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
+|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.|
|Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
## Compression operators
diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java
deleted file mode 100644
index 2dd6561901..0000000000
--- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) 2018-2020 Lightbend Inc.
- */
-
-package akka.stream.typed.javadsl;
-
-import akka.actor.typed.ActorRef;
-import akka.actor.typed.ActorSystem;
-import akka.stream.ActorMaterializer;
-import akka.stream.javadsl.Sink;
-import akka.stream.javadsl.Source;
-
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-
-public class ActorFlowCompileTest {
-
- interface Protocol {}
-
- class Init implements Protocol {}
-
- class Msg implements Protocol {}
-
- class Complete implements Protocol {}
-
- class Failure implements Protocol {
- public Exception ex;
- }
-
- {
- final ActorSystem system = null;
- }
-
- static
- // #ask-actor
- class AskMe {
- final String payload;
- final ActorRef replyTo;
-
- AskMe(String payload, ActorRef replyTo) {
- this.payload = payload;
- this.replyTo = replyTo;
- }
- }
-
- // #ask-actor
-
- {
- final ActorRef ref = null;
-
- // #ask
- Duration timeout = Duration.of(1, ChronoUnit.SECONDS);
-
- Source.repeat("hello").via(ActorFlow.ask(ref, timeout, AskMe::new)).to(Sink.ignore());
-
- Source.repeat("hello")
- .via(
- ActorFlow.ask(
- ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo)))
- .to(Sink.ignore());
- // #ask
- }
-}
diff --git a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java
new file mode 100644
index 0000000000..625585abfa
--- /dev/null
+++ b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2018-2020 Lightbend Inc.
+ */
+
+package docs.javadsl;
+
+import akka.NotUsed;
+// #ask-actor
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.ActorSystem;
+import akka.stream.javadsl.Flow;
+import akka.stream.javadsl.Sink;
+import akka.stream.javadsl.Source;
+import akka.stream.typed.javadsl.ActorFlow;
+
+// #ask-actor
+import java.time.Duration;
+
+public class ActorFlowCompileTest {
+
+ final ActorSystem system = null;
+
+ static
+ // #ask-actor
+ class Asking {
+ final String payload;
+ final ActorRef replyTo;
+
+ public Asking(String payload, ActorRef replyTo) {
+ this.payload = payload;
+ this.replyTo = replyTo;
+ }
+ }
+
+ // #ask-actor
+ static
+ // #ask-actor
+ class Reply {
+ public final String msg;
+
+ public Reply(String msg) {
+ this.msg = msg;
+ }
+ }
+
+ // #ask-actor
+
+ {
+ // #ask
+ final ActorRef actorRef = // ???
+ // #ask
+ null;
+
+ // #ask
+ Duration timeout = Duration.ofSeconds(1);
+
+ // method reference notation
+ Flow askFlow = ActorFlow.ask(actorRef, timeout, Asking::new);
+
+ // explicit creation of the sent message
+ Flow askFlowExplicit =
+ ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo));
+
+ Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system);
+ // #ask
+ }
+}
diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala
similarity index 76%
rename from akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala
rename to akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala
index 11232ba4dd..d55184193e 100644
--- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala
+++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala
@@ -2,26 +2,30 @@
* Copyright (C) 2018-2020 Lightbend Inc.
*/
-package akka.stream.typed.scaladsl
+package docs.scaladsl
+import akka.NotUsed
//#imports
-import akka.stream.scaladsl._
+import akka.stream.scaladsl.{ Flow, Sink, Source }
+import akka.stream.typed.scaladsl.ActorFlow
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
-import scala.concurrent.duration._
+//#imports
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import akka.stream.testkit.TestSubscriber
import org.scalatest.wordspec.AnyWordSpecLike
-//#imports
-import akka.stream.testkit.TestSubscriber
-
import scala.collection.immutable
+import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
object ActorFlowSpec {
+ //#ask-actor
final case class Asking(s: String, replyTo: ActorRef[Reply])
- final case class Reply(s: String)
+ final case class Reply(msg: String)
+
+ //#ask-actor
}
class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
@@ -59,14 +63,21 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
//#ask-actor
//#ask
- val in: Future[immutable.Seq[Reply]] =
- Source(1 to 50)
- .map(_.toString)
- .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)))
- .runWith(Sink.seq)
- //#ask
+ implicit val timeout: akka.util.Timeout = 1.second
- in.futureValue shouldEqual List.tabulate(51)(i => Reply(s"$i!!!")).drop(1)
+ val askFlow: Flow[String, Reply, NotUsed] =
+ ActorFlow.ask(ref)(Asking.apply)
+
+ // explicit creation of the sent message
+ val askFlowExplicit: Flow[String, Reply, NotUsed] =
+ ActorFlow.ask(ref)(makeMessage = (el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))
+
+ val in: Future[immutable.Seq[String]] =
+ Source(1 to 50).map(_.toString).via(askFlow).map(_.msg).runWith(Sink.seq)
+ //#ask
+ askFlowExplicit.map(identity)
+
+ in.futureValue shouldEqual List.tabulate(51)(i => s"$i!!!").drop(1)
}
"signal ask timeout failure" in {