diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 24adea2c7b..bf27cb186f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -65,138 +65,36 @@ object ClusterEvent { /** * Marker interface for membership events. - * Only published after convergence, when all members have seen current + * Published when the state change is first seen on a node. + * The state change was performed by the leader when there was + * convergence on the leader node, i.e. all members had seen previous * state. */ sealed trait MemberEvent extends ClusterDomainEvent { def member: Member } - /** - * A new member joined the cluster. - * Only published after convergence, when all members have seen current - * state. - */ - case class MemberJoined(member: Member) extends MemberEvent { - if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) - } - /** * Member status changed to Up. - * Only published after convergence, when all members have seen current - * state. */ case class MemberUp(member: Member) extends MemberEvent { if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) } - /** - * Member status changed to Leaving. - * Only published after convergence, when all members have seen current - * state. - */ - case class MemberLeft(member: Member) extends MemberEvent { - if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) - } - /** * Member status changed to Exiting. - * Only published after convergence, when all members have seen current - * state. */ case class MemberExited(member: Member) extends MemberEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) } /** - * Member status changed to Down. - * Only published after convergence, when all members have seen current - * state. - */ - case class MemberDowned(member: Member) extends MemberEvent { - if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) - } - - /** - * Member completely removed from the cluster. Only published after convergence, - * when all other members have seen the state. + * Member completely removed from the cluster. */ case class MemberRemoved(member: Member) extends MemberEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } - /** - * Current snapshot state of the cluster. Sent to new subscriber of - * [akka.cluster.ClusterEvent.InstantMemberEvent]. - */ - case class InstantClusterState(members: immutable.SortedSet[Member] = immutable.SortedSet.empty) - extends ClusterDomainEvent { - - /** - * Java API: get current member list - */ - def getMembers: java.lang.Iterable[Member] = { - import scala.collection.JavaConverters._ - members.asJava - } - } - - /** - * Marker interface for membership events published immediately when - * it happened. All other members might not have seen the state. - */ - sealed trait InstantMemberEvent extends ClusterDomainEvent { - def member: Member - } - - /** - * A new member joined the cluster. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberJoined(member: Member) extends InstantMemberEvent { - if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) - } - - /** - * Member status changed to Up. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberUp(member: Member) extends InstantMemberEvent { - if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) - } - - /** - * Member status changed to Leaving. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberLeft(member: Member) extends InstantMemberEvent { - if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) - } - - /** - * Member status changed to Exiting. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberExited(member: Member) extends InstantMemberEvent { - if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) - } - - /** - * Member status changed to Down. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberDowned(member: Member) extends InstantMemberEvent { - if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) - } - - /** - * Member completely removed from the cluster. Published immediately when it happened. - * All other members might not have seen the state. - */ - case class InstantMemberRemoved(member: Member) extends InstantMemberEvent { - if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) - } - /** * Leader of the cluster members changed. Only published after convergence. */ @@ -260,19 +158,13 @@ object ClusterEvent { val changedMembers = membersGroupedByAddress collect { case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember } - val memberEvents = (newMembers ++ changedMembers) map { m ⇒ - m.status match { - case Joining ⇒ MemberJoined(m) - case Up ⇒ MemberUp(m) - case Leaving ⇒ MemberLeft(m) - case Exiting ⇒ MemberExited(m) - case _ ⇒ throw new IllegalStateException("Unexpected member status: " + m) - } + val memberEvents = (newMembers ++ changedMembers) collect { + case m if m.status == Up ⇒ MemberUp(m) + case m if m.status == Exiting ⇒ MemberExited(m) + // no events for other transitions } val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable - val newDowned = allNewUnreachable filter { _.status == Down } - val downedEvents = newDowned map MemberDowned val unreachableGroupedByAddress = List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.address) @@ -280,27 +172,12 @@ object ClusterEvent { case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ newMember } - val unreachableDownedEvents = unreachableDownMembers map MemberDowned val removedMembers = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) ++ (oldGossip.overview.unreachable -- newGossip.overview.unreachable) val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed))) - (new VectorBuilder[MemberEvent]() ++= memberEvents ++= downedEvents ++= unreachableDownedEvents - ++= removedEvents).result() - } - - /** - * INTERNAL API - */ - private[cluster] def convertToInstantMemberEvents(memberEvents: immutable.Seq[MemberEvent]): immutable.Seq[InstantMemberEvent] = - memberEvents map { - case MemberJoined(m) ⇒ InstantMemberJoined(m) - case MemberUp(m) ⇒ InstantMemberUp(m) - case MemberDowned(m) ⇒ InstantMemberDowned(m) - case MemberLeft(m) ⇒ InstantMemberLeft(m) - case MemberExited(m) ⇒ InstantMemberExited(m) - case MemberRemoved(m) ⇒ InstantMemberRemoved(m) + (new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result() } /** @@ -358,12 +235,12 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def eventStream: EventStream = context.system.eventStream /** - * The current snapshot state that is a mix of converged and latest gossip + * The current snapshot state corresponding to latest gossip * to mimic what you would have seen if you where listening to the events. */ def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { val state = CurrentClusterState( - members = latestConvergedGossip.members, + members = latestGossip.members, unreachable = latestGossip.overview.unreachable, seenBy = latestGossip.seenBy, leader = latestConvergedGossip.leader) @@ -373,20 +250,8 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } } - /** - * Publish the snapshot state that is based on latest gossip to mimic what you - * would have seen if you where listening to the InstantMemberEvent stream. - */ - def publishInstantClusterState(receiver: ActorRef): Unit = - receiver ! InstantClusterState(members = latestGossip.members) - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - val isInstantMemberEvent = classOf[InstantMemberEvent].isAssignableFrom(to) - if (classOf[ClusterDomainEvent] == to || isInstantMemberEvent) - publishInstantClusterState(subscriber) - if (!isInstantMemberEvent) - publishCurrentClusterState(Some(subscriber)) - + publishCurrentClusterState(Some(subscriber)) eventStream.subscribe(subscriber, to) } @@ -401,25 +266,22 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip // first publish the diffUnreachable between the last two gossips diffUnreachable(oldGossip, newGossip) foreach publish - val newMemberEvents = diffMemberEvents(oldGossip, newGossip) - convertToInstantMemberEvents(newMemberEvents) foreach publish - // buffer up the MemberEvents waiting for convergence - bufferedEvents ++= newMemberEvents + diffMemberEvents(oldGossip, newGossip) foreach { event ⇒ + event match { + case MemberRemoved(m) ⇒ + publish(event) + // notify DeathWatch about downed node + publish(AddressTerminated(m.address)) + case _ ⇒ publish(event) + } + } // buffer up the LeaderChanged waiting for convergence bufferedEvents ++= diffLeader(oldGossip, newGossip) // if we have convergence then publish the MemberEvents and LeaderChanged if (newGossip.convergence) { val previousConvergedGossip = latestConvergedGossip latestConvergedGossip = newGossip - bufferedEvents foreach { event ⇒ - event match { - case m: MemberEvent if m.isInstanceOf[MemberRemoved] ⇒ - publish(event) - // notify DeathWatch about downed node - publish(AddressTerminated(m.member.address)) - case _ ⇒ publish(event) - } - } + bufferedEvents foreach publish bufferedEvents = Vector.empty } // publish internal SeenState for testing purposes diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 309b0d039a..6c4166f95f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -101,7 +101,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg HeartbeatInterval, self, HeartbeatTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[InstantMemberEvent]) + cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[UnreachableMember]) } @@ -123,19 +123,17 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def receive = { case HeartbeatTick ⇒ heartbeat() - case InstantMemberUp(m) ⇒ addMember(m) + case MemberUp(m) ⇒ addMember(m) case UnreachableMember(m) ⇒ removeMember(m) - case InstantMemberDowned(m) ⇒ removeMember(m) - case InstantMemberRemoved(m) ⇒ removeMember(m) - case s: InstantClusterState ⇒ reset(s) - case _: CurrentClusterState ⇒ // enough with InstantClusterState - case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent + case MemberRemoved(m) ⇒ removeMember(m) + case s: CurrentClusterState ⇒ reset(s) + case _: MemberEvent ⇒ // not interested in other types of MemberEvent case HeartbeatRequest(from) ⇒ addHeartbeatRequest(from) case SendHeartbeatRequest(to) ⇒ sendHeartbeatRequest(to) case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } - def reset(snapshot: InstantClusterState): Unit = state = state.reset(snapshot.members.map(_.address)) + def reset(snapshot: CurrentClusterState): Unit = state = state.reset(snapshot.members.map(_.address)) def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 4c3f70108a..2b756e75a1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -76,7 +76,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto MetricsInterval, self, MetricsTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[InstantMemberEvent]) + cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[UnreachableMember]) log.info("Metrics collection has started successfully on node [{}]", selfAddress) } @@ -85,13 +85,11 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case GossipTick ⇒ gossip() case MetricsTick ⇒ collect() case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) - case state: InstantClusterState ⇒ receiveState(state) - case state: CurrentClusterState ⇒ // enough with InstantClusterState - case InstantMemberUp(m) ⇒ addMember(m) - case InstantMemberDowned(m) ⇒ removeMember(m) - case InstantMemberRemoved(m) ⇒ removeMember(m) + case state: CurrentClusterState ⇒ receiveState(state) + case MemberUp(m) ⇒ addMember(m) + case MemberRemoved(m) ⇒ removeMember(m) case UnreachableMember(m) ⇒ removeMember(m) - case _: InstantMemberEvent ⇒ // not interested in other types of InstantMemberEvent + case _: MemberEvent ⇒ // not interested in other types of MemberEvent } @@ -119,7 +117,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus.Up]]. */ - def receiveState(state: InstantClusterState): Unit = + def receiveState(state: CurrentClusterState): Unit = nodes = state.members collect { case m if m.status == Up ⇒ m.address } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 00b0f18435..807fe85e52 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -53,18 +53,14 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case UnreachableMember(member) ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) - case MemberDowned(member) ⇒ - // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member, unreachable = state.unreachable - event.member) - case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes - case _: InstantClusterState | _: InstantMemberEvent ⇒ // not used here + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") @@ -130,6 +126,12 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { */ def clusterMetrics: Set[NodeMetrics] = _clusterMetrics + /** + * INTERNAL API + */ + private[cluster] def refreshCurrentState(): Unit = + cluster.sendCurrentClusterState(eventBusListener) + /** * INTERNAL API * The nodes that has seen current version of the Gossip. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 79f5a72c4b..687f49faaa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -52,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow cluster.down(thirdAddress) enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -63,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 92622dd13a..d1b6e7cdc4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT markNodeAsUnavailable(thirdAddress) - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -61,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 763d2623c2..0554144015 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -92,33 +92,15 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) def memberStatus(address: Address): Option[MemberStatus] = clusterView.members.collectFirst { case m if m.address == address ⇒ m.status } - def assertNotMovedUp(joining: Boolean): Unit = { - within(20 seconds) { - if (joining) awaitCond(clusterView.members.size == 0) - else awaitCond(clusterView.members.size == 2) - awaitSeenSameState(first, second, fourth) - if (joining) memberStatus(first) must be(None) - else memberStatus(first) must be(Some(MemberStatus.Up)) - if (joining) memberStatus(second) must be(None) - else memberStatus(second) must be(Some(MemberStatus.Up)) - // leader is not allowed to move the new node to Up - memberStatus(fourth) must be(None) - } - } - enterBarrier("after-join") - runOn(first, second) { + runOn(first, second, fourth) { for (n ← 1 to 5) { - assertNotMovedUp(joining = false) - // wait and then check again - Thread.sleep(1.second.dilated.toMillis) - } - } - - runOn(fourth) { - for (n ← 1 to 5) { - assertNotMovedUp(joining = true) + awaitCond(clusterView.members.size == 2) + awaitSeenSameState(first, second, fourth) + memberStatus(first) must be(Some(MemberStatus.Up)) + memberStatus(second) must be(Some(MemberStatus.Up)) + memberStatus(fourth) must be(None) // wait and then check again Thread.sleep(1.second.dilated.toMillis) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala index 4e6bae48d5..707b6a9c47 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialHeartbeatSpec.scala @@ -8,8 +8,7 @@ import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.Props -import akka.cluster.ClusterEvent.InstantClusterState -import akka.cluster.ClusterEvent.InstantMemberJoined +import akka.cluster.ClusterEvent.CurrentClusterState import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction @@ -48,20 +47,11 @@ abstract class InitialHeartbeatSpec awaitClusterUp(first) runOn(first) { - val joinLatch = TestLatch() - cluster.subscribe(system.actorOf(Props(new Actor { - def receive = { - case state: InstantClusterState ⇒ - if (state.members.exists(_.address == secondAddress)) - joinLatch.countDown() - case InstantMemberJoined(m) ⇒ - if (m.address == secondAddress) - joinLatch.countDown() - } - })), classOf[InstantMemberJoined]) - within(10 seconds) { - joinLatch.await + awaitCond { + cluster.sendCurrentClusterState(testActor) + expectMsgType[CurrentClusterState].members.exists(_.address == secondAddress) + } } } runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 577ea213ec..f01f8da317 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -48,7 +48,7 @@ abstract class JoinSeedNodeSpec runOn(seed1, seed2, seed3) { cluster.joinSeedNodes(seedNodes) - awaitUpConvergence(3) + awaitMembersUp(3) } enterBarrier("after-1") } @@ -57,7 +57,7 @@ abstract class JoinSeedNodeSpec runOn(ordinary1, ordinary2) { cluster.joinSeedNodes(seedNodes) } - awaitUpConvergence(roles.size) + awaitMembersUp(roles.size) enterBarrier("after-2") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 46ec2780b8..db5e618a22 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -52,7 +52,7 @@ abstract class JoinTwoClustersSpec cluster.join(c1) } - awaitUpConvergence(numberOfMembers = 2) + awaitMembersUp(numberOfMembers = 2) assertLeader(a1, a2) assertLeader(b1, b2) @@ -65,7 +65,7 @@ abstract class JoinTwoClustersSpec } runOn(a1, a2, b1, b2) { - awaitUpConvergence(numberOfMembers = 4) + awaitMembersUp(numberOfMembers = 4) } assertLeader(a1, a2, b1, b2) @@ -80,7 +80,7 @@ abstract class JoinTwoClustersSpec cluster.join(c1) } - awaitUpConvergence(numberOfMembers = 6) + awaitMembersUp(numberOfMembers = 6) assertLeader(a1, a2, b1, b2, c1, c2) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 1203ac2740..d5616d07ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -60,7 +60,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) } runOn(fourth) { @@ -70,7 +70,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(second, third) { enterBarrier("down-fourth-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) + awaitMembersUp(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) } enterBarrier("await-completion-1") @@ -90,7 +90,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) + awaitMembersUp(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) } runOn(second) { @@ -100,7 +100,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(third) { enterBarrier("down-second-node") - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30 seconds) + awaitMembersUp(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30 seconds) } enterBarrier("await-completion-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 13c6b7fae8..95f8f29250 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -102,7 +102,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig enterBarrier("after-unavailable" + n) enterBarrier("after-down" + n) - awaitUpConvergence(currentRoles.size - 1) + awaitMembersUp(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head clusterView.isLeader must be(myself == nextExpectedLeader) assertLeaderIn(remainingRoles) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 0f6ff1b904..1e52674e62 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -61,17 +61,13 @@ abstract class LeaderLeavingSpec } else { - val leavingLatch = TestLatch() val exitingLatch = TestLatch() cluster.subscribe(system.actorOf(Props(new Actor { def receive = { case state: CurrentClusterState ⇒ - if (state.members.exists(m ⇒ m.address == oldLeaderAddress && m.status == Leaving)) - leavingLatch.countDown() if (state.members.exists(m ⇒ m.address == oldLeaderAddress && m.status == Exiting)) exitingLatch.countDown() - case MemberLeft(m) if m.address == oldLeaderAddress ⇒ leavingLatch.countDown() case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown() case _ ⇒ // ignore } @@ -83,9 +79,6 @@ abstract class LeaderLeavingSpec val expectedAddresses = roles.toSet map address awaitCond(clusterView.members.map(_.address) == expectedAddresses) - // verify that the LEADER is LEAVING - leavingLatch.await - // verify that the LEADER is EXITING exitingLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index c858222b7f..5c3fd3d5f6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -83,7 +83,7 @@ abstract class MBeanSpec } enterBarrier("joined") - awaitUpConvergence(4) + awaitMembersUp(4) assertMembers(clusterView.members, roles.map(address(_)): _*) awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up") val expectedMembers = roles.sorted.map(address(_)).mkString(",") @@ -115,7 +115,7 @@ abstract class MBeanSpec enterBarrier("fourth-down") runOn(first, second, third) { - awaitUpConvergence(3, canNotBePartOfMemberRing = Set(fourthAddress)) + awaitMembersUp(3, canNotBePartOfMemberRing = Set(fourthAddress)) assertMembers(clusterView.members, first, second, third) awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == "") } @@ -129,7 +129,7 @@ abstract class MBeanSpec } enterBarrier("third-left") runOn(first, second) { - awaitUpConvergence(2) + awaitMembersUp(2) assertMembers(clusterView.members, first, second) val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",") awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala deleted file mode 100644 index 381780c810..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import scala.concurrent.duration._ -import akka.actor.Props -import akka.actor.Actor - -object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) -} - -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec - -abstract class MembershipChangeListenerJoinSpec - extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) - with MultiNodeClusterSpec { - - import MembershipChangeListenerJoinMultiJvmSpec._ - import ClusterEvent._ - - "A registered MembershipChangeListener" must { - "be notified when new node is JOINING" taggedAs LongRunningTest in { - - runOn(first) { - cluster.join(first) - val joinLatch = TestLatch() - val expectedAddresses = Set(first, second) map address - cluster.subscribe(system.actorOf(Props(new Actor { - var members = Set.empty[Member] - def receive = { - case state: CurrentClusterState ⇒ members = state.members - case MemberJoined(m) ⇒ - members = members - m + m - if (members.map(_.address) == expectedAddresses) - joinLatch.countDown() - case _ ⇒ // ignore - } - })), classOf[MemberEvent]) - enterBarrier("registered-listener") - - joinLatch.await - } - - runOn(second) { - enterBarrier("registered-listener") - cluster.join(first) - } - - awaitUpConvergence(2) - - enterBarrier("after") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala deleted file mode 100644 index 281ef451c1..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import org.scalatest.BeforeAndAfter -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import akka.actor.Address -import akka.actor.Props -import akka.actor.Actor -import akka.cluster.MemberStatus._ - -object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" - """)) - .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) -} - -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec - -abstract class MembershipChangeListenerLeavingSpec - extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) - with MultiNodeClusterSpec { - - import MembershipChangeListenerLeavingMultiJvmSpec._ - import ClusterEvent._ - - "A registered MembershipChangeListener" must { - "be notified when new node is LEAVING" taggedAs LongRunningTest in { - - awaitClusterUp(first, second, third) - - runOn(first) { - enterBarrier("registered-listener") - cluster.leave(second) - } - - runOn(second) { - enterBarrier("registered-listener") - } - - runOn(third) { - val latch = TestLatch() - val secondAddress = address(second) - cluster.subscribe(system.actorOf(Props(new Actor { - def receive = { - case state: CurrentClusterState ⇒ - if (state.members.exists(m ⇒ m.address == secondAddress && m.status == Leaving)) - latch.countDown() - case MemberLeft(m) if m.address == secondAddress ⇒ - latch.countDown() - case _ ⇒ // ignore - } - })), classOf[MemberEvent]) - enterBarrier("registered-listener") - latch.await - } - - enterBarrier("finished") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala index 4cad603747..33ab5a7d4e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala @@ -20,9 +20,8 @@ object MinMembersBeforeUpMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - # turn off unreachable reaper - akka.cluster.min-nr-of-members = 3""")). + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( + "akka.cluster.min-nr-of-members = 3")). withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } @@ -44,8 +43,12 @@ abstract class MinMembersBeforeUpSpec cluster.registerOnMemberUp(onUpLatch.countDown()) runOn(first) { - startClusterNode() - awaitCond(clusterView.status == Joining) + cluster join myself + awaitCond { + val result = clusterView.status == Joining + clusterView.refreshCurrentState() + result + } } enterBarrier("first-started") @@ -56,7 +59,11 @@ abstract class MinMembersBeforeUpSpec } runOn(first, second) { val expectedAddresses = Set(first, second) map address - awaitCond(clusterView.members.map(_.address) == expectedAddresses) + awaitCond { + val result = clusterView.members.map(_.address) == expectedAddresses + clusterView.refreshCurrentState() + result + } clusterView.members.map(_.status) must be(Set(Joining)) // and it should not change 1 to 5 foreach { _ ⇒ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 9401013558..18dfc0d7ac 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -151,22 +151,13 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS clusterView.self } - /** - * Initialize the cluster with the specified member - * nodes (roles). First node will be started first - * and others will join the first. - */ - def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.to[immutable.Seq]) - /** * Initialize the cluster of the specified member * nodes (roles) and wait until all joined and `Up`. * First node will be started first and others will join * the first. */ - def awaitClusterUp(roles: RoleName*): Unit = awaitStartCluster(true, roles.to[immutable.Seq]) - - private def awaitStartCluster(upConvergence: Boolean = true, roles: immutable.Seq[RoleName]): Unit = { + def awaitClusterUp(roles: RoleName*): Unit = { runOn(roles.head) { // make sure that the node-to-join is started before other join startClusterNode() @@ -175,8 +166,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS if (roles.tail.contains(myself)) { cluster.join(roles.head) } - if (upConvergence && roles.contains(myself)) { - awaitUpConvergence(numberOfMembers = roles.length) + if (roles.contains(myself)) { + awaitMembersUp(numberOfMembers = roles.length) } enterBarrier(roles.map(_.name).mkString("-") + "-joined") } @@ -212,10 +203,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS } /** - * Wait until the expected number of members has status Up and convergence has been reached. + * Wait until the expected number of members has status Up has been reached. * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring. */ - def awaitUpConvergence( + def awaitMembersUp( numberOfMembers: Int, canNotBePartOfMemberRing: Set[Address] = Set.empty, timeout: FiniteDuration = 20.seconds): Unit = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index d38cc06e1f..341a8528bb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -45,16 +45,12 @@ abstract class NodeLeavingAndExitingSpec runOn(first, third) { val secondAddess = address(second) - val leavingLatch = TestLatch() val exitingLatch = TestLatch() cluster.subscribe(system.actorOf(Props(new Actor { def receive = { case state: CurrentClusterState ⇒ - if (state.members.exists(m ⇒ m.address == secondAddess && m.status == Leaving)) - leavingLatch.countDown() if (state.members.exists(m ⇒ m.address == secondAddess && m.status == Exiting)) exitingLatch.countDown() - case MemberLeft(m) if m.address == secondAddess ⇒ leavingLatch.countDown() case MemberExited(m) if m.address == secondAddess ⇒ exitingLatch.countDown() case MemberRemoved(m) ⇒ // not tested here @@ -70,9 +66,6 @@ abstract class NodeLeavingAndExitingSpec val expectedAddresses = roles.toSet map address awaitCond(clusterView.members.map(_.address) == expectedAddresses) - // Verify that 'second' node is set to LEAVING - leavingLatch.await - // Verify that 'second' node is set to EXITING exitingLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 86e5450fe7..716a3cdcf8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -97,7 +97,7 @@ abstract class RestartFirstSeedNodeSpec } runOn(seed2, seed3) { cluster.joinSeedNodes(seedNodes) - awaitUpConvergence(3) + awaitMembersUp(3) } enterBarrier("started") @@ -107,7 +107,7 @@ abstract class RestartFirstSeedNodeSpec seed1System.awaitTermination(remaining) } runOn(seed2, seed3) { - awaitUpConvergence(2, canNotBePartOfMemberRing = Set(seedNodes.head)) + awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head)) awaitCond(clusterView.unreachableMembers.forall(_.address != seedNodes.head)) } enterBarrier("seed1-shutdown") @@ -119,7 +119,7 @@ abstract class RestartFirstSeedNodeSpec awaitCond(Cluster(restartedSeed1System).readView.members.forall(_.status == Up)) } runOn(seed2, seed3) { - awaitUpConvergence(3) + awaitMembersUp(3) } enterBarrier("seed1-restarted") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 5910f48ac3..8a20bc8efd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -45,7 +45,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo "A cluster of 2 nodes" must { "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { - awaitUpConvergence(1) + awaitMembersUp(1) clusterView.isSingletonCluster must be(true) enterBarrier("after-1") @@ -66,7 +66,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo markNodeAsUnavailable(secondAddress) - awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) + awaitMembersUp(numberOfMembers = 1, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) clusterView.isSingletonCluster must be(true) awaitCond(clusterView.isLeader) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index af860058af..a2398d8ce7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -88,13 +88,13 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) runOn(side1: _*) { // auto-down = on - awaitUpConvergence(side1.size, side2.toSet map address) + awaitMembersUp(side1.size, side2.toSet map address) assertLeader(side1: _*) } runOn(side2: _*) { // auto-down = on - awaitUpConvergence(side2.size, side1.toSet map address) + awaitMembersUp(side2.size, side1.toSet map address) assertLeader(side2: _*) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index b77e623682..a09a98c8f8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -721,7 +721,7 @@ abstract class StressSpec runOn(currentRoles.last) { cluster.join(roles.head) } - awaitUpConvergence(currentRoles.size, timeout = remaining) + awaitMembersUp(currentRoles.size, timeout = remaining) } } @@ -741,7 +741,7 @@ abstract class StressSpec if (toSeedNodes) cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) else cluster.join(roles.head) } - awaitUpConvergence(currentRoles.size, timeout = remaining) + awaitMembersUp(currentRoles.size, timeout = remaining) } } @@ -781,7 +781,7 @@ abstract class StressSpec testConductor.shutdown(removeRole, 0).await } } - awaitUpConvergence(currentRoles.size, timeout = remaining) + awaitMembersUp(currentRoles.size, timeout = remaining) } } @@ -814,7 +814,7 @@ abstract class StressSpec testConductor.shutdown(r, 0).await } } - awaitUpConvergence(currentRoles.size, timeout = remaining) + awaitMembersUp(currentRoles.size, timeout = remaining) } } awaitClusterResult @@ -860,7 +860,7 @@ abstract class StressSpec Some(sys) } else previousAS runOn(usedRoles: _*) { - awaitUpConvergence( + awaitMembersUp( nbrUsedRoles + activeRoles.size, canNotBePartOfMemberRing = allPreviousAddresses, timeout = remaining) @@ -884,7 +884,7 @@ abstract class StressSpec loop(1, None, Set.empty) foreach { _.shutdown } within(loopDuration) { runOn(usedRoles: _*) { - awaitUpConvergence(nbrUsedRoles, timeout = remaining) + awaitMembersUp(nbrUsedRoles, timeout = remaining) phiObserver ! Reset statsObserver ! Reset } @@ -989,7 +989,7 @@ abstract class StressSpec runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) { reportResult { cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) - awaitUpConvergence(size, timeout = remaining) + awaitMembersUp(size, timeout = remaining) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 7da0baf7e4..a7628d1460 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -73,7 +73,7 @@ abstract class SunnyWeatherSpec for (n ← 1 to 30) { enterBarrier("period-" + n) unexpected.get must be(SortedSet.empty) - awaitUpConvergence(roles.size) + awaitMembersUp(roles.size) assertLeaderIn(roles) if (n % 5 == 0) log.debug("Passed period [{}]", n) Thread.sleep(1000) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index c7c6bf685e..602d242b63 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -47,8 +47,7 @@ abstract class TransitionSpec val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } - statusOption must not be (None) - statusOption.get + statusOption.getOrElse(Removed) } def memberAddresses: Set[Address] = clusterView.members.map(_.address) @@ -62,11 +61,15 @@ abstract class TransitionSpec } def awaitMembers(addresses: Address*): Unit = awaitCond { - memberAddresses == addresses.toSet + val result = memberAddresses == addresses.toSet + clusterView.refreshCurrentState() + result } def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond { - memberStatus(address) == status + val result = memberStatus(address) == status + clusterView.refreshCurrentState() + result } def leaderActions(): Unit = @@ -111,11 +114,11 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { runOn(first) { - startClusterNode() - awaitCond(clusterView.isSingletonCluster) + cluster join myself awaitMemberStatus(myself, Joining) leaderActions() awaitMemberStatus(myself, Up) + awaitCond(clusterView.isSingletonCluster) } enterBarrier("after-1") @@ -188,14 +191,14 @@ abstract class TransitionSpec leaderActions() awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) - awaitMemberStatus(third, Joining) + awaitMemberStatus(third, Up) } enterBarrier("leader-actions-3") // leader gossipTo first non-leader leader(first, second, third) gossipTo nonLeader(first, second, third).head runOn(nonLeader(first, second, third).head) { - awaitMemberStatus(third, Joining) + awaitMemberStatus(third, Up) awaitCond(seenLatestGossip == Set(leader(first, second, third), myself)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 686f1c644b..dc7786501a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -130,7 +130,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod } runOn(allBut(victim): _*) { - awaitUpConvergence(roles.size - 1, Set(victim)) + awaitMembersUp(roles.size - 1, Set(victim)) // eventually removed awaitCond(clusterView.unreachableMembers.isEmpty, 15 seconds) } @@ -152,7 +152,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod cluster join master } - awaitUpConvergence(roles.size) + awaitMembersUp(roles.size) endBarrier } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 4b7f711ff8..196308e2e6 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -60,11 +60,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "ClusterDomainEventPublisher" must { - "not publish MemberUp when there is no convergence" in { - publisher ! PublishChanges(g2) - } - - "publish MemberEvents when there is convergence" in { + "publish MemberUp" in { publisher ! PublishChanges(g2) publisher ! PublishChanges(g3) memberSubscriber.expectMsg(MemberUp(bUp)) @@ -73,12 +69,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish leader changed when new leader after convergence" in { publisher ! PublishChanges(g4) - memberSubscriber.expectNoMsg(1 second) - - publisher ! PublishChanges(g5) memberSubscriber.expectMsg(MemberUp(dUp)) memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberUp(cUp)) + memberSubscriber.expectNoMsg(1 second) + + publisher ! PublishChanges(g5) memberSubscriber.expectMsg(LeaderChanged(Some(dUp.address))) } @@ -99,20 +95,22 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! PublishChanges(g6) memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g7) + memberSubscriber.expectMsg(MemberExited(aExiting)) memberSubscriber.expectNoMsg(1 second) // at the removed member a an empty gossip is the last thing publisher ! PublishChanges(Gossip.empty) - memberSubscriber.expectMsg(MemberLeft(aLeaving)) - memberSubscriber.expectMsg(MemberExited(aExiting)) - memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) memberSubscriber.expectMsg(MemberRemoved(aRemoved)) memberSubscriber.expectMsg(MemberRemoved(bRemoved)) memberSubscriber.expectMsg(MemberRemoved(cRemoved)) + memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) memberSubscriber.expectMsg(LeaderChanged(None)) } "not publish leader changed when not convergence" in { publisher ! PublishChanges(g4) + memberSubscriber.expectMsg(MemberUp(dUp)) + memberSubscriber.expectMsg(MemberUp(bUp)) + memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectNoMsg(1 second) } @@ -133,7 +131,6 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "send CurrentClusterState when subscribe" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) - subscriber.expectMsgType[InstantClusterState] subscriber.expectMsgType[CurrentClusterState] // but only to the new subscriber memberSubscriber.expectNoMsg(1 second) @@ -154,11 +151,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish clean state when PublishStart" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) - subscriber.expectMsgType[InstantClusterState] subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(g3) - subscriber.expectMsg(InstantMemberUp(bUp)) - subscriber.expectMsg(InstantMemberUp(cUp)) subscriber.expectMsg(MemberUp(bUp)) subscriber.expectMsg(MemberUp(cUp)) subscriber.expectMsgType[SeenChanged] @@ -167,18 +161,6 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec subscriber.expectMsgType[CurrentClusterState] must be(CurrentClusterState()) } - "publish immediately when subscribing to InstantMemberEvent" in { - val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent]) - subscriber.expectMsgType[InstantClusterState] - publisher ! PublishChanges(g2) - subscriber.expectMsg(InstantMemberUp(bUp)) - subscriber.expectMsg(InstantMemberUp(cUp)) - subscriber.expectNoMsg(1 second) - publisher ! PublishChanges(g3) - subscriber.expectNoMsg(1 second) - } - "publish SeenChanged" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[SeenChanged]) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 76f0838a96..21e20d2b6f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -44,7 +44,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val (g1, _) = converge(Gossip(members = SortedSet(a1))) val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, e1))) - diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1))) + diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } @@ -53,7 +53,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val (g1, _) = converge(Gossip(members = SortedSet(a2, b1, c2))) val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, c1, e1))) - diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1))) + diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1))) diffUnreachable(g1, g2) must be(Seq.empty) diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } @@ -62,20 +62,10 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2))) val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3))) - diffMemberEvents(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3))) diffUnreachable(g1, g2) must be(Seq(UnreachableMember(b3))) diffSeen(g1, g2) must be(Seq.empty) } - "be produced for downed members" in { - val (g1, _) = converge(Gossip(members = SortedSet(a1, b1))) - val (g2, _) = converge(Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3)))) - - diffMemberEvents(g1, g2) must be(Seq(MemberDowned(e3))) - diffUnreachable(g1, g2) must be(Seq(UnreachableMember(e3))) - diffSeen(g1, g2) must be(Seq.empty) - } - "be produced for removed members" in { val (g1, _) = converge(Gossip(members = SortedSet(a1, d1))) val (g2, s2) = converge(Gossip(members = SortedSet(a1))) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 20a5023f4a..a2738ea9ef 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -63,12 +63,10 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { "initially become singleton cluster when joining itself and reach convergence" in { clusterView.members.size must be(0) // auto-join = off cluster.join(selfAddress) - Thread.sleep(5000) + leaderActions() // Joining -> Up awaitCond(clusterView.isSingletonCluster) clusterView.self.address must be(selfAddress) clusterView.members.map(_.address) must be(Set(selfAddress)) - clusterView.status must be(MemberStatus.Joining) - leaderActions() awaitCond(clusterView.status == MemberStatus.Up) } @@ -76,7 +74,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { try { cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent]) // first, is in response to the subscription - expectMsgClass(classOf[ClusterEvent.InstantClusterState]) expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) cluster.publishCurrentClusterState() diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index f042a0606b..ebb57512c1 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -237,8 +237,8 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess * `retryInterval` until the previous leader confirms that the hand * over has started, or this `maxHandOverRetries` limit has been * reached. If the retry limit is reached it takes the decision to be - * the new leader if previous leader is unknown (typically removed or - * downed), otherwise it initiates a new round by throwing + * the new leader if previous leader is unknown (typically removed), + * otherwise it initiates a new round by throwing * [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting * restart with fresh state. For a cluster with many members you might * need to increase this retry limit because it takes longer time to @@ -306,19 +306,13 @@ class ClusterSingletonManager( // Previous GetNext request delivered event and new GetNext is to be sent var leaderChangedReceived = true - // keep track of previously downed members - var downed = Map.empty[Address, Deadline] // keep track of previously removed members var removed = Map.empty[Address, Deadline] - def addDowned(address: Address): Unit = - downed += address -> (Deadline.now + 15.minutes) - def addRemoved(address: Address): Unit = removed += address -> (Deadline.now + 15.minutes) def cleanupOverdueNotMemberAnyMore(): Unit = { - downed = downed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } removed = removed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } } @@ -336,7 +330,6 @@ class ClusterSingletonManager( require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart - cluster.subscribe(self, classOf[MemberDowned]) cluster.subscribe(self, classOf[MemberRemoved]) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) @@ -385,8 +378,8 @@ class ClusterSingletonManager( if (leaderOption == selfAddressOption) { logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption) previousLeaderOption match { - case None ⇒ gotoLeader(None) - case Some(prev) if downed.contains(prev) ⇒ gotoLeader(None) + case None ⇒ gotoLeader(None) + case Some(prev) if removed.contains(prev) ⇒ gotoLeader(None) case Some(prev) ⇒ peer(prev) ! HandOverToMe goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption) @@ -397,9 +390,9 @@ class ClusterSingletonManager( stay using NonLeaderData(leaderOption) } - case Event(MemberDowned(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ - logInfo("Previous leader downed [{}]", m.address) - addDowned(m.address) + case Event(MemberRemoved(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ + logInfo("Previous leader removed [{}]", m.address) + addRemoved(m.address) // transition when LeaderChanged stay using NonLeaderData(None) @@ -426,9 +419,9 @@ class ClusterSingletonManager( stay } - case Event(MemberDowned(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ - logInfo("Previous leader [{}] downed", previousLeader) - addDowned(m.address) + case Event(MemberRemoved(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ + logInfo("Previous leader [{}] removed", previousLeader) + addRemoved(m.address) gotoLeader(None) case Event(TakeOverFromMe, BecomingLeaderData(None)) ⇒ @@ -471,7 +464,7 @@ class ClusterSingletonManager( case Some(a) if a == cluster.selfAddress ⇒ // already leader stay - case Some(a) if downed.contains(a) || removed.contains(a) ⇒ + case Some(a) if removed.contains(a) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, None) case Some(a) ⇒ // send TakeOver request in case the new leader doesn't know previous leader @@ -507,8 +500,8 @@ class ClusterSingletonManager( case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) - case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader ⇒ - addDowned(m.address) + case Event(MemberRemoved(m), WasLeaderData(singleton, singletonTerminated, handOverData, Some(newLeader))) if m.address == newLeader ⇒ + addRemoved(m.address) gotoHandingOver(singleton, singletonTerminated, handOverData, None) case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton ⇒ @@ -554,13 +547,8 @@ class ClusterSingletonManager( case Event(_: CurrentClusterState, _) ⇒ stay case Event(MemberRemoved(m), _) ⇒ logInfo("Member removed [{}]", m.address) - // if self removed, it will be stopped onTranstion to NonLeader addRemoved(m.address) stay - case Event(MemberDowned(m), _) ⇒ - logInfo("Member downed [{}]", m.address) - addDowned(m.address) - stay case Event(TakeOverFromMe, _) ⇒ logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender.path.address) stay @@ -587,7 +575,7 @@ class ClusterSingletonManager( } onTransition { - case _ -> NonLeader if removed.contains(cluster.selfAddress) || downed.contains(cluster.selfAddress) ⇒ + case _ -> NonLeader if removed.contains(cluster.selfAddress) ⇒ logInfo("Self removed, stopping ClusterSingletonManager") stop() } diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java index 40e595e653..b0ec99b2fa 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java @@ -3,7 +3,6 @@ package sample.cluster.simple.japi; import akka.actor.UntypedActor; import akka.cluster.ClusterEvent.ClusterDomainEvent; import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.MemberJoined; import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; @@ -18,10 +17,6 @@ public class SimpleClusterListener extends UntypedActor { CurrentClusterState state = (CurrentClusterState) message; log.info("Current members: {}", state.members()); - } else if (message instanceof MemberJoined) { - MemberJoined mJoined = (MemberJoined) message; - log.info("Member joined: {}", mJoined); - } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; log.info("Member is Up: {}", mUp.member()); diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index 8da64adac6..38290f2176 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -8,7 +8,7 @@ object SimpleClusterApp { def main(args: Array[String]): Unit = { - // Override the configuration of the port + // Override the configuration of the port // when specified as program argument if (args.nonEmpty) System.setProperty("akka.remote.netty.tcp.port", args(0)) @@ -18,8 +18,6 @@ object SimpleClusterApp { def receive = { case state: CurrentClusterState ⇒ log.info("Current members: {}", state.members) - case MemberJoined(member) ⇒ - log.info("Member joined: {}", member) case MemberUp(member) ⇒ log.info("Member is Up: {}", member) case UnreachableMember(member) ⇒