From 0b1ce7223da19dac94b6f6676f5e4663dd2b208d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Jul 2017 14:52:42 +0200 Subject: [PATCH] fix usage of the the leader sorted set in Replicator * since the ordering can change based on the member's status it's not possible to use ordinary - for removal * similar issue at a few places where ageOrdering was used --- .../scala/akka/cluster/sharding/ShardRegion.scala | 14 ++++++++++++-- .../cluster/singleton/ClusterSingletonProxy.scala | 8 +++++--- .../main/scala/akka/cluster/ddata/Replicator.scala | 8 +++++--- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 8de68d4cd4..bc3a42d1e8 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -479,13 +479,23 @@ private[akka] class ShardRegion( def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { case MemberUp(m) ⇒ if (matchingRole(m)) - changeMembers(membersByAge - m + m) // replace + changeMembers { + // replace, it's possible that the upNumber is changed + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) + membersByAge += m + membersByAge + } case MemberRemoved(m, _) ⇒ if (m.uniqueAddress == cluster.selfUniqueAddress) context.stop(self) else if (matchingRole(m)) - changeMembers(membersByAge - m) + changeMembers { + // filter, it's possible that the upNumber is changed + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) + membersByAge += m + membersByAge + } case _: MemberEvent ⇒ // these are expected, no need to warn about them diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 0c017d31dd..57dd041218 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -222,7 +222,8 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste def add(m: Member): Unit = { if (matchingRole(m)) trackChange { () ⇒ - membersByAge -= m // replace + // replace, it's possible that the upNumber is changed + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) membersByAge += m } } @@ -233,8 +234,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste */ def remove(m: Member): Unit = { if (matchingRole(m)) - trackChange { - () ⇒ membersByAge -= m + trackChange { () ⇒ + // filter, it's possible that the upNumber is changed + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) } } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index ca79c9be89..0c2291c936 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1740,7 +1740,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (m.address == selfAddress) context stop self else if (matchingRole(m)) { - leader -= m + // filter, it's possible that the ordering is changed since it based on MemberStatus + leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress) nodes -= m.address weaklyUpNodes -= m.address log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress) @@ -1752,8 +1753,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveOtherMemberEvent(m: Member): Unit = if (matchingRole(m)) { - // update changed status - leader = (leader - m) + m + // replace, it's possible that the ordering is changed since it based on MemberStatus + leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress) + leader += m } def receiveUnreachable(m: Member): Unit =