Tidying some code
This commit is contained in:
parent
54ab0392ae
commit
b93738ba76
1 changed files with 21 additions and 13 deletions
|
|
@ -9,17 +9,18 @@ import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress
|
|||
import scala.collection.immutable.{Map,HashMap,HashSet}
|
||||
import org.jgroups.util.Util
|
||||
import se.scalablesolutions.akka.nio.RemoteServer
|
||||
import se.scalablesolutions.akka.cluster.Cluster.{DeregisterLocalNode, RegisterLocalNode, Node}
|
||||
import se.scalablesolutions.akka.cluster.Cluster.Node
|
||||
|
||||
abstract class ClusterActor(name : String) extends Actor {
|
||||
// def clusterSpawn[T <: Actor](clazz : Class[T]) : T
|
||||
def members : List[Node]
|
||||
trait Cluster {
|
||||
def members : List[Node]
|
||||
def name : String
|
||||
def registerLocalNode(server : RemoteAddress) : Unit
|
||||
def deregisterLocalNode(server : RemoteAddress) : Unit
|
||||
}
|
||||
|
||||
object Cluster {
|
||||
abstract class ClusterActor(val name : String) extends Actor with Cluster
|
||||
|
||||
case class RegisterLocalNode(server : RemoteAddress)
|
||||
case class DeregisterLocalNode(server : RemoteAddress)
|
||||
object Cluster extends Cluster {
|
||||
case class Node(endpoints : List[RemoteAddress])
|
||||
|
||||
lazy val impl : Option[ClusterActor] = {
|
||||
|
|
@ -30,9 +31,11 @@ object Cluster {
|
|||
.asInstanceOf[ClusterActor]
|
||||
})
|
||||
}
|
||||
|
||||
//def registerLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! RegisterLocalNode(server))
|
||||
//def deregisterLocalNode(server : RemoteAddress) : Unit = impl.foreach(_ ! DeregisterLocalNode(server))
|
||||
|
||||
def name = impl.map(_.name).getOrElse("No cluster")
|
||||
def members = impl.map(_.members).getOrElse(Nil)
|
||||
def registerLocalNode(server : RemoteAddress) : Unit = impl.map(_.registerLocalNode(server))
|
||||
def deregisterLocalNode(server : RemoteAddress) : Unit = impl.map(_.deregisterLocalNode(server))
|
||||
}
|
||||
|
||||
object JGroupsClusterActor {
|
||||
|
|
@ -42,12 +45,15 @@ object JGroupsClusterActor {
|
|||
case object Block
|
||||
case object Unblock
|
||||
case class Zombie(address : Address)
|
||||
case class RegisterLocalNode(server : RemoteAddress)
|
||||
case class DeregisterLocalNode(server : RemoteAddress)
|
||||
}
|
||||
|
||||
class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
||||
{
|
||||
import JGroupsClusterActor._
|
||||
import org.scala_tools.javautils.Implicits._
|
||||
|
||||
private var local : Node = Node(Nil)
|
||||
private var channel : JChannel = null
|
||||
private var remotes : Map[Address,Node] = Map()
|
||||
|
|
@ -89,10 +95,10 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
|||
//Not present in the cluster anymore = presumably zombies
|
||||
//Nodes we have no prior knowledge existed = unknowns
|
||||
val members = Set[Address]() ++ v.getMembers.asScala
|
||||
val zombies = Set[Address]() ++ remotes.keySet.filter( members contains _ )
|
||||
val zombies = Set[Address]() ++ remotes.keySet -- members
|
||||
val unknown : Set[Address] = members -- remotes.keySet
|
||||
|
||||
//Tell the zombies and unknowns to provide papers and prematurely treat them as dead
|
||||
//Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
|
||||
broadcast(zombies ++ unknown, PapersPlease)
|
||||
remotes = remotes -- zombies
|
||||
}
|
||||
|
|
@ -114,7 +120,7 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
|||
case RegisterLocalNode(s) => {
|
||||
log debug "RegisterLocalNode"+s
|
||||
local = Node(local.endpoints + s)
|
||||
broadcast(Nil,Papers(local.endpoints))
|
||||
broadcast(Papers(local.endpoints))
|
||||
}
|
||||
|
||||
case Block => log debug "Asked to block" //TODO HotSwap to a buffering body
|
||||
|
|
@ -122,6 +128,8 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
|||
}
|
||||
|
||||
def members = remotes.values.toList
|
||||
def registerLocalNode(server : RemoteAddress) : Unit = this ! RegisterLocalNode(server)
|
||||
def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server)
|
||||
|
||||
override def shutdown = {
|
||||
remotes = Map()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue