diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index bcf288dfec..30e425dafd 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -40,9 +40,9 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s - # How often the current state (Gossip) should be published for reading from the outside. - # A value of 0 s can be used to always publish the state, when it happens. - publish-state-interval = 1s + # How often the current internal stats should be published. + # A value of 0 s can be used to always publish the stats, when it happens. + publish-stats-interval = 10s # A joining node stops sending heartbeats to the node to join if it hasn't become member # of the cluster within this deadline. diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index bf9c2945cf..c76b637164 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -69,13 +69,9 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { +class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { - /** - * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. - * All state is represented by this immutable case class and managed by an AtomicReference. - */ - private case class State(memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty) + import ClusterEvent._ if (!system.provider.isInstanceOf[RemoteActorRefProvider]) throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") @@ -92,23 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) log.info("Cluster Node [{}] - is starting up...", selfAddress) - private val state = new AtomicReference[State](State()) - - /** - * Read only view of cluster state, updated periodically by - * ClusterCoreDaemon. Access with `latestGossip`. - */ - @volatile - private[cluster] var _latestGossip: Gossip = Gossip() - - /** - * INTERNAL API - * Read only view of internal cluster stats, updated periodically by - * ClusterCoreDaemon. Access with `latestStats`. - */ - @volatile - private[cluster] var _latestStats = ClusterStats() - // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -175,6 +154,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } + @volatile + private var readViewStarted = false + private[cluster] lazy val readView: ClusterReadView = { + val readView = new ClusterReadView(this) + readViewStarted = true + readView + } + system.registerOnTermination(shutdown()) private val clusterJmx = new ClusterJmx(this, log) @@ -186,87 +173,25 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== PUBLIC API ===================== // ====================================================== - def self: Member = latestGossip.member(selfAddress) - /** * Returns true if the cluster node is up and running, false if it is shut down. */ def isRunning: Boolean = _isRunning.get /** - * Latest gossip. + * Subscribe to cluster domain events. + * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] + * or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] + * will also be sent to the subscriber. */ - def latestGossip: Gossip = _latestGossip + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = + clusterCore ! InternalClusterAction.Subscribe(subscriber, to) /** - * Member status for this node ([[akka.cluster.MemberStatus]]). - * - * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state - * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the - * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`. + * Unsubscribe to cluster domain events. */ - def status: MemberStatus = self.status - - /** - * Is this node the leader? - */ - def isLeader: Boolean = latestGossip.isLeader(selfAddress) - - /** - * Get the address of the current leader. - */ - def leader: Address = latestGossip.leader match { - case Some(x) ⇒ x - case None ⇒ throw new IllegalStateException("There is no leader in this cluster") - } - - /** - * Is this node a singleton cluster? - */ - def isSingletonCluster: Boolean = latestGossip.isSingletonCluster - - /** - * Checks if we have a cluster convergence. - * - * @return Some(convergedGossip) if convergence have been reached and None if not - */ - def convergence: Option[Gossip] = latestGossip match { - case gossip if gossip.convergence ⇒ Some(gossip) - case _ ⇒ None - } - - /** - * Returns true if the node is UP or JOINING. - */ - def isAvailable: Boolean = latestGossip.isAvailable(selfAddress) - - /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - def seedNodes: IndexedSeq[Address] = SeedNodes - - /** - * Registers a listener to subscribe to cluster membership changes. - */ - @tailrec - final def registerListener(listener: MembershipChangeListener): Unit = { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners + listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur - } - - /** - * Unsubscribes to cluster membership changes. - */ - @tailrec - final def unregisterListener(listener: MembershipChangeListener): Unit = { - val localState = state.get - val newListeners = localState.memberMembershipChangeListeners - listener - val newState = localState copy (memberMembershipChangeListeners = newListeners) - if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur - } + def unsubscribe(subscriber: ActorRef): Unit = + clusterCore ! InternalClusterAction.Unsubscribe(subscriber) /** * Try to join this cluster node with the node specified by 'address'. @@ -291,6 +216,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // ===================== INTERNAL API ===================== // ======================================================== + /** + * Make it possible to override/configure seedNodes from tests without + * specifying in config. Addresses are unknown before startup time. + */ + private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes + /** * INTERNAL API. * @@ -303,10 +234,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (_isRunning.compareAndSet(true, false)) { log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress) - // FIXME isTerminated check can be removed when ticket #2221 is fixed - // now it prevents logging if system is shutdown (or in progress of shutdown) - if (!clusterDaemons.isTerminated) - system.stop(clusterDaemons) + system.stop(clusterDaemons) + if (readViewStarted) readView.close() scheduler.close() @@ -316,41 +245,5 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - /** - * INTERNAL API - */ - private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit = { - // FIXME run callbacks async (to not block the cluster) - state.get.memberMembershipChangeListeners foreach { _ notify members } - } - - /** - * INTERNAL API - */ - private[cluster] def latestStats: ClusterStats = _latestStats - - /** - * INTERNAL API - */ - private[cluster] def publishLatestGossip(gossip: Gossip): Unit = _latestGossip = gossip - - /** - * INTERNAL API - */ - private[cluster] def publishLatestStats(stats: ClusterStats): Unit = _latestStats = stats - } -/** - * Interface for membership change listener. - */ -trait MembershipChangeListener { - def notify(members: SortedSet[Member]): Unit -} - -/** - * Interface for meta data change listener. - */ -trait MetaDataChangeListener { - def notify(meta: Map[String, Array[Byte]]): Unit -} diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b23c0f2108..f3a6c1a730 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -6,12 +6,13 @@ package akka.cluster import scala.collection.immutable.SortedSet import scala.concurrent.util.{ Deadline, Duration } import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler } +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler } import akka.actor.Status.Failure -import akka.routing.ScatterGatherFirstCompletedRouter +import akka.event.EventStream import akka.util.Timeout -import akka.pattern.{ AskTimeoutException, ask, pipe } -import MemberStatus._ +import akka.cluster.MemberStatus._ +import akka.cluster.ClusterEvent._ +import language.existentials /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -72,15 +73,20 @@ private[cluster] object InternalClusterAction { */ case class InitJoinAck(address: Address) extends ClusterMessage - case object GossipTick + /** + * Marker interface for periodic tick messages + */ + sealed trait Tick - case object HeartbeatTick + case object GossipTick extends Tick - case object ReapUnreachableTick + case object HeartbeatTick extends Tick - case object LeaderActionsTick + case object ReapUnreachableTick extends Tick - case object PublishStateTick + case object LeaderActionsTick extends Tick + + case object PublishStatsTick extends Tick case class SendClusterMessage(to: Address, msg: ClusterMessage) @@ -88,6 +94,9 @@ private[cluster] object InternalClusterAction { case object GetClusterCoreRef + case class Subscribe(subscriber: ActorRef, to: Class[_]) + case class Unsubscribe(subscriber: ActorRef) + case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage @@ -124,9 +133,6 @@ private[cluster] trait ClusterEnvironment { private[cluster] def selfAddress: Address private[cluster] def scheduler: Scheduler private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit - private[cluster] def publishLatestGossip(gossip: Gossip): Unit - private[cluster] def publishLatestStats(stats: ClusterStats): Unit private[cluster] def shutdown(): Unit } @@ -206,13 +212,20 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // start periodic publish of current state private val publishStateTask: Option[Cancellable] = - if (PublishStateInterval == Duration.Zero) None - else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) { - self ! PublishStateTick + if (PublishStatsInterval == Duration.Zero) None + else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { + self ! PublishStatsTick }) override def preStart(): Unit = { - if (AutoJoin) self ! InternalClusterAction.JoinSeedNode + if (AutoJoin) { + // only the node which is named first in the list of seed nodes will join itself + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(environment)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + } } override def postStop(): Unit = { @@ -223,18 +236,23 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) publishStateTask foreach { _.cancel() } } - def receive = { + def uninitialized: Actor.Receive = { + case InitJoin ⇒ // skip, not ready yet + case JoinTo(address) ⇒ join(address) + case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) + case _: Tick ⇒ // ignore periodic tasks until initialized + } + + def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() case HeartbeatTick ⇒ heartbeat() case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() - case PublishStateTick ⇒ publishState() - case JoinSeedNode ⇒ joinSeedNode() + case PublishStatsTick ⇒ publishInternalStats() case InitJoin ⇒ initJoin() - case InitJoinAck(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() case JoinTo(address) ⇒ join(address) case ClusterUserAction.Join(address) ⇒ joining(address) case ClusterUserAction.Down(address) ⇒ downing(address) @@ -242,25 +260,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case Exit(address) ⇒ exiting(address) case Remove(address) ⇒ removing(address) case SendGossipTo(address) ⇒ gossipTo(address) + case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber) case p: Ping ⇒ ping(p) } - def joinSeedNode(): Unit = { - val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } - if (seedRoutees.isEmpty) join(selfAddress) - else { - implicit val within = Timeout(SeedNodeTimeout) - val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration))) - seedRouter ! InitJoin - seedRouter ! PoisonPill - } - } + def receive = uninitialized def initJoin(): Unit = sender ! InitJoinAck(selfAddress) - def joinSeedNodeTimeout(): Unit = join(selfAddress) - /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -274,9 +283,13 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() - notifyListeners(localGossip) + publish(localGossip) - coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + context.become(initialized) + if (address == selfAddress) + joining(address) + else + coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) } /** @@ -316,7 +329,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) gossipTo(node) } - notifyListeners(localGossip) + publish(localGossip) } } @@ -335,7 +348,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) latestGossip = seenVersionedGossip log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address) - notifyListeners(localGossip) + publish(localGossip) } } @@ -362,7 +375,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // just cleaning up the gossip state latestGossip = Gossip() // make sure the final (removed) state is always published - notifyListeners(localGossip) + publish(localGossip) environment.shutdown() } @@ -413,7 +426,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) val versionedGossip = newGossip :+ vclockNode latestGossip = versionedGossip seen selfAddress - notifyListeners(localGossip) + publish(localGossip) } /** @@ -507,7 +520,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } stats = stats.incrementReceivedGossipCount - notifyListeners(localGossip) + publish(localGossip) if (envelope.conversation && (conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) { @@ -709,7 +722,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address) } - notifyListeners(localGossip) + publish(localGossip) } } } @@ -763,7 +776,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", ")) - notifyListeners(localGossip) + publish(localGossip) } } } @@ -803,23 +816,115 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) coreSender ! SendClusterMessage(address, gossipMsg) - def notifyListeners(oldGossip: Gossip): Unit = { - if (PublishStateInterval == Duration.Zero) publishState() - - val oldMembersStatus = oldGossip.members.map(m ⇒ (m.address, m.status)) - val newMembersStatus = latestGossip.members.map(m ⇒ (m.address, m.status)) - if (newMembersStatus != oldMembersStatus) - environment notifyMembershipChangeListeners latestGossip.members + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { + subscriber ! CurrentClusterState( + members = latestGossip.members, + unreachable = latestGossip.overview.unreachable, + convergence = latestGossip.convergence, + seenBy = latestGossip.seenBy, + leader = latestGossip.leader) + eventStream.subscribe(subscriber, to) } - def publishState(): Unit = { - environment.publishLatestGossip(latestGossip) - environment.publishLatestStats(stats) + def unsubscribe(subscriber: ActorRef): Unit = + eventStream.unsubscribe(subscriber) + + def publish(oldGossip: Gossip): Unit = { + publishMembers(oldGossip) + publishUnreachableMembers(oldGossip) + publishLeader(oldGossip) + publishSeen(oldGossip) + if (PublishStatsInterval == Duration.Zero) publishInternalStats() } + def publishMembers(oldGossip: Gossip): Unit = { + if (!isSame(oldGossip.members, latestGossip.members)) + eventStream publish MembersChanged(latestGossip.members) + } + + def publishUnreachableMembers(oldGossip: Gossip): Unit = { + if (!isSame(oldGossip.overview.unreachable, latestGossip.overview.unreachable)) + eventStream publish UnreachableMembersChanged(latestGossip.overview.unreachable) + } + + def isSame(oldMembers: Set[Member], newMembers: Set[Member]): Boolean = { + def oldMembersStatus = oldMembers.map(m ⇒ (m.address, m.status)) + def newMembersStatus = newMembers.map(m ⇒ (m.address, m.status)) + (newMembers eq oldMembers) || ((newMembers.size == oldMembers.size) && (newMembersStatus == oldMembersStatus)) + } + + def publishLeader(oldGossip: Gossip): Unit = { + if (latestGossip.leader != oldGossip.leader || latestGossip.convergence != oldGossip.convergence) + eventStream publish LeaderChanged(latestGossip.leader, latestGossip.convergence) + } + + def publishSeen(oldGossip: Gossip): Unit = { + val oldConvergence = oldGossip.convergence + val newConvergence = latestGossip.convergence + val oldSeenBy = oldGossip.seenBy + val newSeenBy = latestGossip.seenBy + + if (newConvergence != oldConvergence || newSeenBy != oldSeenBy) { + eventStream publish SeenChanged(newConvergence, newSeenBy) + } + } + + def publishInternalStats(): Unit = { + eventStream publish CurrentInternalStats(stats) + } + + def eventStream: EventStream = context.system.eventStream + def ping(p: Ping): Unit = sender ! Pong(p) } +/** + * INTERNAL API. + * + * Sends InitJoinAck to all seed nodes (except itself) and expect + * InitJoinAck reply back. The seed node that replied first + * will be used, joined to. InitJoinAck replies received after the + * first one are ignored. + * + * Retries if no InitJoinAck replies are received within the + * SeedNodeTimeout. + * When at least one reply has been received it stops itself after + * an idle SeedNodeTimeout. + * + */ +private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { + import InternalClusterAction._ + + def selfAddress = environment.selfAddress + + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + throw new IllegalArgumentException("Join seed node should not be done") + + context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + + override def preStart(): Unit = self ! JoinSeedNode + + def receive = { + case JoinSeedNode ⇒ + // send InitJoin to all seed nodes (except myself) + environment.seedNodes.collect { + case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) + } foreach { _ ! InitJoin } + case InitJoinAck(address) ⇒ + // first InitJoinAck reply + context.parent ! JoinTo(address) + context.become(done) + case ReceiveTimeout ⇒ + // no InitJoinAck received, try again + self ! JoinSeedNode + } + + def done: Actor.Receive = { + case InitJoinAck(_) ⇒ // already received one, skip rest + case ReceiveTimeout ⇒ context.stop(self) + } +} + /** * INTERNAL API. */ @@ -839,6 +944,53 @@ private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Act } } +/** + * Domain events published to the event bus. + */ +object ClusterEvent { + /** + * Marker interface for cluster domain events. + */ + sealed trait ClusterDomainEvent + + /** + * Current snapshot state of the cluster. Sent to new subscriber. + */ + case class CurrentClusterState( + members: SortedSet[Member] = SortedSet.empty, + unreachable: Set[Member] = Set.empty, + convergence: Boolean = false, + seenBy: Set[Address] = Set.empty, + leader: Option[Address] = None) extends ClusterDomainEvent + + /** + * Set of cluster members or their status have changed. + */ + case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent + + /** + * Set of unreachable cluster members or their status have changed. + */ + case class UnreachableMembersChanged(unreachable: Set[Member]) extends ClusterDomainEvent + + /** + * Leader of the cluster members changed, and/or convergence status. + */ + case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent + + /** + * INTERNAL API + * The nodes that have seen current version of the Gossip. + */ + private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent + + /** + * INTERNAL API + */ + private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent + +} + /** * INTERNAL API */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 944d90079b..4eb27e836e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -33,10 +33,11 @@ trait ClusterNodeMBean { /** * Internal API */ -private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { +private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { private val mBeanServer = ManagementFactory.getPlatformMBeanServer private val clusterMBeanName = new ObjectName("akka:type=Cluster") + private def clusterView = cluster.readView /** * Creates the cluster JMX MBean and registers it in the MBean server. @@ -57,37 +58,34 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { * }}} */ def getClusterStatus: String = { - val gossip = clusterNode.latestGossip - val unreachable = gossip.overview.unreachable - val metaData = gossip.meta - "\nMembers:\n\t" + gossip.members.mkString("\n\t") + - { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } + - { if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" } + val unreachable = clusterView.unreachableMembers + "\nMembers:\n\t" + clusterView.members.mkString("\n\t") + + { if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } } - def getMemberStatus: String = clusterNode.status.toString + def getMemberStatus: String = clusterView.status.toString - def getLeader: String = clusterNode.leader.toString + def getLeader: String = clusterView.leader.toString - def isSingleton: Boolean = clusterNode.isSingletonCluster + def isSingleton: Boolean = clusterView.isSingletonCluster - def isConvergence: Boolean = clusterNode.convergence.isDefined + def isConvergence: Boolean = clusterView.convergence - def isAvailable: Boolean = clusterNode.isAvailable + def isAvailable: Boolean = clusterView.isAvailable - def isRunning: Boolean = clusterNode.isRunning + def isRunning: Boolean = clusterView.isRunning // JMX commands - def join(address: String) = clusterNode.join(AddressFromURIString(address)) + def join(address: String) = cluster.join(AddressFromURIString(address)) - def leave(address: String) = clusterNode.leave(AddressFromURIString(address)) + def leave(address: String) = cluster.leave(AddressFromURIString(address)) - def down(address: String) = clusterNode.down(AddressFromURIString(address)) + def down(address: String) = cluster.down(AddressFromURIString(address)) } try { mBeanServer.registerMBean(mbean, clusterMBeanName) - log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName) + log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName) } catch { case e: InstanceAlreadyExistsException ⇒ // ignore - we are running multiple cluster nodes in the same JVM (probably for testing) } @@ -97,6 +95,7 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) { * Unregisters the cluster JMX MBean from MBean server. */ def unregisterMBean(): Unit = { + clusterView.close() try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala new file mode 100644 index 0000000000..4d3904cbb8 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import java.io.Closeable +import scala.collection.immutable.SortedSet + +import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } +import akka.cluster.ClusterEvent._ + +/** + * INTERNAL API + * + * Read view of cluster state, updated via subscription of + * cluster events published on the event bus. + */ +private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { + + /** + * Current state + */ + @volatile + private var state: CurrentClusterState = CurrentClusterState() + + /** + * Current internal cluster stats, updated periodically via event bus. + */ + @volatile + private var _latestStats = ClusterStats() + + val selfAddress = cluster.selfAddress + + // create actor that subscribes to the cluster eventBus to update current read view state + private val eventBusListener: ActorRef = { + cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case SeenChanged(convergence, seenBy) ⇒ state = state.copy(convergence = convergence, seenBy = seenBy) + case MembersChanged(members) ⇒ state = state.copy(members = members) + case UnreachableMembersChanged(unreachable) ⇒ state = state.copy(unreachable = unreachable) + case LeaderChanged(leader, convergence) ⇒ state = state.copy(leader = leader, convergence = convergence) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case _ ⇒ // ignore, not interesting + } + }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") + } + + def self: Member = { + state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). + getOrElse(Member(selfAddress, MemberStatus.Removed)) + } + + /** + * Returns true if the cluster node is up and running, false if it is shut down. + */ + def isRunning: Boolean = cluster.isRunning + + /** + * Current cluster members, sorted with leader first. + */ + def members: SortedSet[Member] = state.members + + /** + * Members that has been detected as unreachable. + */ + def unreachableMembers: Set[Member] = state.unreachable + + /** + * Member status for this node ([[akka.cluster.MemberStatus]]). + * + * NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state + * and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the + * model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`. + */ + def status: MemberStatus = self.status + + /** + * Is this node the leader? + */ + def isLeader: Boolean = leader == Some(selfAddress) + + /** + * Get the address of the current leader. + */ + def leader: Option[Address] = state.leader + + /** + * Is this node a singleton cluster? + */ + def isSingletonCluster: Boolean = members.size == 1 + + /** + * Checks if we have a cluster convergence. + */ + def convergence: Boolean = state.convergence + + /** + * Returns true if the node is UP or JOINING. + */ + def isAvailable: Boolean = { + val myself = self + !unreachableMembers.contains(myself) && !myself.status.isUnavailable + } + + /** + * INTERNAL API + * The nodes that has seen current version of the Gossip. + */ + private[cluster] def seenBy: Set[Address] = state.seenBy + + /** + * INTERNAL API + */ + private[cluster] def latestStats: ClusterStats = _latestStats + + /** + * Unsubscribe to cluster events. + */ + def close(): Unit = { + cluster.system.stop(eventBusListener) + } + +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index b8f5463529..544b48870a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -32,7 +32,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) - final val PublishStateInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-state-interval"), MILLISECONDS) + final val PublishStatsInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS) final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 643d0fd6fd..b975034c66 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -8,12 +8,17 @@ import akka.actor.Address import scala.collection.immutable.SortedSet import MemberStatus._ -object Gossip { +/** + * Internal API + */ +private[cluster] object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty } /** - * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - + * INTERNAL API + * + * Represents the state of the cluster; cluster ring membership, ring convergence - * all versioned by a vector clock. * * When a node is joining the `Member`, with status `Joining`, is added to `members`. @@ -43,10 +48,9 @@ object Gossip { * `Removed` by removing it from the `members` set and sending a `Removed` command to the * removed node telling it to shut itself down. */ -case class Gossip( +private[cluster] case class Gossip( overview: GossipOverview = GossipOverview(), members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address - meta: Map[String, Array[Byte]] = Map.empty, version: VectorClock = VectorClock()) // vector clock version extends ClusterMessage // is a serializable cluster message with Versioned[Gossip] { @@ -97,7 +101,16 @@ case class Gossip( } /** - * Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories. + * The nodes that have seen current version of the Gossip. + */ + def seenBy: Set[Address] = { + overview.seen.collect { + case (address, vclock) if vclock == version ⇒ address + }.toSet + } + + /** + * Merges two Gossip instances including membership tables, and the VectorClock histories. */ def merge(that: Gossip): Gossip = { import Member.ordering @@ -105,20 +118,17 @@ case class Gossip( // 1. merge vector clocks val mergedVClock = this.version merge that.version - // 2. merge meta-data - val mergedMeta = this.meta ++ that.meta - - // 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups + // 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) - // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, + // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - // 5. fresh seen table + // 4. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] - Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock) + Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedVClock) } /** @@ -151,8 +161,7 @@ case class Gossip( !hasUnreachable && allMembersInSeen && seenSame } - def isLeader(address: Address): Boolean = - members.nonEmpty && (address == members.head.address) + def isLeader(address: Address): Boolean = leader == Some(address) def leader: Option[Address] = members.headOption.map(_.address) @@ -178,15 +187,15 @@ case class Gossip( "Gossip(" + "overview = " + overview + ", members = [" + members.mkString(", ") + - "], meta = [" + meta.mkString(", ") + "], version = " + version + ")" } /** + * INTERNAL API * Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes. */ -case class GossipOverview( +private[cluster] case class GossipOverview( seen: Map[Address, VectorClock] = Map.empty, unreachable: Set[Member] = Set.empty) { @@ -200,13 +209,15 @@ case class GossipOverview( } /** + * INTERNAL API * Envelope adding a sender address to the gossip. */ -case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage +private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage /** + * INTERNAL API * When conflicting versions of received and local [[akka.cluster.Gossip]] is detected * it's forwarded to the leader for conflict resolution. */ -case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage +private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 8112aeab25..e0440394a7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec enterBarrier("down-third-node") awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) + clusterView.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 4c65e85054..82d90c81b5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -48,7 +48,7 @@ abstract class ClientDowningNodeThatIsUpSpec markNodeAsUnavailable(thirdAddress) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) - cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) + clusterView.members.exists(_.address == thirdAddress) must be(false) } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index f29bb97f49..1862b8ea40 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -65,15 +65,15 @@ abstract class ConvergenceSpec within(28 seconds) { // third becomes unreachable - awaitCond(cluster.latestGossip.overview.unreachable.size == 1) - awaitCond(cluster.latestGossip.members.size == 2) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(clusterView.unreachableMembers.size == 1) + awaitCond(clusterView.members.size == 2) + awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) awaitSeenSameState(first, second) // still one unreachable - cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(thirdAddress) + clusterView.unreachableMembers.size must be(1) + clusterView.unreachableMembers.head.address must be(thirdAddress) // and therefore no convergence - cluster.convergence.isDefined must be(false) + clusterView.convergence must be(false) } } @@ -88,18 +88,18 @@ abstract class ConvergenceSpec } def memberStatus(address: Address): Option[MemberStatus] = - cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } + clusterView.members.collectFirst { case m if m.address == address ⇒ m.status } def assertNotMovedUp: Unit = { within(20 seconds) { - awaitCond(cluster.latestGossip.members.size == 3) + awaitCond(clusterView.members.size == 3) awaitSeenSameState(first, second, fourth) memberStatus(first) must be(Some(MemberStatus.Up)) memberStatus(second) must be(Some(MemberStatus.Up)) // leader is not allowed to move the new node to Up memberStatus(fourth) must be(Some(MemberStatus.Joining)) // still no convergence - cluster.convergence.isDefined must be(false) + clusterView.convergence must be(false) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index f71ebe3cc3..10d98cd86b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -13,6 +13,7 @@ import scala.concurrent.util.duration._ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") val seed2 = role("seed2") + val seed3 = role("seed3") val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") @@ -25,6 +26,7 @@ class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPup class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -32,37 +34,24 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2) + override def seedNodes = IndexedSeq(seed1, seed2, seed3) "A cluster with configured seed nodes" must { - "start the seed nodes sequentially" taggedAs LongRunningTest in { - // without looking up the addresses first there might be - // [akka://JoinSeedNodeSpec/user/TestConductorClient] cannot write GetAddress(RoleName(seed2)) while waiting for seed1 - roles foreach address + "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { runOn(seed1) { - startClusterNode() + // test that first seed doesn't have to be started first + Thread.sleep(3000) } - enterBarrier("seed1-started") - runOn(seed2) { - startClusterNode() - } - enterBarrier("seed2-started") - - runOn(seed1, seed2) { - awaitUpConvergence(2) + runOn(seed1, seed2, seed3) { + awaitUpConvergence(3) } enterBarrier("after-1") } "join the seed nodes at startup" taggedAs LongRunningTest in { - - startClusterNode() - enterBarrier("all-started") - - awaitUpConvergence(4) - + awaitUpConvergence(roles.size) enterBarrier("after-2") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index 0d26d5de60..99dd2633a4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -16,6 +16,8 @@ import scala.concurrent.Await import scala.concurrent.util.Duration import java.util.concurrent.TimeUnit import akka.remote.testconductor.RoleName +import akka.actor.Props +import akka.actor.Actor object LargeClusterMultiJvmSpec extends MultiNodeConfig { // each jvm simulates a datacenter with many nodes @@ -38,7 +40,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { auto-join = off auto-down = on failure-detector.acceptable-heartbeat-pause = 10s - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.loglevel = INFO akka.actor.default-dispatcher.fork-join-executor { @@ -78,6 +80,7 @@ abstract class LargeClusterSpec with MultiNodeClusterSpec { import LargeClusterMultiJvmSpec._ + import ClusterEvent._ var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) val nodesPerDatacenter = system.settings.config.getInt( @@ -134,24 +137,25 @@ abstract class LargeClusterSpec val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) val startGossipCounts = Map.empty[Cluster, Long] ++ - clusterNodes.map(c ⇒ (c -> c.latestStats.receivedGossipCount)) + clusterNodes.map(c ⇒ (c -> c.readView.latestStats.receivedGossipCount)) def gossipCount(c: Cluster): Long = { - c.latestStats.receivedGossipCount - startGossipCounts(c) + c.readView.latestStats.receivedGossipCount - startGossipCounts(c) } val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" val latch = TestLatch(clusterNodes.size) clusterNodes foreach { c ⇒ - c.registerListener(new MembershipChangeListener { - override def notify(members: SortedSet[Member]): Unit = { - if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { - log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", - totalNodes, c.selfAddress, tookMillis, gossipCount(c)) - latch.countDown() - } + c.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", + totalNodes, c.selfAddress, tookMillis, gossipCount(c)) + latch.countDown() + } } - }) + })), classOf[MembersChanged]) } runOn(from) { @@ -160,7 +164,7 @@ abstract class LargeClusterSpec Await.ready(latch, remaining) - awaitCond(clusterNodes.forall(_.convergence.isDefined)) + awaitCond(clusterNodes.forall(_.readView.convergence)) val counts = clusterNodes.map(gossipCount(_)) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", @@ -262,24 +266,33 @@ abstract class LargeClusterSpec within(30.seconds + (3.seconds * liveNodes)) { val startGossipCounts = Map.empty[Cluster, Long] ++ - systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).latestStats.receivedGossipCount)) + systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount)) def gossipCount(c: Cluster): Long = { - c.latestStats.receivedGossipCount - startGossipCounts(c) + c.readView.latestStats.receivedGossipCount - startGossipCounts(c) } val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ - Cluster(sys).registerListener(new MembershipChangeListener { - override def notify(members: SortedSet[Member]): Unit = { - if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { - log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", - unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) - latch.countDown() - } + Cluster(sys).subscribe(sys.actorOf(Props(new Actor { + var gotExpectedLiveNodes = false + var gotExpectedUnreachableNodes = false + def receive = { + case MembersChanged(members) if !latch.isOpen ⇒ + gotExpectedLiveNodes = members.size == liveNodes + checkDone() + case UnreachableMembersChanged(unreachable) if !latch.isOpen ⇒ + gotExpectedUnreachableNodes = unreachable.size == unreachableNodes + checkDone() + case _ ⇒ // not interesting } - }) + def checkDone(): Unit = if (gotExpectedLiveNodes && gotExpectedUnreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } + })), classOf[ClusterDomainEvent]) } runOn(firstDatacenter) { @@ -290,8 +303,8 @@ abstract class LargeClusterSpec runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) - awaitCond(systems.forall(Cluster(_).convergence.isDefined)) - val mergeCount = systems.map(sys ⇒ Cluster(sys).latestStats.mergeCount).sum + awaitCond(systems.forall(Cluster(_).readView.convergence)) + val mergeCount = systems.map(sys ⇒ Cluster(sys).readView.latestStats.mergeCount).sum val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times", diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 9ed8f27ad4..1a657b3da8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -46,7 +46,7 @@ abstract class LeaderElectionSpec awaitClusterUp(first, second, third, fourth) if (myself != controller) { - cluster.isLeader must be(myself == sortedRoles.head) + clusterView.isLeader must be(myself == sortedRoles.head) assertLeaderIn(sortedRoles) } @@ -87,7 +87,7 @@ abstract class LeaderElectionSpec awaitUpConvergence(currentRoles.size - 1) val nextExpectedLeader = remainingRoles.head - cluster.isLeader must be(myself == nextExpectedLeader) + clusterView.isLeader must be(myself == nextExpectedLeader) assertLeaderIn(remainingRoles) enterBarrier("completed") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 9e45b1529b..6c16a9550a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -32,6 +34,7 @@ abstract class LeaderLeavingSpec with MultiNodeClusterSpec { import LeaderLeavingMultiJvmSpec._ + import ClusterEvent._ val leaderHandoffWaitingTime = 30.seconds @@ -41,11 +44,11 @@ abstract class LeaderLeavingSpec awaitClusterUp(first, second, third) - val oldLeaderAddress = cluster.leader + val oldLeaderAddress = clusterView.leader.get within(leaderHandoffWaitingTime) { - if (cluster.isLeader) { + if (clusterView.isLeader) { enterBarrier("registered-listener") @@ -53,28 +56,29 @@ abstract class LeaderLeavingSpec enterBarrier("leader-left") // verify that a NEW LEADER have taken over - awaitCond(!cluster.isLeader) + awaitCond(!clusterView.isLeader) // verify that the LEADER is shut down awaitCond(!cluster.isRunning) // verify that the LEADER is REMOVED - awaitCond(cluster.status == MemberStatus.Removed) + awaitCond(clusterView.status == MemberStatus.Removed) } else { val leavingLatch = TestLatch() val exitingLatch = TestLatch() val expectedAddresses = roles.toSet map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") enterBarrier("leader-left") @@ -86,13 +90,13 @@ abstract class LeaderLeavingSpec exitingLatch.await // verify that the LEADER is no longer part of the 'members' set - awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress)) + awaitCond(clusterView.members.forall(_.address != oldLeaderAddress)) // verify that the LEADER is not part of the 'unreachable' set - awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress)) + awaitCond(clusterView.unreachableMembers.forall(_.address != oldLeaderAddress)) // verify that we have a new LEADER - awaitCond(cluster.leader != oldLeaderAddress) + awaitCond(clusterView.leader != oldLeaderAddress) } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 3bf49a538b..62ff1d1e3e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -36,6 +38,7 @@ abstract class MembershipChangeListenerExitingSpec with MultiNodeClusterSpec { import MembershipChangeListenerExitingMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is EXITING" taggedAs LongRunningTest in { @@ -53,12 +56,13 @@ abstract class MembershipChangeListenerExitingSpec runOn(third) { val exitingLatch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Exiting)) - exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.size == 3 && members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Exiting)) + exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") exitingLatch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 441ecc4528..0eeee4334a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -17,7 +19,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP + .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP .withFallback(MultiNodeClusterSpec.clusterConfig))) } @@ -29,6 +31,7 @@ abstract class MembershipChangeListenerJoinSpec with MultiNodeClusterSpec { import MembershipChangeListenerJoinMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is JOINING" taggedAs LongRunningTest in { @@ -36,12 +39,13 @@ abstract class MembershipChangeListenerJoinSpec runOn(first) { val joinLatch = TestLatch() val expectedAddresses = Set(first, second) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) - joinLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining)) + joinLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") joinLatch.await diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index e6430314d4..69ef096613 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -19,7 +21,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-interval = 5 s + akka.clusterView.leader-actions-interval = 5 s akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) @@ -34,6 +36,7 @@ abstract class MembershipChangeListenerLeavingSpec with MultiNodeClusterSpec { import MembershipChangeListenerLeavingMultiJvmSpec._ + import ClusterEvent._ "A registered MembershipChangeListener" must { "be notified when new node is LEAVING" taggedAs LongRunningTest in { @@ -52,13 +55,14 @@ abstract class MembershipChangeListenerLeavingSpec runOn(third) { val latch = TestLatch() val expectedAddresses = Set(first, second, third) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Leaving)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == address(second) && m.status == MemberStatus.Leaving)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") latch.await } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 5638399b59..efb5ffd42f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -8,6 +8,8 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ +import akka.actor.Props +import akka.actor.Actor object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -26,6 +28,7 @@ abstract class MembershipChangeListenerUpSpec with MultiNodeClusterSpec { import MembershipChangeListenerUpMultiJvmSpec._ + import ClusterEvent._ "A set of connected cluster systems" must { @@ -36,12 +39,13 @@ abstract class MembershipChangeListenerUpSpec runOn(first, second) { val latch = TestLatch() val expectedAddresses = Set(first, second) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("listener-1-registered") cluster.join(first) latch.await @@ -58,12 +62,13 @@ abstract class MembershipChangeListenerUpSpec val latch = TestLatch() val expectedAddresses = Set(first, second, third) map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) - latch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up)) + latch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("listener-2-registered") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 2362da8aef..93c1f921ae 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -22,14 +22,14 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-join = off + auto-join = on auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms leader-actions-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms periodic-tasks-initial-delay = 300 ms - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.test { single-expect-default = 5 s @@ -97,6 +97,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } + def clusterView: ClusterReadView = cluster.readView + /** * Get the cluster node to use. */ @@ -106,11 +108,11 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu * Use this method for the initial startup of the cluster node. */ def startClusterNode(): Unit = { - if (cluster.latestGossip.members.isEmpty) { + if (clusterView.members.isEmpty) { cluster join myself - awaitCond(cluster.latestGossip.members.exists(_.address == address(myself))) + awaitCond(clusterView.members.exists(_.address == address(myself))) } else - cluster.self + clusterView.self } /** @@ -168,8 +170,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { nodesInCluster.length must not be (0) val expectedLeader = roleOfLeader(nodesInCluster) - cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) - cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) + clusterView.isLeader must be(ifNode(expectedLeader)(true)(false)) + clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) } /** @@ -181,25 +183,20 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], timeout: Duration = 20.seconds): Unit = { within(timeout) { - awaitCond(cluster.latestGossip.members.size == numberOfMembers) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence.isDefined) + awaitCond(clusterView.members.size == numberOfMembers) + awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) + awaitCond(clusterView.convergence) if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set awaitCond( - canNotBePartOfMemberRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address)))) + canNotBePartOfMemberRing forall (address ⇒ !(clusterView.members exists (_.address == address)))) } } /** * Wait until the specified nodes have seen the same gossip overview. */ - def awaitSeenSameState(addresses: Address*): Unit = { - awaitCond { - val seen = cluster.latestGossip.overview.seen - val seenVectorClocks = addresses.flatMap(seen.get(_)) - seenVectorClocks.size == addresses.size && seenVectorClocks.toSet.size == 1 - } - } + def awaitSeenSameState(addresses: Address*): Unit = + awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 1a35af6411..de21d714bb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -16,7 +16,7 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval + .withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval .withFallback(MultiNodeClusterSpec.clusterConfig))) } @@ -42,7 +42,7 @@ abstract class NodeJoinSpec cluster.join(first) } - awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) + awaitCond(clusterView.members.exists { member ⇒ member.address == address(second) && member.status == MemberStatus.Joining }) enterBarrier("after") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d5c374ba64..752857316b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -43,16 +43,16 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec runOn(first, third) { // verify that the 'second' node is no longer part of the 'members' set - awaitCond(cluster.latestGossip.members.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(clusterView.members.forall(_.address != address(second)), reaperWaitingTime) // verify that the 'second' node is not part of the 'unreachable' set - awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != address(second)), reaperWaitingTime) + awaitCond(clusterView.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime) } runOn(second) { // verify that the second node is shut down and has status REMOVED awaitCond(!cluster.isRunning, reaperWaitingTime) - awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime) + awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime) } enterBarrier("finished") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 19c81ecb28..f1c0f5e97f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Props +import akka.actor.Actor object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -32,6 +34,7 @@ abstract class NodeLeavingAndExitingSpec with MultiNodeClusterSpec { import NodeLeavingAndExitingMultiJvmSpec._ + import ClusterEvent._ "A node that is LEAVING a non-singleton cluster" must { @@ -44,15 +47,16 @@ abstract class NodeLeavingAndExitingSpec val leavingLatch = TestLatch() val exitingLatch = TestLatch() val expectedAddresses = roles.toSet map address - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - def check(status: MemberStatus): Boolean = - (members.map(_.address) == expectedAddresses && - members.exists(m ⇒ m.address == secondAddess && m.status == status)) - if (check(MemberStatus.Leaving)) leavingLatch.countDown() - if (check(MemberStatus.Exiting)) exitingLatch.countDown() + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + def check(status: MemberStatus): Boolean = + (members.map(_.address) == expectedAddresses && + members.exists(m ⇒ m.address == secondAddess && m.status == status)) + if (check(MemberStatus.Leaving)) leavingLatch.countDown() + if (check(MemberStatus.Exiting)) exitingLatch.countDown() } - }) + })), classOf[MembersChanged]) enterBarrier("registered-listener") runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 364edca08b..3df642afad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -38,12 +38,12 @@ abstract class NodeMembershipSpec runOn(first, second) { cluster.join(first) - awaitCond(cluster.latestGossip.members.size == 2) - assertMembers(cluster.latestGossip.members, first, second) + awaitCond(clusterView.members.size == 2) + assertMembers(clusterView.members, first, second) awaitCond { - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence.isDefined) + awaitCond(clusterView.convergence) } enterBarrier("after-1") @@ -55,12 +55,12 @@ abstract class NodeMembershipSpec cluster.join(first) } - awaitCond(cluster.latestGossip.members.size == 3) - assertMembers(cluster.latestGossip.members, first, second, third) + awaitCond(clusterView.members.size == 3) + assertMembers(clusterView.members, first, second, third) awaitCond { - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + clusterView.members.forall(_.status == MemberStatus.Up) } - awaitCond(cluster.convergence.isDefined) + awaitCond(clusterView.convergence) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index d4bdf2b748..8ee02ee197 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -11,6 +11,8 @@ import akka.testkit._ import scala.concurrent.util.duration._ import scala.collection.immutable.SortedSet import java.util.concurrent.atomic.AtomicReference +import akka.actor.Props +import akka.actor.Actor object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -27,6 +29,7 @@ abstract class NodeUpSpec with MultiNodeClusterSpec { import NodeUpMultiJvmSpec._ + import ClusterEvent._ "A cluster node that is joining another cluster" must { "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { @@ -39,12 +42,13 @@ abstract class NodeUpSpec "be unaffected when joining again" taggedAs LongRunningTest in { val unexpected = new AtomicReference[SortedSet[Member]](SortedSet.empty) - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) - unexpected.set(members) + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) + unexpected.set(members) } - }) + })), classOf[MembersChanged]) enterBarrier("listener-registered") runOn(second) { @@ -56,7 +60,7 @@ abstract class NodeUpSpec for (n ← 1 to 20) { Thread.sleep(100.millis.dilated.toMillis) unexpected.get must be(SortedSet.empty) - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true) + clusterView.members.forall(_.status == MemberStatus.Up) must be(true) } enterBarrier("after-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index dddeac8816..d044d90f72 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -42,14 +42,14 @@ abstract class SingletonClusterSpec "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { startClusterNode() awaitUpConvergence(1) - cluster.isSingletonCluster must be(true) + clusterView.isSingletonCluster must be(true) enterBarrier("after-1") } "not be singleton cluster when joined with other node" taggedAs LongRunningTest in { awaitClusterUp(first, second) - cluster.isSingletonCluster must be(false) + clusterView.isSingletonCluster must be(false) assertLeader(first, second) enterBarrier("after-2") @@ -63,7 +63,7 @@ abstract class SingletonClusterSpec markNodeAsUnavailable(secondAddress) awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) - cluster.isSingletonCluster must be(true) + clusterView.isSingletonCluster must be(true) assertLeader(first) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index c2c517fd16..d030734a71 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -78,10 +78,10 @@ abstract class SplitBrainSpec } runOn(side1: _*) { - awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds) } runOn(side2: _*) { - awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds) } enterBarrier("after-2") @@ -91,16 +91,16 @@ abstract class SplitBrainSpec runOn(side1: _*) { // auto-down = on - awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address) awaitUpConvergence(side1.size, side2 map address) assertLeader(side1: _*) } runOn(side2: _*) { // auto-down = on - awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) - cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address) awaitUpConvergence(side2.size, side1 map address) assertLeader(side2: _*) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 0937d1a8bf..1bbc890c11 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -11,6 +11,8 @@ import akka.testkit._ import scala.concurrent.util.duration._ import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.SortedSet +import akka.actor.Props +import akka.actor.Actor object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -40,6 +42,7 @@ abstract class SunnyWeatherSpec with MultiNodeClusterSpec { import SunnyWeatherMultiJvmSpec._ + import ClusterEvent._ "A normal cluster" must { "be healthy" taggedAs LongRunningTest in { @@ -55,12 +58,13 @@ abstract class SunnyWeatherSpec log.info("5 joined") val unexpected = new AtomicReference[SortedSet[Member]] - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - // we don't expected any changes to the cluster - unexpected.set(members) + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case MembersChanged(members) ⇒ + // we don't expected any changes to the cluster + unexpected.set(members) } - }) + })), classOf[MembersChanged]) for (n ← 1 to 30) { enterBarrier("period-" + n) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index c86622ce8b..0d5e60b7ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -42,23 +42,18 @@ abstract class TransitionSpec def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail def memberStatus(address: Address): MemberStatus = { - val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst { + val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } statusOption must not be (None) statusOption.get } - def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address) + def memberAddresses: Set[Address] = clusterView.members.map(_.address) def members: Set[RoleName] = memberAddresses.flatMap(roleName(_)) - def seenLatestGossip: Set[RoleName] = { - val gossip = cluster.latestGossip - gossip.overview.seen.collect { - case (address, v) if v == gossip.version ⇒ roleName(address) - }.flatten.toSet - } + def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName def awaitSeen(addresses: Address*): Unit = awaitCond { (seenLatestGossip map address) == addresses.toSet @@ -95,9 +90,11 @@ abstract class TransitionSpec def gossipTo(toRole: RoleName): Unit = { gossipBarrierCounter += 1 runOn(toRole) { - val g = cluster.latestGossip + val oldCount = clusterView.latestStats.receivedGossipCount enterBarrier("before-gossip-" + gossipBarrierCounter) - awaitCond(cluster.latestGossip != g) // received gossip + awaitCond { + clusterView.latestStats.receivedGossipCount != oldCount // received gossip + } // gossip chat will synchronize the views awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) @@ -123,11 +120,11 @@ abstract class TransitionSpec runOn(first) { startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) + clusterView.isSingletonCluster must be(true) + clusterView.status must be(Joining) + clusterView.convergence must be(true) leaderActions() - cluster.status must be(Up) + clusterView.status must be(Up) } enterBarrier("after-1") @@ -144,7 +141,7 @@ abstract class TransitionSpec memberStatus(first) must be(Up) memberStatus(second) must be(Joining) awaitCond(seenLatestGossip == Set(first, second)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } enterBarrier("convergence-joining-2") @@ -161,7 +158,7 @@ abstract class TransitionSpec awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) memberStatus(first) must be(Up) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } enterBarrier("after-2") @@ -177,7 +174,7 @@ abstract class TransitionSpec awaitMembers(first, second, third) memberStatus(third) must be(Joining) awaitCond(seenLatestGossip == Set(second, third)) - cluster.convergence.isDefined must be(false) + clusterView.convergence must be(false) } enterBarrier("third-joined-second") @@ -188,7 +185,7 @@ abstract class TransitionSpec memberStatus(third) must be(Joining) awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } first gossipTo third @@ -198,7 +195,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Joining) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } enterBarrier("convergence-joining-3") @@ -216,7 +213,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(leader(first, second, third), myself)) - cluster.convergence.isDefined must be(false) + clusterView.convergence must be(false) } // first non-leader gossipTo the other non-leader @@ -228,7 +225,7 @@ abstract class TransitionSpec runOn(nonLeader(first, second, third).tail.head) { memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } // first non-leader gossipTo the leader @@ -238,7 +235,7 @@ abstract class TransitionSpec memberStatus(second) must be(Up) memberStatus(third) must be(Up) seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } enterBarrier("after-3") @@ -248,7 +245,7 @@ abstract class TransitionSpec runOn(third) { markNodeAsUnavailable(second) reapUnreachable() - cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) + clusterView.unreachableMembers must contain(Member(second, Up)) seenLatestGossip must be(Set(third)) } @@ -257,8 +254,8 @@ abstract class TransitionSpec third gossipTo first runOn(first, third) { - cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) - cluster.convergence.isDefined must be(false) + clusterView.unreachableMembers must contain(Member(second, Up)) + clusterView.convergence must be(false) } runOn(first) { @@ -271,10 +268,10 @@ abstract class TransitionSpec first gossipTo third runOn(first, third) { - cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) + clusterView.unreachableMembers must contain(Member(second, Down)) memberStatus(second) must be(Down) seenLatestGossip must be(Set(first, third)) - cluster.convergence.isDefined must be(true) + clusterView.convergence must be(true) } enterBarrier("after-6") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 0ef9dd603f..ca5c8f3265 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -80,13 +80,13 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes all alone awaitCond({ - val gossip = cluster.latestGossip - gossip.overview.unreachable.size == (roles.size - 1) && - gossip.members.size == 1 && - gossip.members.forall(_.status == MemberStatus.Up) + val members = clusterView.members + clusterView.unreachableMembers.size == (roles.size - 1) && + members.size == 1 && + members.forall(_.status == MemberStatus.Up) }) - cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet) - cluster.convergence.isDefined must be(false) + clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) + clusterView.convergence must be(false) } } @@ -95,17 +95,17 @@ abstract class UnreachableNodeRejoinsClusterSpec within(30 seconds) { // victim becomes unreachable awaitCond({ - val gossip = cluster.latestGossip - gossip.overview.unreachable.size == 1 && - gossip.members.size == (roles.size - 1) && - gossip.members.forall(_.status == MemberStatus.Up) + val members = clusterView.members + clusterView.unreachableMembers.size == 1 && + members.size == (roles.size - 1) && + members.forall(_.status == MemberStatus.Up) }) awaitSeenSameState(allButVictim map address: _*) // still one unreachable - cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address) + clusterView.unreachableMembers.size must be(1) + clusterView.unreachableMembers.head.address must be(node(victim).address) // and therefore no convergence - cluster.convergence.isDefined must be(false) + clusterView.convergence must be(false) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 8d420dc021..2d7565f5f5 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -31,7 +31,7 @@ class ClusterConfigSpec extends AkkaSpec { HeartbeatInterval must be(1 second) LeaderActionsInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second) - PublishStateInterval must be(1 second) + PublishStatsInterval must be(10 second) JoinTimeout must be(60 seconds) AutoJoin must be(true) AutoDown must be(false) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index dd3fe83de9..73364b853e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -25,7 +25,7 @@ object ClusterSpec { auto-join = off auto-down = off periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks - publish-state-interval = 0 s # always, when it happens + publish-stats-interval = 0 s # always, when it happens } akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote.netty.port = 0 @@ -44,6 +44,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val failureDetector = new FailureDetectorPuppet(system) val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + def clusterView = cluster.readView def leaderActions(): Unit = { cluster.clusterCore ! LeaderActionsTick @@ -70,15 +71,16 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } "initially become singleton cluster when joining itself and reach convergence" in { - cluster.isSingletonCluster must be(false) // auto-join = off + clusterView.members.size must be(0) // auto-join = off cluster.join(selfAddress) - awaitCond(cluster.isSingletonCluster) - cluster.self.address must be(selfAddress) - cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) - cluster.status must be(MemberStatus.Joining) - cluster.convergence.isDefined must be(true) + Thread.sleep(5000) + awaitCond(clusterView.isSingletonCluster) + clusterView.self.address must be(selfAddress) + clusterView.members.map(_.address) must be(Set(selfAddress)) + clusterView.status must be(MemberStatus.Joining) + clusterView.convergence must be(true) leaderActions() - cluster.status must be(MemberStatus.Up) + clusterView.status must be(MemberStatus.Up) } } diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst new file mode 100644 index 0000000000..b258b8eb8e --- /dev/null +++ b/akka-docs/cluster/cluster-usage.rst @@ -0,0 +1,84 @@ + +.. _cluster_usage: + +######### + Cluster +######### + +.. note:: *This document describes how to use the features implemented so far of the +new clustering coming in Akka Coltrane and is not available in the latest stable release. +The API might change before it is released. + +For introduction to the Akka Cluster concepts please see + +Preparing your ActorSystem for Clustering +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" % "akka-cluster" % "2.1-SNAPSHOT" + +It can be difficult to find the correct versions and repositories at the moment. The following sbt 0.11.3 build +file illustrates what to use with Scala 2.10.0-M6 and Akka 2.1-SNAPSHOT + + import sbt._ + import sbt.Keys._ + + object ProjectBuild extends Build { + + lazy val root = Project( + id = "root", + base = file("."), + settings = Project.defaultSettings ++ Seq( + name := "Akka Cluster Example", + organization := "org.test", + version := "0.1-SNAPSHOT", + scalaVersion := "2.10.0-M6", + + resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/", + resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", + resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases", + resolvers += "Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/", + + + libraryDependencies ++= Seq( + "com.typesafe.akka" % "akka-cluster" % "2.1-20120816-000904", + "com.typesafe.akka" % "akka-testkit" % "2.1-20120816-000904" % "test", + "junit" % "junit" % "4.5" % "test", + "org.scalatest" %% "scalatest" % "1.9-2.10.0-M6-B2" % "test") + ) + ) + } + +Pick a timestamped Akka version from ``_. + +To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala` +settings and the ``cluster seed-nodes`` to your ``application.conf`` file: + +.. literalinclude:: ../../akka-samples/akka-sample-remote/src/main/resources/common.conf + :language: none + +The seed nodes are configured contact points for inital join of the cluster. +When a new node is started started it sends a message to all seed nodes and +then sends join command to the one that answers first. + +A Simple Cluster Example +^^^^^^^^^^^^^^^^^^^^^^^^ + + + + +Configuration +^^^^^^^^^^^^^ + +There are lots of more properties that are related to clustering in Akka. We refer to the following +reference file for more information: + + +.. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf + :language: none + + + + + diff --git a/akka-docs/cluster/index.rst b/akka-docs/cluster/index.rst index 35c4b2250a..dac3a558d9 100644 --- a/akka-docs/cluster/index.rst +++ b/akka-docs/cluster/index.rst @@ -5,3 +5,4 @@ Cluster :maxdepth: 2 cluster + cluster-usage diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index 82a736973f..4aa62a63be 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -43,6 +43,11 @@ As you can see in the example above there are four things you need to add to get communicate across the network. * Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically +.. note:: + The port number needs to be unique for each actor system on the same machine even if the actor + systems have different names. This is because each actor system has its own network subsystem + listening for connections and handling messages as not to interfere with other actor systems. + The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information: diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index ab49765fad..a165272ddb 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -40,6 +40,11 @@ As you can see in the example above there are four things you need to add to get communicate across the network. * Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically +.. note:: + The port number needs to be unique for each actor system on the same machine even if the actor + systems have different names. This is because each actor system has its own network subsystem + listening for connections and handling messages as not to interfere with other actor systems. + The example above only illustrates the bare minimum of properties you have to add to enable remoting. There are lots of more properties that are related to remoting in Akka. We refer to the following reference file for more information: diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 7ae3219c5d..3064f05ed2 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -107,6 +107,7 @@ akka { # (I) The default remote server port clients should connect to. # Default is 2552 (AKKA), use 0 if you want a random available port + # This port needs to be unique for each actor system on the same machine. port = 2552 # (O) The address of a local network interface (IP Address) to bind to when creating diff --git a/akka-samples/akka-sample-cluster/README.rst b/akka-samples/akka-sample-cluster/README.rst new file mode 100644 index 0000000000..5f9a9fb6c6 --- /dev/null +++ b/akka-samples/akka-sample-cluster/README.rst @@ -0,0 +1,143 @@ +REMOTE CALCULATOR +================= + +Requirements +------------ + +To build and run remote calculator you need [Simple Build Tool][sbt] (sbt). + +The Sample Explained +-------------------- + +In order to showcase the remote capabilities of Akka 2.0 we thought a remote calculator could do the trick. + +There are two implementations of the sample; one in Scala and one in Java. +The explanation below is for Scala, but everything is similar in Java except that the class names begin with a ``J``, +e.g. ``JCalcApp`` instead of ``CalcApp``, and that the Java classes reside in another package structure. + +There are three actor systems used in the sample: + +* CalculatorApplication : the actor system performing the number crunching +* LookupApplication : illustrates how to look up an actor on a remote node and and how communicate with that actor +* CreationApplication : illustrates how to create an actor on a remote node and how to communicate with that actor + +The CalculatorApplication contains an actor, SimpleCalculatorActor, which can handle simple math operations such as +addition and subtraction. This actor is looked up and used from the LookupApplication. + +The CreationApplication wants to use more "advanced" mathematical operations, such as multiplication and division, +but as the CalculatorApplication does not have any actor that can perform those type of calculations the +CreationApplication has to remote deploy an actor that can (which in our case is AdvancedCalculatorActor). +So this actor is deployed, over the network, onto the CalculatorApplication actor system and thereafter the +CreationApplication will send messages to it. + +It is important to point out that as the actor system run on different ports it is possible to run all three in parallel. +See the next section for more information of how to run the sample application. + +Running +------- + +In order to run all three actor systems you have to start SBT in three different terminal windows. + +We start off by running the CalculatorApplication: + +First type 'sbt' to start SBT interactively, the run 'update' and 'run': +> cd $AKKA_HOME + +> sbt + +> project akka-sample-remote + +> run + +Select to run "sample.remote.calculator.CalcApp" which in the case below is number 3: + + Multiple main classes detected, select one to run: + + [1] sample.remote.calculator.LookupApp + [2] sample.remote.calculator.CreationApp + [3] sample.remote.calculator.CalcApp + + Enter number: 3 + +You should see something similar to this:: + + [info] Running sample.remote.calculator.CalcApp + [INFO] [12/22/2011 14:21:51.631] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://CalculatorApplication@127.0.0.1:2552 + [INFO] [12/22/2011 14:21:51.632] [run-main] [Remote] Starting remote server on [akka://CalculatorApplication@127.0.0.1:2552] + Started Calculator Application - waiting for messages + [INFO] [12/22/2011 14:22:39.894] [New I/O server worker #1-1] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2553 + +Open up a new terminal window and run SBT once more: + +> sbt + +> project akka-sample-remote + +> run + +Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1:: + + Multiple main classes detected, select one to run: + + [1] sample.remote.calculator.LookupApp + [2] sample.remote.calculator.CreationApp + [3] sample.remote.calculator.CalcApp + + Enter number: 1 + +Now you should see something like this:: + + [info] Running sample.remote.calculator.LookupApp + [INFO] [12/22/2011 14:54:38.630] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://LookupApplication@127.0.0.1:2553 + [INFO] [12/22/2011 14:54:38.632] [run-main] [Remote] Starting remote server on [akka://LookupApplication@127.0.0.1:2553] + Started Lookup Application + [INFO] [12/22/2011 14:54:38.801] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552 + Sub result: 4 - 30 = -26 + Add result: 17 + 1 = 18 + Add result: 37 + 43 = 80 + Add result: 68 + 66 = 134 + +Congrats! You have now successfully looked up a remote actor and communicated with it. +The next step is to have an actor deployed on a remote note. +Once more you should open a new terminal window and run SBT: + +> sbt + +> project akka-sample-remote + +> run + +Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2:: + + Multiple main classes detected, select one to run: + + [1] sample.remote.calculator.LookupApp + [2] sample.remote.calculator.CreationApp + [3] sample.remote.calculator.CalcApp + + Enter number: 2 + +Now you should see something like this:: + + [info] Running sample.remote.calculator.CreationApp + [INFO] [12/22/2011 14:57:02.150] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://RemoteCreation@127.0.0.1:2554 + [INFO] [12/22/2011 14:57:02.151] [run-main] [Remote] Starting remote server on [akka://RemoteCreation@127.0.0.1:2554] + [INFO] [12/22/2011 14:57:02.267] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552 + Started Creation Application + Mul result: 14 * 17 = 238 + Div result: 3764 / 80 = 47.00 + Mul result: 16 * 5 = 80 + Mul result: 1 * 18 = 18 + Mul result: 8 * 13 = 104 + +That's it! + +Notice +------ + +The sample application is just that, i.e. a sample. Parts of it are not the way you would do a "real" application. +Some improvements are to remove all hard coded addresses from the code as they reduce the flexibility of how and +where the application can be run. We leave this to the astute reader to refine the sample into a real-world app. + +* `Akka `_ +* `SBT `_ diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf new file mode 100644 index 0000000000..779b3825cd --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -0,0 +1,18 @@ +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + transport = "akka.remote.netty.NettyRemoteTransport" + netty { + hostname = "127.0.0.1" + port = 0 + } + } + + extensions = ["akka.cluster.Cluster$"] + + cluster { + seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"] + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala new file mode 100644 index 0000000000..e61af1db6a --- /dev/null +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/ClusterApp.scala @@ -0,0 +1,28 @@ +package sample.cluster + +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ + +object ClusterApp { + + def main(args: Array[String]): Unit = { + + if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0)) + + // Create an Akka system + val system = ActorSystem("ClusterSystem") + val clusterListener = system.actorOf(Props(new Actor { + def receive = { + case state: CurrentClusterState ⇒ + println("Current members: " + state.members) + case MembersChanged(members) ⇒ + println("Current members: " + members) + + } + })) + + Cluster(system).subscribe(clusterListener, classOf[MembersChanged]) + } + +} \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 9c34f0f68d..47f58bdce6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -282,7 +282,7 @@ object AkkaBuild extends Build { id = "akka-samples", base = file("akka-samples"), settings = parentSettings, - aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample) + aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, clusterSample) ) lazy val camelSample = Project( @@ -322,6 +322,13 @@ object AkkaBuild extends Build { settings = defaultSettings ) + lazy val clusterSample = Project( + id = "akka-sample-cluster", + base = file("akka-samples/akka-sample-cluster"), + dependencies = Seq(cluster), + settings = defaultSettings + ) + lazy val tutorials = Project( id = "akka-tutorials", base = file("akka-tutorials"),