From c0bbcc70d2f0d18f8d757ec019b3311613f4ee93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Mon, 15 Feb 2010 16:21:26 +0100 Subject: [PATCH] Added clean automatic shutdown of RemoteClient, based on reference counting + fixed bug in shutdown of RemoteClient --- akka-core/pom.xml | 2 +- .../src/main/scala/actor/ActiveObject.scala | 6 +- akka-core/src/main/scala/actor/Actor.scala | 9 +- .../src/main/scala/remote/RemoteClient.scala | 87 ++++++++++++++----- .../src/main/scala/remote/RemoteServer.scala | 3 +- .../test/scala/RemoteClientShutdownTest.scala | 30 +++++++ akka.iml | 5 ++ 7 files changed, 108 insertions(+), 34 deletions(-) create mode 100644 akka-core/src/test/scala/RemoteClientShutdownTest.scala diff --git a/akka-core/pom.xml b/akka-core/pom.xml index 356931e5b3..d6ca57ebfe 100644 --- a/akka-core/pom.xml +++ b/akka-core/pom.xml @@ -44,7 +44,7 @@ org.jboss.netty netty - 3.2.0.ALPHA2 + 3.2.0.ALPHA3 org.scala-tools diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 86e08c1a87..1858952f40 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -4,10 +4,9 @@ package se.scalablesolutions.akka.actor -import java.net.InetSocketAddress - import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util._ @@ -16,8 +15,8 @@ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} +import java.net.InetSocketAddress import java.lang.reflect.{InvocationTargetException, Method} -import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult} object Annotations { import se.scalablesolutions.akka.annotation._ @@ -234,6 +233,7 @@ private[akka] sealed case class AspectInit( * @author Jonas Bonér */ @Aspect("perInstance") +// TODO: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 8a30c1da28..2fd4a217db 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -435,6 +435,7 @@ trait Actor extends TransactionManagement { _isShutDown = true shutdown ActorRegistry.unregister(this) +// _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) } } @@ -470,7 +471,7 @@ trait Actor extends TransactionManagement { * */ def !(message: Any)(implicit sender: Option[Actor]) = { -//FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = { + //FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) postMessageToMailbox(message, sender) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") @@ -624,9 +625,9 @@ trait Actor extends TransactionManagement { def makeRemote(address: InetSocketAddress): Unit = if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.") else { - _remoteAddress = Some(address) - if(_replyToAddress.isEmpty) - setReplyToAddress(Actor.HOSTNAME,Actor.PORT) + _remoteAddress = Some(address) + RemoteClient.register(address.getHostName, address.getPort, uuid) + if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT) } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index e2f1e6d032..0a7bcfa2c8 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -4,8 +4,6 @@ package se.scalablesolutions.akka.remote -import scala.collection.mutable.HashMap - import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult} @@ -13,6 +11,7 @@ import se.scalablesolutions.akka.util.{UUID, Logging} import se.scalablesolutions.akka.Config.config import org.jboss.netty.channel._ +import group.DefaultChannelGroup import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} @@ -25,6 +24,8 @@ import java.net.{SocketAddress, InetSocketAddress} import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicLong +import scala. collection.mutable.{HashSet, HashMap} + /** * @author Jonas Bonér */ @@ -41,27 +42,62 @@ object RemoteClient extends Logging { val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000) val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000) - private val clients = new HashMap[String, RemoteClient] + private val remoteClients = new HashMap[String, RemoteClient] + private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] + + def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) def clientFor(address: InetSocketAddress): RemoteClient = synchronized { val hostname = address.getHostName val port = address.getPort val hash = hostname + ':' + port - if (clients.contains(hash)) clients(hash) + if (remoteClients.contains(hash)) remoteClients(hash) else { val client = new RemoteClient(hostname, port) client.connect - clients += hash -> client + remoteClients += hash -> client client } } + def shutdownClientFor(address: InetSocketAddress) = synchronized { + val hostname = address.getHostName + val port = address.getPort + val hash = hostname + ':' + port + if (remoteClients.contains(hash)) { + val client = remoteClients(hash) + client.shutdown + remoteClients - hash + } + } + /** * Clean-up all open connections. */ - def shutdownAll() = synchronized { - clients.foreach({case (addr, client) => client.shutdown}) - clients.clear + def shutdownAll = synchronized { + remoteClients.foreach({case (addr, client) => client.shutdown}) + remoteClients.clear + } + + private[akka] def register(hostname: String, port: Int, uuid: String) = synchronized { + actorsFor(RemoteServer.Address(hostname, port)) + uuid + } + + // TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback + private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized { + val set = actorsFor(RemoteServer.Address(hostname, port)) + set - uuid + if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port)) + } + + private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[String] = { + val set = remoteActors.get(remoteServerAddress) + if (set.isDefined && (set.get ne null)) set.get + else { + val remoteActorSet = new HashSet[String] + remoteActors.put(remoteServerAddress, remoteActorSet) + remoteActorSet + } } } @@ -69,9 +105,9 @@ object RemoteClient extends Logging { * @author Jonas Bonér */ class RemoteClient(hostname: String, port: Int) extends Logging { - val name = "RemoteClient@" + hostname - - @volatile private var isRunning = false + val name = "RemoteClient@" + hostname + "::" + port + + @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val supervisors = new ConcurrentHashMap[String, Actor] @@ -80,6 +116,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { Executors.newCachedThreadPool) private val bootstrap = new ClientBootstrap(channelFactory) + private val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName); private val timer = new HashedWheelTimer private val remoteAddress = new InetSocketAddress(hostname, port) @@ -93,20 +130,22 @@ class RemoteClient(hostname: String, port: Int) extends Logging { if (!isRunning) { connection = bootstrap.connect(remoteAddress) log.info("Starting remote client connection to [%s:%s]", hostname, port) - // Wait until the connection attempt succeeds or fails. - connection.awaitUninterruptibly - if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port) + val channel = connection.awaitUninterruptibly.getChannel + openChannels.add(channel) + if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) isRunning = true } } def shutdown = synchronized { - if (!isRunning) { - connection.getChannel.getCloseFuture.awaitUninterruptibly - channelFactory.releaseExternalResources + if (isRunning) { + isRunning = false + openChannels.close.awaitUninterruptibly + bootstrap.releaseExternalResources + timer.stop + log.info("%s has been shut down", name) } - timer.stop } def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) { @@ -120,7 +159,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { futures.put(request.getId, futureResult) connection.getChannel.write(request) Some(futureResult) - } + } } } else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") @@ -131,14 +170,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging { def deregisterSupervisorForActor(actor: Actor) = if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision") else supervisors.remove(actor._supervisor.get.uuid) - + def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid) } /** * @author Jonas Bonér */ -class RemoteClientPipelineFactory(name: String, +class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFutureResult], supervisors: ConcurrentMap[String, Actor], bootstrap: ClientBootstrap, @@ -158,7 +197,7 @@ class RemoteClientPipelineFactory(name: String, } val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages: Array[ChannelHandler] = + val stages: Array[ChannelHandler] = zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient)) .getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)) new StaticChannelPipeline(stages: _*) @@ -214,9 +253,9 @@ class RemoteClientHandler(val name: String, log.error("Unexpected exception in remote client handler: %s", e) throw e } - } + } - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { log.debug("Remote client reconnecting to [%s]", remoteAddress) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index c52bd75afa..4edcb60168 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -180,8 +180,7 @@ class RemoteServer extends Logging { def shutdown = { RemoteServer.unregister(hostname, port) openChannels.disconnect - openChannels.unbind - openChannels.close.awaitUninterruptibly(1000) + openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources Cluster.deregisterLocalNode(hostname, port) } diff --git a/akka-core/src/test/scala/RemoteClientShutdownTest.scala b/akka-core/src/test/scala/RemoteClientShutdownTest.scala new file mode 100644 index 0000000000..f6fbea1bb9 --- /dev/null +++ b/akka-core/src/test/scala/RemoteClientShutdownTest.scala @@ -0,0 +1,30 @@ +package se.scalablesolutions.akka.actor + +import se.scalablesolutions.akka.remote.RemoteNode +import se.scalablesolutions.akka.actor._ +import Actor.Sender.Self + +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +class RemoteClientShutdownTest extends JUnitSuite { + @Test def shouldShutdownRemoteClient = { + RemoteNode.start("localhost", 9999) + + var remote = new TravelingActor + remote.start + remote ! "sending a remote message" + remote.stop + + Thread.sleep(1000) + RemoteNode.shutdown + println("======= REMOTE CLIENT SHUT DOWN FINE =======") + assert(true) + } +} + +class TravelingActor extends RemoteActor("localhost", 9999) { + def receive = { + case _ => log.info("message received") + } +} diff --git a/akka.iml b/akka.iml index c418d66936..2f07a75716 100644 --- a/akka.iml +++ b/akka.iml @@ -1,5 +1,10 @@ + + + + +