diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c1f25b6d4f..3763bcb17b 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -67,6 +67,7 @@ object RemoteNode extends RemoteServer * @author Jonas Bonér */ object RemoteServer { + val UUID_PREFIX = "uuid:" val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) @@ -123,18 +124,20 @@ object RemoteServer { private class RemoteActorSet { private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard private val remoteActorSets = Map[Address, RemoteActorSet]() private val remoteServers = Map[Address, RemoteServer]() - private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { + private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } @@ -192,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer * @author Jonas Bonér */ class RemoteServer extends Logging with ListenerManagement { + import RemoteServer._ def name = "RemoteServer@" + hostname + ":" + port private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) @@ -283,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement { * @param typedActor typed actor to register */ def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { - val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors - if (!typedActors.contains(id)) { - log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) - typedActors.put(id, typedActor) + log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid()) + } else { + registerTypedActor(id, typedActor, typedActors()) } } @@ -301,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement { * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ def register(id: String, actorRef: ActorRef): Unit = synchronized { + log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid()) + } else { + register(id, actorRef, actors()) + } + } + + private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) { if (_isRunning) { - val actorMap = actors() - if (!actorMap.contains(id)) { + if (!registry.contains(id)) { if (!actorRef.isRunning) actorRef.start - log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) - actorMap.put(id, actorRef) + registry.put(id, actorRef) + } + } + } + + private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) { + if (_isRunning) { + if (!registry.contains(id)) { + registry.put(id, typedActor) } } } @@ -319,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement { log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) val actorMap = actors() actorMap remove actorRef.id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid } } @@ -331,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote actor with id [%s]", id) - val actorMap = actors() - val actorRef = actorMap get id - actorMap remove id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (id.startsWith(UUID_PREFIX)) { + actorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + if (actorRef.registeredInRemoteNodeDuringSerialization) { + val actorRef = actors().get(id) + actorsByUuid() remove actorRef.uuid + } + actors() remove id + } } } @@ -346,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement { def unregisterTypedActor(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote typed actor with id [%s]", id) - val registeredTypedActors = typedActors() - registeredTypedActors.remove(id) + if (id.startsWith(UUID_PREFIX)) { + typedActorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + typedActors().remove(id) + } } } @@ -356,7 +384,9 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid } object RemoteServerSslContext { @@ -419,6 +449,7 @@ class RemoteServerHandler( val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { + import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -565,32 +596,23 @@ class RemoteServerHandler( } } - /** - * Find a registered actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { - val registeredActors = server.actors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorById(id: String) : ActorRef = { + server.actors().get(id) } - /** - * Find a registered typed actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = { - val registeredActors = server.typedActors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorByUuid(uuid: String) : ActorRef = { + server.actorsByUuid().get(uuid) } + private def findTypedActorById(id: String) : AnyRef = { + server.typedActors().get(id) + } + + private def findTypedActorByUuid(uuid: String) : AnyRef = { + server.typedActorsByUuid().get(uuid) + } + + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -605,8 +627,12 @@ class RemoteServerHandler( val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val actorRefOrNull = findActorByIdOrUuid(id, uuid) - + val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) { + findActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findActorById(id) + } + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) @@ -632,7 +658,11 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) + val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) { + findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findTypedActorById(id) + } if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 1577b62e13..43e52a92c8 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -249,7 +249,7 @@ object RemoteActorSerialization { if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActor(homeAddress, uuid, ar) + RemoteServer.registerActorByUuid(homeAddress, uuid, ar) registeredInRemoteNodeDuringSerialization = true } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 59f122c656..fbf723ece5 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } } - @Test def shouldSendWithBang { val actor = RemoteClient.actorFor( @@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { assert(actor2.id == actor3.id) } + @Test + def shouldFindActorByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + val actor2 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + server.register("my-service", actor2) + + val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT) + val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + + ref1 ! "OneWay" + assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + ref1.stop + ref2 ! "OneWay" + ref2.stop + + } + + @Test + def shouldRegisterAndUnregister { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("my-service-1", actor1) + assert(server.actors().get("my-service-1") != null, "actor registered") + server.unregister("my-service-1") + assert(server.actors().get("my-service-1") == null, "actor unregistered") + } + + @Test + def shouldRegisterAndUnregisterByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + assert(server.actorsByUuid().get(actor1.uuid) != null, "actor registered") + server.unregister("uuid:" + actor1.uuid) + assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered") + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index b800fbf2c3..f50c3e6652 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends it("should register and unregister typed actors") { val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) server.registerTypedActor("my-test-service", typedActor) - assert(server.typedActors().get("my-test-service") != null) + assert(server.typedActors().get("my-test-service") != null, "typed actor registered") server.unregisterTypedActor("my-test-service") - assert(server.typedActors().get("my-test-service") == null) + assert(server.typedActors().get("my-test-service") == null, "typed actor unregistered") + } + + it("should register and unregister typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered") + server.unregisterTypedActor(uuid) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) == null, "typed actor unregistered") + } + + it("should find typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered") + + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } } }