diff --git a/akka-actor/src/main/scala/akka/util/Address.scala b/akka-actor/src/main/scala/akka/util/Address.scala index 07ffbec303..66af8aec6a 100644 --- a/akka-actor/src/main/scala/akka/util/Address.scala +++ b/akka-actor/src/main/scala/akka/util/Address.scala @@ -8,7 +8,7 @@ object Address { } class Address(val hostname: String, val port: Int) { - override def hashCode: Int = { + override val hashCode: Int = { var result = HashCode.SEED result = HashCode.hash(result, hostname) result = HashCode.hash(result, port) diff --git a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala index 39a32e9557..dd670b5bd4 100644 --- a/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/NettyRemoteSupport.scala @@ -37,19 +37,13 @@ import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} import akka.remoteinterface._ import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} - -trait NettyRemoteShared { - def registerPassiveClient(channel: Channel): Boolean - def deregisterPassiveClient(channel: Channel): Boolean -} - /** * The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles. * * @author Jonas Bonér */ -trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared { self: ListenerManagement with Logging => - private val remoteClients = new HashMap[String, RemoteClient] +trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging => + private val remoteClients = new HashMap[Address, RemoteClient] private val remoteActors = new Index[Address, Uuid] private val lock = new ReadWriteGuard @@ -81,76 +75,24 @@ trait NettyRemoteClientModule extends RemoteClientModule with NettyRemoteShared lock.readLock.unlock lock.writeLock.lock //Lock upgrade, not supported natively try { - remoteClients.get(key) match { //Recheck for addition, race between upgrades - case Some(client) => client //If already populated by other writer - case None => //Populate map - val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) - client.connect() - remoteClients += key -> client - client - } - } finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade + try { + remoteClients.get(key) match { //Recheck for addition, race between upgrades + case Some(client) => client //If already populated by other writer + case None => //Populate map + val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) + client.connect() + remoteClients += key -> client + client + } + } finally { lock.readLock.lock } //downgrade + } finally { lock.writeLock.unlock } } } finally { lock.readLock.unlock } } - /** - * This method is called by the server module to register passive clients - */ - def registerPassiveClient(channel: Channel): Boolean = { - val address = channel.getRemoteAddress.asInstanceOf[InetSocketAddress] - val key = makeKey(address) - lock.readLock.lock - try { - remoteClients.get(key) match { - case Some(client) => false - case None => - lock.readLock.unlock - lock.writeLock.lock //Lock upgrade, not supported natively - try { - remoteClients.get(key) match { - case Some(client) => false - case None => - val client = new PassiveRemoteClient(this, address, channel, self.notifyListeners _ ) - client.connect() - remoteClients.put(key, client) - true - } - } finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade - } - } finally { lock.readLock.unlock } - } - - /** - * This method is called by the server module to deregister passive clients - */ - def deregisterPassiveClient(channel: Channel): Boolean = { - val address = channel.getRemoteAddress.asInstanceOf[InetSocketAddress] - val key = makeKey(address) - lock.readLock.lock - try { - remoteClients.get(key) match { - case Some(client: PassiveRemoteClient) => - lock.readLock.unlock - lock.writeLock.lock //Lock upgrade, not supported natively - try { - remoteClients.get(key) match { - case Some(client: ActiveRemoteClient) => false - case None => false - case Some(client: PassiveRemoteClient) => - remoteClients.remove(key) - true - } - } finally { lock.readLock.lock; lock.writeLock.unlock } //downgrade - //Otherwise, unlock the readlock and return false - case _ => false - } - } finally { lock.readLock.unlock } - } - - private def makeKey(a: InetSocketAddress): String = a match { + private def makeKey(a: InetSocketAddress): Address = a match { case null => null - case address => address.getHostName + ':' + address.getPort + case address => Address(address.getHostName,address.getPort) } def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { @@ -308,30 +250,6 @@ abstract class RemoteClient private[akka] ( else supervisors.remove(actorRef.supervisor.get.uuid) } -/** - * PassiveRemoteClient reuses an incoming connection - */ -class PassiveRemoteClient(module: NettyRemoteClientModule, - remoteAddress: InetSocketAddress, - val currentChannel : Channel, - notifyListenersFun: (=> Any) => Unit) extends RemoteClient(module, remoteAddress) { - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { //Cannot reconnect, it's passive. - runSwitch.switchOn { - notifyListeners(RemoteClientStarted(module, remoteAddress)) - } - false - } - - def shutdown = runSwitch switchOff { - log.slf4j.info("Shutting down {}", name) - notifyListeners(RemoteClientShutdown(module, remoteAddress)) - //try { currentChannel.close } catch { case _ => } //TODO: Add failure notification when currentchannel gets shut down? - log.slf4j.info("{} has been shut down", name) - } - - def notifyListeners(msg: => Any) = notifyListenersFun(msg) -} - /** * RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer. * @@ -712,7 +630,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, } } -trait NettyRemoteServerModule extends RemoteServerModule with NettyRemoteShared { self: RemoteModule => +trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => import RemoteServer._ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) @@ -980,7 +898,6 @@ class RemoteServerHandler( } }) } else { - server.registerPassiveClient(ctx.getChannel) server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) } if (RemoteServer.REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication @@ -1005,7 +922,7 @@ class RemoteServerHandler( TypedActor.stop(channelTypedActorsIterator.nextElement) } } - server.deregisterPassiveClient(ctx.getChannel) + server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index c648217410..b119a1e1fc 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -160,6 +160,17 @@ class ServerInitiatedRemoteActorSpec extends AkkaRemoteTest { remote.actorsByUuid.get(actor1.uuid) must be (null) } + "shouldHandleOneWayReplyThroughPassiveRemoteClient" in { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + remote.register("foo", actor1) + val latch = new CountDownLatch(1) + val actor2 = actorOf(new Actor { def receive = { case "Pong" => latch.countDown } }).start + + val remoteActor = remote.actorFor("foo", host, port) + remoteActor.!("Ping")(Some(actor2)) + latch.await(3,TimeUnit.SECONDS) must be (true) + } + "should be able to remotely communicate between 2 server-managed actors" in { val localFoo = actorOf[Decrementer] val localBar = actorOf[Decrementer]