Moved Cluster package

This commit is contained in:
Viktor Klang 2009-12-18 22:28:32 +01:00
parent 3f91c3a257
commit 93cdf1faed

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.nio
package se.scalablesolutions.akka.remote
import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver, SetStateEvent}
import org.jgroups.util.Util
@ -11,8 +11,8 @@ import se.scalablesolutions.akka.Config.config
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{Init, SupervisorFactory, Actor, ActorRegistry}
import se.scalablesolutions.akka.nio.Cluster.{Node, RelayedMessage}
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry}
import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage}
import scala.collection.immutable.{Map, HashMap, HashSet}
@ -59,7 +59,7 @@ object Cluster extends Cluster {
Supervise(actor, LifeCycle(Permanent)) :: Nil
)
).newInstance.start
actor send Init(None)
actor
})
}
@ -101,8 +101,8 @@ class JGroupsClusterActor extends ClusterActor {
@volatile private var channel: Option[JChannel] = None
@volatile private var remotes: Map[Address, Node] = Map()
override def init(config: AnyRef) = {
log info "Initiating cluster actor"
override def init = {
log error "Initiating cluster actor"
remotes = new HashMap[Address, Node]
val me = this
//Set up the JGroups local endpoint
@ -142,28 +142,26 @@ class JGroupsClusterActor extends ClusterActor {
}
private def broadcast[T <: AnyRef](msg: T): Unit =
channel.map(_.send(new Message(null, null, serializer out msg)))
private def broadcast[T <: AnyRef](msg: T): Unit = {
if(!remotes.isEmpty) //Don't broadcast if we are not connected anywhere...
channel.map(_.send(new Message(null, null, serializer out msg)))
}
def receive = {
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
log info ("Zombie: %s", x)
log debug ("Zombie: %s", x)
broadcast(x :: Nil, PapersPlease)
remotes = remotes - x
}
case v: View => {
log info v.printDetails
//Not present in the cluster anymore = presumably zombies
//Nodes we have no prior knowledge existed = unknowns
val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress //Exclude ourselves
val zombies = Set[Address]() ++ remotes.keySet -- members
val unknown = members -- remotes.keySet
log info ("Updating view, zombies: %s", zombies)
log info (" , unknown: %s", unknown)
log info (" , members: %s", members)
log info (" , known: %s", remotes.keySet)
log debug v.printDetails
//Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
broadcast(zombies ++ unknown, PapersPlease)
@ -173,51 +171,47 @@ class JGroupsClusterActor extends ClusterActor {
case m: Message => {
if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected
(serializer in (m.getRawBuffer, None)) match {
case PapersPlease => {
log info ("Asked for papers by %s", m.getSrc)
log debug ("Asked for papers by %s", m.getSrc)
broadcast(m.getSrc :: Nil, Papers(local.endpoints))
remotes.get(m.getSrc) match {
case Some(x) =>
case None => broadcast(m.getSrc :: Nil, PapersPlease) //If we were asked for papers from someone we don't know, ask them!
}
if(remotes.get(m.getSrc).isEmpty) //If we were asked for papers from someone we don't know, ask them!
broadcast(m.getSrc :: Nil, PapersPlease)
}
case Papers(x) => {
log info ("Got papers from %s = %s", m.getSrc, x)
remotes = remotes + (m.getSrc -> Node(x))
log info ("Installed nodes: %s", remotes.keySet)
}
case RelayedMessage(c, m) => {
log info ("Relaying [%s] to [%s]", m, c)
ActorRegistry.actorsFor(c).map(_ send m)
}
case unknown => log info ("Unknown message: %s", unknown.toString)
case Papers(x) => remotes = remotes + (m.getSrc -> Node(x))
case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).map(_ send m)
case unknown => log debug ("Unknown message: %s", unknown.toString)
}
}
case rm @ RelayedMessage(_,_) => {
log info ("Relaying message: %s", rm)
log debug ("Relaying message: %s", rm)
broadcast(rm)
}
case RegisterLocalNode(s) => {
log info ("RegisterLocalNode: %s", s)
log debug ("RegisterLocalNode: %s", s)
local = Node(local.endpoints + s)
broadcast(Papers(local.endpoints))
}
case DeregisterLocalNode(s) => {
log info ("DeregisterLocalNode: %s", s)
log debug ("DeregisterLocalNode: %s", s)
local = Node(local.endpoints - s)
broadcast(Papers(local.endpoints))
}
case Block => log info "UNSUPPORTED: block" //TODO HotSwap to a buffering body
case Unblock => log info "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer
case Block => log debug "UNSUPPORTED: block" //TODO HotSwap to a buffering body
case Unblock => log debug "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer
}
override def shutdown = {
log info ("Shutting down %s", this.getClass.getName)
channel.map(_.close)
log debug ("Shutting down %s", this.getClass.getName)
channel.map(_.shutdown)
remotes = Map()
channel = None
}