diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index 9f0a260ba6..a310994df2 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -91,15 +91,82 @@ When using non-blocking it is better to use the `mapTo` method to safely try to The `mapTo` method will return a new `Future` that contains the result if the cast was successful, or a `ClassCastException` if not. Handling `Exception`s will be discussed further within this documentation. -@@@ +@@@ -To send the result of a `Future` to an `Actor`, you can use the `pipe` construct: +## Use the pipe pattern -scala -: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to } +Another useful message-transfer pattern is "pipe", which is to send the result of @scala[`Future`]@java[`CompletableFuture`] to another actor, upon completion of the @scala[`Future`]@java[`CompletableFuture`]. +The pipe pattern can be used by importing @java[`akka.pattern.PatternsCS.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.] -java -: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to } +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-usage } + +To see how this works in more detail, let's introduce a small example consisting of three different actors, +`UserProxyActor`, `UserDataActor` and `UserActivityActor`. +In this example, when you need information about a user, you send a request message to `UserProxyActor`, +then it gets the corresponding result from the appropriate backend actor based on the request message type. + +

+ +

+ +

+ +

+ +The message types you send to `UserProxyActor` are `GetUserData` and `GetUserActivities`: + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-messages } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-proxy-messages } + +and `UserData` and @scala[`List[UserActivity]`]@java[`ArrayList`] are returned to the original sender in the end. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-returned-data } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-returned-data } + +The backend `UserDataActor` and `UserActivityActor` are defined as follows: + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-data-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-user-data-actor } + +`UserDataActor` holds the data in memory, so that it can return the current state of the user data quickly upon a request. + +On the other hand, `UserActivityActor` queries into a `repository` to retrieve historical user activities then +sends the result to the `sender()` which is `UserProxy` in this case, with the pipe pattern. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-activity-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-pipe #pipe-to-user-activity-actor } + +Since it needs to talk to the separate `repository`, it takes time to retrieve the list of `UserActivity`, +hence the return type of `queryHistoricalActivities` is @scala[`Future`]@java[`CompletableFuture`]. +To send back the result to the `sender()` we used the @scala[`pipeTo`]@java[`pipe`] method, +so that the result of the @scala[`Future`]@java[`CompletableFuture`] is sent to `sender()` upon @scala[`Future`]@java[`CompletableFuture`]'s completion. + +Finally, the definition of `UserProxyActor` is as below. + +Scala +: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-actor } + +Java +: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-proxy-actor } + +Note that the @scala[`pipeTo`]@java[`pipe`] method used with the @scala[`?`]@java[`ask`] method. +Using @scala[`pipeTo`]@java[`pipe`] with the @scala[`?`]@java[`ask`] method is a common practice when you want to relay a message from one actor to another. ## Use Directly diff --git a/akka-docs/src/main/paradox/images/futures-pipeto1.png b/akka-docs/src/main/paradox/images/futures-pipeto1.png new file mode 100644 index 0000000000..aa8258b110 Binary files /dev/null and b/akka-docs/src/main/paradox/images/futures-pipeto1.png differ diff --git a/akka-docs/src/main/paradox/images/futures-pipeto2.png b/akka-docs/src/main/paradox/images/futures-pipeto2.png new file mode 100644 index 0000000000..5d4fae5426 Binary files /dev/null and b/akka-docs/src/main/paradox/images/futures-pipeto2.png differ diff --git a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java index 1b5c4886f9..6edfb81232 100644 --- a/akka-docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/akka-docs/src/test/java/jdocs/future/FutureDocTest.java @@ -18,7 +18,7 @@ import akka.util.Timeout; //#imports1 //#imports2 -import scala.concurrent.duration.Duration; +import java.time.Duration; import akka.japi.Function; import java.util.concurrent.*; @@ -68,6 +68,14 @@ import static akka.pattern.PatternsCS.retry; //#imports8 +//#imports-ask +import static akka.pattern.PatternsCS.ask; +//#imports-ask +//#imports-pipe +import static akka.pattern.PatternsCS.pipe; +//#imports-pipe + + import java.util.ArrayList; import java.util.List; @@ -112,6 +120,134 @@ public class FutureDocTest extends AbstractJavaTest { } //#print-result } + + //#pipe-to-usage + public class ActorUsingPipeTo extends AbstractActor { + ActorRef target; + Timeout timeout; + + ActorUsingPipeTo(ActorRef target) { + this.target = target; + this.timeout = Timeout.create(Duration.ofSeconds(5)); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(String.class, msg -> { + CompletableFuture fut = + ask(target, "some message", timeout).toCompletableFuture(); + + // the pipe pattern + pipe(fut, getContext().dispatcher()).to(getSender()); + }) + .build(); + } + } + //#pipe-to-usage + + //#pipe-to-returned-data + public class UserData { + final String data; + + UserData(String data){ + this.data = data; + } + } + + public class UserActivity { + final String activity; + UserActivity(String activity){ + this.activity = activity; + } + } + //#pipe-to-returned-data + + //#pipe-to-user-data-actor + public class UserDataActor extends AbstractActor { + UserData internalData; + + UserDataActor(){ + this.internalData = new UserData("initial data"); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetFromUserDataActor.class, msg -> sender().tell(internalData, self())) + .build(); + } + } + + public class GetFromUserDataActor {} + //#pipe-to-user-data-actor + + //#pipe-to-user-activity-actor + interface UserActivityRepository { + CompletableFuture> queryHistoricalActivities(String userId); + } + + public class UserActivityActor extends AbstractActor { + String userId; + UserActivityRepository repository; + + UserActivityActor(String userId, UserActivityRepository repository) { + this.userId = userId; + this.repository = repository; + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetFromUserActivityActor.class, msg -> { + CompletableFuture> fut = + repository.queryHistoricalActivities(userId); + + pipe(fut, getContext().dispatcher()).to(sender()); + }) + .build(); + } + } + + public class GetFromUserActivityActor {} + //#pipe-to-user-activity-actor + + //#pipe-to-proxy-actor + public class UserProxyActor extends AbstractActor { + ActorRef userActor; + ActorRef userActivityActor; + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + + UserProxyActor(ActorRef userActor, ActorRef userActivityActor) { + this.userActor = userActor; + this.userActivityActor = userActivityActor; + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(GetUserData.class, msg -> { + CompletableFuture fut = + ask(userActor, new GetUserData(), timeout).toCompletableFuture(); + + pipe(fut, getContext().dispatcher()); + }) + .match(GetUserActivities.class, msg -> { + CompletableFuture fut = + ask(userActivityActor, new GetFromUserActivityActor(), timeout).toCompletableFuture(); + + pipe(fut, getContext().dispatcher()).to(sender()); + }) + .build(); + } + } + //#pipe-to-proxy-actor + + //#pipe-to-proxy-messages + public class GetUserData {} + public class GetUserActivities {} + //#pipe-to-proxy-messages + @SuppressWarnings("unchecked") @Test public void useCustomExecutionContext() throws Exception { ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor(); //#diy-execution-context @@ -131,13 +267,11 @@ public class FutureDocTest extends AbstractJavaTest { ActorRef actor = system.actorOf(Props.create(MyActor.class)); String msg = "hello"; //#ask-blocking - Timeout timeout = new Timeout(Duration.create(5, "seconds")); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking - //#pipe-to - akka.pattern.Patterns.pipe(future, system.dispatcher()).to(actor); - //#pipe-to + assertEquals("HELLO", result); } @@ -152,7 +286,8 @@ public class FutureDocTest extends AbstractJavaTest { f.onSuccess(new PrintResult(), system.dispatcher()); //#future-eval - String result = (String) Await.result(f, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + String result = (String) Await.result(f, timeout.duration()); assertEquals("HelloWorld", result); } @@ -175,7 +310,8 @@ public class FutureDocTest extends AbstractJavaTest { f2.onSuccess(new PrintResult(), system.dispatcher()); //#map - int result = Await.result(f2, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + int result = Await.result(f2, timeout.duration()); assertEquals(10, result); } @@ -202,7 +338,8 @@ public class FutureDocTest extends AbstractJavaTest { f2.onSuccess(new PrintResult(), system.dispatcher()); //#flat-map - int result = Await.result(f2, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + int result = Await.result(f2, timeout.duration()); assertEquals(10, result); } @@ -233,7 +370,8 @@ public class FutureDocTest extends AbstractJavaTest { futureSum.onSuccess(new PrintResult(), system.dispatcher()); //#sequence - long result = Await.result(futureSum, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + long result = Await.result(futureSum, timeout.duration()); assertEquals(3L, result); } @@ -258,7 +396,8 @@ public class FutureDocTest extends AbstractJavaTest { //Returns the sequence of strings as upper case futureResult.onSuccess(new PrintResult>(), system.dispatcher()); //#traverse - Iterable result = Await.result(futureResult, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + Iterable result = Await.result(futureResult, timeout.duration()); assertEquals(Arrays.asList("A", "B", "C"), result); } @@ -284,7 +423,8 @@ public class FutureDocTest extends AbstractJavaTest { resultFuture.onSuccess(new PrintResult(), system.dispatcher()); //#fold - String result = Await.result(resultFuture, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + String result = Await.result(resultFuture, timeout.duration()); assertEquals("ab", result); } @@ -308,7 +448,8 @@ public class FutureDocTest extends AbstractJavaTest { resultFuture.onSuccess(new PrintResult(), system.dispatcher()); //#reduce - Object result = Await.result(resultFuture, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + Object result = Await.result(resultFuture, timeout.duration()); assertEquals("ab", result); } @@ -328,12 +469,13 @@ public class FutureDocTest extends AbstractJavaTest { Future theFuture = promise.future(); promise.success("hello"); //#promise - Object result = Await.result(future, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + Object result = Await.result(future, timeout.duration()); assertEquals("Yay!", result); Throwable result2 = Await.result(otherFuture.failed(), - Duration.create(5, SECONDS)); + timeout.duration()); assertEquals("Bang!", result2.getMessage()); - String out = Await.result(theFuture, Duration.create(5, SECONDS)); + String out = Await.result(theFuture, timeout.duration()); assertEquals("hello", out); } @@ -406,7 +548,8 @@ public class FutureDocTest extends AbstractJavaTest { future.onSuccess(new PrintResult(), system.dispatcher()); //#recover - int result = Await.result(future, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + int result = Await.result(future, timeout.duration()); assertEquals(result, 0); } @@ -434,7 +577,8 @@ public class FutureDocTest extends AbstractJavaTest { future.onSuccess(new PrintResult(), system.dispatcher()); //#try-recover - int result = Await.result(future, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + int result = Await.result(future, timeout.duration()); assertEquals(result, 0); } @@ -507,7 +651,8 @@ public class FutureDocTest extends AbstractJavaTest { future3.onSuccess(new PrintResult(), system.dispatcher()); //#zip - String result = Await.result(future3, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + String result = Await.result(future3, timeout.duration()); assertEquals("foo bar", result); } @@ -520,7 +665,8 @@ public class FutureDocTest extends AbstractJavaTest { Future future4 = future1.fallbackTo(future2).fallbackTo(future3); future4.onSuccess(new PrintResult(), system.dispatcher()); //#fallback-to - String result = Await.result(future4, Duration.create(5, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(5)); + String result = Await.result(future4, timeout.duration()); assertEquals("bar", result); } @@ -532,7 +678,8 @@ public class FutureDocTest extends AbstractJavaTest { //#after final ExecutionContext ec = system.dispatcher(); Future failExc = Futures.failed(new IllegalStateException("OHNOES1")); - Future delayed = Patterns.after(Duration.create(200, "millis"), + Timeout delay = Timeout.create(Duration.ofMillis(200)); + Future delayed = Patterns.after(delay.duration(), system.scheduler(), ec, failExc); Future future = future(new Callable() { public String call() throws InterruptedException { @@ -543,7 +690,8 @@ public class FutureDocTest extends AbstractJavaTest { Future result = Futures.firstCompletedOf( Arrays.>asList(future, delayed), ec); //#after - Await.result(result, Duration.create(2, SECONDS)); + Timeout timeout = Timeout.create(Duration.ofSeconds(2)); + Await.result(result, timeout.duration()); } @Test diff --git a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala index 8c962346a5..82d5891909 100644 --- a/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala +++ b/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala @@ -7,8 +7,7 @@ package docs.future import language.postfixOps import akka.testkit._ -import akka.actor.{ Actor, Props } -import akka.actor.Status +import akka.actor.{ Actor, ActorRef, Props, Status } import akka.util.Timeout import scala.concurrent.duration._ import java.lang.IllegalStateException @@ -35,6 +34,97 @@ object FutureDocSpec { n += 2 } } + + //#pipe-to-usage + class ActorUsingPipeTo(target: ActorRef) extends Actor { + // akka.pattern.pipe needs to be imported + import akka.pattern.{ ask, pipe } + // implicit ExecutionContext should be in scope + implicit val ec: ExecutionContext = context.dispatcher + implicit val timeout: Timeout = 5.seconds + + def receive = { + case _ ⇒ + val future = target ? "some message" + future pipeTo sender() // use the pipe pattern + } + } + //#pipe-to-usage + + //#pipe-to-returned-data + case class UserData(data: String) + case class UserActivity(activity: String) + //#pipe-to-returned-data + + //#pipe-to-user-data-actor + class UserDataActor extends Actor { + import UserDataActor._ + + //holds the user data internally + var internalData: UserData = UserData("initial data") + + def receive = { + case Get ⇒ + sender() ! internalData + } + } + + object UserDataActor { + case object Get + } + //#pipe-to-user-data-actor + + //#pipe-to-user-activity-actor + trait UserActivityRepository { + def queryHistoricalActivities(userId: String): Future[List[UserActivity]] + } + + class UserActivityActor(val userId: String, repository: UserActivityRepository) extends Actor { + import akka.pattern.pipe + import UserActivityActor._ + implicit val ec: ExecutionContext = context.dispatcher + + def receive = { + case Get ⇒ + // user's historical activities are retrieved + // via the separate repository + repository.queryHistoricalActivities(userId) pipeTo sender() + } + } + + object UserActivityActor { + case object Get + } + //#pipe-to-user-activity-actor + + //#pipe-to-proxy-actor + class UserProxyActor( + userData: ActorRef, + userActivities: ActorRef + ) extends Actor { + import UserProxyActor._ + import akka.pattern.{ ask, pipe } + implicit val ec: ExecutionContext = context.dispatcher + + implicit val timeout = Timeout(5 seconds) + + def receive = { + case GetUserData ⇒ + (userData ? UserDataActor.Get) pipeTo sender() + case GetUserActivities ⇒ + (userActivities ? UserActivityActor.Get) pipeTo sender() + } + } + //#pipe-to-proxy-actor + + //#pipe-to-proxy-messages + object UserProxyActor { + sealed trait Message + case object GetUserData extends Message + case object GetUserActivities extends Message + } + //#pipe-to-proxy-messages + } class FutureDocSpec extends AkkaSpec { @@ -73,11 +163,6 @@ class FutureDocSpec extends AkkaSpec { val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking - //#pipe-to - import akka.pattern.pipe - future pipeTo actor - //#pipe-to - result should be("HELLO") }