Fix for local actor ref home address
- home address is set on deserialization - flag for client-managed actor ref so that having a home address doesn't imply client-managed
This commit is contained in:
parent
ad6498f3e1
commit
6a93610bda
5 changed files with 23 additions and 23 deletions
|
|
@ -551,7 +551,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
|
|||
*/
|
||||
class LocalActorRef private[akka] (
|
||||
private[this] val actorFactory: () => Actor,
|
||||
val homeAddress: Option[InetSocketAddress])
|
||||
val homeAddress: Option[InetSocketAddress],
|
||||
val clientManaged: Boolean = false)
|
||||
extends ActorRef with ScalaActorRef {
|
||||
|
||||
@volatile
|
||||
|
|
@ -598,7 +599,7 @@ class LocalActorRef private[akka] (
|
|||
/**
|
||||
* Returns whether this actor ref is client-managed remote or not
|
||||
*/
|
||||
private[akka] final def isClientManaged_? = homeAddress.isDefined && isRemotingEnabled
|
||||
private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled
|
||||
|
||||
// ========= PUBLIC FUNCTIONS =========
|
||||
|
||||
|
|
@ -644,7 +645,7 @@ class LocalActorRef private[akka] (
|
|||
initializeActorInstance
|
||||
|
||||
if (isClientManaged_?)
|
||||
Actor.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid)
|
||||
Actor.remote.registerClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
|
||||
|
||||
checkReceiveTimeout //Schedule the initial Receive timeout
|
||||
}
|
||||
|
|
@ -664,7 +665,7 @@ class LocalActorRef private[akka] (
|
|||
Actor.registry.unregister(this)
|
||||
if (isRemotingEnabled) {
|
||||
if (isClientManaged_?)
|
||||
Actor.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid)
|
||||
Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid)
|
||||
Actor.remote.unregister(this)
|
||||
}
|
||||
setActorSelfFields(actorInstance.get,null)
|
||||
|
|
|
|||
|
|
@ -527,7 +527,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
|
|||
return new LocalActorRef(factory, None) // Code is much simpler with return
|
||||
}
|
||||
|
||||
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)))
|
||||
val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true)
|
||||
//ref.timeout = timeout //removed because setting default timeout should be done after construction
|
||||
ref
|
||||
}
|
||||
|
|
@ -1241,4 +1241,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
|
|||
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -206,6 +206,11 @@ object ActorSerialization {
|
|||
else actorClass.newInstance.asInstanceOf[Actor]
|
||||
}
|
||||
|
||||
val homeAddress = {
|
||||
val address = protocol.getOriginalAddress
|
||||
Some(new InetSocketAddress(address.getHostname, address.getPort))
|
||||
}
|
||||
|
||||
val ar = new LocalActorRef(
|
||||
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
|
||||
protocol.getId,
|
||||
|
|
@ -215,7 +220,7 @@ object ActorSerialization {
|
|||
supervisor,
|
||||
hotswap,
|
||||
factory,
|
||||
None) //TODO: shouldn't originalAddress be optional?
|
||||
homeAddress)
|
||||
|
||||
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
|
||||
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
|
||||
|
|
|
|||
|
|
@ -14,20 +14,6 @@ import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnid
|
|||
import akka.actor.remote.AkkaRemoteTest
|
||||
|
||||
class TypedActorSerializationSpec extends AkkaRemoteTest {
|
||||
var typedActor: MyTypedActor = null
|
||||
|
||||
override def beforeAll = {
|
||||
super.beforeAll
|
||||
typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
|
||||
remote.registerTypedActor("typed-actor-service", typedActor)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
override def afterAll = {
|
||||
TypedActor.stop(typedActor)
|
||||
super.afterAll
|
||||
}
|
||||
|
||||
object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl]
|
||||
|
||||
class MyTypedActorFormat extends Format[MyTypedActorImpl] {
|
||||
|
|
@ -86,7 +72,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest {
|
|||
typedActor2.requestReply("hello") must equal("world 3 3")
|
||||
}
|
||||
|
||||
"should be able to serialize a local yped actor ref to a remote typed actor ref proxy" in {
|
||||
"should be able to serialize a local typed actor ref to a remote typed actor ref proxy" in {
|
||||
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
|
||||
typedActor1.requestReply("hello") must equal("world")
|
||||
typedActor1.requestReply("hello") must equal("world")
|
||||
|
|
|
|||
|
|
@ -575,8 +575,16 @@ object TypedActor extends Logging {
|
|||
if (config._id.isDefined) actorRef.id = config._id.get
|
||||
|
||||
actorRef.timeout = config.timeout
|
||||
|
||||
//log.slf4j.debug("config._host for {} is {} but homeAddress is {} and on ref {}",Array[AnyRef](intfClass, config._host, typedActor.context.homeAddress,actorRef.homeAddress))
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout))
|
||||
|
||||
val remoteAddress = actorRef match {
|
||||
case remote: RemoteActorRef => remote.homeAddress
|
||||
case local: LocalActorRef if local.clientManaged => local.homeAddress
|
||||
case _ => None
|
||||
}
|
||||
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue