diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 2d6dac2568..50644e431c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -120,10 +120,12 @@ private[cluster] object InternalClusterAction { */ case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage - case class PublishChanges(oldGossip: Gossip, newGossip: Gossip) - case class PublishEvent(event: ClusterDomainEvent) - case object PublishDone - + sealed trait PublishMessage + case class PublishChanges(newGossip: Gossip) extends PublishMessage + case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage + case object PublishStart extends PublishMessage + case object PublishDone extends PublishMessage + case object PublishDoneFinished extends PublishMessage } /** @@ -280,14 +282,14 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ def join(address: Address): Unit = { if (!latestGossip.members.exists(_.address == address)) { - val localGossip = latestGossip // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip() - // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() + // wipe the publisher since we are starting fresh + publisher ! PublishStart - publish(localGossip) + publish(latestGossip) heartbeatSender ! JoinInProgress(address, Deadline.now + JoinTimeout) context.become(initialized) @@ -302,18 +304,16 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * State transition to JOINING - new node joining. */ def joining(node: Address): Unit = { - val localGossip = latestGossip - val localMembers = localGossip.members - val localUnreachable = localGossip.overview.unreachable + val localMembers = latestGossip.members + val localUnreachable = latestGossip.overview.unreachable val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = localGossip.overview.isNonDownUnreachable(node) + val isUnreachable = latestGossip.overview.isNonDownUnreachable(node) if (!alreadyMember && !isUnreachable) { - // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } - val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) + val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers) // remove the node from the failure detector if it is a DOWN node that is rejoining cluster if (rejoiningMember.nonEmpty) failureDetector.remove(node) @@ -321,7 +321,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) - val newGossip = localGossip copy (overview = newOverview, members = newMembers) + val newGossip = latestGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress @@ -335,7 +335,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto gossipTo(node) } - publish(localGossip) + publish(latestGossip) } } @@ -343,10 +343,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * State transition to LEAVING. */ def leaving(address: Address): Unit = { - val localGossip = latestGossip - if (localGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) - val newMembers = localGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING - val newGossip = localGossip copy (members = newMembers) + if (latestGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) + val newMembers = latestGossip.members map { member ⇒ if (member.address == address) Member(address, Leaving) else member } // mark node as LEAVING + val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode val seenVersionedGossip = versionedGossip seen selfAddress @@ -354,7 +353,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto latestGossip = seenVersionedGossip log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) - publish(localGossip) + publish(latestGossip) } } @@ -377,10 +376,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ def removing(address: Address): Unit = { log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) - val localGossip = latestGossip // just cleaning up the gossip state latestGossip = Gossip() - publish(localGossip) + publish(latestGossip) context.become(removed) // make sure the final (removed) state is published // before shutting down @@ -435,7 +433,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val versionedGossip = newGossip :+ vclockNode latestGossip = versionedGossip seen selfAddress - publish(localGossip) + publish(latestGossip) } /** @@ -524,7 +522,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } stats = stats.incrementReceivedGossipCount - publish(localGossip) + publish(latestGossip) if (envelope.conversation && (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { @@ -730,7 +728,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) } - publish(localGossip) + publish(latestGossip) } } } @@ -767,7 +765,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) - publish(localGossip) + publish(latestGossip) } } } @@ -805,8 +803,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) coreSender ! SendClusterMessage(address, gossipMsg) - def publish(oldGossip: Gossip): Unit = { - publisher ! PublishChanges(oldGossip, latestGossip) + def publish(newGossip: Gossip): Unit = { + publisher ! PublishChanges(newGossip) if (PublishStatsInterval == Duration.Zero) publishInternalStats() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index bb90764837..c896e721cc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,6 +5,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable +import scala.collection.immutable.{ VectorBuilder, SortedSet } import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ @@ -33,7 +34,6 @@ object ClusterEvent { case class CurrentClusterState( members: immutable.SortedSet[Member] = immutable.SortedSet.empty, unreachable: Set[Member] = Set.empty, - convergence: Boolean = false, seenBy: Set[Address] = Set.empty, leader: Option[Address] = None) extends ClusterDomainEvent { @@ -75,57 +75,47 @@ object ClusterEvent { } /** - * A new member joined the cluster. + * A new member joined the cluster. Only published after convergence. */ case class MemberJoined(member: Member) extends MemberEvent { if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) } /** - * Member status changed to Up + * Member status changed to Up. Only published after convergence. */ case class MemberUp(member: Member) extends MemberEvent { if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) } /** - * Member status changed to Leaving + * Member status changed to Leaving. Only published after convergence. */ case class MemberLeft(member: Member) extends MemberEvent { if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) } /** - * Member status changed to Exiting + * Member status changed to Exiting. Only published after convergence. */ case class MemberExited(member: Member) extends MemberEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) } /** - * A member is considered as unreachable by the failure detector. - */ - case class MemberUnreachable(member: Member) extends MemberEvent - - /** - * Member status changed to Down + * Member status changed to Down. Only published after convergence. */ case class MemberDowned(member: Member) extends MemberEvent { if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) } /** - * Member completely removed from the cluster + * Member completely removed from the cluster. Only published after convergence. */ case class MemberRemoved(member: Member) extends MemberEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } - /** - * Cluster convergence state changed. - */ - case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent - /** * Leader of the cluster members changed. Only published after convergence. */ @@ -138,6 +128,12 @@ object ClusterEvent { } /** + * A member is considered as unreachable by the failure detector. + */ + case class UnreachableMember(member: Member) extends ClusterDomainEvent + + /** + * INTERNAL API * * Current snapshot of cluster node metrics. Published to subscribers. */ @@ -163,56 +159,75 @@ object ClusterEvent { /** * INTERNAL API */ - private[cluster] def diff(oldGossip: Gossip, newGossip: Gossip): immutable.IndexedSeq[ClusterDomainEvent] = { - val newMembers = newGossip.members -- oldGossip.members + private[cluster] def diffUnreachable(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[UnreachableMember] = + if (newGossip eq oldGossip) Nil + else { + val newUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable + val unreachableEvents = newUnreachable map UnreachableMember - val membersGroupedByAddress = (newGossip.members.toList ++ oldGossip.members.toList).groupBy(_.address) - val changedMembers = membersGroupedByAddress collect { - case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember + immutable.Seq.empty ++ unreachableEvents } - val memberEvents = (newMembers ++ changedMembers) map { m ⇒ - if (m.status == Joining) MemberJoined(m) - else if (m.status == Up) MemberUp(m) - else if (m.status == Leaving) MemberLeft(m) - else if (m.status == Exiting) MemberExited(m) - else throw new IllegalStateException("Unexpected member status: " + m) + /** + * INTERNAL API. + */ + private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] = + if (newGossip eq oldGossip) Nil + else { + val newMembers = newGossip.members -- oldGossip.members + val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.address) + val changedMembers = membersGroupedByAddress collect { + case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember + } + val memberEvents = (newMembers ++ changedMembers) map { m ⇒ + m.status match { + case Joining ⇒ MemberJoined(m) + case Up ⇒ MemberUp(m) + case Leaving ⇒ MemberLeft(m) + case Exiting ⇒ MemberExited(m) + case _ ⇒ throw new IllegalStateException("Unexpected member status: " + m) + } + } + + val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable + val newDowned = allNewUnreachable filter { _.status == Down } + val downedEvents = newDowned map MemberDowned + + val unreachableGroupedByAddress = + List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.address) + val unreachableDownMembers = unreachableGroupedByAddress collect { + case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ + newMember + } + val unreachableDownedEvents = unreachableDownMembers map MemberDowned + + val removedEvents = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) map { m ⇒ + MemberRemoved(m.copy(status = Removed)) + } + + (new VectorBuilder[MemberEvent]() ++= memberEvents ++= downedEvents ++= unreachableDownedEvents + ++= removedEvents).result() } - val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable - val (newDowned, newUnreachable) = allNewUnreachable partition { _.status == Down } - val downedEvents = newDowned map MemberDowned - val unreachableEvents = newUnreachable map MemberUnreachable + /** + * INTERNAL API + */ + private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = + if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader)) + else Nil - val unreachableGroupedByAddress = - (newGossip.overview.unreachable.toList ++ oldGossip.overview.unreachable.toList).groupBy(_.address) - val unreachableDownMembers = unreachableGroupedByAddress collect { - case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ - newMember + /** + * INTERNAL API + */ + private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[SeenChanged] = + if (newGossip eq oldGossip) Nil + else { + val newConvergence = newGossip.convergence + val newSeenBy = newGossip.seenBy + if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy) + List(SeenChanged(newConvergence, newSeenBy)) + else Nil } - val unreachableDownedEvents = unreachableDownMembers map MemberDowned - - val removedEvents = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) map { m ⇒ - MemberRemoved(m.copy(status = Removed)) - } - - val newConvergence = newGossip.convergence - val convergenceChanged = newConvergence != oldGossip.convergence - val convergenceEvents = if (convergenceChanged) List(ConvergenceChanged(newConvergence)) else EmptyImmutableSeq - - val leaderEvents = - if (newGossip.leader != oldGossip.leader) List(LeaderChanged(newGossip.leader)) - else EmptyImmutableSeq - - val newSeenBy = newGossip.seenBy - val seenEvents = - if (convergenceChanged || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy)) - else EmptyImmutableSeq - - memberEvents.toVector ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++ - leaderEvents ++ convergenceEvents ++ seenEvents - } - } /** @@ -224,34 +239,30 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto import InternalClusterAction._ var latestGossip: Gossip = Gossip() - - // Keep track of LeaderChanged event. Should not be published until - // convergence, and it should only be published when leader actually - // changed to another node. 3 states: - // - None: No LeaderChanged detected yet, nothing published yet - // - Some(Left): Stashed LeaderChanged to be published later, when convergence - // - Some(Right): Latest published LeaderChanged - var leaderChangedState: Option[Either[LeaderChanged, LeaderChanged]] = None + var latestConvergedGossip: Gossip = Gossip() + var memberEvents: immutable.Seq[MemberEvent] = immutable.Seq.empty def receive = { - case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) + case PublishChanges(newGossip) ⇒ publishChanges(newGossip) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishEvent(event) ⇒ publish(event) - case PublishDone ⇒ sender ! PublishDone + case PublishStart ⇒ publishStart() + case PublishDone ⇒ publishDone(sender) } def eventStream: EventStream = context.system.eventStream def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { + // The state is a mix of converged and latest gossip to mimic what you + // would have seen if you where listening to the events. val state = CurrentClusterState( - members = latestGossip.members, + members = latestConvergedGossip.members, unreachable = latestGossip.overview.unreachable, - convergence = latestGossip.convergence, seenBy = latestGossip.seenBy, - leader = latestGossip.leader) + leader = latestConvergedGossip.leader) receiver match { case Some(ref) ⇒ ref ! state case None ⇒ publish(state) @@ -268,54 +279,43 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case Some(c) ⇒ eventStream.unsubscribe(subscriber, c) } - def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { + def publishChanges(newGossip: Gossip): Unit = { + val oldGossip = latestGossip // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - diff(oldGossip, newGossip) foreach { event ⇒ - event match { - case x @ LeaderChanged(_) if leaderChangedState == Some(Right(x)) ⇒ - // skip, this leader has already been published - - case x @ LeaderChanged(_) if oldGossip.convergence && newGossip.convergence ⇒ - // leader changed and immediate convergence - leaderChangedState = Some(Right(x)) - publish(x) - - case x: LeaderChanged ⇒ - // publish later, when convergence - leaderChangedState = Some(Left(x)) - - case ConvergenceChanged(true) ⇒ - // now it's convergence, publish eventual stashed LeaderChanged event - leaderChangedState match { - case Some(Left(x)) ⇒ - leaderChangedState = Some(Right(x)) - publish(x) - - case _ ⇒ // nothing stashed - } - publish(event) - - case MemberDowned(m) ⇒ - // TODO this case might be collapsed with MemberRemoved, see ticket #2788 - // but right now we don't change Downed to Removed - publish(event) - // notify DeathWatch about downed node - publish(AddressTerminated(m.address)) - - case MemberRemoved(m) ⇒ - publish(event) - // notify DeathWatch about removed node - publish(AddressTerminated(m.address)) - - case _ ⇒ - // all other events - publish(event) - } + // first publish the diffUnreachable between the last two gossips + diffUnreachable(oldGossip, newGossip) foreach { event ⇒ + publish(event) + // notify DeathWatch about unreachable node + publish(AddressTerminated(event.member.address)) } + // buffer up the MemberEvents waiting for convergence + memberEvents ++= diffMemberEvents(oldGossip, newGossip) + // if we have convergence then publish the MemberEvents and possibly a LeaderChanged + if (newGossip.convergence) { + val previousConvergedGossip = latestConvergedGossip + latestConvergedGossip = newGossip + memberEvents foreach publish + memberEvents = immutable.Seq.empty + diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish + } + // publish internal SeenState for testing purposes + diffSeen(oldGossip, newGossip) foreach publish } def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) def publish(event: AnyRef): Unit = eventStream publish event + + def publishStart(): Unit = clearState() + + def publishDone(receiver: ActorRef): Unit = { + clearState() + receiver ! PublishDoneFinished + } + + def clearState(): Unit = { + latestGossip = Gossip() + latestConvergedGossip = Gossip() + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 4ea4382b5a..3ada580bb2 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -96,7 +96,10 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg val heartbeatTask = scheduler.schedule(PeriodicTasksInitialDelay max HeartbeatInterval, HeartbeatInterval, self, HeartbeatTick) - override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[UnreachableMember]) + } override def postStop(): Unit = { heartbeatTask.cancel() @@ -112,7 +115,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def receive = { case HeartbeatTick ⇒ heartbeat() case s: CurrentClusterState ⇒ reset(s) - case MemberUnreachable(m) ⇒ removeMember(m) + case UnreachableMember(m) ⇒ removeMember(m) case MemberRemoved(m) ⇒ removeMember(m) case e: MemberEvent ⇒ addMember(e.member) case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 271ad1d29a..2a7951a667 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -77,6 +77,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[UnreachableMember]) log.info("Metrics collection has started successfully on node [{}]", selfAddress) } @@ -85,7 +86,8 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto case MetricsTick ⇒ collect() case state: CurrentClusterState ⇒ receiveState(state) case MemberUp(m) ⇒ addMember(m) - case e: MemberEvent ⇒ removeMember(e) + case e: MemberEvent ⇒ removeMember(e.member) + case UnreachableMember(m) ⇒ removeMember(m) case msg: MetricsGossipEnvelope ⇒ receiveGossip(msg) } @@ -104,9 +106,9 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto /** * Removes a member from the member node ring. */ - def removeMember(event: MemberEvent): Unit = { - nodes -= event.member.address - latestGossip = latestGossip remove event.member.address + def removeMember(member: Member): Unit = { + nodes -= member.address + latestGossip = latestGossip remove member.address publish() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 5f80cfd044..831d72f9c8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -45,25 +45,25 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case SeenChanged(convergence, seenBy) ⇒ - state = state.copy(convergence = convergence, seenBy = seenBy) - case MemberRemoved(member) ⇒ - state = state.copy(members = state.members - member, unreachable = state.unreachable - member) - case MemberUnreachable(member) ⇒ - // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) - case MemberDowned(member) ⇒ - // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) - case event: MemberEvent ⇒ - // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - event.member + event.member) - case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) - case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes - case _ ⇒ // ignore, not interesting + case e: ClusterDomainEvent ⇒ e match { + case SeenChanged(convergence, seenBy) ⇒ + state = state.copy(seenBy = seenBy) + case MemberRemoved(member) ⇒ + state = state.copy(members = state.members - member, unreachable = state.unreachable - member) + case UnreachableMember(member) ⇒ + // replace current member with new member (might have different status, only address is used in equals) + state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) + case MemberDowned(member) ⇒ + // replace current member with new member (might have different status, only address is used in equals) + state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) + case event: MemberEvent ⇒ + // replace current member with new member (might have different status, only address is used in equals) + state = state.copy(members = state.members - event.member + event.member) + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes + } } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") } @@ -112,11 +112,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { */ def isSingletonCluster: Boolean = members.size == 1 - /** - * Checks if we have a cluster convergence. - */ - def convergence: Boolean = state.convergence - /** * Returns true if the node is not unreachable and not `Down` * and not `Removed`. @@ -151,4 +146,4 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { eventBusListener ! PoisonPill } -} \ No newline at end of file +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 81618f4a68..1f96434995 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -135,7 +135,7 @@ private[cluster] case class Gossip( * Checks if we have a cluster convergence. If there are any unreachable nodes then we can't have a convergence - * waiting for user to act (issuing DOWN) or leader to act (issuing DOWN through auto-down). * - * @return Some(convergedGossip) if convergence have been reached and None if not + * @return true if convergence have been reached and false if not */ def convergence: Boolean = { val unreachable = overview.unreachable @@ -151,8 +151,10 @@ private[cluster] case class Gossip( def allMembersInSeen = members.forall(m ⇒ seen.contains(m.address)) def seenSame: Boolean = - if (seen.isEmpty) false - else { + if (seen.isEmpty) { + // if both seen and members are empty, then every(no)body has seen the same thing + members.isEmpty + } else { val values = seen.values val seenHead = values.head values.forall(_ == seenHead) diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index ed6724058f..baef66f26c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -160,7 +160,7 @@ case class VectorClock( * Compare two vector clocks. The outcomes will be one of the following: *
* {{{ - * 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c(2) and there does not exist a j such that c1(j) > c2(j). + * 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c2(i) and there does not exist a j such that c1(j) > c2(j). * 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). * 3. Clock 1 is AFTER (<) Clock 2 otherwise. * }}} diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 906b3d154f..59c88c9fee 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -248,9 +248,11 @@ private[akka] class ClusterRouteeProvider( */ private[akka] class ClusterRouterActor extends Router { - // subscribe to cluster changes, MemberEvent // re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[UnreachableMember]) + } override def postStop(): Unit = cluster.unsubscribe(self) // lazy to not interfere with RoutedActorCell initialization @@ -264,6 +266,19 @@ private[akka] class ClusterRouterActor extends Router { def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef) + def unregisterRoutees(member: Member) = { + val address = member.address + routeeProvider.nodes -= address + + // unregister routees that live on that node + val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address) + routeeProvider.unregisterRoutees(affectedRoutes) + + // createRoutees will not create more than createRoutees and maxInstancesPerNode + // this is useful when totalInstances < upNodes.size + routeeProvider.createRoutees() + } + override def routerReceive: Receive = { case s: CurrentClusterState ⇒ import Member.addressOrdering @@ -278,17 +293,10 @@ private[akka] class ClusterRouterActor extends Router { case other: MemberEvent ⇒ // other events means that it is no longer interesting, such as - // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved - val address = other.member.address - routeeProvider.nodes -= address - - // unregister routees that live on that node - val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address) - routeeProvider.unregisterRoutees(affectedRoutes) - - // createRoutees will not create more than createRoutees and maxInstancesPerNode - // this is useful when totalInstances < upNodes.size - routeeProvider.createRoutees() + // MemberJoined, MemberLeft, MemberExited, MemberRemoved + unregisterRoutees(other.member) + case UnreachableMember(m) ⇒ + unregisterRoutees(m) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index b2a9453035..ae5dea869e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -76,8 +76,6 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) // still one unreachable clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.head.address must be(thirdAddress) - // and therefore no convergence - clusterView.convergence must be(false) } } @@ -94,23 +92,33 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) def memberStatus(address: Address): Option[MemberStatus] = clusterView.members.collectFirst { case m if m.address == address ⇒ m.status } - def assertNotMovedUp: Unit = { + def assertNotMovedUp(joining: Boolean): Unit = { within(20 seconds) { - awaitCond(clusterView.members.size == 3) + if (joining) awaitCond(clusterView.members.size == 0) + else awaitCond(clusterView.members.size == 2) awaitSeenSameState(first, second, fourth) - memberStatus(first) must be(Some(MemberStatus.Up)) - memberStatus(second) must be(Some(MemberStatus.Up)) + if (joining) memberStatus(first) must be(None) + else memberStatus(first) must be(Some(MemberStatus.Up)) + if (joining) memberStatus(second) must be(None) + else memberStatus(second) must be(Some(MemberStatus.Up)) // leader is not allowed to move the new node to Up - memberStatus(fourth) must be(Some(MemberStatus.Joining)) - // still no convergence - clusterView.convergence must be(false) + memberStatus(fourth) must be(None) } } - runOn(first, second, fourth) { + enterBarrier("after-join") + + runOn(first, second) { for (n ← 1 to 5) { - log.debug("assertNotMovedUp#" + n) - assertNotMovedUp + assertNotMovedUp(joining = false) + // wait and then check again + Thread.sleep(1.second.dilated.toMillis) + } + } + + runOn(fourth) { + for (n ← 1 to 5) { + assertNotMovedUp(joining = true) // wait and then check again Thread.sleep(1.second.dilated.toMillis) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index a5d2ceb58d..f3b933f493 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -181,7 +181,6 @@ abstract class LargeClusterSpec Await.ready(latch, remaining) - awaitCond(clusterNodes.forall(_.readView.convergence)) val counts = clusterNodes.map(gossipCount(_)) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", @@ -278,7 +277,7 @@ abstract class LargeClusterSpec } "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest in { - val unreachableNodes = nodesPerDatacenter + val downedNodes = nodesPerDatacenter val liveNodes = nodesPerDatacenter * 4 within(30.seconds + 3.seconds * liveNodes) { @@ -293,22 +292,19 @@ abstract class LargeClusterSpec val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ Cluster(sys).subscribe(sys.actorOf(Props(new Actor { - var gotUnreachable = Set.empty[Member] + var gotDowned = Set.empty[Member] def receive = { case state: CurrentClusterState ⇒ - gotUnreachable = state.unreachable - checkDone() - case MemberUnreachable(m) if !latch.isOpen ⇒ - gotUnreachable = gotUnreachable + m + gotDowned = gotDowned ++ state.unreachable.filter(_.status == Down) checkDone() case MemberDowned(m) if !latch.isOpen ⇒ - gotUnreachable = gotUnreachable + m + gotDowned = gotDowned + m checkDone() case _ ⇒ // not interesting } - def checkDone(): Unit = if (gotUnreachable.size == unreachableNodes) { - log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", - unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + def checkDone(): Unit = if (gotDowned.size == downedNodes) { + log.info("Detected [{}] downed nodes in [{}], it took [{}], received [{}] gossip messages", + downedNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) latch.countDown() } })), classOf[ClusterDomainEvent]) @@ -322,7 +318,6 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) - awaitCond(systems.forall(Cluster(_).readView.convergence)) val mergeCount = systems.map(sys ⇒ Cluster(sys).readView.latestStats.mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 43af47b70f..fd2714005b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -223,7 +223,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS within(timeout) { awaitCond(clusterView.members.size == numberOfMembers) awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) - awaitCond(clusterView.convergence) // clusterView.leader is updated by LeaderChanged, await that to be updated also val expectedLeader = clusterView.members.headOption.map(_.address) awaitCond(clusterView.leader == expectedLeader) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 86e5dd71e9..336acc2769 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -43,7 +43,6 @@ abstract class NodeMembershipSpec awaitCond { clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(clusterView.convergence) } enterBarrier("after-1") @@ -60,7 +59,6 @@ abstract class NodeMembershipSpec awaitCond { clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(clusterView.convergence) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 3fc7432f98..4c9054d3d1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -114,7 +114,6 @@ abstract class TransitionSpec startClusterNode() awaitCond(clusterView.isSingletonCluster) awaitMemberStatus(myself, Joining) - awaitCond(clusterView.convergence) leaderActions() awaitMemberStatus(myself, Up) } @@ -133,14 +132,13 @@ abstract class TransitionSpec awaitMemberStatus(first, Up) awaitMemberStatus(second, Joining) awaitCond(seenLatestGossip == Set(first, second)) - clusterView.convergence must be(true) } enterBarrier("convergence-joining-2") runOn(leader(first, second)) { leaderActions() awaitMemberStatus(first, Up) - awaitMemberStatus(second, Up) + awaitMemberStatus(second, Joining) } enterBarrier("leader-actions-2") @@ -150,7 +148,6 @@ abstract class TransitionSpec awaitMemberStatus(second, Up) awaitCond(seenLatestGossip == Set(first, second)) awaitMemberStatus(first, Up) - clusterView.convergence must be(true) } enterBarrier("after-2") @@ -163,10 +160,7 @@ abstract class TransitionSpec } runOn(second, third) { // gossip chat from the join will synchronize the views - awaitMembers(first, second, third) - awaitMemberStatus(third, Joining) awaitCond(seenLatestGossip == Set(second, third)) - clusterView.convergence must be(false) } enterBarrier("third-joined-second") @@ -177,7 +171,6 @@ abstract class TransitionSpec awaitMemberStatus(third, Joining) awaitMemberStatus(second, Up) awaitCond(seenLatestGossip == Set(first, second, third)) - clusterView.convergence must be(true) } first gossipTo third @@ -187,7 +180,6 @@ abstract class TransitionSpec awaitMemberStatus(second, Up) awaitMemberStatus(third, Joining) awaitCond(seenLatestGossip == Set(first, second, third)) - clusterView.convergence must be(true) } enterBarrier("convergence-joining-3") @@ -196,16 +188,15 @@ abstract class TransitionSpec leaderActions() awaitMemberStatus(first, Up) awaitMemberStatus(second, Up) - awaitMemberStatus(third, Up) + awaitMemberStatus(third, Joining) } enterBarrier("leader-actions-3") // leader gossipTo first non-leader leader(first, second, third) gossipTo nonLeader(first, second, third).head runOn(nonLeader(first, second, third).head) { - awaitMemberStatus(third, Up) + awaitMemberStatus(third, Joining) awaitCond(seenLatestGossip == Set(leader(first, second, third), myself)) - clusterView.convergence must be(false) } // first non-leader gossipTo the other non-leader @@ -217,7 +208,6 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).tail.head) { awaitMemberStatus(third, Up) awaitCond(seenLatestGossip == Set(first, second, third)) - clusterView.convergence must be(true) } // first non-leader gossipTo the leader @@ -227,7 +217,6 @@ abstract class TransitionSpec awaitMemberStatus(second, Up) awaitMemberStatus(third, Up) awaitCond(seenLatestGossip == Set(first, second, third)) - clusterView.convergence must be(true) } enterBarrier("after-3") @@ -247,12 +236,10 @@ abstract class TransitionSpec runOn(first, third) { awaitCond(clusterView.unreachableMembers.contains(Member(second, Up))) - awaitCond(!clusterView.convergence) } runOn(first) { cluster.down(second) - awaitMemberStatus(second, Down) } enterBarrier("after-second-down") @@ -263,7 +250,6 @@ abstract class TransitionSpec awaitCond(clusterView.unreachableMembers.contains(Member(second, Down))) awaitMemberStatus(second, Down) awaitCond(seenLatestGossip == Set(first, third)) - clusterView.convergence must be(true) } enterBarrier("after-6") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index c78cbf904d..1b2082b0e1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -94,7 +94,6 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod members.forall(_.status == MemberStatus.Up) }) clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) - clusterView.convergence must be(false) } } @@ -112,8 +111,6 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod // still one unreachable clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.head.address must be(node(victim).address) - // and therefore no convergence - clusterView.convergence must be(false) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 59252ba599..602ffadd8b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -42,6 +42,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec override def beforeEach(): Unit = { publisher = system.actorOf(Props[ClusterDomainEventPublisher]) + publisher ! PublishChanges(g0) + expectMsg(MemberUp(a1)) + expectMsg(LeaderChanged(Some(a1.address))) + expectMsgType[SeenChanged] } override def afterEach(): Unit = { @@ -50,59 +54,63 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "ClusterDomainEventPublisher" must { - "publish MemberUp when member status changed to Up" in { - publisher ! PublishChanges(g1, g2) - expectMsg(MemberUp(c2)) - expectMsg(ConvergenceChanged(false)) + "not publish MemberUp when there is no convergence" in { + publisher ! PublishChanges(g2) expectMsgType[SeenChanged] } - "publish convergence true when all seen it" in { - publisher ! PublishChanges(g2, g3) - expectMsg(ConvergenceChanged(true)) + "publish MemberEvents when there is convergence" in { + publisher ! PublishChanges(g2) + expectMsgType[SeenChanged] + publisher ! PublishChanges(g3) + expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) expectMsgType[SeenChanged] } "publish leader changed when new leader after convergence" in { - publisher ! PublishChanges(g3, g4) - expectMsg(MemberUp(d1)) - expectMsg(ConvergenceChanged(false)) + publisher ! PublishChanges(g4) expectMsgType[SeenChanged] expectNoMsg(1 second) - publisher ! PublishChanges(g4, g5) + publisher ! PublishChanges(g5) + expectMsg(MemberUp(d1)) + expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) expectMsg(LeaderChanged(Some(d1.address))) - expectMsg(ConvergenceChanged(true)) expectMsgType[SeenChanged] } "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after - publisher ! PublishChanges(g3, g5) + publisher ! PublishChanges(g3) + expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) + expectMsgType[SeenChanged] + publisher ! PublishChanges(g5) expectMsg(MemberUp(d1)) expectMsg(LeaderChanged(Some(d1.address))) expectMsgType[SeenChanged] } "not publish leader changed when not convergence" in { - publisher ! PublishChanges(g2, g4) - expectMsg(MemberUp(d1)) + publisher ! PublishChanges(g4) + expectMsgType[SeenChanged] expectNoMsg(1 second) } "not publish leader changed when changed convergence but still same leader" in { - publisher ! PublishChanges(g2, g5) + publisher ! PublishChanges(g5) expectMsg(MemberUp(d1)) + expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) expectMsg(LeaderChanged(Some(d1.address))) - expectMsg(ConvergenceChanged(true)) expectMsgType[SeenChanged] - publisher ! PublishChanges(g5, g4) - expectMsg(ConvergenceChanged(false)) + publisher ! PublishChanges(g4) expectMsgType[SeenChanged] - publisher ! PublishChanges(g4, g5) - expectMsg(ConvergenceChanged(true)) + publisher ! PublishChanges(g5) expectMsgType[SeenChanged] } @@ -119,12 +127,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) subscriber.expectMsgType[CurrentClusterState] publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent])) - publisher ! PublishChanges(Gossip(members = SortedSet(a1)), Gossip(members = SortedSet(a1, b1))) + publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) // but testActor is still subscriber expectMsg(MemberUp(b1)) + expectMsg(MemberUp(c2)) + expectMsgType[SeenChanged] } - } - } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index 3a4e3ee3a4..8be81496df 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -17,6 +17,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val a1 = Member(Address("akka", "sys", "a", 2552), Up) val a2 = Member(Address("akka", "sys", "a", 2552), Joining) + val a3 = Member(Address("akka", "sys", "a", 2552), Removed) val b1 = Member(Address("akka", "sys", "b", 2552), Up) val b2 = Member(Address("akka", "sys", "b", 2552), Removed) val b3 = Member(Address("akka", "sys", "b", 2552), Down) @@ -28,61 +29,82 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { val e2 = Member(Address("akka", "sys", "e", 2552), Up) val e3 = Member(Address("akka", "sys", "e", 2552), Down) + def converge(gossip: Gossip): (Gossip, Set[Address]) = + ((gossip, Set.empty[Address]) /: gossip.members) { (gs, m) ⇒ (gs._1.seen(m.address), gs._2 + m.address) } + "Domain events" must { - "be produced for new members" in { + "be empty for the same gossip" in { val g1 = Gossip(members = SortedSet(a1)) - val g2 = Gossip(members = SortedSet(a1, b1, e1)) - diff(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1))) + diffUnreachable(g1, g1) must be(Seq.empty) + } + + "be produced for new members" in { + val (g1, _) = converge(Gossip(members = SortedSet(a1))) + val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, e1))) + + diffMemberEvents(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1))) + diffUnreachable(g1, g2) must be(Seq.empty) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } "be produced for changed status of members" in { - val g1 = Gossip(members = SortedSet(a2, b1, c2)) - val g2 = Gossip(members = SortedSet(a1, b1, c1, e1)) + val (g1, _) = converge(Gossip(members = SortedSet(a2, b1, c2))) + val (g2, s2) = converge(Gossip(members = SortedSet(a1, b1, c1, e1))) - diff(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1))) + diffMemberEvents(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1))) + diffUnreachable(g1, g2) must be(Seq.empty) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } - "be produced for unreachable members" in { - val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2))) - val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(b1, c2))) - - diff(g1, g2) must be(Seq(MemberUnreachable(b1))) - } - - "be produced for downed members" in { + "be produced for members in unreachable" in { val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2))) val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3))) - diff(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3))) + diffMemberEvents(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3))) + diffUnreachable(g1, g2) must be(Seq(UnreachableMember(b3))) + diffSeen(g1, g2) must be(Seq.empty) + } + + "be produced for downed members" in { + val (g1, _) = converge(Gossip(members = SortedSet(a1, b1))) + val (g2, _) = converge(Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3)))) + + diffMemberEvents(g1, g2) must be(Seq(MemberDowned(e3))) + diffUnreachable(g1, g2) must be(Seq(UnreachableMember(e3))) + diffSeen(g1, g2) must be(Seq.empty) } "be produced for removed members" in { - val g1 = Gossip(members = SortedSet(a1, d1), overview = GossipOverview(unreachable = Set(c2))) - val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2))) + val (g1, _) = converge(Gossip(members = SortedSet(a1, d1))) + val (g2, s2) = converge(Gossip(members = SortedSet(a1))) - diff(g1, g2) must be(Seq(MemberRemoved(d2))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(d2))) + diffUnreachable(g1, g2) must be(Seq.empty) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) } "be produced for convergence changes" in { val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) - diff(g1, g2) must be(Seq(ConvergenceChanged(false), - SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) - diff(g2, g1) must be(Seq(ConvergenceChanged(true), - SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) + diffMemberEvents(g1, g2) must be(Seq.empty) + diffUnreachable(g1, g2) must be(Seq.empty) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) + diffMemberEvents(g2, g1) must be(Seq.empty) + diffUnreachable(g2, g1) must be(Seq.empty) + diffSeen(g2, g1) must be(Seq(SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) } "be produced for leader changes" in { - val g1 = Gossip(members = SortedSet(a1, b1, e1)) - val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) - val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address) + val (g1, _) = converge(Gossip(members = SortedSet(a1, b1, e1))) + val (g2, s2) = converge(Gossip(members = SortedSet(b1, e1))) - diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address)))) - diff(g2, g3) must be(Seq(ConvergenceChanged(true), - SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) + diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(a3))) + diffUnreachable(g1, g2) must be(Seq.empty) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) + diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(b1.address)))) } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index a659abf313..efdee268ca 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -67,7 +67,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { clusterView.self.address must be(selfAddress) clusterView.members.map(_.address) must be(Set(selfAddress)) clusterView.status must be(MemberStatus.Joining) - clusterView.convergence must be(true) leaderActions() awaitCond(clusterView.status == MemberStatus.Up) } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index f873f5f252..f1da0dcf0e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -28,8 +28,11 @@ class GossipSpec extends WordSpec with MustMatchers { "A Gossip" must { - "merge members by status priority" in { + "reach convergence when it's empty" in { + Gossip().convergence must be(true) + } + "merge members by status priority" in { val g1 = Gossip(members = SortedSet(a1, c1, e1)) val g2 = Gossip(members = SortedSet(a2, c2, e2)) @@ -44,7 +47,6 @@ class GossipSpec extends WordSpec with MustMatchers { } "merge unreachable by status priority" in { - val g1 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a1, b1, c1, d1))) val g2 = Gossip(members = Gossip.emptyMembers, overview = GossipOverview(unreachable = Set(a2, b2, c2, d2))) diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java index 24770318c1..40e595e653 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/simple/japi/SimpleClusterListener.java @@ -4,8 +4,8 @@ import akka.actor.UntypedActor; import akka.cluster.ClusterEvent.ClusterDomainEvent; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberJoined; -import akka.cluster.ClusterEvent.MemberUnreachable; import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.UnreachableMember; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -26,8 +26,8 @@ public class SimpleClusterListener extends UntypedActor { MemberUp mUp = (MemberUp) message; log.info("Member is Up: {}", mUp.member()); - } else if (message instanceof MemberUnreachable) { - MemberUnreachable mUnreachable = (MemberUnreachable) message; + } else if (message instanceof UnreachableMember) { + UnreachableMember mUnreachable = (UnreachableMember) message; log.info("Member detected as unreachable: {}", mUnreachable.member()); } else if (message instanceof ClusterDomainEvent) { diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala index bdf669eb99..6bdec7cfba 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/simple/SimpleClusterApp.scala @@ -22,7 +22,7 @@ object SimpleClusterApp { log.info("Member joined: {}", member) case MemberUp(member) ⇒ log.info("Member is Up: {}", member) - case MemberUnreachable(member) ⇒ + case UnreachableMember(member) ⇒ log.info("Member detected as unreachable: {}", member) case _: ClusterDomainEvent ⇒ // ignore diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index a1cab85069..de7eff456c 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -15,10 +15,7 @@ import akka.actor.ReceiveTimeout import akka.actor.RelativeActorPath import akka.actor.RootActorPath import akka.cluster.Cluster -import akka.cluster.ClusterEvent.CurrentClusterState -import akka.cluster.ClusterEvent.LeaderChanged -import akka.cluster.ClusterEvent.MemberEvent -import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus import akka.routing.FromConfig import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope @@ -219,7 +216,10 @@ class StatsSampleClient(servicePath: String) extends Actor { var nodes = Set.empty[Address] - override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + cluster.subscribe(self, classOf[UnreachableMember]) + } override def postStop(): Unit = { cluster.unsubscribe(self) tickTask.cancel() @@ -237,8 +237,9 @@ class StatsSampleClient(servicePath: String) extends Actor { println(failed) case state: CurrentClusterState ⇒ nodes = state.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } - case MemberUp(m) ⇒ nodes += m.address - case other: MemberEvent ⇒ nodes -= other.member.address + case MemberUp(m) ⇒ nodes += m.address + case other: MemberEvent ⇒ nodes -= other.member.address + case UnreachableMember(m) ⇒ nodes -= m.address } }