diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index 2908be4df3..664d089640 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -91,15 +91,78 @@ When using non-blocking it is better to use the `mapTo` method to safely try to The `mapTo` method will return a new `Future` that contains the result if the cast was successful, or a `ClassCastException` if not. Handling `Exception`s will be discussed further within this documentation. -@@@ +@@@ -To send the result of a `Future` to an `Actor`, you can use the `pipe` construct: +## Use the pipe pattern -scala -: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to } +Another useful message-transfer pattern is "pipe", which is to send the result of `Future` to another actor, upon completion of the `Future`. +The pipe pattern can be used by importing @java[`akka.pattern.PatternsCS.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope. -java -: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to } +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-usage } + +To see how this works in more detail, let's introduce a small example consisting of three different actors, +`UserProxyActor`, `UserDataActor` and `UserActivityActor`. +In this example, when you need information about a user, you send a request message to `UserProxyActor`, +then it gets the corresponding result from the appropriate backend actor based on the request message type. + +![futures-pipeto1](images/futures-pipeto1.png) + +![futures-pipeto2](images/futures-pipeto2.png) + +The message types you send to `UserProxyActor` are `GetUserData` and `GetUserActivities`: + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-messages } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-proxy-messages } + +and `UserData` and @scala[`List[UserActivity]`]@java[`ArrayList`] are returned to the original sender in the end. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-returned-data } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-returned-data } + +The backend `UserDataActor` and `UserActivityActor` are defined as follows: + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-data-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-user-data-actor } + +`UserDataActor` holds the data in memory, so that it can return the current state of the user data quickly upon a request. + +On the other hand, `UserActivityActor` queries into a `repository` to retrieve historical user activities then +sends the result to the `sender()` which is `UserProxy` in this case, with the pipe pattern. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-activity-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-pipe #pipe-to-user-activity-actor } + +Since it needs to talk to the separate `repository`, it takes time to retrieve the list of `UserActivity`, +hence the return type of `queryHistoricalActivities` is @scala[`Future`]@java[`CompletableFuture`]. +To send back the result to the `sender()` we used the @scala[`pipeTo`]@java[`pipe`] method, +so that the result of the @scala[`Future`]@java[`CompletableFuture`] is sent to `sender()` upon @scala[`Future`]@java[`CompletableFuture`]'s completion. + +Finally, the definition of `UserProxyActor` is as below. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-proxy-actor } + +Note that the @scala[`pipeTo`]@java[`pipe`] method used with the @scala[`?`]@java[`ask`] method. +Using @scala[`pipeTo`]@java[`pipe`] with the @scala[`?`]@java[`ask`] method is a common practice when you want to relay a message from one actor to another. ## Use Directly diff --git a/akka-docs/src/main/paradox/images/futures-pipeto1.png b/akka-docs/src/main/paradox/images/futures-pipeto1.png new file mode 100644 index 0000000000..aa8258b110 Binary files /dev/null and b/akka-docs/src/main/paradox/images/futures-pipeto1.png differ diff --git a/akka-docs/src/main/paradox/images/futures-pipeto2.png b/akka-docs/src/main/paradox/images/futures-pipeto2.png new file mode 100644 index 0000000000..5d4fae5426 Binary files /dev/null and b/akka-docs/src/main/paradox/images/futures-pipeto2.png differ diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 1b5c4886f9..4324861ec2 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -68,6 +68,14 @@ import static akka.pattern.PatternsCS.retry; //#imports8 +//#imports-ask +import static akka.pattern.PatternsCS.ask; +//#imports-ask +//#imports-pipe +import static akka.pattern.PatternsCS.pipe; +//#imports-pipe + + import java.util.ArrayList; import java.util.List; @@ -112,6 +120,122 @@ public class FutureDocTest extends AbstractJavaTest { } //#print-result } + + //#pipe-to-usage + public class ActorUsingPipeTo extends AbstractActor { + ActorRef target; + Timeout timeout; + + ActorUsingPipeTo(ActorRef target) { + this.target = target; + this.timeout = new Timeout(Duration.create(5, "seconds")); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(String.class, msg -> { + CompletableFuture fut = + ask(target, "some message", timeout).toCompletableFuture(); + + // the pipe pattern + pipe(fut, getContext().dispatcher()).to(sender()); + }) + .build(); + } + } + //#pipe-to-usage + + //#pipe-to-returned-data + public class UserData { final String data; UserData(String data){ this.data = data; } } + public class UserActivity { final String activity; UserActivity(String activity){ this.activity = activity; } } + //#pipe-to-returned-data + + //#pipe-to-user-data-actor + public class UserDataActor extends AbstractActor { + UserData internalData; + + UserDataActor(){ + this.internalData = new UserData("initial data"); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetFromUserDataActor.class, msg -> sender().tell(internalData, self())) + .build(); + } + } + + public class GetFromUserDataActor {} + //#pipe-to-user-data-actor + + //#pipe-to-user-activity-actor + interface UserActivityRepository { + CompletableFuture> queryHistoricalActivities(String userId); + } + + public class UserActivityActor extends AbstractActor { + String userId; + UserActivityRepository repository; + + UserActivityActor(String userId, UserActivityRepository repository) { + this.userId = userId; + this.repository = repository; + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetFromUserActivityActor.class, msg -> { + CompletableFuture> fut = + repository.queryHistoricalActivities(userId); + + pipe(fut, getContext().dispatcher()).to(sender()); + }) + .build(); + } + } + + public class GetFromUserActivityActor {} + //#pipe-to-user-activity-actor + + //#pipe-to-proxy-actor + public class UserProxyActor extends AbstractActor { + ActorRef userActor; + ActorRef userActivityActor; + Timeout timeout = new Timeout(Duration.create(5, "seconds")); + + UserProxyActor(ActorRef userActor, ActorRef userActivityActor) { + this.userActor = userActor; + this.userActivityActor = userActivityActor; + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetUserData.class, msg -> { + CompletableFuture fut = + ask(userActor, new GetUserData(), timeout).toCompletableFuture(); + + pipe(fut, getContext().dispatcher()); + }) + .match(GetUserActivities.class, msg -> { + CompletableFuture fut = + ask(userActivityActor, new GetFromUserActivityActor(), timeout).toCompletableFuture(); + + pipe(fut, getContext().dispatcher()).to(sender()); + }) + .build(); + } + } + //#pipe-to-proxy-actor + + //#pipe-to-proxy-messages + public class GetUserData {} + public class GetUserActivities {} + //#pipe-to-proxy-messages + @SuppressWarnings("unchecked") @Test public void useCustomExecutionContext() throws Exception { ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor(); //#diy-execution-context @@ -135,9 +259,7 @@ public class FutureDocTest extends AbstractJavaTest { Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking - //#pipe-to - akka.pattern.Patterns.pipe(future, system.dispatcher()).to(actor); - //#pipe-to + assertEquals("HELLO", result); } diff --git a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala index 8c962346a5..f30009f7c7 100644 --- a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala +++ b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala @@ -7,8 +7,7 @@ package docs.future import language.postfixOps import akka.testkit._ -import akka.actor.{ Actor, Props } -import akka.actor.Status +import akka.actor.{ Actor, ActorRef, Props, Status } import akka.util.Timeout import scala.concurrent.duration._ import java.lang.IllegalStateException @@ -35,6 +34,95 @@ object FutureDocSpec { n += 2 } } + + //#pipe-to-usage + class ActorUsingPipeTo(target: ActorRef) extends Actor { + // akka.pattern.pipe needs to be imported + import akka.pattern.{ask, pipe} + // implicit ExecutionContext should be in scope + implicit val ec: ExecutionContext = context.dispatcher + + def receive = { + val future = target ? "some message" + future pipeTo sender() // use the pipe pattern + } + } + //#pipe-to-usage + + //#pipe-to-returned-data + case class UserData(data: String) + case class UserActivity(activity: String) + //#pipe-to-returned-data + + //#pipe-to-user-data-actor + class UserDataActor extends Actor { + import UserDataActor._ + + //holds the user data internally + var internalData: UserData = UserData("initial data") + + def receive = { + case Get ⇒ + sender() ! internalData + } + } + + object UserDataActor { + case object Get + } + //#pipe-to-user-data-actor + + //#pipe-to-user-activity-actor + trait UserActivityRepository { + def queryHistoricalActivities(userId: String): Future[List[UserActivity]] + } + + class UserActivityActor(val userId: String, repository: UserActivityRepository) extends Actor { + import akka.pattern.pipe + import UserActivityActor._ + implicit val ec: ExecutionContext = context.dispatcher + + def receive = { + case Get ⇒ + // user's historical activities are retrieved + // via the separate repository + repository.queryHistoricalActivities(userId) pipeTo sender() + } + } + + object UserActivityActor { + case object Get + } + //#pipe-to-user-activity-actor + + //#pipe-to-proxy-actor + class UserProxyActor( + userData: ActorRef, + userActivities: ActorRef + ) extends Actor { + import UserProxyActor._ + import akka.pattern.{ ask, pipe } + implicit val ec: ExecutionContext = context.dispatcher + + implicit val timeout = Timeout(5 seconds) + + def receive = { + case GetUserData ⇒ + (userData ? UserDataActor.Get) pipeTo sender() + case GetUserActivities ⇒ + (userActivities ? UserActivityActor.Get) pipeTo sender() + } + } + //#pipe-to-proxy-actor + + //#pipe-to-proxy-messages + object UserProxyActor { + sealed trait Message + case object GetUserData extends Message + case object GetUserActivities extends Message + } + //#pipe-to-proxy-messages + } class FutureDocSpec extends AkkaSpec { @@ -73,11 +161,6 @@ class FutureDocSpec extends AkkaSpec { val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking - //#pipe-to - import akka.pattern.pipe - future pipeTo actor - //#pipe-to - result should be("HELLO") }