Remote deployed typed actors need a local proxy. See #3380
This commit is contained in:
parent
51ed174432
commit
a91a86c6f3
2 changed files with 36 additions and 9 deletions
|
|
@ -74,7 +74,8 @@ trait TypedActorFactory {
|
|||
def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T]): R = {
|
||||
val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
|
||||
val c = props.creator //Cache this to avoid closing over the Props
|
||||
val ap = props.actorProps.withCreator(new TypedActor.TypedActor[R, T](proxyVar, c()))
|
||||
val i = props.interfaces //Cache this to avoid closing over the Props
|
||||
val ap = props.actorProps.withCreator(new TypedActor.TypedActor[R, T](proxyVar, c(), i))
|
||||
typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap))
|
||||
}
|
||||
|
||||
|
|
@ -84,7 +85,8 @@ trait TypedActorFactory {
|
|||
def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], name: String): R = {
|
||||
val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
|
||||
val c = props.creator //Cache this to avoid closing over the Props
|
||||
val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c()))
|
||||
val i = props.interfaces //Cache this to avoid closing over the Props
|
||||
val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c(), i))
|
||||
typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap, name))
|
||||
}
|
||||
|
||||
|
|
@ -245,8 +247,13 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
*
|
||||
* Implementation of TypedActor as an Actor
|
||||
*/
|
||||
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor {
|
||||
val me = withContext[T](createInstance)
|
||||
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T, interfaces: immutable.Seq[Class[_]]) extends Actor {
|
||||
// if we were remote deployed we need to create a local proxy
|
||||
if (!context.parent.asInstanceOf[InternalActorRef].isLocal)
|
||||
TypedActor.get(context.system).createActorRefProxy(
|
||||
TypedProps(interfaces, createInstance), proxyVar, context.self)
|
||||
|
||||
private val me = withContext[T](createInstance)
|
||||
|
||||
override def supervisorStrategy: SupervisorStrategy = me match {
|
||||
case l: Supervisor ⇒ l.supervisorStrategy
|
||||
|
|
@ -506,6 +513,12 @@ object TypedProps {
|
|||
*/
|
||||
def apply[T <: AnyRef: ClassTag](): TypedProps[T] =
|
||||
new TypedProps[T](implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def apply[T <: AnyRef](interfaces: immutable.Seq[Class[_]], creator: ⇒ T): TypedProps[T] =
|
||||
new TypedProps[T](interfaces, () ⇒ creator)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import scala.concurrent.{ Await, Future }
|
|||
import TypedActorRemoteDeploySpec._
|
||||
import akka.actor.{ Deploy, ActorSystem, TypedProps, TypedActor }
|
||||
import scala.concurrent.duration._
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
|
||||
object TypedActorRemoteDeploySpec {
|
||||
val conf = ConfigFactory.parseString("""
|
||||
|
|
@ -18,10 +19,12 @@ object TypedActorRemoteDeploySpec {
|
|||
|
||||
trait RemoteNameService {
|
||||
def getName: Future[String]
|
||||
def getNameSelfDeref: Future[String]
|
||||
}
|
||||
|
||||
class RemoteNameServiceImpl extends RemoteNameService {
|
||||
def getName: Future[String] = Future.successful(TypedActor.context.system.name)
|
||||
def getNameSelfDeref: Future[String] = TypedActor.self[RemoteNameService].getName
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -31,19 +34,30 @@ class TypedActorRemoteDeploySpec extends AkkaSpec(conf) {
|
|||
val remoteSystem = ActorSystem(remoteName, conf)
|
||||
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
|
||||
|
||||
def verify[T](f: RemoteNameService ⇒ Future[T], expected: T) = {
|
||||
val ts = TypedActor(system)
|
||||
val echoService: RemoteNameService = ts.typedActorOf(
|
||||
TypedProps[RemoteNameServiceImpl].withDeploy(Deploy(scope = RemoteScope(remoteAddress))))
|
||||
Await.result(f(echoService), 3.seconds) must be(expected)
|
||||
val actor = ts.getActorRefFor(echoService)
|
||||
system.stop(actor)
|
||||
verifyActorTermination(actor)
|
||||
}
|
||||
|
||||
"Typed actors" must {
|
||||
|
||||
"be possible to deploy remotely and communicate with" in {
|
||||
val echoService: RemoteNameService = TypedActor(system).typedActorOf(
|
||||
TypedProps[RemoteNameServiceImpl].withDeploy(Deploy(scope = RemoteScope(remoteAddress))))
|
||||
Await.result(echoService.getName, 3.seconds) must be === remoteName
|
||||
verify({ _.getName }, remoteName)
|
||||
}
|
||||
|
||||
"be possible to deploy remotely and be able to dereference self" in {
|
||||
verify({ _.getNameSelfDeref }, remoteName)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
remoteSystem.shutdown()
|
||||
remoteSystem.awaitTermination(5.seconds)
|
||||
shutdown(remoteSystem)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue