Fixing a bug in JGroupsClusterActor

This commit is contained in:
Viktor Klang 2010-03-04 23:25:55 +01:00
parent 259b6c21bf
commit 8091b6cb26
4 changed files with 30 additions and 31 deletions

View file

@ -44,7 +44,8 @@ class JGroupsClusterActor extends BasicClusterActor {
log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer
}) })
}) })
channel.map(_.connect(name))
channel.foreach(_.connect(name))
} }
protected def toOneNode(dest : Address, msg: Array[Byte]): Unit = protected def toOneNode(dest : Address, msg: Array[Byte]): Unit =

View file

@ -85,6 +85,7 @@ class ShoalClusterActor extends BasicClusterActor {
*/ */
protected def createCallback : CallBack = { protected def createCallback : CallBack = {
import org.scala_tools.javautils.Imports._ import org.scala_tools.javautils.Imports._
import ClusterActor._
val me = this val me = this
new CallBack { new CallBack {
def processNotification(signal : Signal) { def processNotification(signal : Signal) {
@ -92,10 +93,10 @@ class ShoalClusterActor extends BasicClusterActor {
signal.acquire() signal.acquire()
if(isActive) { if(isActive) {
signal match { signal match {
case ms : MessageSignal => me send Message(ms.getMemberToken,ms.getMessage) case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage)
case jns : JoinNotificationSignal => me send View(Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName)
case fss : FailureSuspectedSignal => me send Zombie(fss.getMemberToken) case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken)
case fns : FailureNotificationSignal => me send Zombie(fns.getMemberToken) case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken)
case _ => log.debug("Unhandled signal: [%s]",signal) case _ => log.debug("Unhandled signal: [%s]",signal)
} }
} }

View file

@ -48,7 +48,15 @@ private[remote] object ClusterActor {
sealed trait ClusterMessage sealed trait ClusterMessage
private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
private[remote] case class Message[ADDR_T](sender : ADDR_T,msg : Array[Byte])
private[remote] case object PapersPlease extends ClusterMessage
private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
private[remote] case object Block extends ClusterMessage
private[remote] case object Unblock extends ClusterMessage
private[remote] case class View[ADDR_T](othersPresent : Set[ADDR_T]) extends ClusterMessage
private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage
private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
private[remote] case class Node(endpoints: List[RemoteAddress]) private[remote] case class Node(endpoints: List[RemoteAddress])
} }
@ -60,16 +68,6 @@ private[remote] object ClusterActor {
abstract class BasicClusterActor extends ClusterActor { abstract class BasicClusterActor extends ClusterActor {
import ClusterActor._ import ClusterActor._
case class Message(sender : ADDR_T,msg : Array[Byte])
case object PapersPlease extends ClusterMessage
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
case object Block extends ClusterMessage
case object Unblock extends ClusterMessage
case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
case class Zombie(address: ADDR_T) extends ClusterMessage
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
type ADDR_T type ADDR_T
@ -85,14 +83,14 @@ abstract class BasicClusterActor extends ClusterActor {
} }
def receive = { def receive = {
case v @ View(members) => { case v : View[ADDR_T] => {
// Not present in the cluster anymore = presumably zombies // Not present in the cluster anymore = presumably zombies
// Nodes we have no prior knowledge existed = unknowns // Nodes we have no prior knowledge existed = unknowns
val zombies = Set[ADDR_T]() ++ remotes.keySet -- members val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent
val unknown = members -- remotes.keySet val unknown = v.othersPresent -- remotes.keySet
log debug ("Updating view") log debug ("Updating view")
log debug ("Other memebers: [%s]",members) log debug ("Other memebers: [%s]",v.othersPresent)
log debug ("Zombies: [%s]",zombies) log debug ("Zombies: [%s]",zombies)
log debug ("Unknowns: [%s]",unknown) log debug ("Unknowns: [%s]",unknown)
@ -101,10 +99,10 @@ abstract class BasicClusterActor extends ClusterActor {
remotes = remotes -- zombies remotes = remotes -- zombies
} }
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead case z : Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead
log debug ("Killing Zombie Node: %s", x) log debug ("Killing Zombie Node: %s", z.address)
broadcast(x :: Nil, PapersPlease) broadcast(z.address :: Nil, PapersPlease)
remotes = remotes - x remotes = remotes - z.address
} }
case rm @ RelayedMessage(_, _) => { case rm @ RelayedMessage(_, _) => {
@ -112,7 +110,8 @@ abstract class BasicClusterActor extends ClusterActor {
broadcast(rm) broadcast(rm)
} }
case m @ Message(src,msg) => { case m : Message[ADDR_T] => {
val (src,msg) = (m.sender,m.msg)
(Cluster.serializer in (msg, None)) match { (Cluster.serializer in (msg, None)) match {
case PapersPlease => { case PapersPlease => {
@ -207,7 +206,7 @@ abstract class BasicClusterActor extends ClusterActor {
*/ */
object Cluster extends Cluster with Logging { object Cluster extends Cluster with Logging {
@volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var clusterActor: Option[ClusterActor] = None
@volatile private[remote] var supervisor: Option[Supervisor] = None @volatile private[remote] var supervisor: Option[Supervisor] = None
private[remote] lazy val serializer: Serializer = { private[remote] lazy val serializer: Serializer = {
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
@ -219,9 +218,7 @@ object Cluster extends Cluster with Logging {
try { try {
name map { fqn => name map { fqn =>
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
a.start
a
} }
} }
catch { catch {
@ -235,7 +232,6 @@ object Cluster extends Cluster with Logging {
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil) Supervise(actor, LifeCycle(Permanent)) :: Nil)
).newInstance ).newInstance
sup.start
Some(sup) Some(sup)
} }
@ -258,6 +254,7 @@ object Cluster extends Cluster with Logging {
sup <- createSupervisor(actor)) { sup <- createSupervisor(actor)) {
clusterActor = Some(actor) clusterActor = Some(actor)
supervisor = Some(sup) supervisor = Some(sup)
sup.start
} }
} }
} }

View file

@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer
*/ */
object RemoteServer { object RemoteServer {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999) val PORT = config.getInt("akka.remote.server.port", 9966)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)