added possibility to register and find remote actors by uuid

This commit is contained in:
Michael Kober 2010-09-20 12:33:30 +02:00
parent e90d5b1b69
commit 60dd1b9eea
4 changed files with 137 additions and 47 deletions

View file

@ -67,6 +67,7 @@ object RemoteNode extends RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object RemoteServer { object RemoteServer {
val UUID_PREFIX = "uuid:"
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999) val PORT = config.getInt("akka.remote.server.port", 9999)
@ -123,18 +124,20 @@ object RemoteServer {
private class RemoteActorSet { private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
} }
private val guard = new ReadWriteGuard private val guard = new ReadWriteGuard
private val remoteActorSets = Map[Address, RemoteActorSet]() private val remoteActorSets = Map[Address, RemoteActorSet]()
private val remoteServers = Map[Address, RemoteServer]() private val remoteServers = Map[Address, RemoteServer]()
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor)
} }
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
} }
@ -192,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteServer extends Logging with ListenerManagement { class RemoteServer extends Logging with ListenerManagement {
import RemoteServer._
def name = "RemoteServer@" + hostname + ":" + port def name = "RemoteServer@" + hostname + ":" + port
private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
@ -283,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement {
* @param typedActor typed actor to register * @param typedActor typed actor to register
*/ */
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
if (!typedActors.contains(id)) { if (id.startsWith(UUID_PREFIX)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
typedActors.put(id, typedActor) } else {
registerTypedActor(id, typedActor, typedActors())
} }
} }
@ -301,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement {
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/ */
def register(id: String, actorRef: ActorRef): Unit = synchronized { def register(id: String, actorRef: ActorRef): Unit = synchronized {
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) {
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
} else {
register(id, actorRef, actors())
}
}
private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) {
if (_isRunning) { if (_isRunning) {
val actorMap = actors() if (!registry.contains(id)) {
if (!actorMap.contains(id)) {
if (!actorRef.isRunning) actorRef.start if (!actorRef.isRunning) actorRef.start
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) registry.put(id, actorRef)
actorMap.put(id, actorRef) }
}
}
private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) {
if (_isRunning) {
if (!registry.contains(id)) {
registry.put(id, typedActor)
} }
} }
} }
@ -319,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement {
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
val actorMap = actors() val actorMap = actors()
actorMap remove actorRef.id actorMap remove actorRef.id
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid
} }
} }
@ -331,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement {
def unregister(id: String):Unit = synchronized { def unregister(id: String):Unit = synchronized {
if (_isRunning) { if (_isRunning) {
log.info("Unregistering server side remote actor with id [%s]", id) log.info("Unregistering server side remote actor with id [%s]", id)
val actorMap = actors() if (id.startsWith(UUID_PREFIX)) {
val actorRef = actorMap get id actorsByUuid().remove(id.substring(UUID_PREFIX.length))
actorMap remove id } else {
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid if (actorRef.registeredInRemoteNodeDuringSerialization) {
val actorRef = actors().get(id)
actorsByUuid() remove actorRef.uuid
}
actors() remove id
}
} }
} }
@ -346,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement {
def unregisterTypedActor(id: String):Unit = synchronized { def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) { if (_isRunning) {
log.info("Unregistering server side remote typed actor with id [%s]", id) log.info("Unregistering server side remote typed actor with id [%s]", id)
val registeredTypedActors = typedActors() if (id.startsWith(UUID_PREFIX)) {
registeredTypedActors.remove(id) typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
} else {
typedActors().remove(id)
}
} }
} }
@ -356,7 +384,9 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() = RemoteServer.actorsFor(address).actors private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid
} }
object RemoteServerSslContext { object RemoteServerSslContext {
@ -419,6 +449,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup, val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader], val applicationLoader: Option[ClassLoader],
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServer._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) applicationLoader.foreach(MessageSerializer.setClassLoader(_))
@ -565,32 +596,23 @@ class RemoteServerHandler(
} }
} }
/** private def findActorById(id: String) : ActorRef = {
* Find a registered actor by ID (default) or UUID. server.actors().get(id)
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
val registeredActors = server.actors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
} }
/** private def findActorByUuid(uuid: String) : ActorRef = {
* Find a registered typed actor by ID (default) or UUID. server.actorsByUuid().get(uuid)
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
val registeredActors = server.typedActors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
} }
private def findTypedActorById(id: String) : AnyRef = {
server.typedActors().get(id)
}
private def findTypedActorByUuid(uuid: String) : AnyRef = {
server.typedActorsByUuid().get(uuid)
}
/** /**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments. * Creates a new instance of the actor with name, uuid and timeout specified as arguments.
* *
@ -605,8 +627,12 @@ class RemoteServerHandler(
val name = actorInfo.getTarget val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout val timeout = actorInfo.getTimeout
val actorRefOrNull = findActorByIdOrUuid(id, uuid) val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
findActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findActorById(id)
}
if (actorRefOrNull eq null) { if (actorRefOrNull eq null) {
try { try {
log.info("Creating a new remote actor [%s:%s]", name, uuid) log.info("Creating a new remote actor [%s:%s]", name, uuid)
@ -632,7 +658,11 @@ class RemoteServerHandler(
val uuid = actorInfo.getUuid val uuid = actorInfo.getUuid
val id = actorInfo.getId val id = actorInfo.getId
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) {
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findTypedActorById(id)
}
if (typedActorOrNull eq null) { if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo val typedActorInfo = actorInfo.getTypedActorInfo

View file

@ -249,7 +249,7 @@ object RemoteActorSerialization {
if (!registeredInRemoteNodeDuringSerialization) { if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
RemoteServer.getOrCreateServer(homeAddress) RemoteServer.getOrCreateServer(homeAddress)
RemoteServer.registerActor(homeAddress, uuid, ar) RemoteServer.registerActorByUuid(homeAddress, uuid, ar)
registeredInRemoteNodeDuringSerialization = true registeredInRemoteNodeDuringSerialization = true
} }

View file

@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
} }
} }
@Test @Test
def shouldSendWithBang { def shouldSendWithBang {
val actor = RemoteClient.actorFor( val actor = RemoteClient.actorFor(
@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
assert(actor2.id == actor3.id) assert(actor2.id == actor3.id)
} }
@Test
def shouldFindActorByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
server.register("my-service", actor2)
val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT)
val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
ref1 ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
ref1.stop
ref2 ! "OneWay"
ref2.stop
}
@Test
def shouldRegisterAndUnregister {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("my-service-1", actor1)
assert(server.actors().get("my-service-1") != null, "actor registered")
server.unregister("my-service-1")
assert(server.actors().get("my-service-1") == null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
assert(server.actorsByUuid().get(actor1.uuid) != null, "actor registered")
server.unregister("uuid:" + actor1.uuid)
assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered")
}
} }

View file

@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends
it("should register and unregister typed actors") { it("should register and unregister typed actors") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("my-test-service", typedActor) server.registerTypedActor("my-test-service", typedActor)
assert(server.typedActors().get("my-test-service") != null) assert(server.typedActors().get("my-test-service") != null, "typed actor registered")
server.unregisterTypedActor("my-test-service") server.unregisterTypedActor("my-test-service")
assert(server.typedActors().get("my-test-service") == null) assert(server.typedActors().get("my-test-service") == null, "typed actor unregistered")
}
it("should register and unregister typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered")
server.unregisterTypedActor(uuid)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) == null, "typed actor unregistered")
}
it("should find typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered")
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
} }
} }
} }