From de7dc9cdf8754c6381842234b6ebfd359a437838 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 18 Mar 2020 18:23:09 +0100 Subject: [PATCH 1/6] Docs: sync and refresh ask stream operators --- .../paradox/stream/operators/ActorFlow/ask.md | 43 +++++++++--- .../stream/operators/Source-or-Flow/ask.md | 21 +++--- .../main/paradox/stream/operators/index.md | 3 +- .../typed/javadsl/ActorFlowCompileTest.java | 63 ----------------- .../docs/javadsl/ActorFlowCompileTest.java | 67 +++++++++++++++++++ .../scaladsl/ActorFlowSpec.scala | 34 ++++++---- 6 files changed, 135 insertions(+), 96 deletions(-) delete mode 100644 akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java create mode 100644 akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java rename akka-stream-typed/src/test/scala/{akka/stream/typed => docs}/scaladsl/ActorFlowSpec.scala (79%) diff --git a/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md b/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md index b70f1a4b43..8f65f6aa7f 100644 --- a/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md +++ b/akka-docs/src/main/paradox/stream/operators/ActorFlow/ask.md @@ -1,6 +1,6 @@ # ActorFlow.ask -Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream. +Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream. @ref[Actor interop operators](../index.md#actor-interop-operators) @@ -14,25 +14,48 @@ This operator is included in: version="$akka.version$" } -@@@div { .group-scala } - ## Signature -@@signature [ActorFlow.scala](/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala) { #ask } - -@@@ +@apidoc[ActorFlow.ask](ActorFlow$) { scala="#ask%5BI,Q,A](ref:akka.actor.typed.ActorRef%5BQ])(makeMessage:(I,akka.actor.typed.ActorRef%5BA])=%3EQ)(implicittimeout:akka.util.Timeout):akka.stream.scaladsl.Flow%5BI,A,akka.NotUsed]" java="#ask(akka.actor.typed.ActorRef,java.time.Duration,java.util.function.BiFunction)" } ## Description -Emit the contents of a file, as `ByteString`s, materializes into a @scala[`Future`] @java[`CompletionStage`] which will be completed with -a `IOResult` upon reaching the end of the file or if there is a failure. +Use the @ref[Ask pattern](../../../typed/interaction-patterns.md#request-response-with-ask-from-outside-an-actor) to send a request-reply message to the target `ref` actor. +If any of the asks times out it will fail the stream with an @apidoc[AskTimeoutException]. + +The `ask` operator requires + +* the actor `ref`, +* a `makeMessage` function to create the message sent to the actor from the incoming element and the actor ref accepting the actor's reply message, +* and a timeout. + +See also: + +* @ref[Flow.ask](../Source-or-Flow/ask.md) for the classic actors variant ## Examples +The `ActorFlow.ask` sends a message to the actor which expects `Asking` messages which contain the actor ref for replies of type `Reply`. The replies are emitted when received and the `map` extracts the message `String`. Scala -: @@snip [ask.scala](/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask } +: @@snip [ask.scala](/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala) { #imports #ask-actor #ask } Java -: @@snip [ask.java](/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask } +: @@snip [ask.java](/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java) { #ask-actor #ask } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the futures (in submission order) created by the ask pattern internally are completed + +**backpressures** when the number of futures reaches the configured parallelism and the downstream backpressures + +**completes** when upstream completes and all futures have been completed and all elements have been emitted + +**fails** when the passed in actor terminates, or a timeout is exceeded in any of the asks performed + +**cancels** when downstream cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md index 09f3d343ad..ff5a538f80 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/ask.md @@ -1,27 +1,28 @@ # Flow.ask -Use the `ask` pattern to send a request-reply message to the target `ref` actor. +Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API). -@ref[Asynchronous operators](../index.md#asynchronous-operators) +@ref[Actor interop operators](../index.md#actor-interop-operators) -@@@ div { .group-scala } ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #ask } -@@@ +@apidoc[Flow.ask](Flow$) { scala="#ask%5BS](ref:akka.actor.ActorRef)(implicittimeout:akka.util.Timeout,implicittag:scala.reflect.ClassTag%5BS]):FlowOps.this.Repr%5BS]" java="#ask(akka.actor.ActorRef,java.lang.Class,akka.util.Timeout)" } ## Description -Use the `ask` pattern to send a request-reply message to the target `ref` actor. +Use the @ref[Ask Pattern](../../../actors.md#ask-send-and-receive-future) to send a request-reply message to the target `ref` actor. If any of the asks times out it will fail the stream with a @apidoc[AskTimeoutException]. The @java[`mapTo` class]@scala[`S` generic] parameter is used to cast the responses from the actor to the expected outgoing flow type. -Similar to the plain ask pattern, the target actor is allowed to reply with `akka.util.Status`. -An `akka.util.Status#Failure` will cause the operator to fail with the cause carried in the `Failure` message. +Similar to the plain ask pattern, the target actor is allowed to reply with @apidoc[akka.actor.Status]. +An @apidoc[akka.actor.Status.Failure] will cause the operator to fail with the cause carried in the `Failure` message. -Adheres to the @scala[@scaladoc[`ActorAttributes.SupervisionStrategy`](akka.stream.ActorAttributes$$SupervisionStrategy)] -@java[`ActorAttributes.SupervisionStrategy`] attribute. +Adheres to the @apidoc[ActorAttributes.SupervisionStrategy] attribute. + +See also: + +* @ref[ActorFlow.ask](../ActorFlow/ask.md) for the `akka.actor.typed.ActorRef[_]` variant ## Reactive Streams semantics diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index e96095d028..31691d2694 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -321,7 +321,8 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSink|@ref[actorRef](ActorSink/actorRef.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`], without considering backpressure.| |ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.| -|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the `AskPattern` to send each element as an `ask` to the target actor, and expect a reply back that will be sent further downstream.| +|Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| +|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be sent further downstream.| |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| ## Compression operators diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java deleted file mode 100644 index 2dd6561901..0000000000 --- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorFlowCompileTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (C) 2018-2020 Lightbend Inc. - */ - -package akka.stream.typed.javadsl; - -import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; -import akka.stream.ActorMaterializer; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; - -import java.time.Duration; -import java.time.temporal.ChronoUnit; - -public class ActorFlowCompileTest { - - interface Protocol {} - - class Init implements Protocol {} - - class Msg implements Protocol {} - - class Complete implements Protocol {} - - class Failure implements Protocol { - public Exception ex; - } - - { - final ActorSystem system = null; - } - - static - // #ask-actor - class AskMe { - final String payload; - final ActorRef replyTo; - - AskMe(String payload, ActorRef replyTo) { - this.payload = payload; - this.replyTo = replyTo; - } - } - - // #ask-actor - - { - final ActorRef ref = null; - - // #ask - Duration timeout = Duration.of(1, ChronoUnit.SECONDS); - - Source.repeat("hello").via(ActorFlow.ask(ref, timeout, AskMe::new)).to(Sink.ignore()); - - Source.repeat("hello") - .via( - ActorFlow.ask( - ref, timeout, (msg, replyTo) -> new AskMe(msg, replyTo))) - .to(Sink.ignore()); - // #ask - } -} diff --git a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java new file mode 100644 index 0000000000..1af3201b34 --- /dev/null +++ b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package docs.javadsl; + +import akka.NotUsed; +// #ask-actor +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.typed.javadsl.ActorFlow; + +// #ask-actor +import java.time.Duration; + +public class ActorFlowCompileTest { + + final ActorSystem system = null; + + static + // #ask-actor + class Asking { + final String payload; + final ActorRef replyTo; + + public Asking(String payload, ActorRef replyTo) { + this.payload = payload; + this.replyTo = replyTo; + } + } + + // #ask-actor + static + // #ask-actor + class Reply { + public final String msg; + + public Reply(String msg) { + this.msg = msg; + } + } + + // #ask-actor + + { + // #ask + final ActorRef actorRef = // ??? + // #ask + null; + + // #ask + Duration timeout = Duration.ofSeconds(1); + + // method reference notation + Flow askFlow = ActorFlow.ask(actorRef, timeout, Asking::new); + + // explicit creation of the sent message + Flow askFlowExplicit = + ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); + + Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); + // #ask + } +} diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala similarity index 79% rename from akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala rename to akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala index 11232ba4dd..782f2a8a6e 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala @@ -2,26 +2,30 @@ * Copyright (C) 2018-2020 Lightbend Inc. */ -package akka.stream.typed.scaladsl +package docs.scaladsl +import akka.NotUsed //#imports -import akka.stream.scaladsl._ +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.stream.typed.scaladsl.ActorFlow import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors -import scala.concurrent.duration._ +//#imports import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.stream.testkit.TestSubscriber import org.scalatest.wordspec.AnyWordSpecLike -//#imports -import akka.stream.testkit.TestSubscriber - import scala.collection.immutable +import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } object ActorFlowSpec { + //#ask-actor final case class Asking(s: String, replyTo: ActorRef[Reply]) - final case class Reply(s: String) + final case class Reply(msg: String) + + //#ask-actor } class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { @@ -59,11 +63,17 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { //#ask-actor //#ask - val in: Future[immutable.Seq[Reply]] = - Source(1 to 50) - .map(_.toString) - .via(ActorFlow.ask(ref)((el, replyTo: ActorRef[Reply]) => Asking(el, replyTo))) - .runWith(Sink.seq) + implicit val timeout: akka.util.Timeout = 1.second + + val askFlow: Flow[String, Reply, NotUsed] = + ActorFlow.ask(ref)(Asking.apply) + + // explicit creation of the sent message + val askFlowExplicit: Flow[String, Reply, NotUsed] = + ActorFlow.ask(ref)(makeMessage = (el, replyTo: ActorRef[Reply]) => Asking(el, replyTo)) + + val in: Future[immutable.Seq[String]] = + Source(1 to 50).map(_.toString).via(askFlow).map(_.msg).runWith(Sink.seq) //#ask in.futureValue shouldEqual List.tabulate(51)(i => Reply(s"$i!!!")).drop(1) From 22a78ee22fb55a0141c56796e170956e22fbb828 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 18 Mar 2020 20:09:41 +0100 Subject: [PATCH 2/6] Avoid unused val --- .../src/test/scala/docs/scaladsl/ActorFlowSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala index 782f2a8a6e..e3c79a23ac 100644 --- a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala @@ -75,6 +75,7 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { val in: Future[immutable.Seq[String]] = Source(1 to 50).map(_.toString).via(askFlow).map(_.msg).runWith(Sink.seq) //#ask + askFlowExplicit.map(identity) in.futureValue shouldEqual List.tabulate(51)(i => Reply(s"$i!!!")).drop(1) } From ebd181e037cb7916bf472f2bb4af8aa9e34e57d1 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 18 Mar 2020 21:04:47 +0100 Subject: [PATCH 3/6] javafmtAll --- .../docs/javadsl/ActorFlowCompileTest.java | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java index 1af3201b34..625585abfa 100644 --- a/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java +++ b/akka-stream-typed/src/test/java/docs/javadsl/ActorFlowCompileTest.java @@ -18,50 +18,50 @@ import java.time.Duration; public class ActorFlowCompileTest { - final ActorSystem system = null; + final ActorSystem system = null; - static - // #ask-actor - class Asking { - final String payload; - final ActorRef replyTo; + static + // #ask-actor + class Asking { + final String payload; + final ActorRef replyTo; - public Asking(String payload, ActorRef replyTo) { - this.payload = payload; - this.replyTo = replyTo; - } + public Asking(String payload, ActorRef replyTo) { + this.payload = payload; + this.replyTo = replyTo; } + } - // #ask-actor - static - // #ask-actor - class Reply { - public final String msg; + // #ask-actor + static + // #ask-actor + class Reply { + public final String msg; - public Reply(String msg) { - this.msg = msg; - } + public Reply(String msg) { + this.msg = msg; } + } - // #ask-actor + // #ask-actor - { + { + // #ask + final ActorRef actorRef = // ??? // #ask - final ActorRef actorRef = // ??? - // #ask - null; + null; - // #ask - Duration timeout = Duration.ofSeconds(1); + // #ask + Duration timeout = Duration.ofSeconds(1); - // method reference notation - Flow askFlow = ActorFlow.ask(actorRef, timeout, Asking::new); + // method reference notation + Flow askFlow = ActorFlow.ask(actorRef, timeout, Asking::new); - // explicit creation of the sent message - Flow askFlowExplicit = - ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); + // explicit creation of the sent message + Flow askFlowExplicit = + ActorFlow.ask(actorRef, timeout, (msg, replyTo) -> new Asking(msg, replyTo)); - Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); - // #ask - } + Source.repeat("hello").via(askFlow).map(reply -> reply.msg).runWith(Sink.seq(), system); + // #ask + } } From 7f3b00fe448e80dd5da0596ed81d99565a3ea87a Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 19 Mar 2020 09:15:22 +0100 Subject: [PATCH 4/6] Regenerate index.md --- akka-docs/src/main/paradox/stream/operators/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 31691d2694..a9b4540514 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -322,7 +322,7 @@ Operators meant for inter-operating between Akka Streams and Actors: |ActorSource|@ref[actorRefWithBackpressure](ActorSource/actorRefWithBackpressure.md)|Materialize an @java[`ActorRef`]@scala[`ActorRef[T]`]; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |ActorSink|@ref[actorRefWithBackpressure](ActorSink/actorRefWithBackpressure.md)|Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorRef[T]`] with backpressure, to be able to signal demand when the actor is ready to receive more elements.| |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| -|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be sent further downstream.| +|ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply back that will be emitted downstream.| |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| ## Compression operators From c9f66f5a78ac4752c25a2bd24a73985210ae46e7 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 19 Mar 2020 10:18:27 +0100 Subject: [PATCH 5/6] Change to assert text --- .../src/test/scala/docs/scaladsl/ActorFlowSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala index e3c79a23ac..d55184193e 100644 --- a/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala +++ b/akka-stream-typed/src/test/scala/docs/scaladsl/ActorFlowSpec.scala @@ -77,7 +77,7 @@ class ActorFlowSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { //#ask askFlowExplicit.map(identity) - in.futureValue shouldEqual List.tabulate(51)(i => Reply(s"$i!!!")).drop(1) + in.futureValue shouldEqual List.tabulate(51)(i => s"$i!!!").drop(1) } "signal ask timeout failure" in { From 38570fe0c0102aa2b1f92fdb2509c0ca22ce13aa Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 1 Apr 2020 12:48:09 +0200 Subject: [PATCH 6/6] Rebased and regenerated index. --- akka-docs/src/main/paradox/stream/operators/index.md | 1 - 1 file changed, 1 deletion(-) diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index a9b4540514..7ee779d9cb 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -197,7 +197,6 @@ operation at the same time (usually handling the completion of a @scala[`Future` | |Operator|Description| |--|--|--| -|Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the `ask` pattern to send a request-reply message to the target `ref` actor.| |Source/Flow|@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.| |Source/Flow|@ref[mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)|Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.|