Added clean automatic shutdown of RemoteClient, based on reference counting + fixed bug in shutdown of RemoteClient
This commit is contained in:
parent
8829ceacbb
commit
c0bbcc70d2
7 changed files with 108 additions and 34 deletions
|
|
@ -44,7 +44,7 @@
|
|||
<dependency>
|
||||
<groupId>org.jboss.netty</groupId>
|
||||
<artifactId>netty</artifactId>
|
||||
<version>3.2.0.ALPHA2</version>
|
||||
<version>3.2.0.ALPHA3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
|
|
|
|||
|
|
@ -4,10 +4,9 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
||||
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.util._
|
||||
|
|
@ -16,8 +15,8 @@ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
|
|||
import org.codehaus.aspectwerkz.proxy.Proxy
|
||||
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.lang.reflect.{InvocationTargetException, Method}
|
||||
import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult}
|
||||
|
||||
object Annotations {
|
||||
import se.scalablesolutions.akka.annotation._
|
||||
|
|
@ -234,6 +233,7 @@ private[akka] sealed case class AspectInit(
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@Aspect("perInstance")
|
||||
// TODO: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
|
||||
private[akka] sealed class ActiveObjectAspect {
|
||||
@volatile private var isInitialized = false
|
||||
private var target: Class[_] = _
|
||||
|
|
|
|||
|
|
@ -435,6 +435,7 @@ trait Actor extends TransactionManagement {
|
|||
_isShutDown = true
|
||||
shutdown
|
||||
ActorRegistry.unregister(this)
|
||||
// _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -470,7 +471,7 @@ trait Actor extends TransactionManagement {
|
|||
* </pre>
|
||||
*/
|
||||
def !(message: Any)(implicit sender: Option[Actor]) = {
|
||||
//FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
|
||||
//FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
|
||||
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
|
||||
if (_isRunning) postMessageToMailbox(message, sender)
|
||||
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
|
|
@ -625,8 +626,8 @@ trait Actor extends TransactionManagement {
|
|||
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
|
||||
else {
|
||||
_remoteAddress = Some(address)
|
||||
if(_replyToAddress.isEmpty)
|
||||
setReplyToAddress(Actor.HOSTNAME,Actor.PORT)
|
||||
RemoteClient.register(address.getHostName, address.getPort, uuid)
|
||||
if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@
|
|||
|
||||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
|
||||
|
|
@ -13,6 +11,7 @@ import se.scalablesolutions.akka.util.{UUID, Logging}
|
|||
import se.scalablesolutions.akka.Config.config
|
||||
|
||||
import org.jboss.netty.channel._
|
||||
import group.DefaultChannelGroup
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap
|
||||
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
||||
|
|
@ -25,6 +24,8 @@ import java.net.{SocketAddress, InetSocketAddress}
|
|||
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import scala. collection.mutable.{HashSet, HashMap}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -41,27 +42,62 @@ object RemoteClient extends Logging {
|
|||
val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
|
||||
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
|
||||
|
||||
private val clients = new HashMap[String, RemoteClient]
|
||||
private val remoteClients = new HashMap[String, RemoteClient]
|
||||
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
||||
|
||||
def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
|
||||
|
||||
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
|
||||
val hostname = address.getHostName
|
||||
val port = address.getPort
|
||||
val hash = hostname + ':' + port
|
||||
if (clients.contains(hash)) clients(hash)
|
||||
if (remoteClients.contains(hash)) remoteClients(hash)
|
||||
else {
|
||||
val client = new RemoteClient(hostname, port)
|
||||
client.connect
|
||||
clients += hash -> client
|
||||
remoteClients += hash -> client
|
||||
client
|
||||
}
|
||||
}
|
||||
|
||||
def shutdownClientFor(address: InetSocketAddress) = synchronized {
|
||||
val hostname = address.getHostName
|
||||
val port = address.getPort
|
||||
val hash = hostname + ':' + port
|
||||
if (remoteClients.contains(hash)) {
|
||||
val client = remoteClients(hash)
|
||||
client.shutdown
|
||||
remoteClients - hash
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean-up all open connections.
|
||||
*/
|
||||
def shutdownAll() = synchronized {
|
||||
clients.foreach({case (addr, client) => client.shutdown})
|
||||
clients.clear
|
||||
def shutdownAll = synchronized {
|
||||
remoteClients.foreach({case (addr, client) => client.shutdown})
|
||||
remoteClients.clear
|
||||
}
|
||||
|
||||
private[akka] def register(hostname: String, port: Int, uuid: String) = synchronized {
|
||||
actorsFor(RemoteServer.Address(hostname, port)) + uuid
|
||||
}
|
||||
|
||||
// TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
|
||||
private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized {
|
||||
val set = actorsFor(RemoteServer.Address(hostname, port))
|
||||
set - uuid
|
||||
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
|
||||
}
|
||||
|
||||
private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[String] = {
|
||||
val set = remoteActors.get(remoteServerAddress)
|
||||
if (set.isDefined && (set.get ne null)) set.get
|
||||
else {
|
||||
val remoteActorSet = new HashSet[String]
|
||||
remoteActors.put(remoteServerAddress, remoteActorSet)
|
||||
remoteActorSet
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -69,9 +105,9 @@ object RemoteClient extends Logging {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClient(hostname: String, port: Int) extends Logging {
|
||||
val name = "RemoteClient@" + hostname
|
||||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private var isRunning = false
|
||||
@volatile private[remote] var isRunning = false
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
|
||||
private val supervisors = new ConcurrentHashMap[String, Actor]
|
||||
|
||||
|
|
@ -80,6 +116,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
Executors.newCachedThreadPool)
|
||||
|
||||
private val bootstrap = new ClientBootstrap(channelFactory)
|
||||
private val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
|
||||
|
||||
private val timer = new HashedWheelTimer
|
||||
private val remoteAddress = new InetSocketAddress(hostname, port)
|
||||
|
|
@ -93,20 +130,22 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
if (!isRunning) {
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
||||
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
connection.awaitUninterruptibly
|
||||
if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
|
||||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
||||
def shutdown = synchronized {
|
||||
if (!isRunning) {
|
||||
connection.getChannel.getCloseFuture.awaitUninterruptibly
|
||||
channelFactory.releaseExternalResources
|
||||
}
|
||||
if (isRunning) {
|
||||
isRunning = false
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
timer.stop
|
||||
log.info("%s has been shut down", name)
|
||||
}
|
||||
}
|
||||
|
||||
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
|
||||
|
|
@ -216,7 +255,7 @@ class RemoteClientHandler(val name: String,
|
|||
}
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
log.debug("Remote client reconnecting to [%s]", remoteAddress)
|
||||
|
|
|
|||
|
|
@ -180,8 +180,7 @@ class RemoteServer extends Logging {
|
|||
def shutdown = {
|
||||
RemoteServer.unregister(hostname, port)
|
||||
openChannels.disconnect
|
||||
openChannels.unbind
|
||||
openChannels.close.awaitUninterruptibly(1000)
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
Cluster.deregisterLocalNode(hostname, port)
|
||||
}
|
||||
|
|
|
|||
30
akka-core/src/test/scala/RemoteClientShutdownTest.scala
Normal file
30
akka-core/src/test/scala/RemoteClientShutdownTest.scala
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import Actor.Sender.Self
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
|
||||
class RemoteClientShutdownTest extends JUnitSuite {
|
||||
@Test def shouldShutdownRemoteClient = {
|
||||
RemoteNode.start("localhost", 9999)
|
||||
|
||||
var remote = new TravelingActor
|
||||
remote.start
|
||||
remote ! "sending a remote message"
|
||||
remote.stop
|
||||
|
||||
Thread.sleep(1000)
|
||||
RemoteNode.shutdown
|
||||
println("======= REMOTE CLIENT SHUT DOWN FINE =======")
|
||||
assert(true)
|
||||
}
|
||||
}
|
||||
|
||||
class TravelingActor extends RemoteActor("localhost", 9999) {
|
||||
def receive = {
|
||||
case _ => log.info("message received")
|
||||
}
|
||||
}
|
||||
5
akka.iml
5
akka.iml
|
|
@ -1,5 +1,10 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
|
||||
<component name="FacetManager">
|
||||
<facet type="Scala" name="Scala">
|
||||
<configuration />
|
||||
</facet>
|
||||
</component>
|
||||
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_5" inherit-compiler-output="false">
|
||||
<output url="file://$MODULE_DIR$/target/classes" />
|
||||
<output-test url="file://$MODULE_DIR$/target/test-classes" />
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue