diff --git a/akka-core/pom.xml b/akka-core/pom.xml
index 356931e5b3..d6ca57ebfe 100644
--- a/akka-core/pom.xml
+++ b/akka-core/pom.xml
@@ -44,7 +44,7 @@
org.jboss.netty
netty
- 3.2.0.ALPHA2
+ 3.2.0.ALPHA3
org.scala-tools
diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala
index 86e08c1a87..1858952f40 100644
--- a/akka-core/src/main/scala/actor/ActiveObject.scala
+++ b/akka-core/src/main/scala/actor/ActiveObject.scala
@@ -4,10 +4,9 @@
package se.scalablesolutions.akka.actor
-import java.net.InetSocketAddress
-
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util._
@@ -16,8 +15,8 @@ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
+import java.net.InetSocketAddress
import java.lang.reflect.{InvocationTargetException, Method}
-import se.scalablesolutions.akka.dispatch.{Dispatchers, MessageDispatcher, FutureResult}
object Annotations {
import se.scalablesolutions.akka.annotation._
@@ -234,6 +233,7 @@ private[akka] sealed case class AspectInit(
* @author Jonas Bonér
*/
@Aspect("perInstance")
+// TODO: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
private[akka] sealed class ActiveObjectAspect {
@volatile private var isInitialized = false
private var target: Class[_] = _
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index 8a30c1da28..2fd4a217db 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -435,6 +435,7 @@ trait Actor extends TransactionManagement {
_isShutDown = true
shutdown
ActorRegistry.unregister(this)
+// _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid))
}
}
@@ -470,7 +471,7 @@ trait Actor extends TransactionManagement {
*
*/
def !(message: Any)(implicit sender: Option[Actor]) = {
-//FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
+ //FIXME 2.8 def !(message: Any)(implicit sender: Option[Actor] = None) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) postMessageToMailbox(message, sender)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
@@ -624,9 +625,9 @@ trait Actor extends TransactionManagement {
def makeRemote(address: InetSocketAddress): Unit =
if (_isRunning) throw new IllegalStateException("Can't make a running actor remote. Make sure you call 'makeRemote' before 'start'.")
else {
- _remoteAddress = Some(address)
- if(_replyToAddress.isEmpty)
- setReplyToAddress(Actor.HOSTNAME,Actor.PORT)
+ _remoteAddress = Some(address)
+ RemoteClient.register(address.getHostName, address.getPort, uuid)
+ if (_replyToAddress.isEmpty) setReplyToAddress(Actor.HOSTNAME, Actor.PORT)
}
diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala
index e2f1e6d032..0a7bcfa2c8 100644
--- a/akka-core/src/main/scala/remote/RemoteClient.scala
+++ b/akka-core/src/main/scala/remote/RemoteClient.scala
@@ -4,8 +4,6 @@
package se.scalablesolutions.akka.remote
-import scala.collection.mutable.HashMap
-
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
import se.scalablesolutions.akka.actor.{Exit, Actor}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
@@ -13,6 +11,7 @@ import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.Config.config
import org.jboss.netty.channel._
+import group.DefaultChannelGroup
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
@@ -25,6 +24,8 @@ import java.net.{SocketAddress, InetSocketAddress}
import java.util.concurrent.{TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap}
import java.util.concurrent.atomic.AtomicLong
+import scala. collection.mutable.{HashSet, HashMap}
+
/**
* @author Jonas Bonér
*/
@@ -41,27 +42,62 @@ object RemoteClient extends Logging {
val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
- private val clients = new HashMap[String, RemoteClient]
+ private val remoteClients = new HashMap[String, RemoteClient]
+ private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
+
+ def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port))
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
- if (clients.contains(hash)) clients(hash)
+ if (remoteClients.contains(hash)) remoteClients(hash)
else {
val client = new RemoteClient(hostname, port)
client.connect
- clients += hash -> client
+ remoteClients += hash -> client
client
}
}
+ def shutdownClientFor(address: InetSocketAddress) = synchronized {
+ val hostname = address.getHostName
+ val port = address.getPort
+ val hash = hostname + ':' + port
+ if (remoteClients.contains(hash)) {
+ val client = remoteClients(hash)
+ client.shutdown
+ remoteClients - hash
+ }
+ }
+
/**
* Clean-up all open connections.
*/
- def shutdownAll() = synchronized {
- clients.foreach({case (addr, client) => client.shutdown})
- clients.clear
+ def shutdownAll = synchronized {
+ remoteClients.foreach({case (addr, client) => client.shutdown})
+ remoteClients.clear
+ }
+
+ private[akka] def register(hostname: String, port: Int, uuid: String) = synchronized {
+ actorsFor(RemoteServer.Address(hostname, port)) + uuid
+ }
+
+ // TODO: add RemoteClient.unregister for ActiveObject, but first need a @shutdown callback
+ private[akka] def unregister(hostname: String, port: Int, uuid: String) = synchronized {
+ val set = actorsFor(RemoteServer.Address(hostname, port))
+ set - uuid
+ if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
+ }
+
+ private[akka] def actorsFor(remoteServerAddress: RemoteServer.Address): HashSet[String] = {
+ val set = remoteActors.get(remoteServerAddress)
+ if (set.isDefined && (set.get ne null)) set.get
+ else {
+ val remoteActorSet = new HashSet[String]
+ remoteActors.put(remoteServerAddress, remoteActorSet)
+ remoteActorSet
+ }
}
}
@@ -69,9 +105,9 @@ object RemoteClient extends Logging {
* @author Jonas Bonér
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
- val name = "RemoteClient@" + hostname
-
- @volatile private var isRunning = false
+ val name = "RemoteClient@" + hostname + "::" + port
+
+ @volatile private[remote] var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@@ -80,6 +116,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
Executors.newCachedThreadPool)
private val bootstrap = new ClientBootstrap(channelFactory)
+ private val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
private val timer = new HashedWheelTimer
private val remoteAddress = new InetSocketAddress(hostname, port)
@@ -93,20 +130,22 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
if (!isRunning) {
connection = bootstrap.connect(remoteAddress)
log.info("Starting remote client connection to [%s:%s]", hostname, port)
-
// Wait until the connection attempt succeeds or fails.
- connection.awaitUninterruptibly
- if (!connection.isSuccess) log.error(connection.getCause, "Remote connection to [%s:%s] has failed", hostname, port)
+ val channel = connection.awaitUninterruptibly.getChannel
+ openChannels.add(channel)
+ if (!connection.isSuccess) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
isRunning = true
}
}
def shutdown = synchronized {
- if (!isRunning) {
- connection.getChannel.getCloseFuture.awaitUninterruptibly
- channelFactory.releaseExternalResources
+ if (isRunning) {
+ isRunning = false
+ openChannels.close.awaitUninterruptibly
+ bootstrap.releaseExternalResources
+ timer.stop
+ log.info("%s has been shut down", name)
}
- timer.stop
}
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
@@ -120,7 +159,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
futures.put(request.getId, futureResult)
connection.getChannel.write(request)
Some(futureResult)
- }
+ }
}
} else throw new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
@@ -131,14 +170,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
def deregisterSupervisorForActor(actor: Actor) =
if (!actor._supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actor + " since it is not under supervision")
else supervisors.remove(actor._supervisor.get.uuid)
-
+
def deregisterSupervisorWithUuid(uuid: String) = supervisors.remove(uuid)
}
/**
* @author Jonas Bonér
*/
-class RemoteClientPipelineFactory(name: String,
+class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap,
@@ -158,7 +197,7 @@ class RemoteClientPipelineFactory(name: String,
}
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
- val stages: Array[ChannelHandler] =
+ val stages: Array[ChannelHandler] =
zipCodec.map(codec => Array(timeout, codec.decoder, lenDec, protobufDec, codec.encoder, lenPrep, protobufEnc, remoteClient))
.getOrElse(Array(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient))
new StaticChannelPipeline(stages: _*)
@@ -214,9 +253,9 @@ class RemoteClientHandler(val name: String,
log.error("Unexpected exception in remote client handler: %s", e)
throw e
}
- }
+ }
- override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
+ override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = if (client.isRunning) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", remoteAddress)
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index c52bd75afa..4edcb60168 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -180,8 +180,7 @@ class RemoteServer extends Logging {
def shutdown = {
RemoteServer.unregister(hostname, port)
openChannels.disconnect
- openChannels.unbind
- openChannels.close.awaitUninterruptibly(1000)
+ openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources
Cluster.deregisterLocalNode(hostname, port)
}
diff --git a/akka-core/src/test/scala/RemoteClientShutdownTest.scala b/akka-core/src/test/scala/RemoteClientShutdownTest.scala
new file mode 100644
index 0000000000..f6fbea1bb9
--- /dev/null
+++ b/akka-core/src/test/scala/RemoteClientShutdownTest.scala
@@ -0,0 +1,30 @@
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.remote.RemoteNode
+import se.scalablesolutions.akka.actor._
+import Actor.Sender.Self
+
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+class RemoteClientShutdownTest extends JUnitSuite {
+ @Test def shouldShutdownRemoteClient = {
+ RemoteNode.start("localhost", 9999)
+
+ var remote = new TravelingActor
+ remote.start
+ remote ! "sending a remote message"
+ remote.stop
+
+ Thread.sleep(1000)
+ RemoteNode.shutdown
+ println("======= REMOTE CLIENT SHUT DOWN FINE =======")
+ assert(true)
+ }
+}
+
+class TravelingActor extends RemoteActor("localhost", 9999) {
+ def receive = {
+ case _ => log.info("message received")
+ }
+}
diff --git a/akka.iml b/akka.iml
index c418d66936..2f07a75716 100644
--- a/akka.iml
+++ b/akka.iml
@@ -1,5 +1,10 @@
+
+
+
+
+