Removing PassiveRemoteClient because of architectural problems

This commit is contained in:
Viktor Klang 2011-01-03 11:25:27 +01:00
parent 63a182afd7
commit 8e522f4a03
3 changed files with 29 additions and 101 deletions

View file

@ -8,7 +8,7 @@ object Address {
}
class Address(val hostname: String, val port: Int) {
override def hashCode: Int = {
override val hashCode: Int = {
var result = HashCode.SEED
result = HashCode.hash(result, hostname)
result = HashCode.hash(result, port)

View file

@ -37,19 +37,13 @@ import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
import akka.remoteinterface._
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
trait NettyRemoteShared {
def registerPassiveClient(channel: Channel): Boolean
def deregisterPassiveClient(channel: Channel): Boolean
}
/**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared { self: ListenerManagement with Logging =>
private val remoteClients = new HashMap[String, RemoteClient]
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging =>
private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid]
private val lock = new ReadWriteGuard
@ -81,76 +75,24 @@ trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case Some(client) => client //If already populated by other writer
case None => //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
client
}
} finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case Some(client) => client //If already populated by other writer
case None => //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
remoteClients += key -> client
client
}
} finally { lock.readLock.lock } //downgrade
} finally { lock.writeLock.unlock }
}
} finally { lock.readLock.unlock }
}
/**
* This method is called by the server module to register passive clients
*/
def registerPassiveClient(channel: Channel): Boolean = {
val address = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
val key = makeKey(address)
lock.readLock.lock
try {
remoteClients.get(key) match {
case Some(client) => false
case None =>
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
remoteClients.get(key) match {
case Some(client) => false
case None =>
val client = new PassiveRemoteClient(this, address, channel, self.notifyListeners _ )
client.connect()
remoteClients.put(key, client)
true
}
} finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade
}
} finally { lock.readLock.unlock }
}
/**
* This method is called by the server module to deregister passive clients
*/
def deregisterPassiveClient(channel: Channel): Boolean = {
val address = channel.getRemoteAddress.asInstanceOf[InetSocketAddress]
val key = makeKey(address)
lock.readLock.lock
try {
remoteClients.get(key) match {
case Some(client: PassiveRemoteClient) =>
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
remoteClients.get(key) match {
case Some(client: ActiveRemoteClient) => false
case None => false
case Some(client: PassiveRemoteClient) =>
remoteClients.remove(key)
true
}
} finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade
//Otherwise, unlock the readlock and return false
case _ => false
}
} finally { lock.readLock.unlock }
}
private def makeKey(a: InetSocketAddress): String = a match {
private def makeKey(a: InetSocketAddress): Address = a match {
case null => null
case address => address.getHostName + ':' + address.getPort
case address => Address(address.getHostName,address.getPort)
}
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
@ -308,30 +250,6 @@ abstract class RemoteClient private[akka] (
else supervisors.remove(actorRef.supervisor.get.uuid)
}
/**
* PassiveRemoteClient reuses an incoming connection
*/
class PassiveRemoteClient(module: NettyRemoteClientModule,
remoteAddress: InetSocketAddress,
val currentChannel : Channel,
notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) {
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { //Cannot reconnect, it's passive.
runSwitch.switchOn {
notifyListeners(RemoteClientStarted(module, remoteAddress))
}
false
}
def shutdown = runSwitch switchOff {
log.slf4j.info("Shutting down {}", name)
notifyListeners(RemoteClientShutdown(module, remoteAddress))
//try { currentChannel.close } catch { case _ => } //TODO: Add failure notification when currentchannel gets shut down?
log.slf4j.info("{} has been shut down", name)
}
def notifyListeners(msg: => Any) = notifyListenersFun(msg)
}
/**
* RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer.
*
@ -712,7 +630,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
}
}
trait NettyRemoteServerModule extends RemoteServerModule with NettyRemoteShared { self: RemoteModule =>
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
@ -980,7 +898,6 @@ class RemoteServerHandler(
}
})
} else {
server.registerPassiveClient(ctx.getChannel)
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
@ -1005,7 +922,7 @@ class RemoteServerHandler(
TypedActor.stop(channelTypedActorsIterator.nextElement)
}
}
server.deregisterPassiveClient(ctx.getChannel)
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
}

View file

@ -160,6 +160,17 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest {
remote.actorsByUuid.get(actor1.uuid) must be (null)
}
"shouldHandleOneWayReplyThroughPassiveRemoteClient" in {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
remote.register("foo", actor1)
val latch = new CountDownLatch(1)
val actor2 = actorOf(new Actor { def receive = { case "Pong" => latch.countDown } }).start
val remoteActor = remote.actorFor("foo", host, port)
remoteActor.!("Ping")(Some(actor2))
latch.await(3,TimeUnit.SECONDS) must be (true)
}
"should be able to remotely communicate between 2 server-managed actors" in {
val localFoo = actorOf[Decrementer]
val localBar = actorOf[Decrementer]