Fixed race-condition in creation and registration of RemoteServers
This commit is contained in:
parent
656e65cffa
commit
47b4e2b537
4 changed files with 45 additions and 44 deletions
|
|
@ -22,6 +22,8 @@ import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, Length
|
|||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
|
||||
|
||||
import scala.collection.mutable.Map
|
||||
|
||||
/**
|
||||
* Use this object if you need a single remote server on a specific node.
|
||||
*
|
||||
|
|
@ -88,38 +90,52 @@ object RemoteServer {
|
|||
}
|
||||
}
|
||||
|
||||
class RemoteActorSet {
|
||||
val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
private val remoteActorSets = new ConcurrentHashMap[Address, RemoteActorSet]
|
||||
private val remoteServers = new ConcurrentHashMap[Address, RemoteServer]
|
||||
private val guard = new ReadWriteGuard
|
||||
private val remoteActorSets = Map[Address, RemoteActorSet]()
|
||||
private val remoteServers = Map[Address, RemoteServer]()
|
||||
|
||||
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
val set = remoteActorSets.get(remoteServerAddress)
|
||||
if (set ne null) set
|
||||
else {
|
||||
val remoteActorSet = new RemoteActorSet
|
||||
remoteActorSets.put(remoteServerAddress, remoteActorSet)
|
||||
remoteActorSet
|
||||
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerActiveObject(address: InetSocketAddress, name: String, activeObject: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).activeObjects.put(name, activeObject)
|
||||
}
|
||||
|
||||
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
||||
serverFor(address) match {
|
||||
case Some(server) => server
|
||||
case None => (new RemoteServer).start(address)
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = {
|
||||
val server = remoteServers.get(Address(hostname, port))
|
||||
if (server eq null) None
|
||||
else Some(server)
|
||||
}
|
||||
|
||||
private[akka] def serverFor(address: InetSocketAddress): Option[RemoteServer] =
|
||||
serverFor(address.getHostName, address.getPort)
|
||||
|
||||
private[remote] def register(hostname: String, port: Int, server: RemoteServer) =
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
private[akka] def serverFor(hostname: String, port: Int): Option[RemoteServer] = guard.withReadGuard {
|
||||
remoteServers.get(Address(hostname, port))
|
||||
}
|
||||
|
||||
private[remote] def unregister(hostname: String, port: Int) =
|
||||
private[akka] def register(hostname: String, port: Int, server: RemoteServer) = guard.withWriteGuard {
|
||||
remoteServers.put(Address(hostname, port), server)
|
||||
}
|
||||
|
||||
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
|
||||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
remoteActorSets.get(remoteServerAddress).getOrElse {
|
||||
val remoteActorSet = new RemoteActorSet
|
||||
remoteActorSets.put(remoteServerAddress, remoteActorSet)
|
||||
remoteActorSet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue