fix usage of the the leader sorted set in Replicator
* since the ordering can change based on the member's status it's not possible to use ordinary - for removal * similar issue at a few places where ageOrdering was used
This commit is contained in:
parent
f5a40bfcb2
commit
0b1ce7223d
3 changed files with 22 additions and 8 deletions
|
|
@ -479,13 +479,23 @@ private[akka] class ShardRegion(
|
||||||
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
|
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
|
||||||
case MemberUp(m) ⇒
|
case MemberUp(m) ⇒
|
||||||
if (matchingRole(m))
|
if (matchingRole(m))
|
||||||
changeMembers(membersByAge - m + m) // replace
|
changeMembers {
|
||||||
|
// replace, it's possible that the upNumber is changed
|
||||||
|
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
|
membersByAge += m
|
||||||
|
membersByAge
|
||||||
|
}
|
||||||
|
|
||||||
case MemberRemoved(m, _) ⇒
|
case MemberRemoved(m, _) ⇒
|
||||||
if (m.uniqueAddress == cluster.selfUniqueAddress)
|
if (m.uniqueAddress == cluster.selfUniqueAddress)
|
||||||
context.stop(self)
|
context.stop(self)
|
||||||
else if (matchingRole(m))
|
else if (matchingRole(m))
|
||||||
changeMembers(membersByAge - m)
|
changeMembers {
|
||||||
|
// filter, it's possible that the upNumber is changed
|
||||||
|
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
|
membersByAge += m
|
||||||
|
membersByAge
|
||||||
|
}
|
||||||
|
|
||||||
case _: MemberEvent ⇒ // these are expected, no need to warn about them
|
case _: MemberEvent ⇒ // these are expected, no need to warn about them
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -222,7 +222,8 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
||||||
def add(m: Member): Unit = {
|
def add(m: Member): Unit = {
|
||||||
if (matchingRole(m))
|
if (matchingRole(m))
|
||||||
trackChange { () ⇒
|
trackChange { () ⇒
|
||||||
membersByAge -= m // replace
|
// replace, it's possible that the upNumber is changed
|
||||||
|
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
membersByAge += m
|
membersByAge += m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -233,8 +234,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
||||||
*/
|
*/
|
||||||
def remove(m: Member): Unit = {
|
def remove(m: Member): Unit = {
|
||||||
if (matchingRole(m))
|
if (matchingRole(m))
|
||||||
trackChange {
|
trackChange { () ⇒
|
||||||
() ⇒ membersByAge -= m
|
// filter, it's possible that the upNumber is changed
|
||||||
|
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1740,7 +1740,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
if (m.address == selfAddress)
|
if (m.address == selfAddress)
|
||||||
context stop self
|
context stop self
|
||||||
else if (matchingRole(m)) {
|
else if (matchingRole(m)) {
|
||||||
leader -= m
|
// filter, it's possible that the ordering is changed since it based on MemberStatus
|
||||||
|
leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
nodes -= m.address
|
nodes -= m.address
|
||||||
weaklyUpNodes -= m.address
|
weaklyUpNodes -= m.address
|
||||||
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
|
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
|
||||||
|
|
@ -1752,8 +1753,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
|
|
||||||
def receiveOtherMemberEvent(m: Member): Unit =
|
def receiveOtherMemberEvent(m: Member): Unit =
|
||||||
if (matchingRole(m)) {
|
if (matchingRole(m)) {
|
||||||
// update changed status
|
// replace, it's possible that the ordering is changed since it based on MemberStatus
|
||||||
leader = (leader - m) + m
|
leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress)
|
||||||
|
leader += m
|
||||||
}
|
}
|
||||||
|
|
||||||
def receiveUnreachable(m: Member): Unit =
|
def receiveUnreachable(m: Member): Unit =
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue