Fixed race-condition in creation and registration of RemoteServers
This commit is contained in:
parent
a162f4705d
commit
4375f82ae5
4 changed files with 45 additions and 44 deletions
|
|
@ -571,12 +571,7 @@ trait ActorRef extends TransactionManagement {
|
|||
|
||||
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
|
||||
senderOption.foreach { sender =>
|
||||
val address = sender.homeAddress
|
||||
val server = RemoteServer.serverFor(address) match {
|
||||
case Some(server) => server
|
||||
case None => (new RemoteServer).start(address)
|
||||
}
|
||||
server.register(sender.uuid, sender)
|
||||
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
||||
requestBuilder.setSender(sender.toProtocol)
|
||||
}
|
||||
}
|
||||
|
|
@ -620,8 +615,8 @@ sealed class LocalActorRef private[akka](
|
|||
|
||||
if (!registeredInRemoteNodeDuringSerialization) {
|
||||
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
|
||||
if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
|
||||
RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
|
||||
RemoteServer.getOrCreateServer(homeAddress)
|
||||
RemoteServer.registerActor(homeAddress, uuid, this)
|
||||
registeredInRemoteNodeDuringSerialization = true
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import se.scalablesolutions.akka.remote.RemoteServer
|
|||
import Actor._
|
||||
|
||||
import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
class SupervisorException private[akka](message: String) extends RuntimeException(message)
|
||||
|
||||
|
|
@ -204,11 +205,8 @@ sealed class Supervisor private[akka] (
|
|||
childActors.put(className, actorRef :: currentActors)
|
||||
actorRef.lifeCycle = Some(lifeCycle)
|
||||
supervisor.link(actorRef)
|
||||
remoteAddress.foreach { address => RemoteServer
|
||||
.actorsFor(RemoteServer.Address(address.hostname, address.port))
|
||||
.actors.put(actorRef.id, actorRef)
|
||||
}
|
||||
|
||||
remoteAddress.foreach(address =>
|
||||
RemoteServer.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef))
|
||||
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
|
||||
val childSupervisor = Supervisor(supervisorConfig)
|
||||
supervisor.link(childSupervisor.supervisor)
|
||||
|
|
|
|||
|
|
@ -89,11 +89,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
|
|||
Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
|
||||
else None
|
||||
val proxy = ActiveObject.newInstance(targetClass, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef]
|
||||
if (remoteAddress.isDefined) {
|
||||
RemoteServer
|
||||
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
|
||||
.activeObjects.put(targetClass.getName, proxy)
|
||||
}
|
||||
remoteAddress.foreach(address => RemoteServer.registerActiveObject(address, targetClass.getName, proxy))
|
||||
supervised ::= Supervise(actorRef, component.lifeCycle)
|
||||
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
|
||||
new DependencyBinding(targetClass, proxy)
|
||||
|
|
@ -111,11 +107,7 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
|
|||
else None
|
||||
val proxy = ActiveObject.newInstance(
|
||||
targetClass, targetInstance, actorRef, remoteAddress, component.timeout).asInstanceOf[AnyRef]
|
||||
if (remoteAddress.isDefined) {
|
||||
RemoteServer
|
||||
.actorsFor(RemoteServer.Address(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
|
||||
.activeObjects.put(targetClass.getName, proxy)
|
||||
}
|
||||
remoteAddress.foreach(address => RemoteServer.registerActiveObject(address, targetClass.getName, proxy))
|
||||
supervised ::= Supervise(actorRef, component.lifeCycle)
|
||||
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
|
||||
new DependencyBinding(targetClass, proxy)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, Length
|
|||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
|
||||
/**
|
||||
* Use this object if you need a single remote server on a specific node.
|
||||
*
|
||||
|
|
@ -88,38 +90,52 @@ object RemoteServer {
|
|||
}
|
||||
}
|
||||
|
||||
class RemoteActorSet {
|
||||
val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
|
||||
private val remoteServers = new ConcurrentHashMap[Address, RemoteServer]
|
||||
private val guard = new ReadWriteGuard
|
||||
private val remoteActorSets = Map[Address, RemoteActorSet]()
|
||||
private val remoteServers = Map[Address, RemoteServer]()
|
||||
|
||||
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
val set = remoteActorSets.get(remoteServerAddress)
|
||||
if (set ne null) set
|
||||
else {
|
||||
val remoteActorSet = new RemoteActorSet
|
||||
remoteActorSets.put(remoteServerAddress, remoteActorSet)
|
||||
remoteActorSet
|
||||
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerActiveObject(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).activeObjects.put(name, activeObject)
|
||||
}
|
||||
|
||||
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
||||
serverFor(address) match {
|
||||
case Some(server) => server
|
||||
case None => (new RemoteServer).start(address)
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
|
||||
val server = remoteServers.get(Address(hostname, port))
|
||||
if (server eq null) None
|
||||
else Some(server)
|
||||
}
|
||||
|
||||
private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
|
||||
serverFor(address.getHostName, address.getPort)
|
||||
|
||||
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
|
||||
remoteServers.get(Address(hostname, port))
|
||||
}
|
||||
|
||||
private[remote] def unregister(hostname: String, port: Int) =
|
||||
private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
}
|
||||
|
||||
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
remoteActorSets.get(remoteServerAddress).getOrElse {
|
||||
val remoteActorSet = new RemoteActorSet
|
||||
remoteActorSets.put(remoteServerAddress, remoteActorSet)
|
||||
remoteActorSet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue