=ddata add WeaklyUp cluster member node (#20634)

* add WeaklyUp cluster member node

* fix some small point
This commit is contained in:
netcomm 2016-07-08 05:44:14 +08:00 committed by Konrad Malawski
parent 2769b5e1cb
commit 33283da77a

View file

@ -276,12 +276,14 @@ object Replicator {
final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/** /**
* Unregister a subscriber. * Unregister a subscriber.
* @see [[Replicator.Subscribe]] *
* @see [[Replicator.Subscribe]]
*/ */
final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
/** /**
* The data value is retrieved with [[#get]] using the typed key. * The data value is retrieved with [[#get]] using the typed key.
* @see [[Replicator.Subscribe]] *
* @see [[Replicator.Subscribe]]
*/ */
final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage { final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
/** /**
@ -752,6 +754,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// cluster nodes, doesn't contain selfAddress // cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty var nodes: Set[Address] = Set.empty
// cluster weaklyUp nodes, doesn't contain selfAddress
var weaklyUpNodes: Set[Address] = Set.empty
// nodes removed from cluster, to be pruned, and tombstoned // nodes removed from cluster, to be pruned, and tombstoned
var removedNodes: Map[UniqueAddress, Long] = Map.empty var removedNodes: Map[UniqueAddress, Long] = Map.empty
var pruningPerformed: Map[UniqueAddress, Long] = Map.empty var pruningPerformed: Map[UniqueAddress, Long] = Map.empty
@ -810,6 +815,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Subscribe(key, subscriber) receiveSubscribe(key, subscriber) case Subscribe(key, subscriber) receiveSubscribe(key, subscriber)
case Unsubscribe(key, subscriber) receiveUnsubscribe(key, subscriber) case Unsubscribe(key, subscriber) receiveUnsubscribe(key, subscriber)
case Terminated(ref) receiveTerminated(ref) case Terminated(ref) receiveTerminated(ref)
case MemberWeaklyUp(m) receiveWeaklyUpMemberUp(m)
case MemberUp(m) receiveMemberUp(m) case MemberUp(m) receiveMemberUp(m)
case MemberRemoved(m, _) receiveMemberRemoved(m) case MemberRemoved(m, _) receiveMemberRemoved(m)
case _: MemberEvent // not of interest case _: MemberEvent // not of interest
@ -998,7 +1004,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
changed = Set.empty[String] changed = Set.empty[String]
} }
def receiveGossipTick(): Unit = selectRandomNode(nodes.toVector) foreach gossipTo def receiveGossipTick(): Unit = selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo
def gossipTo(address: Address): Unit = { def gossipTo(address: Address): Unit = {
val to = replica(address) val to = replica(address)
@ -1113,15 +1119,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} }
} }
def receiveMemberUp(m: Member): Unit = def receiveWeaklyUpMemberUp(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress) if (matchingRole(m) && m.address != selfAddress)
weaklyUpNodes += m.address
def receiveMemberUp(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress) {
nodes += m.address nodes += m.address
weaklyUpNodes -= m.address
}
def receiveMemberRemoved(m: Member): Unit = { def receiveMemberRemoved(m: Member): Unit = {
if (m.address == selfAddress) if (m.address == selfAddress)
context stop self context stop self
else if (matchingRole(m)) { else if (matchingRole(m)) {
nodes -= m.address nodes -= m.address
weaklyUpNodes -= m.address
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address unreachable -= m.address
} }