Add the pipe pattern section (#23948)

This commit is contained in:
Richard Imaoka 2018-07-28 23:52:51 +09:00
parent 5b3b191bac
commit 52864ffb28
5 changed files with 284 additions and 16 deletions

View file

@ -91,15 +91,78 @@ When using non-blocking it is better to use the `mapTo` method to safely try to
The `mapTo` method will return a new `Future` that contains the result if the cast was successful,
or a `ClassCastException` if not. Handling `Exception`s will be discussed further within this documentation.
@@@
@@@
To send the result of a `Future` to an `Actor`, you can use the `pipe` construct:
## Use the pipe pattern
scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to }
Another useful message-transfer pattern is "pipe", which is to send the result of `Future` to another actor, upon completion of the `Future`.
The pipe pattern can be used by importing @java[`akka.pattern.PatternsCS.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.
java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to }
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-usage }
To see how this works in more detail, let's introduce a small example consisting of three different actors,
`UserProxyActor`, `UserDataActor` and `UserActivityActor`.
In this example, when you need information about a user, you send a request message to `UserProxyActor`,
then it gets the corresponding result from the appropriate backend actor based on the request message type.
![futures-pipeto1](images/futures-pipeto1.png)
![futures-pipeto2](images/futures-pipeto2.png)
The message types you send to `UserProxyActor` are `GetUserData` and `GetUserActivities`:
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-messages }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-proxy-messages }
and `UserData` and @scala[`List[UserActivity]`]@java[`ArrayList<UserActivity>`] are returned to the original sender in the end.
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-returned-data }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-returned-data }
The backend `UserDataActor` and `UserActivityActor` are defined as follows:
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-data-actor }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #pipe-to-user-data-actor }
`UserDataActor` holds the data in memory, so that it can return the current state of the user data quickly upon a request.
On the other hand, `UserActivityActor` queries into a `repository` to retrieve historical user activities then
sends the result to the `sender()` which is `UserProxy` in this case, with the pipe pattern.
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-user-activity-actor }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-pipe #pipe-to-user-activity-actor }
Since it needs to talk to the separate `repository`, it takes time to retrieve the list of `UserActivity`,
hence the return type of `queryHistoricalActivities` is @scala[`Future`]@java[`CompletableFuture`].
To send back the result to the `sender()` we used the @scala[`pipeTo`]@java[`pipe`] method,
so that the result of the @scala[`Future`]@java[`CompletableFuture`] is sent to `sender()` upon @scala[`Future`]@java[`CompletableFuture`]'s completion.
Finally, the definition of `UserProxyActor` is as below.
Scala
: @@snip [FutureDocSpec.scala]($code$/scala/docs/future/FutureDocSpec.scala) { #pipe-to-proxy-actor }
Java
: @@snip [FutureDocTest.java]($code$/java/jdocs/future/FutureDocTest.java) { #imports-ask #imports-pipe #pipe-to-proxy-actor }
Note that the @scala[`pipeTo`]@java[`pipe`] method used with the @scala[`?`]@java[`ask`] method.
Using @scala[`pipeTo`]@java[`pipe`] with the @scala[`?`]@java[`ask`] method is a common practice when you want to relay a message from one actor to another.
## Use Directly

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View file

@ -68,6 +68,14 @@ import static akka.pattern.PatternsCS.retry;
//#imports8
//#imports-ask
import static akka.pattern.PatternsCS.ask;
//#imports-ask
//#imports-pipe
import static akka.pattern.PatternsCS.pipe;
//#imports-pipe
import java.util.ArrayList;
import java.util.List;
@ -112,6 +120,122 @@ public class FutureDocTest extends AbstractJavaTest {
}
//#print-result
}
//#pipe-to-usage
public class ActorUsingPipeTo extends AbstractActor {
ActorRef target;
Timeout timeout;
ActorUsingPipeTo(ActorRef target) {
this.target = target;
this.timeout = new Timeout(Duration.create(5, "seconds"));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
CompletableFuture<Object> fut =
ask(target, "some message", timeout).toCompletableFuture();
// the pipe pattern
pipe(fut, getContext().dispatcher()).to(sender());
})
.build();
}
}
//#pipe-to-usage
//#pipe-to-returned-data
public class UserData { final String data; UserData(String data){ this.data = data; } }
public class UserActivity { final String activity; UserActivity(String activity){ this.activity = activity; } }
//#pipe-to-returned-data
//#pipe-to-user-data-actor
public class UserDataActor extends AbstractActor {
UserData internalData;
UserDataActor(){
this.internalData = new UserData("initial data");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(GetFromUserDataActor.class, msg -> sender().tell(internalData, self()))
.build();
}
}
public class GetFromUserDataActor {}
//#pipe-to-user-data-actor
//#pipe-to-user-activity-actor
interface UserActivityRepository {
CompletableFuture<ArrayList<UserActivity>> queryHistoricalActivities(String userId);
}
public class UserActivityActor extends AbstractActor {
String userId;
UserActivityRepository repository;
UserActivityActor(String userId, UserActivityRepository repository) {
this.userId = userId;
this.repository = repository;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(GetFromUserActivityActor.class, msg -> {
CompletableFuture<ArrayList<UserActivity>> fut =
repository.queryHistoricalActivities(userId);
pipe(fut, getContext().dispatcher()).to(sender());
})
.build();
}
}
public class GetFromUserActivityActor {}
//#pipe-to-user-activity-actor
//#pipe-to-proxy-actor
public class UserProxyActor extends AbstractActor {
ActorRef userActor;
ActorRef userActivityActor;
Timeout timeout = new Timeout(Duration.create(5, "seconds"));
UserProxyActor(ActorRef userActor, ActorRef userActivityActor) {
this.userActor = userActor;
this.userActivityActor = userActivityActor;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(GetUserData.class, msg -> {
CompletableFuture<Object> fut =
ask(userActor, new GetUserData(), timeout).toCompletableFuture();
pipe(fut, getContext().dispatcher());
})
.match(GetUserActivities.class, msg -> {
CompletableFuture<Object> fut =
ask(userActivityActor, new GetFromUserActivityActor(), timeout).toCompletableFuture();
pipe(fut, getContext().dispatcher()).to(sender());
})
.build();
}
}
//#pipe-to-proxy-actor
//#pipe-to-proxy-messages
public class GetUserData {}
public class GetUserActivities {}
//#pipe-to-proxy-messages
@SuppressWarnings("unchecked") @Test public void useCustomExecutionContext() throws Exception {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
@ -135,9 +259,7 @@ public class FutureDocTest extends AbstractJavaTest {
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
//#pipe-to
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(actor);
//#pipe-to
assertEquals("HELLO", result);
}

View file

@ -7,8 +7,7 @@ package docs.future
import language.postfixOps
import akka.testkit._
import akka.actor.{ Actor, Props }
import akka.actor.Status
import akka.actor.{ Actor, ActorRef, Props, Status }
import akka.util.Timeout
import scala.concurrent.duration._
import java.lang.IllegalStateException
@ -35,6 +34,95 @@ object FutureDocSpec {
n += 2
}
}
//#pipe-to-usage
class ActorUsingPipeTo(target: ActorRef) extends Actor {
// akka.pattern.pipe needs to be imported
import akka.pattern.{ask, pipe}
// implicit ExecutionContext should be in scope
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
val future = target ? "some message"
future pipeTo sender() // use the pipe pattern
}
}
//#pipe-to-usage
//#pipe-to-returned-data
case class UserData(data: String)
case class UserActivity(activity: String)
//#pipe-to-returned-data
//#pipe-to-user-data-actor
class UserDataActor extends Actor {
import UserDataActor._
//holds the user data internally
var internalData: UserData = UserData("initial data")
def receive = {
case Get
sender() ! internalData
}
}
object UserDataActor {
case object Get
}
//#pipe-to-user-data-actor
//#pipe-to-user-activity-actor
trait UserActivityRepository {
def queryHistoricalActivities(userId: String): Future[List[UserActivity]]
}
class UserActivityActor(val userId: String, repository: UserActivityRepository) extends Actor {
import akka.pattern.pipe
import UserActivityActor._
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
case Get
// user's historical activities are retrieved
// via the separate repository
repository.queryHistoricalActivities(userId) pipeTo sender()
}
}
object UserActivityActor {
case object Get
}
//#pipe-to-user-activity-actor
//#pipe-to-proxy-actor
class UserProxyActor(
userData: ActorRef,
userActivities: ActorRef
) extends Actor {
import UserProxyActor._
import akka.pattern.{ ask, pipe }
implicit val ec: ExecutionContext = context.dispatcher
implicit val timeout = Timeout(5 seconds)
def receive = {
case GetUserData
(userData ? UserDataActor.Get) pipeTo sender()
case GetUserActivities
(userActivities ? UserActivityActor.Get) pipeTo sender()
}
}
//#pipe-to-proxy-actor
//#pipe-to-proxy-messages
object UserProxyActor {
sealed trait Message
case object GetUserData extends Message
case object GetUserActivities extends Message
}
//#pipe-to-proxy-messages
}
class FutureDocSpec extends AkkaSpec {
@ -73,11 +161,6 @@ class FutureDocSpec extends AkkaSpec {
val result = Await.result(future, timeout.duration).asInstanceOf[String]
//#ask-blocking
//#pipe-to
import akka.pattern.pipe
future pipeTo actor
//#pipe-to
result should be("HELLO")
}