diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 92801313ec..c7c9083af8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -5,14 +5,17 @@ package akka.cluster import scala.collection.immutable.SortedSet import scala.concurrent.util.{ Deadline, Duration } +import scala.concurrent.util.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler } import akka.actor.Status.Failure import akka.event.EventStream +import akka.pattern.ask import akka.util.Timeout import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import language.existentials +import language.postfixOps /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -94,8 +97,12 @@ private[cluster] object InternalClusterAction { case object GetClusterCoreRef - case class Subscribe(subscriber: ActorRef, to: Class[_]) - case class Unsubscribe(subscriber: ActorRef) + sealed trait SubscriptionMessage + case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage + case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage + + case class PublishChanges(oldGossip: Gossip, newGossip: Gossip) + case object PublishDone case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage @@ -183,6 +190,8 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) withDispatcher(UseDispatcher), name = "heartbeatSender") val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). withDispatcher(UseDispatcher), name = "coreSender") + val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)). + withDispatcher(UseDispatcher), name = "publisher") import context.dispatcher @@ -237,11 +246,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } def uninitialized: Actor.Receive = { - case InitJoin ⇒ // skip, not ready yet - case JoinTo(address) ⇒ join(address) - case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) - case _: Tick ⇒ // ignore periodic tasks until initialized + case InitJoin ⇒ // skip, not ready yet + case JoinTo(address) ⇒ join(address) + case msg: SubscriptionMessage ⇒ publisher forward msg + case _: Tick ⇒ // ignore periodic tasks until initialized } def initialized: Actor.Receive = { @@ -260,12 +268,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case Exit(address) ⇒ exiting(address) case Remove(address) ⇒ removing(address) case SendGossipTo(address) ⇒ gossipTo(address) - case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) - case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case msg: SubscriptionMessage ⇒ publisher forward msg case p: Ping ⇒ ping(p) } + def removed: Actor.Receive = { + case msg: SubscriptionMessage ⇒ publisher forward msg + case _: Tick ⇒ // ignore periodic tasks + } + def receive = uninitialized def initJoin(): Unit = sender ! InitJoinAck(selfAddress) @@ -275,21 +287,23 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) * A 'Join(thisNodeAddress)' command is sent to the node to join. */ def join(address: Address): Unit = { - val localGossip = latestGossip - // wipe our state since a node that joins a cluster must be empty - latestGossip = Gossip() - joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) + if (!latestGossip.members.exists(_.address == address)) { + val localGossip = latestGossip + // wipe our state since a node that joins a cluster must be empty + latestGossip = Gossip() + joinInProgress = Map(address -> (Deadline.now + JoinTimeout)) - // wipe the failure detector since we are starting fresh and shouldn't care about the past - failureDetector.reset() + // wipe the failure detector since we are starting fresh and shouldn't care about the past + failureDetector.reset() - publish(localGossip) + publish(localGossip) - context.become(initialized) - if (address == selfAddress) - joining(address) - else - coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + context.become(initialized) + if (address == selfAddress) + joining(address) + else + coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + } } /** @@ -374,9 +388,12 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val localGossip = latestGossip // just cleaning up the gossip state latestGossip = Gossip() - // make sure the final (removed) state is always published publish(localGossip) - environment.shutdown() + context.become(removed) + // make sure the final (removed) state is published + // before shutting down + implicit val timeout = Timeout(5 seconds) + publisher ? PublishDone onComplete { case _ ⇒ environment.shutdown() } } /** @@ -591,12 +608,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // 3. Non-exiting remain -- When all partition handoff has completed // 4. Move EXITING => REMOVED -- When all nodes have seen that the node is EXITING (convergence) - remove the nodes from the node ring and seen table // 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader - // 5. Store away all stuff needed for the side-effecting processing in 10. // 6. Updating the vclock version for the changes // 7. Updating the 'seen' table // 8. Try to update the state with the new gossip - // 9. If failure - retry - // 10. If success - run all the side-effecting processing + // 9. If success - run all the side-effecting processing val ( newGossip: Gossip, @@ -816,64 +831,12 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) coreSender ! SendClusterMessage(address, gossipMsg) - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - subscriber ! CurrentClusterState( - members = latestGossip.members, - unreachable = latestGossip.overview.unreachable, - convergence = latestGossip.convergence, - seenBy = latestGossip.seenBy, - leader = latestGossip.leader) - eventStream.subscribe(subscriber, to) - } - - def unsubscribe(subscriber: ActorRef): Unit = - eventStream.unsubscribe(subscriber) - def publish(oldGossip: Gossip): Unit = { - publishMembers(oldGossip) - publishUnreachableMembers(oldGossip) - publishLeader(oldGossip) - publishSeen(oldGossip) + publisher ! PublishChanges(oldGossip, latestGossip) if (PublishStatsInterval == Duration.Zero) publishInternalStats() } - def publishMembers(oldGossip: Gossip): Unit = { - if (!isSame(oldGossip.members, latestGossip.members)) - eventStream publish MembersChanged(latestGossip.members) - } - - def publishUnreachableMembers(oldGossip: Gossip): Unit = { - if (!isSame(oldGossip.overview.unreachable, latestGossip.overview.unreachable)) - eventStream publish UnreachableMembersChanged(latestGossip.overview.unreachable) - } - - def isSame(oldMembers: Set[Member], newMembers: Set[Member]): Boolean = { - def oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) - def newMembersStatus = newMembers.map(m ⇒ (m.address, m.status)) - (newMembers eq oldMembers) || ((newMembers.size == oldMembers.size) && (newMembersStatus == oldMembersStatus)) - } - - def publishLeader(oldGossip: Gossip): Unit = { - if (latestGossip.leader != oldGossip.leader || latestGossip.convergence != oldGossip.convergence) - eventStream publish LeaderChanged(latestGossip.leader, latestGossip.convergence) - } - - def publishSeen(oldGossip: Gossip): Unit = { - val oldConvergence = oldGossip.convergence - val newConvergence = latestGossip.convergence - val oldSeenBy = oldGossip.seenBy - val newSeenBy = latestGossip.seenBy - - if (newConvergence != oldConvergence || newSeenBy != oldSeenBy) { - eventStream publish SeenChanged(newConvergence, newSeenBy) - } - } - - def publishInternalStats(): Unit = { - eventStream publish CurrentInternalStats(stats) - } - - def eventStream: EventStream = context.system.eventStream + def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats) def ping(p: Ping): Unit = sender ! Pong(p) } @@ -944,53 +907,6 @@ private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Act } } -/** - * Domain events published to the event bus. - */ -object ClusterEvent { - /** - * Marker interface for cluster domain events. - */ - sealed trait ClusterDomainEvent - - /** - * Current snapshot state of the cluster. Sent to new subscriber. - */ - case class CurrentClusterState( - members: SortedSet[Member] = SortedSet.empty, - unreachable: Set[Member] = Set.empty, - convergence: Boolean = false, - seenBy: Set[Address] = Set.empty, - leader: Option[Address] = None) extends ClusterDomainEvent - - /** - * Set of cluster members or their status have changed. - */ - case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent - - /** - * Set of unreachable cluster members or their status have changed. - */ - case class UnreachableMembersChanged(unreachable: Set[Member]) extends ClusterDomainEvent - - /** - * Leader of the cluster members changed, and/or convergence status. - */ - case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent - - /** - * INTERNAL API - * The nodes that have seen current version of the Gossip. - */ - private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent - - /** - * INTERNAL API - */ - private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent - -} - /** * INTERNAL API */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala new file mode 100644 index 0000000000..cbcee78cc9 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import language.postfixOps +import scala.collection.immutable.SortedSet + +import akka.actor.{ Actor, ActorLogging, ActorRef, Address } +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus._ +import akka.event.EventStream + +/** + * Domain events published to the event bus. + * Subscribe with: + * {{{ + * Cluster(system).subscribe(actorRef, classOf[ClusterDomainEvent]) + * }}} + */ +object ClusterEvent { + /** + * Marker interface for cluster domain events. + */ + sealed trait ClusterDomainEvent + + /** + * Current snapshot state of the cluster. Sent to new subscriber. + */ + case class CurrentClusterState( + members: SortedSet[Member] = SortedSet.empty, + unreachable: Set[Member] = Set.empty, + convergence: Boolean = false, + seenBy: Set[Address] = Set.empty, + leader: Option[Address] = None) extends ClusterDomainEvent + + /** + * Marker interface for member related events. + */ + sealed trait MemberEvent extends ClusterDomainEvent { + def member: Member + } + + /** + * A new member joined the cluster. + */ + case class MemberJoined(member: Member) extends MemberEvent { + if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) + } + + /** + * Member status changed to Up + */ + case class MemberUp(member: Member) extends MemberEvent { + if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) + } + + /** + * Member status changed to Leaving + */ + case class MemberLeft(member: Member) extends MemberEvent { + if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) + } + + /** + * Member status changed to Exiting + */ + case class MemberExited(member: Member) extends MemberEvent { + if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) + } + + /** + * A member is considered as unreachable by the failure detector. + */ + case class MemberUnreachable(member: Member) extends MemberEvent + + /** + * Member status changed to Down + */ + case class MemberDowned(member: Member) extends MemberEvent { + if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) + } + + /** + * Member completely removed from the cluster + */ + case class MemberRemoved(member: Member) extends MemberEvent { + if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) + } + + /** + * Cluster convergence state changed. + */ + case class ConvergenceChanged(convergence: Boolean) extends ClusterDomainEvent + + /** + * Leader of the cluster members changed, and/or convergence status. + */ + case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent + + /** + * INTERNAL API + * The nodes that have seen current version of the Gossip. + */ + private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent + + /** + * INTERNAL API + */ + private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent + + /** + * INTERNAL API + */ + private[cluster] def diff(oldGossip: Gossip, newGossip: Gossip): IndexedSeq[ClusterDomainEvent] = { + val newMembers = newGossip.members -- oldGossip.members + + val membersGroupedByAddress = (newGossip.members.toList ++ oldGossip.members.toList).groupBy(_.address) + val changedMembers = membersGroupedByAddress collect { + case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember + } + + val memberEvents = (newMembers ++ changedMembers) map { m ⇒ + if (m.status == Joining) MemberJoined(m) + else if (m.status == Up) MemberUp(m) + else if (m.status == Leaving) MemberLeft(m) + else if (m.status == Exiting) MemberExited(m) + else throw new IllegalStateException("Unexpected member status: " + m) + } + + val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable + val (newDowned, newUnreachable) = allNewUnreachable partition { _.status == Down } + val downedEvents = newDowned map MemberDowned + val unreachableEvents = newUnreachable map MemberUnreachable + + val unreachableGroupedByAddress = + (newGossip.overview.unreachable.toList ++ oldGossip.overview.unreachable.toList).groupBy(_.address) + val unreachableDownMembers = unreachableGroupedByAddress collect { + case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ + newMember + } + val unreachableDownedEvents = unreachableDownMembers map MemberDowned + + val removedEvents = (oldGossip.members -- newGossip.members -- newGossip.overview.unreachable) map { m ⇒ + MemberRemoved(m.copy(status = Removed)) + } + + val newConvergence = newGossip.convergence + val convergenceChanged = newConvergence != oldGossip.convergence + val convergenceEvents = if (convergenceChanged) Seq(ConvergenceChanged(newConvergence)) else Seq.empty + + val leaderEvents = + if (convergenceChanged || newGossip.leader != oldGossip.leader) Seq(LeaderChanged(newGossip.leader, newConvergence)) + else Seq.empty + + val newSeenBy = newGossip.seenBy + val seenEvents = + if (convergenceChanged || newSeenBy != oldGossip.seenBy) Seq(SeenChanged(newConvergence, newSeenBy)) + else Seq.empty + + memberEvents.toIndexedSeq ++ unreachableEvents ++ downedEvents ++ unreachableDownedEvents ++ removedEvents ++ + convergenceEvents ++ leaderEvents ++ seenEvents + } + +} + +/** + * INTERNAL API. + * Responsible for domain event subscriptions and publishing of + * domain events to event bus. + */ +private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnvironment) extends Actor with ActorLogging { + import InternalClusterAction._ + + var latestGossip: Gossip = Gossip() + + def receive = { + case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip) + case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) + case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case PublishDone ⇒ sender ! PublishDone + } + + def eventStream: EventStream = context.system.eventStream + + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { + subscriber ! CurrentClusterState( + members = latestGossip.members, + unreachable = latestGossip.overview.unreachable, + convergence = latestGossip.convergence, + seenBy = latestGossip.seenBy, + leader = latestGossip.leader) + eventStream.subscribe(subscriber, to) + } + + def unsubscribe(subscriber: ActorRef): Unit = + eventStream.unsubscribe(subscriber) + + def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { + // keep the latestGossip to be sent to new subscribers + latestGossip = newGossip + diff(oldGossip, newGossip) foreach { eventStream publish } + } + + def publishInternalStats(currentStats: CurrentInternalStats): Unit = { + eventStream publish currentStats + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 39ee3888d5..5218480e5e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -6,9 +6,9 @@ package akka.cluster import java.io.Closeable import scala.collection.immutable.SortedSet - import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } import akka.cluster.ClusterEvent._ +import akka.actor.PoisonPill /** * INTERNAL API @@ -39,13 +39,20 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case SeenChanged(convergence, seenBy) ⇒ state = state.copy(convergence = convergence, seenBy = seenBy) - case MembersChanged(members) ⇒ state = state.copy(members = members) - case UnreachableMembersChanged(unreachable) ⇒ state = state.copy(unreachable = unreachable) - case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case _ ⇒ // ignore, not interesting + case SeenChanged(convergence, seenBy) ⇒ + state = state.copy(convergence = convergence, seenBy = seenBy) + case MemberRemoved(member) ⇒ + state = state.copy(members = state.members - member, unreachable = state.unreachable - member) + case MemberUnreachable(member) ⇒ + state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) + case MemberDowned(member) ⇒ + state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) + case event: MemberEvent ⇒ state = state.copy(members = state.members - event.member + event.member) + case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) + case ConvergenceChanged(convergence) ⇒ state = state.copy(convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") } @@ -121,8 +128,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { /** * Unsubscribe to cluster events. */ - def close(): Unit = { - cluster.system.stop(eventBusListener) + def close(): Unit = if (!eventBusListener.isTerminated) { + eventBusListener ! PoisonPill } } \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 99dd2633a4..dd880a76d8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit import akka.remote.testconductor.RoleName import akka.actor.Props import akka.actor.Actor +import akka.cluster.MemberStatus._ object LargeClusterMultiJvmSpec extends MultiNodeConfig { // each jvm simulates a datacenter with many nodes @@ -147,15 +148,20 @@ abstract class LargeClusterSpec val latch = TestLatch(clusterNodes.size) clusterNodes foreach { c ⇒ c.subscribe(system.actorOf(Props(new Actor { + var upCount = 0 def receive = { - case MembersChanged(members) ⇒ - if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + case state: CurrentClusterState ⇒ + upCount = state.members.count(_.status == Up) + case MemberUp(_) if !latch.isOpen ⇒ + upCount += 1 + if (upCount == totalNodes) { log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", totalNodes, c.selfAddress, tookMillis, gossipCount(c)) latch.countDown() } + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) } runOn(from) { @@ -276,18 +282,20 @@ abstract class LargeClusterSpec val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ Cluster(sys).subscribe(sys.actorOf(Props(new Actor { - var gotExpectedLiveNodes = false - var gotExpectedUnreachableNodes = false + var gotUnreachable = Set.empty[Member] def receive = { - case MembersChanged(members) if !latch.isOpen ⇒ - gotExpectedLiveNodes = members.size == liveNodes + case state: CurrentClusterState ⇒ + gotUnreachable = state.unreachable checkDone() - case UnreachableMembersChanged(unreachable) if !latch.isOpen ⇒ - gotExpectedUnreachableNodes = unreachable.size == unreachableNodes + case MemberUnreachable(m) if !latch.isOpen ⇒ + gotUnreachable = gotUnreachable + m + checkDone() + case MemberDowned(m) if !latch.isOpen ⇒ + gotUnreachable = gotUnreachable + m checkDone() case _ ⇒ // not interesting } - def checkDone(): Unit = if (gotExpectedLiveNodes && gotExpectedUnreachableNodes) { + def checkDone(): Unit = if (gotUnreachable.size == unreachableNodes) { log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) latch.countDown() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 6c16a9550a..b509341ee6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -68,21 +68,21 @@ abstract class LeaderLeavingSpec val leavingLatch = TestLatch() val exitingLatch = TestLatch() - val expectedAddresses = roles.toSet map address + cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + case MemberLeft(m) if m.address == oldLeaderAddress ⇒ leavingLatch.countDown() + case MemberExited(m) if m.address == oldLeaderAddress ⇒ exitingLatch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("registered-listener") enterBarrier("leader-left") + val expectedAddresses = roles.toSet map address + awaitCond(clusterView.members.map(_.address) == expectedAddresses) + // verify that the LEADER is LEAVING leavingLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 62ff1d1e3e..649b57ee59 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -58,11 +58,11 @@ abstract class MembershipChangeListenerExitingSpec val exitingLatch = TestLatch() cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ - if (members.size == 3 && members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Exiting)) - exitingLatch.countDown() + case MemberExited(m) if m.address == address(second) ⇒ + exitingLatch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("registered-listener") exitingLatch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 0eeee4334a..9999f02078 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -40,12 +40,16 @@ abstract class MembershipChangeListenerJoinSpec val joinLatch = TestLatch() val expectedAddresses = Set(first, second) map address cluster.subscribe(system.actorOf(Props(new Actor { + var members = Set.empty[Member] def receive = { - case MembersChanged(members) ⇒ - if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) + case state: CurrentClusterState ⇒ members = state.members + case MemberJoined(m) ⇒ + members = members - m + m + if (members.map(_.address) == expectedAddresses) joinLatch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("registered-listener") joinLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 69ef096613..8e33497d00 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -54,15 +54,13 @@ abstract class MembershipChangeListenerLeavingSpec runOn(third) { val latch = TestLatch() - val expectedAddresses = Set(first, second, third) map address cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ - if (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Leaving)) - latch.countDown() + case MemberLeft(m) if m.address == address(second) ⇒ + latch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("registered-listener") latch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index efb5ffd42f..c44e61df57 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -40,12 +40,16 @@ abstract class MembershipChangeListenerUpSpec val latch = TestLatch() val expectedAddresses = Set(first, second) map address cluster.subscribe(system.actorOf(Props(new Actor { + var members = Set.empty[Member] def receive = { - case MembersChanged(members) ⇒ - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + case state: CurrentClusterState ⇒ members = state.members + case MemberUp(m) ⇒ + members = members - m + m + if (members.map(_.address) == expectedAddresses) latch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("listener-1-registered") cluster.join(first) latch.await @@ -63,12 +67,16 @@ abstract class MembershipChangeListenerUpSpec val latch = TestLatch() val expectedAddresses = Set(first, second, third) map address cluster.subscribe(system.actorOf(Props(new Actor { + var members = Set.empty[Member] def receive = { - case MembersChanged(members) ⇒ - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + case state: CurrentClusterState ⇒ members = state.members + case MemberUp(m) ⇒ + members = members - m + m + if (members.map(_.address) == expectedAddresses) latch.countDown() + case _ ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("listener-2-registered") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index f1c0f5e97f..5073e17aa1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -46,17 +46,13 @@ abstract class NodeLeavingAndExitingSpec val secondAddess = address(second) val leavingLatch = TestLatch() val exitingLatch = TestLatch() - val expectedAddresses = roles.toSet map address cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == secondAddess && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + case MemberLeft(m) if m.address == secondAddess ⇒ leavingLatch.countDown() + case MemberExited(m) if m.address == secondAddess ⇒ exitingLatch.countDown() + } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("registered-listener") runOn(third) { @@ -64,6 +60,9 @@ abstract class NodeLeavingAndExitingSpec } enterBarrier("second-left") + val expectedAddresses = roles.toSet map address + awaitCond(clusterView.members.map(_.address) == expectedAddresses) + // Verify that 'second' node is set to LEAVING leavingLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 8ee02ee197..4318d0e79d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -44,11 +44,11 @@ abstract class NodeUpSpec val unexpected = new AtomicReference[SortedSet[Member]](SortedSet.empty) cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ - if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) - unexpected.set(members) + case event: MemberEvent ⇒ + unexpected.set(unexpected.get + event.member) + case _: CurrentClusterState ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) enterBarrier("listener-registered") runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 1bbc890c11..e1b3571bd2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -60,11 +60,12 @@ abstract class SunnyWeatherSpec val unexpected = new AtomicReference[SortedSet[Member]] cluster.subscribe(system.actorOf(Props(new Actor { def receive = { - case MembersChanged(members) ⇒ + case event: MemberEvent ⇒ // we don't expected any changes to the cluster - unexpected.set(members) + unexpected.set(unexpected.get + event.member) + case _: CurrentClusterState ⇒ // ignore } - })), classOf[MembersChanged]) + })), classOf[MemberEvent]) for (n ← 1 to 30) { enterBarrier("period-" + n) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0d5e60b7ad..7ad3497d84 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -124,7 +124,7 @@ abstract class TransitionSpec clusterView.status must be(Joining) clusterView.convergence must be(true) leaderActions() - clusterView.status must be(Up) + awaitCond(clusterView.status == Up) } enterBarrier("after-1") @@ -138,8 +138,8 @@ abstract class TransitionSpec runOn(first, second) { // gossip chat from the join will synchronize the views awaitMembers(first, second) - memberStatus(first) must be(Up) - memberStatus(second) must be(Joining) + awaitMemberStatus(first, Up) + awaitMemberStatus(second, Joining) awaitCond(seenLatestGossip == Set(first, second)) clusterView.convergence must be(true) } @@ -147,17 +147,17 @@ abstract class TransitionSpec runOn(leader(first, second)) { leaderActions() - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) + awaitMemberStatus(first, Up) + awaitMemberStatus(second, Up) } enterBarrier("leader-actions-2") leader(first, second) gossipTo nonLeader(first, second).head runOn(first, second) { // gossip chat will synchronize the views - awaitCond(memberStatus(second) == Up) + awaitMemberStatus(second, Up) seenLatestGossip must be(Set(first, second)) - memberStatus(first) must be(Up) + awaitMemberStatus(first, Up) clusterView.convergence must be(true) } @@ -172,7 +172,7 @@ abstract class TransitionSpec runOn(second, third) { // gossip chat from the join will synchronize the views awaitMembers(first, second, third) - memberStatus(third) must be(Joining) + awaitMemberStatus(third, Joining) awaitCond(seenLatestGossip == Set(second, third)) clusterView.convergence must be(false) } @@ -182,8 +182,8 @@ abstract class TransitionSpec runOn(first, second) { // gossip chat will synchronize the views awaitMembers(first, second, third) - memberStatus(third) must be(Joining) - awaitCond(memberStatus(second) == Up) + awaitMemberStatus(third, Joining) + awaitMemberStatus(second, Up) seenLatestGossip must be(Set(first, second, third)) clusterView.convergence must be(true) } @@ -191,9 +191,9 @@ abstract class TransitionSpec first gossipTo third runOn(first, second, third) { members must be(Set(first, second, third)) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Joining) + awaitMemberStatus(first, Up) + awaitMemberStatus(second, Up) + awaitMemberStatus(third, Joining) seenLatestGossip must be(Set(first, second, third)) clusterView.convergence must be(true) } @@ -202,16 +202,16 @@ abstract class TransitionSpec runOn(leader(first, second, third)) { leaderActions() - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) + awaitMemberStatus(first, Up) + awaitMemberStatus(second, Up) + awaitMemberStatus(third, Up) } enterBarrier("leader-actions-3") // leader gossipTo first non-leader leader(first, second, third) gossipTo nonLeader(first, second, third).head runOn(nonLeader(first, second, third).head) { - memberStatus(third) must be(Up) + awaitMemberStatus(third, Up) seenLatestGossip must be(Set(leader(first, second, third), myself)) clusterView.convergence must be(false) } @@ -223,7 +223,7 @@ abstract class TransitionSpec cluster.clusterCore ! InternalClusterAction.SendGossipTo(nonLeader(first, second, third).tail.head) } runOn(nonLeader(first, second, third).tail.head) { - memberStatus(third) must be(Up) + awaitMemberStatus(third, Up) seenLatestGossip must be(Set(first, second, third)) clusterView.convergence must be(true) } @@ -231,9 +231,9 @@ abstract class TransitionSpec // first non-leader gossipTo the leader nonLeader(first, second, third).head gossipTo leader(first, second, third) runOn(first, second, third) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) + awaitMemberStatus(first, Up) + awaitMemberStatus(second, Up) + awaitMemberStatus(third, Up) seenLatestGossip must be(Set(first, second, third)) clusterView.convergence must be(true) } @@ -269,7 +269,7 @@ abstract class TransitionSpec runOn(first, third) { clusterView.unreachableMembers must contain(Member(second, Down)) - memberStatus(second) must be(Down) + awaitMemberStatus(second, Down) seenLatestGossip must be(Set(first, third)) clusterView.convergence must be(true) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala new file mode 100644 index 0000000000..1bbffca3c2 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import akka.actor.Address +import scala.collection.immutable.SortedSet + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ClusterDomainEventSpec extends WordSpec with MustMatchers { + + import MemberStatus._ + import ClusterEvent._ + + val a1 = Member(Address("akka", "sys", "a", 2552), Up) + val a2 = Member(Address("akka", "sys", "a", 2552), Joining) + val b1 = Member(Address("akka", "sys", "b", 2552), Up) + val b2 = Member(Address("akka", "sys", "b", 2552), Removed) + val b3 = Member(Address("akka", "sys", "b", 2552), Down) + val c1 = Member(Address("akka", "sys", "c", 2552), Leaving) + val c2 = Member(Address("akka", "sys", "c", 2552), Up) + val d1 = Member(Address("akka", "sys", "d", 2552), Leaving) + val d2 = Member(Address("akka", "sys", "d", 2552), Removed) + val e1 = Member(Address("akka", "sys", "e", 2552), Joining) + val e2 = Member(Address("akka", "sys", "e", 2552), Up) + val e3 = Member(Address("akka", "sys", "e", 2552), Down) + + "Domain events" must { + + "be produced for new members" in { + val g1 = Gossip(members = SortedSet(a1)) + val g2 = Gossip(members = SortedSet(a1, b1, e1)) + + diff(g1, g2) must be(Seq(MemberUp(b1), MemberJoined(e1))) + } + + "be produced for changed status of members" in { + val g1 = Gossip(members = SortedSet(a2, b1, c2)) + val g2 = Gossip(members = SortedSet(a1, b1, c1, e1)) + + diff(g1, g2) must be(Seq(MemberUp(a1), MemberLeft(c1), MemberJoined(e1))) + } + + "be produced for unreachable members" in { + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2))) + val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(b1, c2))) + + diff(g1, g2) must be(Seq(MemberUnreachable(b1))) + } + + "be produced for downed members" in { + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(c2, e2))) + val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2, b3, e3))) + + diff(g1, g2) must be(Seq(MemberDowned(b3), MemberDowned(e3))) + } + + "be produced for removed members" in { + val g1 = Gossip(members = SortedSet(a1, d1), overview = GossipOverview(unreachable = Set(c2))) + val g2 = Gossip(members = SortedSet(a1), overview = GossipOverview(unreachable = Set(c2))) + + diff(g1, g2) must be(Seq(MemberRemoved(d2))) + } + + "be produced for convergence changes" in { + val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address).seen(e1.address) + val g2 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.address).seen(b1.address) + + // LeaderChanged is also published when convergence changed + diff(g1, g2) must be(Seq(ConvergenceChanged(false), LeaderChanged(Some(a1.address), convergence = false), + SeenChanged(convergence = false, seenBy = Set(a1.address, b1.address)))) + diff(g2, g1) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(a1.address), convergence = true), + SeenChanged(convergence = true, seenBy = Set(a1.address, b1.address, e1.address)))) + } + + "be produced for leader changes" in { + val g1 = Gossip(members = SortedSet(a1, b1, e1)) + val g2 = Gossip(members = SortedSet(b1, e1), overview = GossipOverview(unreachable = Set(a1))) + val g3 = g2.copy(overview = GossipOverview()).seen(b1.address).seen(e1.address) + + diff(g1, g2) must be(Seq(MemberUnreachable(a1), LeaderChanged(Some(b1.address), convergence = false))) + diff(g2, g3) must be(Seq(ConvergenceChanged(true), LeaderChanged(Some(b1.address), convergence = true), + SeenChanged(convergence = true, seenBy = Set(b1.address, e1.address)))) + } + } +} diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala index e61af1db6a..521176d142 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala @@ -16,13 +16,18 @@ object ClusterApp { def receive = { case state: CurrentClusterState ⇒ println("Current members: " + state.members) - case MembersChanged(members) ⇒ - println("Current members: " + members) + case MemberJoined(member) ⇒ + println("Member joined: " + member) + case MemberUp(member) ⇒ + println("Member is Up: " + member) + case MemberUnreachable(member) ⇒ + println("Member detected as unreachable: " + member) + case _ ⇒ // ignore } })) - Cluster(system).subscribe(clusterListener, classOf[MembersChanged]) + Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent]) } } \ No newline at end of file