From 4375f82ae5000a805a08b4fe125e6e28e3ea7fbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 20 May 2010 19:14:31 +0200 Subject: [PATCH] Fixed race-condition in creation and registration of RemoteServers --- akka-core/src/main/scala/actor/ActorRef.scala | 11 +--- .../src/main/scala/actor/Supervisor.scala | 8 +-- .../ActiveObjectGuiceConfigurator.scala | 12 +--- .../src/main/scala/remote/RemoteServer.scala | 58 ++++++++++++------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 4c92b33df6..13d0d2959c 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -571,12 +571,7 @@ trait ActorRef extends TransactionManagement { protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = { senderOption.foreach { sender => - val address = sender.homeAddress - val server = RemoteServer.serverFor(address) match { - case Some(server) => server - case None => (new RemoteServer).start(address) - } - server.register(sender.uuid, sender) + RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) requestBuilder.setSender(sender.toProtocol) } } @@ -620,8 +615,8 @@ sealed class LocalActorRef private[akka]( if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) - if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port) - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this) + RemoteServer.getOrCreateServer(homeAddress) + RemoteServer.registerActor(homeAddress, uuid, this) registeredInRemoteNodeDuringSerialization = true } diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index 3f05f18548..8029a892ca 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -11,6 +11,7 @@ import se.scalablesolutions.akka.remote.RemoteServer import Actor._ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +import java.net.InetSocketAddress class SupervisorException private[akka](message: String) extends RuntimeException(message) @@ -204,11 +205,8 @@ sealed class Supervisor private[akka] ( childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = Some(lifeCycle) supervisor.link(actorRef) - remoteAddress.foreach { address => RemoteServer - .actorsFor(RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorRef.id, actorRef) - } - + remoteAddress.foreach(address => + RemoteServer.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 6f3a6177dc..98f1e4e4f0 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -89,11 +89,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) else None val proxy = ActiveObject.newInstance(targetClass, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef] - if (remoteAddress.isDefined) { - RemoteServer - .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) - .activeObjects.put(targetClass.getName, proxy) - } + remoteAddress.foreach(address => RemoteServer.registerActiveObject(address, targetClass.getName, proxy)) supervised ::= Supervise(actorRef, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, proxy, component)) new DependencyBinding(targetClass, proxy) @@ -111,11 +107,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat else None val proxy = ActiveObject.newInstance( targetClass, targetInstance, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef] - if (remoteAddress.isDefined) { - RemoteServer - .actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) - .activeObjects.put(targetClass.getName, proxy) - } + remoteAddress.foreach(address => RemoteServer.registerActiveObject(address, targetClass.getName, proxy)) supervised ::= Supervise(actorRef, component.lifeCycle) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) new DependencyBinding(targetClass, proxy) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 2617134d28..5658bdc672 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -22,6 +22,8 @@ import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, Length import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} +import scala.collection.mutable.Map + /** * Use this object if you need a single remote server on a specific node. * @@ -88,38 +90,52 @@ object RemoteServer { } } - class RemoteActorSet { - val actors = new ConcurrentHashMap[String, ActorRef] - val activeObjects = new ConcurrentHashMap[String, AnyRef] + private class RemoteActorSet { + private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val activeObjects = new ConcurrentHashMap[String, AnyRef] } - private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet] - private val remoteServers = new ConcurrentHashMap[Address, RemoteServer] + private val guard = new ReadWriteGuard + private val remoteActorSets = Map[Address, RemoteActorSet]() + private val remoteServers = Map[Address, RemoteServer]() - private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { - val set = remoteActorSets.get(remoteServerAddress) - if (set ne null) set - else { - val remoteActorSet = new RemoteActorSet - remoteActorSets.put(remoteServerAddress, remoteActorSet) - remoteActorSet + private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + } + + private[akka] def registerActiveObject(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).activeObjects.put(name, activeObject) + } + + private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { + serverFor(address) match { + case Some(server) => server + case None => (new RemoteServer).start(address) } } - private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = { - val server = remoteServers.get(Address(hostname, port)) - if (server eq null) None - else Some(server) - } - private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] = serverFor(address.getHostName, address.getPort) - private[remote] def register(hostname: String, port: Int, server: RemoteServer) = - remoteServers.put(Address(hostname, port), server) + private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard { + remoteServers.get(Address(hostname, port)) + } - private[remote] def unregister(hostname: String, port: Int) = + private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard { + remoteServers.put(Address(hostname, port), server) + } + + private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) + } + + private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = { + remoteActorSets.get(remoteServerAddress).getOrElse { + val remoteActorSet = new RemoteActorSet + remoteActorSets.put(remoteServerAddress, remoteActorSet) + remoteActorSet + } + } } /**