diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index bf9c2945cf..b655a1ab21 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -71,11 +71,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { */ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { - /** - * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. - * All state is represented by this immutable case class and managed by an AtomicReference. - */ - private case class State(memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty) + import ClusterEvent._ if (!system.provider.isInstanceOf[RemoteActorRefProvider]) throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") @@ -92,8 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Cluster Node [{}] - is starting up...", selfAddress) - private val state = new AtomicReference[State](State()) - /** * Read only view of cluster state, updated periodically by * ClusterCoreDaemon. Access with `latestGossip`. @@ -109,6 +103,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @volatile private[cluster] var _latestStats = ClusterStats() + private[cluster] val eventBus: ClusterEventBus = new ClusterEventBus + // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -161,6 +157,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + // create actor that subscribes to the cluster eventBus to update current read view state + private val eventBusListener: ActorRef = { + val listener = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + def receive = { + case MembershipGossipChanged(gossip) ⇒ _latestGossip = gossip + case InternalStatsChanged(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting + } + }).withDispatcher(UseDispatcher), name = "clusterEventBusListener") + + eventBus.subscribe(listener, classOf[ClusterDomainEvent]) + listener + } + // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). @@ -247,26 +257,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) def seedNodes: IndexedSeq[Address] = SeedNodes /** - * Registers a listener to subscribe to cluster membership changes. + * Subscribe to cluster domain events. + * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] + * or subclass. */ - @tailrec - final def registerListener(listener: MembershipChangeListener): Unit = { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners + listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur - } - - /** - * Unsubscribes to cluster membership changes. - */ - @tailrec - final def unregisterListener(listener: MembershipChangeListener): Unit = { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners - listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur - } + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = eventBus.subscribe(subscriber, to) /** * Try to join this cluster node with the node specified by 'address'. @@ -303,10 +298,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (_isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - // FIXME isTerminated check can be removed when ticket #2221 is fixed - // now it prevents logging if system is shutdown (or in progress of shutdown) - if (!clusterDaemons.isTerminated) - system.stop(clusterDaemons) + system.stop(clusterDaemons) + eventBus.unsubscribe(eventBusListener) + system.stop(eventBusListener) scheduler.close() @@ -316,41 +310,32 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - /** - * INTERNAL API - */ - private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit = { - // FIXME run callbacks async (to not block the cluster) - state.get.memberMembershipChangeListeners foreach { _ notify members } - } - /** * INTERNAL API */ private[cluster] def latestStats: ClusterStats = _latestStats - /** - * INTERNAL API - */ - private[cluster] def publishLatestGossip(gossip: Gossip): Unit = _latestGossip = gossip - - /** - * INTERNAL API - */ - private[cluster] def publishLatestStats(stats: ClusterStats): Unit = _latestStats = stats - } /** - * Interface for membership change listener. + * Domain events published to the cluster event bus. */ -trait MembershipChangeListener { - def notify(members: SortedSet[Member]): Unit +object ClusterEvent { + /** + * Marker interface for cluster domain events. + */ + trait ClusterDomainEvent + + /** + * Set of cluster members, or their status has changed. + */ + case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent + + case class MembershipGossipChanged(gossip: Gossip) extends ClusterDomainEvent + /** + * INTERNAL API + */ + private[cluster] case class InternalStatsChanged(stats: ClusterStats) extends ClusterDomainEvent + } -/** - * Interface for meta data change listener. - */ -trait MetaDataChangeListener { - def notify(meta: Map[String, Array[Byte]]): Unit -} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b23c0f2108..3414c19faf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -11,7 +11,8 @@ import akka.actor.Status.Failure import akka.routing.ScatterGatherFirstCompletedRouter import akka.util.Timeout import akka.pattern.{ AskTimeoutException, ask, pipe } -import MemberStatus._ +import akka.cluster.MemberStatus._ +import akka.cluster.ClusterEvent._ /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -124,9 +125,7 @@ private[cluster] trait ClusterEnvironment { private[cluster] def selfAddress: Address private[cluster] def scheduler: Scheduler private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit - private[cluster] def publishLatestGossip(gossip: Gossip): Unit - private[cluster] def publishLatestStats(stats: ClusterStats): Unit + private[cluster] def eventBus: ClusterEventBus private[cluster] def shutdown(): Unit } @@ -274,7 +273,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() - notifyListeners(localGossip) + publish(localGossip) coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) } @@ -316,7 +315,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) gossipTo(node) } - notifyListeners(localGossip) + publish(localGossip) } } @@ -335,7 +334,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) latestGossip = seenVersionedGossip log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) - notifyListeners(localGossip) + publish(localGossip) } } @@ -362,7 +361,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // just cleaning up the gossip state latestGossip = Gossip() // make sure the final (removed) state is always published - notifyListeners(localGossip) + publish(localGossip) environment.shutdown() } @@ -413,7 +412,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val versionedGossip = newGossip :+ vclockNode latestGossip = versionedGossip seen selfAddress - notifyListeners(localGossip) + publish(localGossip) } /** @@ -507,7 +506,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } stats = stats.incrementReceivedGossipCount - notifyListeners(localGossip) + publish(localGossip) if (envelope.conversation && (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { @@ -709,7 +708,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) } - notifyListeners(localGossip) + publish(localGossip) } } } @@ -763,7 +762,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) - notifyListeners(localGossip) + publish(localGossip) } } } @@ -803,18 +802,21 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) coreSender ! SendClusterMessage(address, gossipMsg) - def notifyListeners(oldGossip: Gossip): Unit = { + def publish(oldGossip: Gossip): Unit = { if (PublishStateInterval == Duration.Zero) publishState() - - val oldMembersStatus = oldGossip.members.map(m ⇒ (m.address, m.status)) - val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) - if (newMembersStatus != oldMembersStatus) - environment notifyMembershipChangeListeners latestGossip.members + publishMembers(oldGossip.members) } def publishState(): Unit = { - environment.publishLatestGossip(latestGossip) - environment.publishLatestStats(stats) + environment.eventBus publish MembershipGossipChanged(latestGossip) + environment.eventBus publish InternalStatsChanged(stats) + } + + def publishMembers(oldMembers: SortedSet[Member]): Unit = { + val oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) + val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) + if (newMembersStatus != oldMembersStatus) + environment.eventBus publish MembersChanged(latestGossip.members) } def ping(p: Ping): Unit = sender ! Pong(p) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala new file mode 100644 index 0000000000..d5a7c7ee6f --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEventBus.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.event.ActorEventBus +import akka.event.SubchannelClassification +import akka.actor.ActorRef +import akka.util.Subclassification + +/** + * Changes to the Cluster are published to this local event bus + * as [[akka.cluster.ClusterEvent.ClusterDomainEvent]] subclasses. + */ +class ClusterEventBus extends ActorEventBus with SubchannelClassification { + + type Event = AnyRef + type Classifier = Class[_] + + protected implicit val subclassification = new Subclassification[Class[_]] { + def isEqual(x: Class[_], y: Class[_]) = x == y + def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x + } + + protected def classify(event: AnyRef): Class[_] = event.getClass + + protected def publish(event: AnyRef, subscriber: ActorRef) = { + if (subscriber.isTerminated) unsubscribe(subscriber) + else subscriber ! event + } + +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 944d90079b..6a01057e7d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -59,10 +59,8 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { def getClusterStatus: String = { val gossip = clusterNode.latestGossip val unreachable = gossip.overview.unreachable - val metaData = gossip.meta "\nMembers:\n\t" + gossip.members.mkString("\n\t") + - { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + - { if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" } + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } } def getMemberStatus: String = clusterNode.status.toString diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 643d0fd6fd..800fa6b584 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -13,7 +13,7 @@ object Gossip { } /** - * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - + * Represents the state of the cluster; cluster ring membership, ring convergence - * all versioned by a vector clock. * * When a node is joining the `Member`, with status `Joining`, is added to `members`. @@ -46,7 +46,6 @@ object Gossip { case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address - meta: Map[String, Array[Byte]] = Map.empty, version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { @@ -97,7 +96,7 @@ case class Gossip( } /** - * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. + * Merges two Gossip instances including membership tables, and the VectorClock histories. */ def merge(that: Gossip): Gossip = { import Member.ordering @@ -105,20 +104,17 @@ case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. merge meta-data - val mergedMeta = this.meta ++ that.meta - - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups + // 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) - // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, + // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - // 5. fresh seen table + // 4. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] - Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) + Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedVClock) } /** @@ -178,7 +174,6 @@ case class Gossip( "Gossip(" + "overview = " + overview + ", members = [" + members.mkString(", ") + - "], meta = [" + meta.mkString(", ") + "], version = " + version + ")" } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 0d26d5de60..6bc8ba0de8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -16,6 +16,8 @@ import scala.concurrent.Await import scala.concurrent.util.Duration import java.util.concurrent.TimeUnit import akka.remote.testconductor.RoleName +import akka.actor.Props +import akka.actor.Actor object LargeClusterMultiJvmSpec extends MultiNodeConfig { // each jvm simulates a datacenter with many nodes @@ -78,6 +80,7 @@ abstract class LargeClusterSpec with MultiNodeClusterSpec { import LargeClusterMultiJvmSpec._ + import ClusterEvent._ var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) val nodesPerDatacenter = system.settings.config.getInt( @@ -143,15 +146,16 @@ abstract class LargeClusterSpec val latch = TestLatch(clusterNodes.size) clusterNodes foreach { c ⇒ - c.registerListener(new MembershipChangeListener { - override def notify(members: SortedSet[Member]): Unit = { - if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { - log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", - totalNodes, c.selfAddress, tookMillis, gossipCount(c)) - latch.countDown() - } + c.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", + totalNodes, c.selfAddress, tookMillis, gossipCount(c)) + latch.countDown() + } } - }) + })), classOf[MembersChanged]) } runOn(from) { @@ -271,15 +275,16 @@ abstract class LargeClusterSpec val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ - Cluster(sys).registerListener(new MembershipChangeListener { - override def notify(members: SortedSet[Member]): Unit = { - if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { - log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", - unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) - latch.countDown() - } + Cluster(sys).subscribe(sys.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } } - }) + })), classOf[MembersChanged]) } runOn(firstDatacenter) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 9e45b1529b..844f1be226 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -32,6 +34,7 @@ abstract class LeaderLeavingSpec with MultiNodeClusterSpec { import LeaderLeavingMultiJvmSpec._ + import ClusterEvent._ val leaderHandoffWaitingTime = 30.seconds @@ -66,15 +69,16 @@ abstract class LeaderLeavingSpec val leavingLatch = TestLatch() val exitingLatch = TestLatch() val expectedAddresses = roles.toSet map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") enterBarrier("leader-left") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 3bf49a538b..62ff1d1e3e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -36,6 +38,7 @@ abstract class MembershipChangeListenerExitingSpec with MultiNodeClusterSpec { import MembershipChangeListenerExitingMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is EXITING" taggedAs LongRunningTest in { @@ -53,12 +56,13 @@ abstract class MembershipChangeListenerExitingSpec runOn(third) { val exitingLatch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Exiting)) - exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.size == 3 && members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Exiting)) + exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") exitingLatch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 441ecc4528..f48cb941fb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -29,6 +31,7 @@ abstract class MembershipChangeListenerJoinSpec with MultiNodeClusterSpec { import MembershipChangeListenerJoinMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is JOINING" taggedAs LongRunningTest in { @@ -36,12 +39,13 @@ abstract class MembershipChangeListenerJoinSpec runOn(first) { val joinLatch = TestLatch() val expectedAddresses = Set(first, second) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) - joinLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) + joinLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") joinLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index e6430314d4..20e6a6132c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -34,6 +36,7 @@ abstract class MembershipChangeListenerLeavingSpec with MultiNodeClusterSpec { import MembershipChangeListenerLeavingMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is LEAVING" taggedAs LongRunningTest in { @@ -52,13 +55,14 @@ abstract class MembershipChangeListenerLeavingSpec runOn(third) { val latch = TestLatch() val expectedAddresses = Set(first, second, third) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Leaving)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Leaving)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") latch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 5638399b59..efb5ffd42f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -8,6 +8,8 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -26,6 +28,7 @@ abstract class MembershipChangeListenerUpSpec with MultiNodeClusterSpec { import MembershipChangeListenerUpMultiJvmSpec._ + import ClusterEvent._ "A set of connected cluster systems" must { @@ -36,12 +39,13 @@ abstract class MembershipChangeListenerUpSpec runOn(first, second) { val latch = TestLatch() val expectedAddresses = Set(first, second) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("listener-1-registered") cluster.join(first) latch.await @@ -58,12 +62,13 @@ abstract class MembershipChangeListenerUpSpec val latch = TestLatch() val expectedAddresses = Set(first, second, third) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("listener-2-registered") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 19c81ecb28..f1c0f5e97f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -32,6 +34,7 @@ abstract class NodeLeavingAndExitingSpec with MultiNodeClusterSpec { import NodeLeavingAndExitingMultiJvmSpec._ + import ClusterEvent._ "A node that is LEAVING a non-singleton cluster" must { @@ -44,15 +47,16 @@ abstract class NodeLeavingAndExitingSpec val leavingLatch = TestLatch() val exitingLatch = TestLatch() val expectedAddresses = roles.toSet map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == secondAddess && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == secondAddess && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index d4bdf2b748..1510663784 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -11,6 +11,8 @@ import akka.testkit._ import scala.concurrent.util.duration._ import scala.collection.immutable.SortedSet import java.util.concurrent.atomic.AtomicReference +import akka.actor.Props +import akka.actor.Actor object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -27,6 +29,7 @@ abstract class NodeUpSpec with MultiNodeClusterSpec { import NodeUpMultiJvmSpec._ + import ClusterEvent._ "A cluster node that is joining another cluster" must { "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { @@ -39,12 +42,13 @@ abstract class NodeUpSpec "be unaffected when joining again" taggedAs LongRunningTest in { val unexpected = new AtomicReference[SortedSet[Member]](SortedSet.empty) - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) - unexpected.set(members) + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) + unexpected.set(members) } - }) + })), classOf[MembersChanged]) enterBarrier("listener-registered") runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 0937d1a8bf..1bbc890c11 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -11,6 +11,8 @@ import akka.testkit._ import scala.concurrent.util.duration._ import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.SortedSet +import akka.actor.Props +import akka.actor.Actor object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -40,6 +42,7 @@ abstract class SunnyWeatherSpec with MultiNodeClusterSpec { import SunnyWeatherMultiJvmSpec._ + import ClusterEvent._ "A normal cluster" must { "be healthy" taggedAs LongRunningTest in { @@ -55,12 +58,13 @@ abstract class SunnyWeatherSpec log.info("5 joined") val unexpected = new AtomicReference[SortedSet[Member]] - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - // we don't expected any changes to the cluster - unexpected.set(members) + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + // we don't expected any changes to the cluster + unexpected.set(members) } - }) + })), classOf[MembersChanged]) for (n ← 1 to 30) { enterBarrier("period-" + n)