From 4cf7aaa002f8d61cbf66cab17468a8d472d3e730 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 12 Dec 2009 00:36:38 +0100 Subject: [PATCH] Tidying some code --- akka-cluster/src/main/scala/Cluster.scala | 34 ++++++++++++++--------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/akka-cluster/src/main/scala/Cluster.scala b/akka-cluster/src/main/scala/Cluster.scala index 88987a79a6..91b5ac78f6 100644 --- a/akka-cluster/src/main/scala/Cluster.scala +++ b/akka-cluster/src/main/scala/Cluster.scala @@ -9,17 +9,18 @@ import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress import scala.collection.immutable.{Map,HashMap,HashSet} import org.jgroups.util.Util import se.scalablesolutions.akka.nio.RemoteServer -import se.scalablesolutions.akka.cluster.Cluster.{DeregisterLocalNode, RegisterLocalNode, Node} +import se.scalablesolutions.akka.cluster.Cluster.Node -abstract class ClusterActor(name : String) extends Actor { - // def clusterSpawn[T <: Actor](clazz : Class[T]) : T - def members : List[Node] +trait Cluster { + def members : List[Node] + def name : String + def registerLocalNode(server : RemoteAddress) : Unit + def deregisterLocalNode(server : RemoteAddress) : Unit } -object Cluster { +abstract class ClusterActor(val name : String) extends Actor with Cluster - case class RegisterLocalNode(server : RemoteAddress) - case class DeregisterLocalNode(server : RemoteAddress) +object Cluster extends Cluster { case class Node(endpoints : List[RemoteAddress]) lazy val impl : Option[ClusterActor] = { @@ -30,9 +31,11 @@ object Cluster { .asInstanceOf[ClusterActor] }) } - - //def registerLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! RegisterLocalNode(server)) - //def deregisterLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! DeregisterLocalNode(server)) + + def name = impl.map(_.name).getOrElse("No cluster") + def members = impl.map(_.members).getOrElse(Nil) + def registerLocalNode(server : RemoteAddress) : Unit = impl.map(_.registerLocalNode(server)) + def deregisterLocalNode(server : RemoteAddress) : Unit = impl.map(_.deregisterLocalNode(server)) } object JGroupsClusterActor { @@ -42,12 +45,15 @@ object JGroupsClusterActor { case object Block case object Unblock case class Zombie(address : Address) + case class RegisterLocalNode(server : RemoteAddress) + case class DeregisterLocalNode(server : RemoteAddress) } class JGroupsClusterActor(name : String) extends ClusterActor(name) { import JGroupsClusterActor._ import org.scala_tools.javautils.Implicits._ + private var local : Node = Node(Nil) private var channel : JChannel = null private var remotes : Map[Address,Node] = Map() @@ -89,10 +95,10 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) //Not present in the cluster anymore = presumably zombies //Nodes we have no prior knowledge existed = unknowns val members = Set[Address]() ++ v.getMembers.asScala - val zombies = Set[Address]() ++ remotes.keySet.filter( members contains _ ) + val zombies = Set[Address]() ++ remotes.keySet -- members val unknown : Set[Address] = members -- remotes.keySet - //Tell the zombies and unknowns to provide papers and prematurely treat them as dead + //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead broadcast(zombies ++ unknown, PapersPlease) remotes = remotes -- zombies } @@ -114,7 +120,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) case RegisterLocalNode(s) => { log debug "RegisterLocalNode"+s local = Node(local.endpoints + s) - broadcast(Nil,Papers(local.endpoints)) + broadcast(Papers(local.endpoints)) } case Block => log debug "Asked to block" //TODO HotSwap to a buffering body @@ -122,6 +128,8 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) } def members = remotes.values.toList + def registerLocalNode(server : RemoteAddress) : Unit = this ! RegisterLocalNode(server) + def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server) override def shutdown = { remotes = Map()