From 6a93610bdaf55a8bf8819419a3082db2b634e24e Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 4 Feb 2011 15:14:49 +1300 Subject: [PATCH] Fix for local actor ref home address - home address is set on deserialization - flag for client-managed actor ref so that having a home address doesn't imply client-managed --- .../src/main/scala/akka/actor/ActorRef.scala | 9 +++++---- .../akka/remote/netty/NettyRemoteSupport.scala | 4 ++-- .../serialization/SerializationProtocol.scala | 7 ++++++- .../TypedActorSerializationSpec.scala | 16 +--------------- .../src/main/scala/akka/actor/TypedActor.scala | 10 +++++++++- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 83b78c8645..d1092a7d0a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -551,7 +551,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ class LocalActorRef private[akka] ( private[this] val actorFactory: () => Actor, - val homeAddress: Option[InetSocketAddress]) + val homeAddress: Option[InetSocketAddress], + val clientManaged: Boolean = false) extends ActorRef with ScalaActorRef { @volatile @@ -598,7 +599,7 @@ class LocalActorRef private[akka] ( /** * Returns whether this actor ref is client-managed remote or not */ - private[akka] final def isClientManaged_? = homeAddress.isDefined && isRemotingEnabled + private[akka] final def isClientManaged_? = clientManaged && homeAddress.isDefined && isRemotingEnabled // ========= PUBLIC FUNCTIONS ========= @@ -644,7 +645,7 @@ class LocalActorRef private[akka] ( initializeActorInstance if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid) + Actor.remote.registerClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) checkReceiveTimeout //Schedule the initial Receive timeout } @@ -664,7 +665,7 @@ class LocalActorRef private[akka] ( Actor.registry.unregister(this) if (isRemotingEnabled) { if (isClientManaged_?) - Actor.remote.registerClientManagedActor(homeAddress.get.getHostName,homeAddress.get.getPort, uuid) + Actor.remote.unregisterClientManagedActor(homeAddress.get.getHostName, homeAddress.get.getPort, uuid) Actor.remote.unregister(this) } setActorSelfFields(actorInstance.get,null) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f35fbdd5f6..1f05ea7452 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -527,7 +527,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with return new LocalActorRef(factory, None) // Code is much simpler with return } - val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port))) + val ref = new LocalActorRef(factory, Some(new InetSocketAddress(host, port)), clientManaged = true) //ref.timeout = timeout //removed because setting default timeout should be done after construction ref } @@ -1241,4 +1241,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") } } -} \ No newline at end of file +} diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 3494bed0b0..e3491fc12f 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -206,6 +206,11 @@ object ActorSerialization { else actorClass.newInstance.asInstanceOf[Actor] } + val homeAddress = { + val address = protocol.getOriginalAddress + Some(new InetSocketAddress(address.getHostname, address.getPort)) + } + val ar = new LocalActorRef( uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow), protocol.getId, @@ -215,7 +220,7 @@ object ActorSerialization { supervisor, hotswap, factory, - None) //TODO: shouldn't originalAddress be optional? + homeAddress) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage)) diff --git a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala index 78ba95d60a..d3b8e1f991 100644 --- a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala +++ b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala @@ -14,20 +14,6 @@ import akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnid import akka.actor.remote.AkkaRemoteTest class TypedActorSerializationSpec extends AkkaRemoteTest { - var typedActor: MyTypedActor = null - - override def beforeAll = { - super.beforeAll - typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) - remote.registerTypedActor("typed-actor-service", typedActor) - } - - // make sure the servers shutdown cleanly after the test has finished - override def afterAll = { - TypedActor.stop(typedActor) - super.afterAll - } - object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl] class MyTypedActorFormat extends Format[MyTypedActorImpl] { @@ -86,7 +72,7 @@ class TypedActorSerializationSpec extends AkkaRemoteTest { typedActor2.requestReply("hello") must equal("world 3 3") } - "should be able to serialize a local yped actor ref to a remote typed actor ref proxy" in { + "should be able to serialize a local typed actor ref to a remote typed actor ref proxy" in { val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) typedActor1.requestReply("hello") must equal("world") typedActor1.requestReply("hello") must equal("world") diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 874342e5dc..9a19dd66a3 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -575,8 +575,16 @@ object TypedActor extends Logging { if (config._id.isDefined) actorRef.id = config._id.get actorRef.timeout = config.timeout + //log.slf4j.debug("config._host for {} is {} but homeAddress is {} and on ref {}",Array[AnyRef](intfClass, config._host, typedActor.context.homeAddress,actorRef.homeAddress)) - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.homeAddress, actorRef.timeout)) + + val remoteAddress = actorRef match { + case remote: RemoteActorRef => remote.homeAddress + case local: LocalActorRef if local.clientManaged => local.homeAddress + case _ => None + } + + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, remoteAddress, actorRef.timeout)) actorRef.start proxy.asInstanceOf[T] }