Fix bug introduced in refactoring, see #2284
This commit is contained in:
parent
66c81e915e
commit
279fd2b6ef
2 changed files with 28 additions and 11 deletions
|
|
@ -128,9 +128,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
def removeMember(m: Member): Unit = if (m.address != selfAddress)
|
||||
state = state removeMember m.address
|
||||
|
||||
def removeJoinInProgress(address: Address): Unit = if (address != selfAddress)
|
||||
state = state.removeJoinInProgress(address)
|
||||
|
||||
def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress)
|
||||
state = state.addJoinInProgress(address, deadline)
|
||||
|
||||
|
|
@ -208,8 +205,7 @@ private[cluster] object ClusterHeartbeatSenderState {
|
|||
// start ending process for nodes not selected any more
|
||||
// abort ending process for nodes that have been selected again
|
||||
val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr
|
||||
old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end,
|
||||
joinInProgress = old.joinInProgress -- all)
|
||||
old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -252,16 +248,17 @@ private[cluster] case class ClusterHeartbeatSenderState private (
|
|||
val active: Set[Address] = current ++ joinInProgress.keySet
|
||||
|
||||
def reset(nodes: Set[Address]): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
|
||||
ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ },
|
||||
consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor),
|
||||
all = nodes)
|
||||
|
||||
def addMember(a: Address): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a)
|
||||
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a)
|
||||
|
||||
def removeMember(a: Address): ClusterHeartbeatSenderState =
|
||||
ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a)
|
||||
ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a)
|
||||
|
||||
def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
|
||||
private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = {
|
||||
if (joinInProgress contains address)
|
||||
copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0))
|
||||
else this
|
||||
|
|
@ -269,7 +266,7 @@ private[cluster] case class ClusterHeartbeatSenderState private (
|
|||
|
||||
def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = {
|
||||
if (all contains address) this
|
||||
else copy(joinInProgress = joinInProgress + (address -> deadline))
|
||||
else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -40,13 +40,33 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers {
|
|||
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress()
|
||||
s.joinInProgress must be(Map.empty)
|
||||
s.active must be(Set.empty)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
}
|
||||
|
||||
"remove joinInProgress after reset" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb))
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb))
|
||||
s.joinInProgress must be(Map.empty)
|
||||
}
|
||||
|
||||
"remove joinInProgress after addMember" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa)
|
||||
s.joinInProgress must be(Map.empty)
|
||||
}
|
||||
|
||||
"remove joinInProgress after removeMember" in {
|
||||
val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa)
|
||||
s.joinInProgress must be(Map.empty)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
}
|
||||
|
||||
"remove from ending after addJoinInProgress" in {
|
||||
val s = emptyState.reset(Set(aa, bb)).removeMember(aa)
|
||||
s.ending must be(Map(aa -> 0))
|
||||
val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds)
|
||||
s2.joinInProgress.keySet must be(Set(aa))
|
||||
s2.ending must be(Map.empty)
|
||||
}
|
||||
|
||||
"include nodes from reset in active set" in {
|
||||
val nodes = Set(aa, bb, cc)
|
||||
val s = emptyState.reset(nodes)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue