Merge pull request #25401 from richardimaoka/doc-23948-futures-pipeto-richard

Add the pipe pattern section
This commit is contained in:
Johan Andrén 2018-09-03 12:13:12 +02:00 committed by GitHub
commit 8294f6262c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 334 additions and 34 deletions

View file

@ -7,8 +7,7 @@ package docs.future
import language.postfixOps
import akka.testkit._
import akka.actor.{ Actor, Props }
import akka.actor.Status
import akka.actor.{ Actor, ActorRef, Props, Status }
import akka.util.Timeout
import scala.concurrent.duration._
import java.lang.IllegalStateException
@ -35,6 +34,97 @@ object FutureDocSpec {
n += 2
}
}
//#pipe-to-usage
class ActorUsingPipeTo(target: ActorRef) extends Actor {
// akka.pattern.pipe needs to be imported
import akka.pattern.{ ask, pipe }
// implicit ExecutionContext should be in scope
implicit val ec: ExecutionContext = context.dispatcher
implicit val timeout: Timeout = 5.seconds
def receive = {
case _
val future = target ? "some message"
future pipeTo sender() // use the pipe pattern
}
}
//#pipe-to-usage
//#pipe-to-returned-data
case class UserData(data: String)
case class UserActivity(activity: String)
//#pipe-to-returned-data
//#pipe-to-user-data-actor
class UserDataActor extends Actor {
import UserDataActor._
//holds the user data internally
var internalData: UserData = UserData("initial data")
def receive = {
case Get
sender() ! internalData
}
}
object UserDataActor {
case object Get
}
//#pipe-to-user-data-actor
//#pipe-to-user-activity-actor
trait UserActivityRepository {
def queryHistoricalActivities(userId: String): Future[List[UserActivity]]
}
class UserActivityActor(val userId: String, repository: UserActivityRepository) extends Actor {
import akka.pattern.pipe
import UserActivityActor._
implicit val ec: ExecutionContext = context.dispatcher
def receive = {
case Get
// user's historical activities are retrieved
// via the separate repository
repository.queryHistoricalActivities(userId) pipeTo sender()
}
}
object UserActivityActor {
case object Get
}
//#pipe-to-user-activity-actor
//#pipe-to-proxy-actor
class UserProxyActor(
userData: ActorRef,
userActivities: ActorRef
) extends Actor {
import UserProxyActor._
import akka.pattern.{ ask, pipe }
implicit val ec: ExecutionContext = context.dispatcher
implicit val timeout = Timeout(5 seconds)
def receive = {
case GetUserData
(userData ? UserDataActor.Get) pipeTo sender()
case GetUserActivities
(userActivities ? UserActivityActor.Get) pipeTo sender()
}
}
//#pipe-to-proxy-actor
//#pipe-to-proxy-messages
object UserProxyActor {
sealed trait Message
case object GetUserData extends Message
case object GetUserActivities extends Message
}
//#pipe-to-proxy-messages
}
class FutureDocSpec extends AkkaSpec {
@ -73,11 +163,6 @@ class FutureDocSpec extends AkkaSpec {
val result = Await.result(future, timeout.duration).asInstanceOf[String]
//#ask-blocking
//#pipe-to
import akka.pattern.pipe
future pipeTo actor
//#pipe-to
result should be("HELLO")
}