diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala index 185e380adf..be59219400 100644 --- a/akka-actors/src/main/scala/nio/Cluster.scala +++ b/akka-actors/src/main/scala/nio/Cluster.scala @@ -62,40 +62,40 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) val me = this channel = new JChannel { setReceiver(new Receiver with ExtendedMembershipListener { - def getState : Array[Byte] = null - def setState(state : Array[Byte]) : Unit = () - def receive(msg : Message) : Unit = me ! msg - def viewAccepted(view : View) : Unit = me ! view - def suspect(a : Address) : Unit = me ! Zombie(a) - def block : Unit = me ! Block - def unblock : Unit = me ! Unblock - }) + def getState : Array[Byte] = null + def setState(state : Array[Byte]) : Unit = () + def receive(msg : Message) : Unit = me ! msg + def viewAccepted(view : View) : Unit = me ! view + def suspect(a : Address) : Unit = me ! Zombie(a) + def block : Unit = me ! Block + def unblock : Unit = me ! Unblock + }) } channel connect name } - protected def serializer = Protobuf + protected def serializer = Protobuf //FIXME make this configurable - private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = { - for(r <- recipients) - channel.send(new Message(r,null,serializer out msg)) + private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) = { + recipients.foreach( to => channel.send(new Message(to,null,serializer out msg))) } private def broadcast[T <: AnyRef](msg : T) : Unit = channel.send(new Message(null,null,serializer out msg)) override def receive = { case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead + log info "Zombie: "+x broadcast(x :: Nil,PapersPlease) remotes = remotes - x } case v : View => { - log debug v.printDetails + log info v.printDetails //Not present in the cluster anymore = presumably zombies //Nodes we have no prior knowledge existed = unknowns val members = Set[Address]() ++ v.getMembers.asScala val zombies = Set[Address]() ++ remotes.keySet -- members - val unknown : Set[Address] = members -- remotes.keySet + val unknown = members -- remotes.keySet //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead broadcast(zombies ++ unknown, PapersPlease) @@ -106,27 +106,27 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) ( serializer in(m.getRawBuffer,None) ) match { case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints)) case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) - case unknown => log debug unknown.toString + case unknown => log info unknown.toString } } - case DeregisterLocalNode(s) => { - log debug "DeregisterLocalNode"+s - local = Node(local.endpoints - s) - broadcast(Papers(local.endpoints)) - } - case RegisterLocalNode(s) => { - log debug "RegisterLocalNode"+s + log info "RegisterLocalNode: "+s local = Node(local.endpoints + s) broadcast(Papers(local.endpoints)) } + + case DeregisterLocalNode(s) => { + log info "DeregisterLocalNode: "+s + local = Node(local.endpoints - s) + broadcast(Papers(local.endpoints)) + } - case Block => log debug "Asked to block" //TODO HotSwap to a buffering body - case Unblock => log debug "Asked to unblock" //TODO HotSwap back and flush the buffer + case Block => log info "Asked to block" //TODO HotSwap to a buffering body + case Unblock => log info "Asked to unblock" //TODO HotSwap back and flush the buffer } - def members = remotes.values.toList + def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes def registerLocalNode(server : RemoteAddress) : Unit = this ! RegisterLocalNode(server) def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server)