Minor code cleanup
This commit is contained in:
parent
f679dd06ab
commit
d5095be95d
2 changed files with 16 additions and 36 deletions
|
|
@ -37,8 +37,8 @@ class ReentrantGuard {
|
||||||
*/
|
*/
|
||||||
class ReadWriteGuard {
|
class ReadWriteGuard {
|
||||||
private val rwl = new ReentrantReadWriteLock
|
private val rwl = new ReentrantReadWriteLock
|
||||||
private val readLock = rwl.readLock
|
val readLock = rwl.readLock
|
||||||
private val writeLock = rwl.writeLock
|
val writeLock = rwl.writeLock
|
||||||
|
|
||||||
def withWriteGuard[T](body: => T): T = {
|
def withWriteGuard[T](body: => T): T = {
|
||||||
writeLock.lock
|
writeLock.lock
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,6 @@ import org.jboss.netty.handler.ssl.SslHandler
|
||||||
|
|
||||||
import java.net.{ SocketAddress, InetSocketAddress }
|
import java.net.{ SocketAddress, InetSocketAddress }
|
||||||
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
|
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
|
||||||
import scala.collection.mutable.{ HashSet, HashMap }
|
import scala.collection.mutable.{ HashSet, HashMap }
|
||||||
import scala.reflect.BeanProperty
|
import scala.reflect.BeanProperty
|
||||||
import java.lang.reflect.InvocationTargetException
|
import java.lang.reflect.InvocationTargetException
|
||||||
|
|
@ -52,7 +51,7 @@ trait NettyRemoteShared {
|
||||||
trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared { self: ListenerManagement with Logging =>
|
trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared { self: ListenerManagement with Logging =>
|
||||||
private val remoteClients = new HashMap[String, RemoteClient]
|
private val remoteClients = new HashMap[String, RemoteClient]
|
||||||
private val remoteActors = new Index[Address, Uuid]
|
private val remoteActors = new Index[Address, Uuid]
|
||||||
private val lock = new ReentrantReadWriteLock
|
private val lock = new ReadWriteGuard
|
||||||
|
|
||||||
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
|
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
|
||||||
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
|
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, AkkaActorType.TypedActor))
|
||||||
|
|
@ -148,43 +147,27 @@ trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared
|
||||||
case address => address.getHostName + ':' + address.getPort
|
case address => address.getHostName + ':' + address.getPort
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdownClientConnection(address: InetSocketAddress): Boolean = {
|
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
|
||||||
lock.writeLock.lock
|
remoteClients.remove(makeKey(address)) match {
|
||||||
try {
|
case Some(client) => client.shutdown
|
||||||
remoteClients.remove(makeKey(address)) match {
|
case None => false
|
||||||
case Some(client) => client.shutdown
|
|
||||||
case None => false
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.writeLock.unlock
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def restartClientConnection(address: InetSocketAddress): Boolean = {
|
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
|
||||||
lock.readLock.lock
|
remoteClients.get(makeKey(address)) match {
|
||||||
try {
|
case Some(client) => client.connect(reconnectIfAlreadyConnected = true)
|
||||||
remoteClients.get(makeKey(address)) match {
|
case None => false
|
||||||
case Some(client) => client.connect(reconnectIfAlreadyConnected = true)
|
|
||||||
case None => false
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.readLock.unlock
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
|
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
|
||||||
clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef)
|
clientFor(actorRef.homeAddress.get, None).registerSupervisorForActor(actorRef)
|
||||||
|
|
||||||
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = {
|
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard {
|
||||||
val key = makeKey(actorRef.homeAddress.get)
|
remoteClients.get(makeKey(actorRef.homeAddress.get)) match {
|
||||||
lock.readLock.lock //TODO: perhaps use writelock here
|
case Some(client) => client.deregisterSupervisorForActor(actorRef)
|
||||||
try {
|
case None => actorRef
|
||||||
remoteClients.get(key) match {
|
|
||||||
case Some(client) => client.deregisterSupervisorForActor(actorRef)
|
|
||||||
case None => actorRef
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.readLock.unlock
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -197,12 +180,9 @@ trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared
|
||||||
//remoteActors.clear
|
//remoteActors.clear
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdownRemoteClients = try {
|
def shutdownRemoteClients = lock withWriteGuard {
|
||||||
lock.writeLock.lock
|
|
||||||
remoteClients.foreach({ case (addr, client) => client.shutdown })
|
remoteClients.foreach({ case (addr, client) => client.shutdown })
|
||||||
remoteClients.clear
|
remoteClients.clear
|
||||||
} finally {
|
|
||||||
lock.writeLock.unlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = {
|
def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue