From 45e40f66ee9834e0fb23df8f2229852535a33b0b Mon Sep 17 00:00:00 2001 From: Eckart Hertzler Date: Wed, 3 Mar 2010 20:06:06 +0100 Subject: [PATCH 1/2] shutdown (and unbind) Remote Server even if the remoteServerThread is not alive --- .../remote/BootableRemoteActorService.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 429fdb61ec..1c31c3025c 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -35,14 +35,21 @@ trait BootableRemoteActorService extends Bootable with Logging { super.onLoad } - + abstract override def onUnload = { - super.onUnload - if (remoteServerThread.isAlive) { - log.info("Shutting down Remote Actors Service") - RemoteNode.shutdown - remoteServerThread.join(1000) - } + super.onUnload + + log.info("Shutting down Remote Actors Service") + + RemoteNode.shutdown + + if (remoteServerThread.isAlive) + remoteServerThread.join(1000) + + log.info("Shutting down Cluster") Cluster.shutdown + + log.info("Remote Actors Service has been shut down") } + } \ No newline at end of file From 2519db5aed50eb1923e4c6973d0236d4a2d76295 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 4 Mar 2010 23:25:55 +0100 Subject: [PATCH 2/2] Fixing a bug in JGroupsClusterActor --- .../src/main/scala/JGroupsClusterActor.scala | 3 +- .../src/main/scala/ShoalClusterActor.scala | 9 ++-- akka-core/src/main/scala/remote/Cluster.scala | 47 +++++++++---------- .../src/main/scala/remote/RemoteServer.scala | 2 +- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala index 2b6730dd60..12d93ef272 100644 --- a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala +++ b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala @@ -44,7 +44,8 @@ class JGroupsClusterActor extends BasicClusterActor { log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer }) }) - channel.map(_.connect(name)) + + channel.foreach(_.connect(name)) } protected def toOneNode(dest : Address, msg: Array[Byte]): Unit = diff --git a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala index c656bd4b81..3d83a46ef3 100644 --- a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala +++ b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala @@ -85,6 +85,7 @@ class ShoalClusterActor extends BasicClusterActor { */ protected def createCallback : CallBack = { import org.scala_tools.javautils.Imports._ + import ClusterActor._ val me = this new CallBack { def processNotification(signal : Signal) { @@ -92,10 +93,10 @@ class ShoalClusterActor extends BasicClusterActor { signal.acquire() if(isActive) { signal match { - case ms : MessageSignal => me send Message(ms.getMemberToken,ms.getMessage) - case jns : JoinNotificationSignal => me send View(Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) - case fss : FailureSuspectedSignal => me send Zombie(fss.getMemberToken) - case fns : FailureNotificationSignal => me send Zombie(fns.getMemberToken) + case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage) + case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) + case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken) + case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken) case _ => log.debug("Unhandled signal: [%s]",signal) } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index c2e9069a01..4313cfe98c 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -48,7 +48,15 @@ private[remote] object ClusterActor { sealed trait ClusterMessage private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage - + private[remote] case class Message[ADDR_T](sender : ADDR_T,msg : Array[Byte]) + private[remote] case object PapersPlease extends ClusterMessage + private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage + private[remote] case object Block extends ClusterMessage + private[remote] case object Unblock extends ClusterMessage + private[remote] case class View[ADDR_T](othersPresent : Set[ADDR_T]) extends ClusterMessage + private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage + private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage + private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[remote] case class Node(endpoints: List[RemoteAddress]) } @@ -60,16 +68,6 @@ private[remote] object ClusterActor { abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ - case class Message(sender : ADDR_T,msg : Array[Byte]) - case object PapersPlease extends ClusterMessage - case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage - case object Block extends ClusterMessage - case object Unblock extends ClusterMessage - case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage - case class Zombie(address: ADDR_T) extends ClusterMessage - case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage - case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage - type ADDR_T @@ -85,14 +83,14 @@ abstract class BasicClusterActor extends ClusterActor { } def receive = { - case v @ View(members) => { + case v : View[ADDR_T] => { // Not present in the cluster anymore = presumably zombies // Nodes we have no prior knowledge existed = unknowns - val zombies = Set[ADDR_T]() ++ remotes.keySet -- members - val unknown = members -- remotes.keySet + val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent + val unknown = v.othersPresent -- remotes.keySet log debug ("Updating view") - log debug ("Other memebers: [%s]",members) + log debug ("Other memebers: [%s]",v.othersPresent) log debug ("Zombies: [%s]",zombies) log debug ("Unknowns: [%s]",unknown) @@ -101,10 +99,10 @@ abstract class BasicClusterActor extends ClusterActor { remotes = remotes -- zombies } - case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead - log debug ("Killing Zombie Node: %s", x) - broadcast(x :: Nil, PapersPlease) - remotes = remotes - x + case z : Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead + log debug ("Killing Zombie Node: %s", z.address) + broadcast(z.address :: Nil, PapersPlease) + remotes = remotes - z.address } case rm @ RelayedMessage(_, _) => { @@ -112,7 +110,8 @@ abstract class BasicClusterActor extends ClusterActor { broadcast(rm) } - case m @ Message(src,msg) => { + case m : Message[ADDR_T] => { + val (src,msg) = (m.sender,m.msg) (Cluster.serializer in (msg, None)) match { case PapersPlease => { @@ -207,7 +206,7 @@ abstract class BasicClusterActor extends ClusterActor { */ object Cluster extends Cluster with Logging { @volatile private[remote] var clusterActor: Option[ClusterActor] = None - @volatile private[remote] var supervisor: Option[Supervisor] = None + @volatile private[remote] var supervisor: Option[Supervisor] = None private[remote] lazy val serializer: Serializer = { val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) @@ -219,9 +218,7 @@ object Cluster extends Cluster with Logging { try { name map { fqn => - val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] - a.start - a + Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] } } catch { @@ -235,7 +232,6 @@ object Cluster extends Cluster with Logging { RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil) ).newInstance - sup.start Some(sup) } @@ -258,6 +254,7 @@ object Cluster extends Cluster with Logging { sup <- createSupervisor(actor)) { clusterActor = Some(actor) supervisor = Some(sup) + sup.start } } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 6da2ceea99..02cf98bcd2 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer */ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val PORT = config.getInt("akka.remote.server.port", 9966) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)