From 33283da77a7cd88cfb2d78515a0169098fa822ec Mon Sep 17 00:00:00 2001 From: netcomm Date: Fri, 8 Jul 2016 05:44:14 +0800 Subject: [PATCH] =ddata add WeaklyUp cluster member node (#20634) * add WeaklyUp cluster member node * fix some small point --- .../scala/akka/cluster/ddata/Replicator.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 5a067d0090..8ccb288c0e 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -276,12 +276,14 @@ object Replicator { final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage /** * Unregister a subscriber. - * @see [[Replicator.Subscribe]] + * + * @see [[Replicator.Subscribe]] */ final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage /** * The data value is retrieved with [[#get]] using the typed key. - * @see [[Replicator.Subscribe]] + * + * @see [[Replicator.Subscribe]] */ final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage { /** @@ -752,6 +754,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // cluster nodes, doesn't contain selfAddress var nodes: Set[Address] = Set.empty + // cluster weaklyUp nodes, doesn't contain selfAddress + var weaklyUpNodes: Set[Address] = Set.empty + // nodes removed from cluster, to be pruned, and tombstoned var removedNodes: Map[UniqueAddress, Long] = Map.empty var pruningPerformed: Map[UniqueAddress, Long] = Map.empty @@ -810,6 +815,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case Subscribe(key, subscriber) ⇒ receiveSubscribe(key, subscriber) case Unsubscribe(key, subscriber) ⇒ receiveUnsubscribe(key, subscriber) case Terminated(ref) ⇒ receiveTerminated(ref) + case MemberWeaklyUp(m) ⇒ receiveWeaklyUpMemberUp(m) case MemberUp(m) ⇒ receiveMemberUp(m) case MemberRemoved(m, _) ⇒ receiveMemberRemoved(m) case _: MemberEvent ⇒ // not of interest @@ -998,7 +1004,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog changed = Set.empty[String] } - def receiveGossipTick(): Unit = selectRandomNode(nodes.toVector) foreach gossipTo + def receiveGossipTick(): Unit = selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo def gossipTo(address: Address): Unit = { val to = replica(address) @@ -1113,15 +1119,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def receiveMemberUp(m: Member): Unit = + def receiveWeaklyUpMemberUp(m: Member): Unit = if (matchingRole(m) && m.address != selfAddress) + weaklyUpNodes += m.address + + def receiveMemberUp(m: Member): Unit = + if (matchingRole(m) && m.address != selfAddress) { nodes += m.address + weaklyUpNodes -= m.address + } def receiveMemberRemoved(m: Member): Unit = { if (m.address == selfAddress) context stop self else if (matchingRole(m)) { nodes -= m.address + weaklyUpNodes -= m.address removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) unreachable -= m.address }