diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 70fe1bd965..b3ca8d4cb7 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -32,11 +32,12 @@ trait BootableRemoteActorService extends Bootable with Logging { } abstract override def onUnload = { - super.onUnload - + super.onUnload if (remoteServerThread.isAlive) { log.info("Shutting down Remote Actors Service") RemoteNode.shutdown + log.info("Shutting down Cluster Service") + Cluster.shutdown remoteServerThread.join(1000) } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index e1fd0c9e56..547b303567 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -8,11 +8,11 @@ import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener import se.scalablesolutions.akka.Config.config import se.scalablesolutions.akka.config.ScalaConfig._ -import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry} import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage} +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.actor.{Supervisor, SupervisorFactory, Actor, ActorRegistry} import scala.collection.immutable.{Map, HashMap} -import se.scalablesolutions.akka.serialization.Serializer /** * Interface for interacting with the Cluster Membership API. @@ -48,20 +48,24 @@ object Cluster extends Cluster { private[remote] case class Node(endpoints: List[RemoteAddress]) extends ClusterMessage private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage - private[remote] lazy val clusterActor: Option[ClusterActor] = { - config.getString("akka.remote.cluster.actor") map (name => { - val actor = Class.forName(name) - .newInstance - .asInstanceOf[ClusterActor] - SupervisorFactory( - SupervisorConfig( - RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), - Supervise(actor, LifeCycle(Permanent)) :: Nil) - ).newInstance.start - actor - }) - } + private[remote] val clusterActor: Option[ClusterActor] = + config.getString("akka.remote.cluster.actor") map { name => + val a = Class.forName(name).newInstance.asInstanceOf[ClusterActor] + a.start + a + } + + private[remote] val supervisor: Option[Supervisor] = if (clusterActor.isDefined) { + val sup = SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), + Supervise(clusterActor.get, LifeCycle(Permanent)) :: Nil) + ).newInstance + sup.start + Some(sup) + } else None + private[remote] lazy val serializer: Serializer = { val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) Class.forName(className).newInstance.asInstanceOf[Serializer] @@ -71,11 +75,13 @@ object Cluster extends Cluster { def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) - def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.registerLocalNode(hostname, port)) + def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.registerLocalNode(hostname, port)) - def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.deregisterLocalNode(hostname, port)) + def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port)) - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.map(_.relayMessage(to, msg)) + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) + + def shutdown = supervisor.foreach(_.stop) } /** @@ -99,6 +105,8 @@ class JGroupsClusterActor extends ClusterActor { import JGroupsClusterActor._ import org.scala_tools.javautils.Implicits._ + + @volatile private var isActive = false @volatile private var local: Node = Node(Nil) @volatile private var channel: Option[JChannel] = None @volatile private var remotes: Map[Address, Node] = Map() @@ -107,6 +115,7 @@ class JGroupsClusterActor extends ClusterActor { log debug "Initiating JGroups-based cluster actor" remotes = new HashMap[Address, Node] val me = this + isActive = true // Set up the JGroups local endpoint channel = Some(new JChannel { @@ -115,15 +124,15 @@ class JGroupsClusterActor extends ClusterActor { def setState(state: Array[Byte]): Unit = () - def receive(msg: Message): Unit = me send msg + def receive(msg: Message): Unit = if (isActive) me send msg - def viewAccepted(view: View): Unit = me send view + def viewAccepted(view: View): Unit = if (isActive) me send view - def suspect(a: Address): Unit = me send Zombie(a) + def suspect(a: Address): Unit = if (isActive) me send Zombie(a) - def block: Unit = me send Block + def block: Unit = if (isActive) me send Block - def unblock: Unit = me send Unblock + def unblock: Unit = if (isActive) me send Unblock }) }) channel.map(_.connect(name)) @@ -213,8 +222,9 @@ class JGroupsClusterActor extends ClusterActor { } override def shutdown = { - log debug ("Shutting down %s", this.getClass.getName) - channel.map(_.shutdown) + log debug ("Shutting down %s", toString) + isActive = false + channel.foreach(_.shutdown) remotes = Map() channel = None } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 46f6f13eb7..315d6b8e49 100755 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -43,7 +43,7 @@ import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder} * * @author Jonas Bonér */ -object RemoteNode extends RemoteServer +object RemoteNode extends RemoteServer(true) /** * This object holds configuration variables. @@ -116,8 +116,9 @@ object RemoteServer { * * @author Jonas Bonér */ -class RemoteServer extends Logging { +class RemoteServer(val registerNodeInCluster: Boolean) extends Logging { val name = "RemoteServer@" + hostname + ":" + port + def this() = this(false) private var hostname = RemoteServer.HOSTNAME private var port = RemoteServer.PORT @@ -155,7 +156,7 @@ class RemoteServer extends Logging { bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) isRunning = true - Cluster.registerLocalNode(hostname, port) + if (registerNodeInCluster) Cluster.registerLocalNode(hostname, port) } } catch { case e => log.error(e, "Could not start up remote server") @@ -163,9 +164,14 @@ class RemoteServer extends Logging { } def shutdown = { - openChannels.close.awaitUninterruptibly() + openChannels.disconnect + openChannels.unbind + openChannels.close.awaitUninterruptibly(1000) bootstrap.releaseExternalResources - Cluster.deregisterLocalNode(hostname, port) + if (registerNodeInCluster) { + Cluster.deregisterLocalNode(hostname, port) + Cluster.shutdown + } } } diff --git a/akka-core/src/test/scala/Test.scala b/akka-core/src/test/scala/Test.scala new file mode 100644 index 0000000000..9b8d9a1fa9 --- /dev/null +++ b/akka-core/src/test/scala/Test.scala @@ -0,0 +1,34 @@ +package test + +import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.remote.RemoteNode + +object AkkaTest1 { + def main(args: Array[String]) { + + class MyActor extends Actor { + def receive = { + case "test" => println("received test") + case m@_ => println("received unknown message " + m) + } + } + + val myActor = new MyActor + myActor.start + myActor.send("test") + myActor.stop + // does not exit + } +} + + +// case 2 + +object AkkaTest2 { + def main(args: Array[String]) { + RemoteNode.start("localhost", 9999) + Thread.sleep(3000) + RemoteNode.shutdown + // does not exit + } +} \ No newline at end of file