From 279fd2b6ef4a5d8aef16f7f0dd2d188b9b378a1c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Oct 2012 18:13:08 +0200 Subject: [PATCH] Fix bug introduced in refactoring, see #2284 --- .../scala/akka/cluster/ClusterHeartbeat.scala | 17 ++++++-------- .../ClusterHeartbeatSenderStateSpec.scala | 22 ++++++++++++++++++- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 325e0aae25..fef88ece20 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -128,9 +128,6 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address - def removeJoinInProgress(address: Address): Unit = if (address != selfAddress) - state = state.removeJoinInProgress(address) - def addJoinInProgress(address: Address, deadline: Deadline): Unit = if (address != selfAddress) state = state.addJoinInProgress(address, deadline) @@ -208,8 +205,7 @@ private[cluster] object ClusterHeartbeatSenderState { // start ending process for nodes not selected any more // abort ending process for nodes that have been selected again val end = old.ending ++ (old.current -- curr).map(_ -> 0) -- curr - old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end, - joinInProgress = old.joinInProgress -- all) + old.copy(consistentHash = consistentHash, all = all, current = curr, ending = end) } } @@ -252,16 +248,17 @@ private[cluster] case class ClusterHeartbeatSenderState private ( val active: Set[Address] = current ++ joinInProgress.keySet def reset(nodes: Set[Address]): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), + ClusterHeartbeatSenderState(nodes.foldLeft(this) { _ removeJoinInProgress _ }, + consistentHash = ConsistentHash(nodes, consistentHash.virtualNodesFactor), all = nodes) def addMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, all = all + a, consistentHash = consistentHash :+ a) + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all + a, consistentHash = consistentHash :+ a) def removeMember(a: Address): ClusterHeartbeatSenderState = - ClusterHeartbeatSenderState(this, all = all - a, consistentHash = consistentHash :- a) + ClusterHeartbeatSenderState(removeJoinInProgress(a), all = all - a, consistentHash = consistentHash :- a) - def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { + private def removeJoinInProgress(address: Address): ClusterHeartbeatSenderState = { if (joinInProgress contains address) copy(joinInProgress = joinInProgress - address, ending = ending + (address -> 0)) else this @@ -269,7 +266,7 @@ private[cluster] case class ClusterHeartbeatSenderState private ( def addJoinInProgress(address: Address, deadline: Deadline): ClusterHeartbeatSenderState = { if (all contains address) this - else copy(joinInProgress = joinInProgress + (address -> deadline)) + else copy(joinInProgress = joinInProgress + (address -> deadline), ending = ending - address) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index bd378ed0fe..3850524c24 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -40,13 +40,33 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with MustMatchers { val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).removeOverdueJoinInProgress() s.joinInProgress must be(Map.empty) s.active must be(Set.empty) + s.ending must be(Map(aa -> 0)) } "remove joinInProgress after reset" in { - val s = emptyState.addJoinInProgress(aa, Deadline.now - 30.seconds).reset(Set(aa, bb)) + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)) s.joinInProgress must be(Map.empty) } + "remove joinInProgress after addMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).addMember(aa) + s.joinInProgress must be(Map.empty) + } + + "remove joinInProgress after removeMember" in { + val s = emptyState.addJoinInProgress(aa, Deadline.now + 30.seconds).reset(Set(aa, bb)).removeMember(aa) + s.joinInProgress must be(Map.empty) + s.ending must be(Map(aa -> 0)) + } + + "remove from ending after addJoinInProgress" in { + val s = emptyState.reset(Set(aa, bb)).removeMember(aa) + s.ending must be(Map(aa -> 0)) + val s2 = s.addJoinInProgress(aa, Deadline.now + 30.seconds) + s2.joinInProgress.keySet must be(Set(aa)) + s2.ending must be(Map.empty) + } + "include nodes from reset in active set" in { val nodes = Set(aa, bb, cc) val s = emptyState.reset(nodes)