Merge branch 'master' into wip-2385-revise-handling-of-IE-ban

This commit is contained in:
Björn Antonsson 2012-08-17 11:11:58 +02:00
commit 2ce8c49e6d
40 changed files with 940 additions and 434 deletions

View file

@ -40,9 +40,9 @@ akka {
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
unreachable-nodes-reaper-interval = 1s
# How often the current state (Gossip) should be published for reading from the outside.
# A value of 0 s can be used to always publish the state, when it happens.
publish-state-interval = 1s
# How often the current internal stats should be published.
# A value of 0 s can be used to always publish the stats, when it happens.
publish-stats-interval = 10s
# A joining node stops sending heartbeats to the node to join if it hasn't become member
# of the cluster within this deadline.

View file

@ -69,13 +69,9 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
* All state is represented by this immutable case class and managed by an AtomicReference.
*/
private case class State(memberMembershipChangeListeners: Set[MembershipChangeListener] = Set.empty)
import ClusterEvent._
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
@ -92,23 +88,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
log.info("Cluster Node [{}] - is starting up...", selfAddress)
private val state = new AtomicReference[State](State())
/**
* Read only view of cluster state, updated periodically by
* ClusterCoreDaemon. Access with `latestGossip`.
*/
@volatile
private[cluster] var _latestGossip: Gossip = Gossip()
/**
* INTERNAL API
* Read only view of internal cluster stats, updated periodically by
* ClusterCoreDaemon. Access with `latestStats`.
*/
@volatile
private[cluster] var _latestStats = ClusterStats()
// ========================================================
// ===================== WORK DAEMONS =====================
// ========================================================
@ -175,6 +154,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration)
}
@volatile
private var readViewStarted = false
private[cluster] lazy val readView: ClusterReadView = {
val readView = new ClusterReadView(this)
readViewStarted = true
readView
}
system.registerOnTermination(shutdown())
private val clusterJmx = new ClusterJmx(this, log)
@ -186,87 +173,25 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== PUBLIC API =====================
// ======================================================
def self: Member = latestGossip.member(selfAddress)
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = _isRunning.get
/**
* Latest gossip.
* Subscribe to cluster domain events.
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclass. A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will also be sent to the subscriber.
*/
def latestGossip: Gossip = _latestGossip
def subscribe(subscriber: ActorRef, to: Class[_]): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, to)
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
* Unsubscribe to cluster domain events.
*/
def status: MemberStatus = self.status
/**
* Is this node the leader?
*/
def isLeader: Boolean = latestGossip.isLeader(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Address = latestGossip.leader match {
case Some(x) x
case None throw new IllegalStateException("There is no leader in this cluster")
}
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = latestGossip.isSingletonCluster
/**
* Checks if we have a cluster convergence.
*
* @return Some(convergedGossip) if convergence have been reached and None if not
*/
def convergence: Option[Gossip] = latestGossip match {
case gossip if gossip.convergence Some(gossip)
case _ None
}
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = latestGossip.isAvailable(selfAddress)
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* Registers a listener to subscribe to cluster membership changes.
*/
@tailrec
final def registerListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners + listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) registerListener(listener) // recur
}
/**
* Unsubscribes to cluster membership changes.
*/
@tailrec
final def unregisterListener(listener: MembershipChangeListener): Unit = {
val localState = state.get
val newListeners = localState.memberMembershipChangeListeners - listener
val newState = localState copy (memberMembershipChangeListeners = newListeners)
if (!state.compareAndSet(localState, newState)) unregisterListener(listener) // recur
}
def unsubscribe(subscriber: ActorRef): Unit =
clusterCore ! InternalClusterAction.Unsubscribe(subscriber)
/**
* Try to join this cluster node with the node specified by 'address'.
@ -291,6 +216,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// ===================== INTERNAL API =====================
// ========================================================
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes
/**
* INTERNAL API.
*
@ -303,10 +234,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (_isRunning.compareAndSet(true, false)) {
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", selfAddress)
// FIXME isTerminated check can be removed when ticket #2221 is fixed
// now it prevents logging if system is shutdown (or in progress of shutdown)
if (!clusterDaemons.isTerminated)
system.stop(clusterDaemons)
system.stop(clusterDaemons)
if (readViewStarted) readView.close()
scheduler.close()
@ -316,41 +245,5 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
}
}
/**
* INTERNAL API
*/
private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit = {
// FIXME run callbacks async (to not block the cluster)
state.get.memberMembershipChangeListeners foreach { _ notify members }
}
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
/**
* INTERNAL API
*/
private[cluster] def publishLatestGossip(gossip: Gossip): Unit = _latestGossip = gossip
/**
* INTERNAL API
*/
private[cluster] def publishLatestStats(stats: ClusterStats): Unit = _latestStats = stats
}
/**
* Interface for membership change listener.
*/
trait MembershipChangeListener {
def notify(members: SortedSet[Member]): Unit
}
/**
* Interface for meta data change listener.
*/
trait MetaDataChangeListener {
def notify(meta: Map[String, Array[Byte]]): Unit
}

View file

@ -6,12 +6,13 @@ package akka.cluster
import scala.collection.immutable.SortedSet
import scala.concurrent.util.{ Deadline, Duration }
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, RootActorPath, PoisonPill, Scheduler }
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, PoisonPill, Scheduler }
import akka.actor.Status.Failure
import akka.routing.ScatterGatherFirstCompletedRouter
import akka.event.EventStream
import akka.util.Timeout
import akka.pattern.{ AskTimeoutException, ask, pipe }
import MemberStatus._
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import language.existentials
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -72,15 +73,20 @@ private[cluster] object InternalClusterAction {
*/
case class InitJoinAck(address: Address) extends ClusterMessage
case object GossipTick
/**
* Marker interface for periodic tick messages
*/
sealed trait Tick
case object HeartbeatTick
case object GossipTick extends Tick
case object ReapUnreachableTick
case object HeartbeatTick extends Tick
case object LeaderActionsTick
case object ReapUnreachableTick extends Tick
case object PublishStateTick
case object LeaderActionsTick extends Tick
case object PublishStatsTick extends Tick
case class SendClusterMessage(to: Address, msg: ClusterMessage)
@ -88,6 +94,9 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef
case class Subscribe(subscriber: ActorRef, to: Class[_])
case class Unsubscribe(subscriber: ActorRef)
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
@ -124,9 +133,6 @@ private[cluster] trait ClusterEnvironment {
private[cluster] def selfAddress: Address
private[cluster] def scheduler: Scheduler
private[cluster] def seedNodes: IndexedSeq[Address]
private[cluster] def notifyMembershipChangeListeners(members: SortedSet[Member]): Unit
private[cluster] def publishLatestGossip(gossip: Gossip): Unit
private[cluster] def publishLatestStats(stats: ClusterStats): Unit
private[cluster] def shutdown(): Unit
}
@ -206,13 +212,20 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
// start periodic publish of current state
private val publishStateTask: Option[Cancellable] =
if (PublishStateInterval == Duration.Zero) None
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStateInterval), PublishStateInterval) {
self ! PublishStateTick
if (PublishStatsInterval == Duration.Zero) None
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
self ! PublishStatsTick
})
override def preStart(): Unit = {
if (AutoJoin) self ! InternalClusterAction.JoinSeedNode
if (AutoJoin) {
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
else
context.actorOf(Props(new JoinSeedNodeProcess(environment)).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
}
}
override def postStop(): Unit = {
@ -223,18 +236,23 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
publishStateTask foreach { _.cancel() }
}
def receive = {
def uninitialized: Actor.Receive = {
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case _: Tick // ignore periodic tasks until initialized
}
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case HeartbeatTick heartbeat()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
case PublishStateTick publishState()
case JoinSeedNode joinSeedNode()
case PublishStatsTick publishInternalStats()
case InitJoin initJoin()
case InitJoinAck(address) join(address)
case Failure(e: AskTimeoutException) joinSeedNodeTimeout()
case JoinTo(address) join(address)
case ClusterUserAction.Join(address) joining(address)
case ClusterUserAction.Down(address) downing(address)
@ -242,25 +260,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
case Exit(address) exiting(address)
case Remove(address) removing(address)
case SendGossipTo(address) gossipTo(address)
case Subscribe(subscriber, to) subscribe(subscriber, to)
case Unsubscribe(subscriber) unsubscribe(subscriber)
case p: Ping ping(p)
}
def joinSeedNode(): Unit = {
val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress self.path.toStringWithAddress(a) }
if (seedRoutees.isEmpty) join(selfAddress)
else {
implicit val within = Timeout(SeedNodeTimeout)
val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration)))
seedRouter ! InitJoin
seedRouter ! PoisonPill
}
}
def receive = uninitialized
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodeTimeout(): Unit = join(selfAddress)
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
@ -274,9 +283,13 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
notifyListeners(localGossip)
publish(localGossip)
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
context.become(initialized)
if (address == selfAddress)
joining(address)
else
coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress))
}
/**
@ -316,7 +329,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
gossipTo(node)
}
notifyListeners(localGossip)
publish(localGossip)
}
}
@ -335,7 +348,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
latestGossip = seenVersionedGossip
log.info("Cluster Node [{}] - Marked address [{}] as LEAVING", selfAddress, address)
notifyListeners(localGossip)
publish(localGossip)
}
}
@ -362,7 +375,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
// just cleaning up the gossip state
latestGossip = Gossip()
// make sure the final (removed) state is always published
notifyListeners(localGossip)
publish(localGossip)
environment.shutdown()
}
@ -413,7 +426,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
val versionedGossip = newGossip :+ vclockNode
latestGossip = versionedGossip seen selfAddress
notifyListeners(localGossip)
publish(localGossip)
}
/**
@ -507,7 +520,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
stats = stats.incrementReceivedGossipCount
notifyListeners(localGossip)
publish(localGossip)
if (envelope.conversation &&
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
@ -709,7 +722,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", selfAddress, member.address)
}
notifyListeners(localGossip)
publish(localGossip)
}
}
}
@ -763,7 +776,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
log.error("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", selfAddress, newlyDetectedUnreachableMembers.mkString(", "))
notifyListeners(localGossip)
publish(localGossip)
}
}
}
@ -803,23 +816,115 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg)
def notifyListeners(oldGossip: Gossip): Unit = {
if (PublishStateInterval == Duration.Zero) publishState()
val oldMembersStatus = oldGossip.members.map(m (m.address, m.status))
val newMembersStatus = latestGossip.members.map(m (m.address, m.status))
if (newMembersStatus != oldMembersStatus)
environment notifyMembershipChangeListeners latestGossip.members
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
subscriber ! CurrentClusterState(
members = latestGossip.members,
unreachable = latestGossip.overview.unreachable,
convergence = latestGossip.convergence,
seenBy = latestGossip.seenBy,
leader = latestGossip.leader)
eventStream.subscribe(subscriber, to)
}
def publishState(): Unit = {
environment.publishLatestGossip(latestGossip)
environment.publishLatestStats(stats)
def unsubscribe(subscriber: ActorRef): Unit =
eventStream.unsubscribe(subscriber)
def publish(oldGossip: Gossip): Unit = {
publishMembers(oldGossip)
publishUnreachableMembers(oldGossip)
publishLeader(oldGossip)
publishSeen(oldGossip)
if (PublishStatsInterval == Duration.Zero) publishInternalStats()
}
def publishMembers(oldGossip: Gossip): Unit = {
if (!isSame(oldGossip.members, latestGossip.members))
eventStream publish MembersChanged(latestGossip.members)
}
def publishUnreachableMembers(oldGossip: Gossip): Unit = {
if (!isSame(oldGossip.overview.unreachable, latestGossip.overview.unreachable))
eventStream publish UnreachableMembersChanged(latestGossip.overview.unreachable)
}
def isSame(oldMembers: Set[Member], newMembers: Set[Member]): Boolean = {
def oldMembersStatus = oldMembers.map(m (m.address, m.status))
def newMembersStatus = newMembers.map(m (m.address, m.status))
(newMembers eq oldMembers) || ((newMembers.size == oldMembers.size) && (newMembersStatus == oldMembersStatus))
}
def publishLeader(oldGossip: Gossip): Unit = {
if (latestGossip.leader != oldGossip.leader || latestGossip.convergence != oldGossip.convergence)
eventStream publish LeaderChanged(latestGossip.leader, latestGossip.convergence)
}
def publishSeen(oldGossip: Gossip): Unit = {
val oldConvergence = oldGossip.convergence
val newConvergence = latestGossip.convergence
val oldSeenBy = oldGossip.seenBy
val newSeenBy = latestGossip.seenBy
if (newConvergence != oldConvergence || newSeenBy != oldSeenBy) {
eventStream publish SeenChanged(newConvergence, newSeenBy)
}
}
def publishInternalStats(): Unit = {
eventStream publish CurrentInternalStats(stats)
}
def eventStream: EventStream = context.system.eventStream
def ping(p: Ping): Unit = sender ! Pong(p)
}
/**
* INTERNAL API.
*
* Sends InitJoinAck to all seed nodes (except itself) and expect
* InitJoinAck reply back. The seed node that replied first
* will be used, joined to. InitJoinAck replies received after the
* first one are ignored.
*
* Retries if no InitJoinAck replies are received within the
* SeedNodeTimeout.
* When at least one reply has been received it stops itself after
* an idle SeedNodeTimeout.
*
*/
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
import InternalClusterAction._
def selfAddress = environment.selfAddress
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
throw new IllegalArgumentException("Join seed node should not be done")
context.setReceiveTimeout(environment.settings.SeedNodeTimeout)
override def preStart(): Unit = self ! JoinSeedNode
def receive = {
case JoinSeedNode
// send InitJoin to all seed nodes (except myself)
environment.seedNodes.collect {
case a if a != selfAddress context.system.actorFor(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin }
case InitJoinAck(address)
// first InitJoinAck reply
context.parent ! JoinTo(address)
context.become(done)
case ReceiveTimeout
// no InitJoinAck received, try again
self ! JoinSeedNode
}
def done: Actor.Receive = {
case InitJoinAck(_) // already received one, skip rest
case ReceiveTimeout context.stop(self)
}
}
/**
* INTERNAL API.
*/
@ -839,6 +944,53 @@ private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Act
}
}
/**
* Domain events published to the event bus.
*/
object ClusterEvent {
/**
* Marker interface for cluster domain events.
*/
sealed trait ClusterDomainEvent
/**
* Current snapshot state of the cluster. Sent to new subscriber.
*/
case class CurrentClusterState(
members: SortedSet[Member] = SortedSet.empty,
unreachable: Set[Member] = Set.empty,
convergence: Boolean = false,
seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None) extends ClusterDomainEvent
/**
* Set of cluster members or their status have changed.
*/
case class MembersChanged(members: SortedSet[Member]) extends ClusterDomainEvent
/**
* Set of unreachable cluster members or their status have changed.
*/
case class UnreachableMembersChanged(unreachable: Set[Member]) extends ClusterDomainEvent
/**
* Leader of the cluster members changed, and/or convergence status.
*/
case class LeaderChanged(leader: Option[Address], convergence: Boolean) extends ClusterDomainEvent
/**
* INTERNAL API
* The nodes that have seen current version of the Gossip.
*/
private[cluster] case class SeenChanged(convergence: Boolean, seenBy: Set[Address]) extends ClusterDomainEvent
/**
* INTERNAL API
*/
private[cluster] case class CurrentInternalStats(stats: ClusterStats) extends ClusterDomainEvent
}
/**
* INTERNAL API
*/

View file

@ -33,10 +33,11 @@ trait ClusterNodeMBean {
/**
* Internal API
*/
private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {
private val mBeanServer = ManagementFactory.getPlatformMBeanServer
private val clusterMBeanName = new ObjectName("akka:type=Cluster")
private def clusterView = cluster.readView
/**
* Creates the cluster JMX MBean and registers it in the MBean server.
@ -57,37 +58,34 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
* }}}
*/
def getClusterStatus: String = {
val gossip = clusterNode.latestGossip
val unreachable = gossip.overview.unreachable
val metaData = gossip.meta
"\nMembers:\n\t" + gossip.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" } +
{ if (metaData.nonEmpty) "\nMeta Data:\t" + metaData.toString else "" }
val unreachable = clusterView.unreachableMembers
"\nMembers:\n\t" + clusterView.members.mkString("\n\t") +
{ if (unreachable.nonEmpty) "\nUnreachable:\n\t" + unreachable.mkString("\n\t") else "" }
}
def getMemberStatus: String = clusterNode.status.toString
def getMemberStatus: String = clusterView.status.toString
def getLeader: String = clusterNode.leader.toString
def getLeader: String = clusterView.leader.toString
def isSingleton: Boolean = clusterNode.isSingletonCluster
def isSingleton: Boolean = clusterView.isSingletonCluster
def isConvergence: Boolean = clusterNode.convergence.isDefined
def isConvergence: Boolean = clusterView.convergence
def isAvailable: Boolean = clusterNode.isAvailable
def isAvailable: Boolean = clusterView.isAvailable
def isRunning: Boolean = clusterNode.isRunning
def isRunning: Boolean = clusterView.isRunning
// JMX commands
def join(address: String) = clusterNode.join(AddressFromURIString(address))
def join(address: String) = cluster.join(AddressFromURIString(address))
def leave(address: String) = clusterNode.leave(AddressFromURIString(address))
def leave(address: String) = cluster.leave(AddressFromURIString(address))
def down(address: String) = clusterNode.down(AddressFromURIString(address))
def down(address: String) = cluster.down(AddressFromURIString(address))
}
try {
mBeanServer.registerMBean(mbean, clusterMBeanName)
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterNode.selfAddress, clusterMBeanName)
log.info("Cluster Node [{}] - registered cluster JMX MBean [{}]", clusterView.selfAddress, clusterMBeanName)
} catch {
case e: InstanceAlreadyExistsException // ignore - we are running multiple cluster nodes in the same JVM (probably for testing)
}
@ -97,6 +95,7 @@ private[akka] class ClusterJmx(clusterNode: Cluster, log: LoggingAdapter) {
* Unregisters the cluster JMX MBean from MBean server.
*/
def unregisterMBean(): Unit = {
clusterView.close()
try {
mBeanServer.unregisterMBean(clusterMBeanName)
} catch {

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import java.io.Closeable
import scala.collection.immutable.SortedSet
import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props }
import akka.cluster.ClusterEvent._
/**
* INTERNAL API
*
* Read view of cluster state, updated via subscription of
* cluster events published on the event bus.
*/
private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
/**
* Current state
*/
@volatile
private var state: CurrentClusterState = CurrentClusterState()
/**
* Current internal cluster stats, updated periodically via event bus.
*/
@volatile
private var _latestStats = ClusterStats()
val selfAddress = cluster.selfAddress
// create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = {
cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case SeenChanged(convergence, seenBy) state = state.copy(convergence = convergence, seenBy = seenBy)
case MembersChanged(members) state = state.copy(members = members)
case UnreachableMembersChanged(unreachable) state = state.copy(unreachable = unreachable)
case LeaderChanged(leader, convergence) state = state.copy(leader = leader, convergence = convergence)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case _ // ignore, not interesting
}
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")
}
def self: Member = {
state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)).
getOrElse(Member(selfAddress, MemberStatus.Removed))
}
/**
* Returns true if the cluster node is up and running, false if it is shut down.
*/
def isRunning: Boolean = cluster.isRunning
/**
* Current cluster members, sorted with leader first.
*/
def members: SortedSet[Member] = state.members
/**
* Members that has been detected as unreachable.
*/
def unreachableMembers: Set[Member] = state.unreachable
/**
* Member status for this node ([[akka.cluster.MemberStatus]]).
*
* NOTE: If the node has been removed from the cluster (and shut down) then it's status is set to the 'REMOVED' tombstone state
* and is no longer present in the node ring or any other part of the gossiping state. However in order to maintain the
* model and the semantics the user would expect, this method will in this situation return `MemberStatus.Removed`.
*/
def status: MemberStatus = self.status
/**
* Is this node the leader?
*/
def isLeader: Boolean = leader == Some(selfAddress)
/**
* Get the address of the current leader.
*/
def leader: Option[Address] = state.leader
/**
* Is this node a singleton cluster?
*/
def isSingletonCluster: Boolean = members.size == 1
/**
* Checks if we have a cluster convergence.
*/
def convergence: Boolean = state.convergence
/**
* Returns true if the node is UP or JOINING.
*/
def isAvailable: Boolean = {
val myself = self
!unreachableMembers.contains(myself) && !myself.status.isUnavailable
}
/**
* INTERNAL API
* The nodes that has seen current version of the Gossip.
*/
private[cluster] def seenBy: Set[Address] = state.seenBy
/**
* INTERNAL API
*/
private[cluster] def latestStats: ClusterStats = _latestStats
/**
* Unsubscribe to cluster events.
*/
def close(): Unit = {
cluster.system.stop(eventBusListener)
}
}

View file

@ -32,7 +32,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
final val LeaderActionsInterval: Duration = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val PublishStateInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-state-interval"), MILLISECONDS)
final val PublishStatsInterval: Duration = Duration(getMilliseconds("akka.cluster.publish-stats-interval"), MILLISECONDS)
final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join")
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)

View file

@ -8,12 +8,17 @@ import akka.actor.Address
import scala.collection.immutable.SortedSet
import MemberStatus._
object Gossip {
/**
* Internal API
*/
private[cluster] object Gossip {
val emptyMembers: SortedSet[Member] = SortedSet.empty
}
/**
* Represents the state of the cluster; cluster ring membership, ring convergence, meta data -
* INTERNAL API
*
* Represents the state of the cluster; cluster ring membership, ring convergence -
* all versioned by a vector clock.
*
* When a node is joining the `Member`, with status `Joining`, is added to `members`.
@ -43,10 +48,9 @@ object Gossip {
* `Removed` by removing it from the `members` set and sending a `Removed` command to the
* removed node telling it to shut itself down.
*/
case class Gossip(
private[cluster] case class Gossip(
overview: GossipOverview = GossipOverview(),
members: SortedSet[Member] = Gossip.emptyMembers, // sorted set of members with their status, sorted by address
meta: Map[String, Array[Byte]] = Map.empty,
version: VectorClock = VectorClock()) // vector clock version
extends ClusterMessage // is a serializable cluster message
with Versioned[Gossip] {
@ -97,7 +101,16 @@ case class Gossip(
}
/**
* Merges two Gossip instances including membership tables, meta-data tables and the VectorClock histories.
* The nodes that have seen current version of the Gossip.
*/
def seenBy: Set[Address] = {
overview.seen.collect {
case (address, vclock) if vclock == version address
}.toSet
}
/**
* Merges two Gossip instances including membership tables, and the VectorClock histories.
*/
def merge(that: Gossip): Gossip = {
import Member.ordering
@ -105,20 +118,17 @@ case class Gossip(
// 1. merge vector clocks
val mergedVClock = this.version merge that.version
// 2. merge meta-data
val mergedMeta = this.meta ++ that.meta
// 3. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
// 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
// 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 5. fresh seen table
// 4. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedMeta, mergedVClock)
Gossip(GossipOverview(mergedSeen, mergedUnreachable), mergedMembers, mergedVClock)
}
/**
@ -151,8 +161,7 @@ case class Gossip(
!hasUnreachable && allMembersInSeen && seenSame
}
def isLeader(address: Address): Boolean =
members.nonEmpty && (address == members.head.address)
def isLeader(address: Address): Boolean = leader == Some(address)
def leader: Option[Address] = members.headOption.map(_.address)
@ -178,15 +187,15 @@ case class Gossip(
"Gossip(" +
"overview = " + overview +
", members = [" + members.mkString(", ") +
"], meta = [" + meta.mkString(", ") +
"], version = " + version +
")"
}
/**
* INTERNAL API
* Represents the overview of the cluster, holds the cluster convergence table and set with unreachable nodes.
*/
case class GossipOverview(
private[cluster] case class GossipOverview(
seen: Map[Address, VectorClock] = Map.empty,
unreachable: Set[Member] = Set.empty) {
@ -200,13 +209,15 @@ case class GossipOverview(
}
/**
* INTERNAL API
* Envelope adding a sender address to the gossip.
*/
case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
/**
* INTERNAL API
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution.
*/
case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage
private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage

View file

@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec
enterBarrier("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
runOn(third) {

View file

@ -48,7 +48,7 @@ abstract class ClientDowningNodeThatIsUpSpec
markNodeAsUnavailable(thirdAddress)
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
clusterView.members.exists(_.address == thirdAddress) must be(false)
}
runOn(third) {

View file

@ -65,15 +65,15 @@ abstract class ConvergenceSpec
within(28 seconds) {
// third becomes unreachable
awaitCond(cluster.latestGossip.overview.unreachable.size == 1)
awaitCond(cluster.latestGossip.members.size == 2)
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
awaitCond(clusterView.unreachableMembers.size == 1)
awaitCond(clusterView.members.size == 2)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
awaitSeenSameState(first, second)
// still one unreachable
cluster.latestGossip.overview.unreachable.size must be(1)
cluster.latestGossip.overview.unreachable.head.address must be(thirdAddress)
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(thirdAddress)
// and therefore no convergence
cluster.convergence.isDefined must be(false)
clusterView.convergence must be(false)
}
}
@ -88,18 +88,18 @@ abstract class ConvergenceSpec
}
def memberStatus(address: Address): Option[MemberStatus] =
cluster.latestGossip.members.collectFirst { case m if m.address == address m.status }
clusterView.members.collectFirst { case m if m.address == address m.status }
def assertNotMovedUp: Unit = {
within(20 seconds) {
awaitCond(cluster.latestGossip.members.size == 3)
awaitCond(clusterView.members.size == 3)
awaitSeenSameState(first, second, fourth)
memberStatus(first) must be(Some(MemberStatus.Up))
memberStatus(second) must be(Some(MemberStatus.Up))
// leader is not allowed to move the new node to Up
memberStatus(fourth) must be(Some(MemberStatus.Joining))
// still no convergence
cluster.convergence.isDefined must be(false)
clusterView.convergence must be(false)
}
}

View file

@ -13,6 +13,7 @@ import scala.concurrent.util.duration._
object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
val seed1 = role("seed1")
val seed2 = role("seed2")
val seed3 = role("seed3")
val ordinary1 = role("ordinary1")
val ordinary2 = role("ordinary2")
@ -25,6 +26,7 @@ class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPup
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
abstract class JoinSeedNodeSpec
extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
@ -32,37 +34,24 @@ abstract class JoinSeedNodeSpec
import JoinSeedNodeMultiJvmSpec._
override def seedNodes = IndexedSeq(seed1, seed2)
override def seedNodes = IndexedSeq(seed1, seed2, seed3)
"A cluster with configured seed nodes" must {
"start the seed nodes sequentially" taggedAs LongRunningTest in {
// without looking up the addresses first there might be
// [akka://JoinSeedNodeSpec/user/TestConductorClient] cannot write GetAddress(RoleName(seed2)) while waiting for seed1
roles foreach address
"be able to start the seed nodes concurrently" taggedAs LongRunningTest in {
runOn(seed1) {
startClusterNode()
// test that first seed doesn't have to be started first
Thread.sleep(3000)
}
enterBarrier("seed1-started")
runOn(seed2) {
startClusterNode()
}
enterBarrier("seed2-started")
runOn(seed1, seed2) {
awaitUpConvergence(2)
runOn(seed1, seed2, seed3) {
awaitUpConvergence(3)
}
enterBarrier("after-1")
}
"join the seed nodes at startup" taggedAs LongRunningTest in {
startClusterNode()
enterBarrier("all-started")
awaitUpConvergence(4)
awaitUpConvergence(roles.size)
enterBarrier("after-2")
}
}

View file

@ -16,6 +16,8 @@ import scala.concurrent.Await
import scala.concurrent.util.Duration
import java.util.concurrent.TimeUnit
import akka.remote.testconductor.RoleName
import akka.actor.Props
import akka.actor.Actor
object LargeClusterMultiJvmSpec extends MultiNodeConfig {
// each jvm simulates a datacenter with many nodes
@ -38,7 +40,7 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig {
auto-join = off
auto-down = on
failure-detector.acceptable-heartbeat-pause = 10s
publish-state-interval = 0 s # always, when it happens
publish-stats-interval = 0 s # always, when it happens
}
akka.loglevel = INFO
akka.actor.default-dispatcher.fork-join-executor {
@ -78,6 +80,7 @@ abstract class LargeClusterSpec
with MultiNodeClusterSpec {
import LargeClusterMultiJvmSpec._
import ClusterEvent._
var systems: IndexedSeq[ActorSystem] = IndexedSeq(system)
val nodesPerDatacenter = system.settings.config.getInt(
@ -134,24 +137,25 @@ abstract class LargeClusterSpec
val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet)
val startGossipCounts = Map.empty[Cluster, Long] ++
clusterNodes.map(c (c -> c.latestStats.receivedGossipCount))
clusterNodes.map(c (c -> c.readView.latestStats.receivedGossipCount))
def gossipCount(c: Cluster): Long = {
c.latestStats.receivedGossipCount - startGossipCounts(c)
c.readView.latestStats.receivedGossipCount - startGossipCounts(c)
}
val startTime = System.nanoTime
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
val latch = TestLatch(clusterNodes.size)
clusterNodes foreach { c
c.registerListener(new MembershipChangeListener {
override def notify(members: SortedSet[Member]): Unit = {
if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) {
log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages",
totalNodes, c.selfAddress, tookMillis, gossipCount(c))
latch.countDown()
}
c.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) {
log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages",
totalNodes, c.selfAddress, tookMillis, gossipCount(c))
latch.countDown()
}
}
})
})), classOf[MembersChanged])
}
runOn(from) {
@ -160,7 +164,7 @@ abstract class LargeClusterSpec
Await.ready(latch, remaining)
awaitCond(clusterNodes.forall(_.convergence.isDefined))
awaitCond(clusterNodes.forall(_.readView.convergence))
val counts = clusterNodes.map(gossipCount(_))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max)
log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node",
@ -262,24 +266,33 @@ abstract class LargeClusterSpec
within(30.seconds + (3.seconds * liveNodes)) {
val startGossipCounts = Map.empty[Cluster, Long] ++
systems.map(sys (Cluster(sys) -> Cluster(sys).latestStats.receivedGossipCount))
systems.map(sys (Cluster(sys) -> Cluster(sys).readView.latestStats.receivedGossipCount))
def gossipCount(c: Cluster): Long = {
c.latestStats.receivedGossipCount - startGossipCounts(c)
c.readView.latestStats.receivedGossipCount - startGossipCounts(c)
}
val startTime = System.nanoTime
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
val latch = TestLatch(nodesPerDatacenter)
systems foreach { sys
Cluster(sys).registerListener(new MembershipChangeListener {
override def notify(members: SortedSet[Member]): Unit = {
if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) {
log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages",
unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys)))
latch.countDown()
}
Cluster(sys).subscribe(sys.actorOf(Props(new Actor {
var gotExpectedLiveNodes = false
var gotExpectedUnreachableNodes = false
def receive = {
case MembersChanged(members) if !latch.isOpen
gotExpectedLiveNodes = members.size == liveNodes
checkDone()
case UnreachableMembersChanged(unreachable) if !latch.isOpen
gotExpectedUnreachableNodes = unreachable.size == unreachableNodes
checkDone()
case _ // not interesting
}
})
def checkDone(): Unit = if (gotExpectedLiveNodes && gotExpectedUnreachableNodes) {
log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages",
unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys)))
latch.countDown()
}
})), classOf[ClusterDomainEvent])
}
runOn(firstDatacenter) {
@ -290,8 +303,8 @@ abstract class LargeClusterSpec
runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) {
Await.ready(latch, remaining)
awaitCond(systems.forall(Cluster(_).convergence.isDefined))
val mergeCount = systems.map(sys Cluster(sys).latestStats.mergeCount).sum
awaitCond(systems.forall(Cluster(_).readView.convergence))
val mergeCount = systems.map(sys Cluster(sys).readView.latestStats.mergeCount).sum
val counts = systems.map(sys gossipCount(Cluster(sys)))
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max)
log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node, merged [{}] times",

View file

@ -46,7 +46,7 @@ abstract class LeaderElectionSpec
awaitClusterUp(first, second, third, fourth)
if (myself != controller) {
cluster.isLeader must be(myself == sortedRoles.head)
clusterView.isLeader must be(myself == sortedRoles.head)
assertLeaderIn(sortedRoles)
}
@ -87,7 +87,7 @@ abstract class LeaderElectionSpec
awaitUpConvergence(currentRoles.size - 1)
val nextExpectedLeader = remainingRoles.head
cluster.isLeader must be(myself == nextExpectedLeader)
clusterView.isLeader must be(myself == nextExpectedLeader)
assertLeaderIn(remainingRoles)
enterBarrier("completed")

View file

@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.util.duration._
import akka.actor.Props
import akka.actor.Actor
object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -32,6 +34,7 @@ abstract class LeaderLeavingSpec
with MultiNodeClusterSpec {
import LeaderLeavingMultiJvmSpec._
import ClusterEvent._
val leaderHandoffWaitingTime = 30.seconds
@ -41,11 +44,11 @@ abstract class LeaderLeavingSpec
awaitClusterUp(first, second, third)
val oldLeaderAddress = cluster.leader
val oldLeaderAddress = clusterView.leader.get
within(leaderHandoffWaitingTime) {
if (cluster.isLeader) {
if (clusterView.isLeader) {
enterBarrier("registered-listener")
@ -53,28 +56,29 @@ abstract class LeaderLeavingSpec
enterBarrier("leader-left")
// verify that a NEW LEADER have taken over
awaitCond(!cluster.isLeader)
awaitCond(!clusterView.isLeader)
// verify that the LEADER is shut down
awaitCond(!cluster.isRunning)
// verify that the LEADER is REMOVED
awaitCond(cluster.status == MemberStatus.Removed)
awaitCond(clusterView.status == MemberStatus.Removed)
} else {
val leavingLatch = TestLatch()
val exitingLatch = TestLatch()
val expectedAddresses = roles.toSet map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
def check(status: MemberStatus): Boolean =
(members.map(_.address) == expectedAddresses &&
members.exists(m m.address == oldLeaderAddress && m.status == status))
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
def check(status: MemberStatus): Boolean =
(members.map(_.address) == expectedAddresses &&
members.exists(m m.address == oldLeaderAddress && m.status == status))
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("registered-listener")
enterBarrier("leader-left")
@ -86,13 +90,13 @@ abstract class LeaderLeavingSpec
exitingLatch.await
// verify that the LEADER is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))
awaitCond(clusterView.members.forall(_.address != oldLeaderAddress))
// verify that the LEADER is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))
awaitCond(clusterView.unreachableMembers.forall(_.address != oldLeaderAddress))
// verify that we have a new LEADER
awaitCond(cluster.leader != oldLeaderAddress)
awaitCond(clusterView.leader != oldLeaderAddress)
}
enterBarrier("finished")

View file

@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.util.duration._
import akka.actor.Props
import akka.actor.Actor
object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -36,6 +38,7 @@ abstract class MembershipChangeListenerExitingSpec
with MultiNodeClusterSpec {
import MembershipChangeListenerExitingMultiJvmSpec._
import ClusterEvent._
"A registered MembershipChangeListener" must {
"be notified when new node is EXITING" taggedAs LongRunningTest in {
@ -53,12 +56,13 @@ abstract class MembershipChangeListenerExitingSpec
runOn(third) {
val exitingLatch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.exists(m m.address == address(second) && m.status == MemberStatus.Exiting))
exitingLatch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.size == 3 && members.exists(m m.address == address(second) && m.status == MemberStatus.Exiting))
exitingLatch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("registered-listener")
exitingLatch.await
}

View file

@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.util.duration._
import akka.actor.Props
import akka.actor.Actor
object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -17,7 +19,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
.withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
@ -29,6 +31,7 @@ abstract class MembershipChangeListenerJoinSpec
with MultiNodeClusterSpec {
import MembershipChangeListenerJoinMultiJvmSpec._
import ClusterEvent._
"A registered MembershipChangeListener" must {
"be notified when new node is JOINING" taggedAs LongRunningTest in {
@ -36,12 +39,13 @@ abstract class MembershipChangeListenerJoinSpec
runOn(first) {
val joinLatch = TestLatch()
val expectedAddresses = Set(first, second) map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining))
joinLatch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.map(_.address) == expectedAddresses && members.exists(_.status == MemberStatus.Joining))
joinLatch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("registered-listener")
joinLatch.await

View file

@ -10,6 +10,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.Address
import akka.actor.Props
import akka.actor.Actor
object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -19,7 +21,7 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.leader-actions-interval = 5 s
akka.clusterView.leader-actions-interval = 5 s
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))
@ -34,6 +36,7 @@ abstract class MembershipChangeListenerLeavingSpec
with MultiNodeClusterSpec {
import MembershipChangeListenerLeavingMultiJvmSpec._
import ClusterEvent._
"A registered MembershipChangeListener" must {
"be notified when new node is LEAVING" taggedAs LongRunningTest in {
@ -52,13 +55,14 @@ abstract class MembershipChangeListenerLeavingSpec
runOn(third) {
val latch = TestLatch()
val expectedAddresses = Set(first, second, third) map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.map(_.address) == expectedAddresses &&
members.exists(m m.address == address(second) && m.status == MemberStatus.Leaving))
latch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.map(_.address) == expectedAddresses &&
members.exists(m m.address == address(second) && m.status == MemberStatus.Leaving))
latch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("registered-listener")
latch.await
}

View file

@ -8,6 +8,8 @@ import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.Props
import akka.actor.Actor
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -26,6 +28,7 @@ abstract class MembershipChangeListenerUpSpec
with MultiNodeClusterSpec {
import MembershipChangeListenerUpMultiJvmSpec._
import ClusterEvent._
"A set of connected cluster systems" must {
@ -36,12 +39,13 @@ abstract class MembershipChangeListenerUpSpec
runOn(first, second) {
val latch = TestLatch()
val expectedAddresses = Set(first, second) map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up))
latch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("listener-1-registered")
cluster.join(first)
latch.await
@ -58,12 +62,13 @@ abstract class MembershipChangeListenerUpSpec
val latch = TestLatch()
val expectedAddresses = Set(first, second, third) map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up))
latch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.map(_.address) == expectedAddresses && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("listener-2-registered")
runOn(third) {

View file

@ -22,14 +22,14 @@ import akka.actor.RootActorPath
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
akka.cluster {
auto-join = off
auto-join = on
auto-down = off
gossip-interval = 200 ms
heartbeat-interval = 400 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms
publish-state-interval = 0 s # always, when it happens
publish-stats-interval = 0 s # always, when it happens
}
akka.test {
single-expect-default = 5 s
@ -97,6 +97,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
}
}
def clusterView: ClusterReadView = cluster.readView
/**
* Get the cluster node to use.
*/
@ -106,11 +108,11 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
* Use this method for the initial startup of the cluster node.
*/
def startClusterNode(): Unit = {
if (cluster.latestGossip.members.isEmpty) {
if (clusterView.members.isEmpty) {
cluster join myself
awaitCond(cluster.latestGossip.members.exists(_.address == address(myself)))
awaitCond(clusterView.members.exists(_.address == address(myself)))
} else
cluster.self
clusterView.self
}
/**
@ -168,8 +170,8 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) {
nodesInCluster.length must not be (0)
val expectedLeader = roleOfLeader(nodesInCluster)
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
clusterView.isLeader must be(ifNode(expectedLeader)(true)(false))
clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
}
/**
@ -181,25 +183,20 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address],
timeout: Duration = 20.seconds): Unit = {
within(timeout) {
awaitCond(cluster.latestGossip.members.size == numberOfMembers)
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
awaitCond(cluster.convergence.isDefined)
awaitCond(clusterView.members.size == numberOfMembers)
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
awaitCond(clusterView.convergence)
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
awaitCond(
canNotBePartOfMemberRing forall (address !(cluster.latestGossip.members exists (_.address == address))))
canNotBePartOfMemberRing forall (address !(clusterView.members exists (_.address == address))))
}
}
/**
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit = {
awaitCond {
val seen = cluster.latestGossip.overview.seen
val seenVectorClocks = addresses.flatMap(seen.get(_))
seenVectorClocks.size == addresses.size && seenVectorClocks.toSet.size == 1
}
}
def awaitSeenSameState(addresses: Address*): Unit =
awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty)
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
nodesInCluster.length must not be (0)

View file

@ -16,7 +16,7 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval
.withFallback(ConfigFactory.parseString("akka.clusterView.leader-actions-interval = 5 s") // increase the leader action task interval
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
@ -42,7 +42,7 @@ abstract class NodeJoinSpec
cluster.join(first)
}
awaitCond(cluster.latestGossip.members.exists { member member.address == address(second) && member.status == MemberStatus.Joining })
awaitCond(clusterView.members.exists { member member.address == address(second) && member.status == MemberStatus.Joining })
enterBarrier("after")
}

View file

@ -43,16 +43,16 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
runOn(first, third) {
// verify that the 'second' node is no longer part of the 'members' set
awaitCond(cluster.latestGossip.members.forall(_.address != address(second)), reaperWaitingTime)
awaitCond(clusterView.members.forall(_.address != address(second)), reaperWaitingTime)
// verify that the 'second' node is not part of the 'unreachable' set
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != address(second)), reaperWaitingTime)
awaitCond(clusterView.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime)
}
runOn(second) {
// verify that the second node is shut down and has status REMOVED
awaitCond(!cluster.isRunning, reaperWaitingTime)
awaitCond(cluster.status == MemberStatus.Removed, reaperWaitingTime)
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
}
enterBarrier("finished")

View file

@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.util.duration._
import akka.actor.Props
import akka.actor.Actor
object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -32,6 +34,7 @@ abstract class NodeLeavingAndExitingSpec
with MultiNodeClusterSpec {
import NodeLeavingAndExitingMultiJvmSpec._
import ClusterEvent._
"A node that is LEAVING a non-singleton cluster" must {
@ -44,15 +47,16 @@ abstract class NodeLeavingAndExitingSpec
val leavingLatch = TestLatch()
val exitingLatch = TestLatch()
val expectedAddresses = roles.toSet map address
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
def check(status: MemberStatus): Boolean =
(members.map(_.address) == expectedAddresses &&
members.exists(m m.address == secondAddess && m.status == status))
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
def check(status: MemberStatus): Boolean =
(members.map(_.address) == expectedAddresses &&
members.exists(m m.address == secondAddess && m.status == status))
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
}
})
})), classOf[MembersChanged])
enterBarrier("registered-listener")
runOn(third) {

View file

@ -38,12 +38,12 @@ abstract class NodeMembershipSpec
runOn(first, second) {
cluster.join(first)
awaitCond(cluster.latestGossip.members.size == 2)
assertMembers(cluster.latestGossip.members, first, second)
awaitCond(clusterView.members.size == 2)
assertMembers(clusterView.members, first, second)
awaitCond {
cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
clusterView.members.forall(_.status == MemberStatus.Up)
}
awaitCond(cluster.convergence.isDefined)
awaitCond(clusterView.convergence)
}
enterBarrier("after-1")
@ -55,12 +55,12 @@ abstract class NodeMembershipSpec
cluster.join(first)
}
awaitCond(cluster.latestGossip.members.size == 3)
assertMembers(cluster.latestGossip.members, first, second, third)
awaitCond(clusterView.members.size == 3)
assertMembers(clusterView.members, first, second, third)
awaitCond {
cluster.latestGossip.members.forall(_.status == MemberStatus.Up)
clusterView.members.forall(_.status == MemberStatus.Up)
}
awaitCond(cluster.convergence.isDefined)
awaitCond(clusterView.convergence)
enterBarrier("after-2")
}

View file

@ -11,6 +11,8 @@ import akka.testkit._
import scala.concurrent.util.duration._
import scala.collection.immutable.SortedSet
import java.util.concurrent.atomic.AtomicReference
import akka.actor.Props
import akka.actor.Actor
object NodeUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -27,6 +29,7 @@ abstract class NodeUpSpec
with MultiNodeClusterSpec {
import NodeUpMultiJvmSpec._
import ClusterEvent._
"A cluster node that is joining another cluster" must {
"be moved to UP by the leader after a convergence" taggedAs LongRunningTest in {
@ -39,12 +42,13 @@ abstract class NodeUpSpec
"be unaffected when joining again" taggedAs LongRunningTest in {
val unexpected = new AtomicReference[SortedSet[Member]](SortedSet.empty)
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size != 2 || members.exists(_.status != MemberStatus.Up))
unexpected.set(members)
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
if (members.size != 2 || members.exists(_.status != MemberStatus.Up))
unexpected.set(members)
}
})
})), classOf[MembersChanged])
enterBarrier("listener-registered")
runOn(second) {
@ -56,7 +60,7 @@ abstract class NodeUpSpec
for (n 1 to 20) {
Thread.sleep(100.millis.dilated.toMillis)
unexpected.get must be(SortedSet.empty)
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true)
clusterView.members.forall(_.status == MemberStatus.Up) must be(true)
}
enterBarrier("after-2")

View file

@ -42,14 +42,14 @@ abstract class SingletonClusterSpec
"become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in {
startClusterNode()
awaitUpConvergence(1)
cluster.isSingletonCluster must be(true)
clusterView.isSingletonCluster must be(true)
enterBarrier("after-1")
}
"not be singleton cluster when joined with other node" taggedAs LongRunningTest in {
awaitClusterUp(first, second)
cluster.isSingletonCluster must be(false)
clusterView.isSingletonCluster must be(false)
assertLeader(first, second)
enterBarrier("after-2")
@ -63,7 +63,7 @@ abstract class SingletonClusterSpec
markNodeAsUnavailable(secondAddress)
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
cluster.isSingletonCluster must be(true)
clusterView.isSingletonCluster must be(true)
assertLeader(first)
}

View file

@ -78,10 +78,10 @@ abstract class SplitBrainSpec
}
runOn(side1: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds)
awaitCond(clusterView.unreachableMembers.map(_.address) == (side2.toSet map address), 20 seconds)
}
runOn(side2: _*) {
awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds)
awaitCond(clusterView.unreachableMembers.map(_.address) == (side1.toSet map address), 20 seconds)
}
enterBarrier("after-2")
@ -91,16 +91,16 @@ abstract class SplitBrainSpec
runOn(side1: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address)
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address)
awaitUpConvergence(side1.size, side2 map address)
assertLeader(side1: _*)
}
runOn(side2: _*) {
// auto-down = on
awaitCond(cluster.latestGossip.overview.unreachable.forall(m m.status == MemberStatus.Down), 15 seconds)
cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address)
awaitCond(clusterView.unreachableMembers.forall(m m.status == MemberStatus.Down), 15 seconds)
clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address)
awaitUpConvergence(side2.size, side1 map address)
assertLeader(side2: _*)
}

View file

@ -11,6 +11,8 @@ import akka.testkit._
import scala.concurrent.util.duration._
import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable.SortedSet
import akka.actor.Props
import akka.actor.Actor
object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
@ -40,6 +42,7 @@ abstract class SunnyWeatherSpec
with MultiNodeClusterSpec {
import SunnyWeatherMultiJvmSpec._
import ClusterEvent._
"A normal cluster" must {
"be healthy" taggedAs LongRunningTest in {
@ -55,12 +58,13 @@ abstract class SunnyWeatherSpec
log.info("5 joined")
val unexpected = new AtomicReference[SortedSet[Member]]
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
// we don't expected any changes to the cluster
unexpected.set(members)
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case MembersChanged(members)
// we don't expected any changes to the cluster
unexpected.set(members)
}
})
})), classOf[MembersChanged])
for (n 1 to 30) {
enterBarrier("period-" + n)

View file

@ -42,23 +42,18 @@ abstract class TransitionSpec
def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail
def memberStatus(address: Address): MemberStatus = {
val statusOption = (cluster.latestGossip.members ++ cluster.latestGossip.overview.unreachable).collectFirst {
val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst {
case m if m.address == address m.status
}
statusOption must not be (None)
statusOption.get
}
def memberAddresses: Set[Address] = cluster.latestGossip.members.map(_.address)
def memberAddresses: Set[Address] = clusterView.members.map(_.address)
def members: Set[RoleName] = memberAddresses.flatMap(roleName(_))
def seenLatestGossip: Set[RoleName] = {
val gossip = cluster.latestGossip
gossip.overview.seen.collect {
case (address, v) if v == gossip.version roleName(address)
}.flatten.toSet
}
def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName
def awaitSeen(addresses: Address*): Unit = awaitCond {
(seenLatestGossip map address) == addresses.toSet
@ -95,9 +90,11 @@ abstract class TransitionSpec
def gossipTo(toRole: RoleName): Unit = {
gossipBarrierCounter += 1
runOn(toRole) {
val g = cluster.latestGossip
val oldCount = clusterView.latestStats.receivedGossipCount
enterBarrier("before-gossip-" + gossipBarrierCounter)
awaitCond(cluster.latestGossip != g) // received gossip
awaitCond {
clusterView.latestStats.receivedGossipCount != oldCount // received gossip
}
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
enterBarrier("after-gossip-" + gossipBarrierCounter)
@ -123,11 +120,11 @@ abstract class TransitionSpec
runOn(first) {
startClusterNode()
cluster.isSingletonCluster must be(true)
cluster.status must be(Joining)
cluster.convergence.isDefined must be(true)
clusterView.isSingletonCluster must be(true)
clusterView.status must be(Joining)
clusterView.convergence must be(true)
leaderActions()
cluster.status must be(Up)
clusterView.status must be(Up)
}
enterBarrier("after-1")
@ -144,7 +141,7 @@ abstract class TransitionSpec
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
awaitCond(seenLatestGossip == Set(first, second))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
enterBarrier("convergence-joining-2")
@ -161,7 +158,7 @@ abstract class TransitionSpec
awaitCond(memberStatus(second) == Up)
seenLatestGossip must be(Set(first, second))
memberStatus(first) must be(Up)
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-2")
@ -177,7 +174,7 @@ abstract class TransitionSpec
awaitMembers(first, second, third)
memberStatus(third) must be(Joining)
awaitCond(seenLatestGossip == Set(second, third))
cluster.convergence.isDefined must be(false)
clusterView.convergence must be(false)
}
enterBarrier("third-joined-second")
@ -188,7 +185,7 @@ abstract class TransitionSpec
memberStatus(third) must be(Joining)
awaitCond(memberStatus(second) == Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
first gossipTo third
@ -198,7 +195,7 @@ abstract class TransitionSpec
memberStatus(second) must be(Up)
memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
enterBarrier("convergence-joining-3")
@ -216,7 +213,7 @@ abstract class TransitionSpec
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(leader(first, second, third), myself))
cluster.convergence.isDefined must be(false)
clusterView.convergence must be(false)
}
// first non-leader gossipTo the other non-leader
@ -228,7 +225,7 @@ abstract class TransitionSpec
runOn(nonLeader(first, second, third).tail.head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
// first non-leader gossipTo the leader
@ -238,7 +235,7 @@ abstract class TransitionSpec
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-3")
@ -248,7 +245,7 @@ abstract class TransitionSpec
runOn(third) {
markNodeAsUnavailable(second)
reapUnreachable()
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
clusterView.unreachableMembers must contain(Member(second, Up))
seenLatestGossip must be(Set(third))
}
@ -257,8 +254,8 @@ abstract class TransitionSpec
third gossipTo first
runOn(first, third) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
cluster.convergence.isDefined must be(false)
clusterView.unreachableMembers must contain(Member(second, Up))
clusterView.convergence must be(false)
}
runOn(first) {
@ -271,10 +268,10 @@ abstract class TransitionSpec
first gossipTo third
runOn(first, third) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Down))
clusterView.unreachableMembers must contain(Member(second, Down))
memberStatus(second) must be(Down)
seenLatestGossip must be(Set(first, third))
cluster.convergence.isDefined must be(true)
clusterView.convergence must be(true)
}
enterBarrier("after-6")

View file

@ -80,13 +80,13 @@ abstract class UnreachableNodeRejoinsClusterSpec
within(30 seconds) {
// victim becomes all alone
awaitCond({
val gossip = cluster.latestGossip
gossip.overview.unreachable.size == (roles.size - 1) &&
gossip.members.size == 1 &&
gossip.members.forall(_.status == MemberStatus.Up)
val members = clusterView.members
clusterView.unreachableMembers.size == (roles.size - 1) &&
members.size == 1 &&
members.forall(_.status == MemberStatus.Up)
})
cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet)
cluster.convergence.isDefined must be(false)
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
clusterView.convergence must be(false)
}
}
@ -95,17 +95,17 @@ abstract class UnreachableNodeRejoinsClusterSpec
within(30 seconds) {
// victim becomes unreachable
awaitCond({
val gossip = cluster.latestGossip
gossip.overview.unreachable.size == 1 &&
gossip.members.size == (roles.size - 1) &&
gossip.members.forall(_.status == MemberStatus.Up)
val members = clusterView.members
clusterView.unreachableMembers.size == 1 &&
members.size == (roles.size - 1) &&
members.forall(_.status == MemberStatus.Up)
})
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
cluster.latestGossip.overview.unreachable.size must be(1)
cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address)
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(node(victim).address)
// and therefore no convergence
cluster.convergence.isDefined must be(false)
clusterView.convergence must be(false)
}
}

View file

@ -31,7 +31,7 @@ class ClusterConfigSpec extends AkkaSpec {
HeartbeatInterval must be(1 second)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
PublishStateInterval must be(1 second)
PublishStatsInterval must be(10 second)
JoinTimeout must be(60 seconds)
AutoJoin must be(true)
AutoDown must be(false)

View file

@ -25,7 +25,7 @@ object ClusterSpec {
auto-join = off
auto-down = off
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
publish-state-interval = 0 s # always, when it happens
publish-stats-interval = 0 s # always, when it happens
}
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
akka.remote.netty.port = 0
@ -44,6 +44,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
val failureDetector = new FailureDetectorPuppet(system)
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
def clusterView = cluster.readView
def leaderActions(): Unit = {
cluster.clusterCore ! LeaderActionsTick
@ -70,15 +71,16 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
}
"initially become singleton cluster when joining itself and reach convergence" in {
cluster.isSingletonCluster must be(false) // auto-join = off
clusterView.members.size must be(0) // auto-join = off
cluster.join(selfAddress)
awaitCond(cluster.isSingletonCluster)
cluster.self.address must be(selfAddress)
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
cluster.status must be(MemberStatus.Joining)
cluster.convergence.isDefined must be(true)
Thread.sleep(5000)
awaitCond(clusterView.isSingletonCluster)
clusterView.self.address must be(selfAddress)
clusterView.members.map(_.address) must be(Set(selfAddress))
clusterView.status must be(MemberStatus.Joining)
clusterView.convergence must be(true)
leaderActions()
cluster.status must be(MemberStatus.Up)
clusterView.status must be(MemberStatus.Up)
}
}

View file

@ -0,0 +1,84 @@
.. _cluster_usage:
#########
Cluster
#########
.. note:: *This document describes how to use the features implemented so far of the
new clustering coming in Akka Coltrane and is not available in the latest stable release.
The API might change before it is released.
For introduction to the Akka Cluster concepts please see
Preparing your ActorSystem for Clustering
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project::
"com.typesafe.akka" % "akka-cluster" % "2.1-SNAPSHOT"
It can be difficult to find the correct versions and repositories at the moment. The following sbt 0.11.3 build
file illustrates what to use with Scala 2.10.0-M6 and Akka 2.1-SNAPSHOT
import sbt._
import sbt.Keys._
object ProjectBuild extends Build {
lazy val root = Project(
id = "root",
base = file("."),
settings = Project.defaultSettings ++ Seq(
name := "Akka Cluster Example",
organization := "org.test",
version := "0.1-SNAPSHOT",
scalaVersion := "2.10.0-M6",
resolvers += "Sonatype Releases Repo" at "https://oss.sonatype.org/content/repositories/releases/",
resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/",
resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases",
resolvers += "Typesafe Snapshots" at "http://repo.typesafe.com/typesafe/snapshots/",
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-cluster" % "2.1-20120816-000904",
"com.typesafe.akka" % "akka-testkit" % "2.1-20120816-000904" % "test",
"junit" % "junit" % "4.5" % "test",
"org.scalatest" %% "scalatest" % "1.9-2.10.0-M6-B2" % "test")
)
)
}
Pick a timestamped Akka version from `<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster/>`_.
To enable cluster capabilities in your Akka project you should, at a minimum, add the :ref:`remoting-scala`
settings and the ``cluster seed-nodes`` to your ``application.conf`` file:
.. literalinclude:: ../../akka-samples/akka-sample-remote/src/main/resources/common.conf
:language: none
The seed nodes are configured contact points for inital join of the cluster.
When a new node is started started it sends a message to all seed nodes and
then sends join command to the one that answers first.
A Simple Cluster Example
^^^^^^^^^^^^^^^^^^^^^^^^
Configuration
^^^^^^^^^^^^^
There are lots of more properties that are related to clustering in Akka. We refer to the following
reference file for more information:
.. literalinclude:: ../../akka-cluster/src/main/resources/reference.conf
:language: none

View file

@ -5,3 +5,4 @@ Cluster
:maxdepth: 2
cluster
cluster-usage

View file

@ -43,6 +43,11 @@ As you can see in the example above there are four things you need to add to get
communicate across the network.
* Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically
.. note::
The port number needs to be unique for each actor system on the same machine even if the actor
systems have different names. This is because each actor system has its own network subsystem
listening for connections and handling messages as not to interfere with other actor systems.
The example above only illustrates the bare minimum of properties you have to add to enable remoting.
There are lots of more properties that are related to remoting in Akka. We refer to the following
reference file for more information:

View file

@ -40,6 +40,11 @@ As you can see in the example above there are four things you need to add to get
communicate across the network.
* Add port number - the port the actor system should listen on, set to 0 to have it chosen automatically
.. note::
The port number needs to be unique for each actor system on the same machine even if the actor
systems have different names. This is because each actor system has its own network subsystem
listening for connections and handling messages as not to interfere with other actor systems.
The example above only illustrates the bare minimum of properties you have to add to enable remoting.
There are lots of more properties that are related to remoting in Akka. We refer to the following
reference file for more information:

View file

@ -107,6 +107,7 @@ akka {
# (I) The default remote server port clients should connect to.
# Default is 2552 (AKKA), use 0 if you want a random available port
# This port needs to be unique for each actor system on the same machine.
port = 2552
# (O) The address of a local network interface (IP Address) to bind to when creating

View file

@ -0,0 +1,143 @@
REMOTE CALCULATOR
=================
Requirements
------------
To build and run remote calculator you need [Simple Build Tool][sbt] (sbt).
The Sample Explained
--------------------
In order to showcase the remote capabilities of Akka 2.0 we thought a remote calculator could do the trick.
There are two implementations of the sample; one in Scala and one in Java.
The explanation below is for Scala, but everything is similar in Java except that the class names begin with a ``J``,
e.g. ``JCalcApp`` instead of ``CalcApp``, and that the Java classes reside in another package structure.
There are three actor systems used in the sample:
* CalculatorApplication : the actor system performing the number crunching
* LookupApplication : illustrates how to look up an actor on a remote node and and how communicate with that actor
* CreationApplication : illustrates how to create an actor on a remote node and how to communicate with that actor
The CalculatorApplication contains an actor, SimpleCalculatorActor, which can handle simple math operations such as
addition and subtraction. This actor is looked up and used from the LookupApplication.
The CreationApplication wants to use more "advanced" mathematical operations, such as multiplication and division,
but as the CalculatorApplication does not have any actor that can perform those type of calculations the
CreationApplication has to remote deploy an actor that can (which in our case is AdvancedCalculatorActor).
So this actor is deployed, over the network, onto the CalculatorApplication actor system and thereafter the
CreationApplication will send messages to it.
It is important to point out that as the actor system run on different ports it is possible to run all three in parallel.
See the next section for more information of how to run the sample application.
Running
-------
In order to run all three actor systems you have to start SBT in three different terminal windows.
We start off by running the CalculatorApplication:
First type 'sbt' to start SBT interactively, the run 'update' and 'run':
> cd $AKKA_HOME
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.CalcApp" which in the case below is number 3:
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 3
You should see something similar to this::
[info] Running sample.remote.calculator.CalcApp
[INFO] [12/22/2011 14:21:51.631] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://CalculatorApplication@127.0.0.1:2552
[INFO] [12/22/2011 14:21:51.632] [run-main] [Remote] Starting remote server on [akka://CalculatorApplication@127.0.0.1:2552]
Started Calculator Application - waiting for messages
[INFO] [12/22/2011 14:22:39.894] [New I/O server worker #1-1] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2553
Open up a new terminal window and run SBT once more:
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.LookupApp" which in the case below is number 1::
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 1
Now you should see something like this::
[info] Running sample.remote.calculator.LookupApp
[INFO] [12/22/2011 14:54:38.630] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://LookupApplication@127.0.0.1:2553
[INFO] [12/22/2011 14:54:38.632] [run-main] [Remote] Starting remote server on [akka://LookupApplication@127.0.0.1:2553]
Started Lookup Application
[INFO] [12/22/2011 14:54:38.801] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552
Sub result: 4 - 30 = -26
Add result: 17 + 1 = 18
Add result: 37 + 43 = 80
Add result: 68 + 66 = 134
Congrats! You have now successfully looked up a remote actor and communicated with it.
The next step is to have an actor deployed on a remote note.
Once more you should open a new terminal window and run SBT:
> sbt
> project akka-sample-remote
> run
Select to run "sample.remote.calculator.CreationApp" which in the case below is number 2::
Multiple main classes detected, select one to run:
[1] sample.remote.calculator.LookupApp
[2] sample.remote.calculator.CreationApp
[3] sample.remote.calculator.CalcApp
Enter number: 2
Now you should see something like this::
[info] Running sample.remote.calculator.CreationApp
[INFO] [12/22/2011 14:57:02.150] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://RemoteCreation@127.0.0.1:2554
[INFO] [12/22/2011 14:57:02.151] [run-main] [Remote] Starting remote server on [akka://RemoteCreation@127.0.0.1:2554]
[INFO] [12/22/2011 14:57:02.267] [default-dispatcher-21] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552
Started Creation Application
Mul result: 14 * 17 = 238
Div result: 3764 / 80 = 47.00
Mul result: 16 * 5 = 80
Mul result: 1 * 18 = 18
Mul result: 8 * 13 = 104
That's it!
Notice
------
The sample application is just that, i.e. a sample. Parts of it are not the way you would do a "real" application.
Some improvements are to remove all hard coded addresses from the code as they reduce the flexibility of how and
where the application can be run. We leave this to the astute reader to refine the sample into a real-world app.
* `Akka <http://akka.io/>`_
* `SBT <http://https://github.com/harrah/xsbt/wiki/>`_

View file

@ -0,0 +1,18 @@
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 0
}
}
extensions = ["akka.cluster.Cluster$"]
cluster {
seed-nodes = ["akka://ClusterSystem@127.0.0.1:2551", "akka://ClusterSystem@127.0.0.1:2552"]
}
}

View file

@ -0,0 +1,28 @@
package sample.cluster
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
object ClusterApp {
def main(args: Array[String]): Unit = {
if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))
// Create an Akka system
val system = ActorSystem("ClusterSystem")
val clusterListener = system.actorOf(Props(new Actor {
def receive = {
case state: CurrentClusterState
println("Current members: " + state.members)
case MembersChanged(members)
println("Current members: " + members)
}
}))
Cluster(system).subscribe(clusterListener, classOf[MembersChanged])
}
}

View file

@ -282,7 +282,7 @@ object AkkaBuild extends Build {
id = "akka-samples",
base = file("akka-samples"),
settings = parentSettings,
aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample)
aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, clusterSample)
)
lazy val camelSample = Project(
@ -322,6 +322,13 @@ object AkkaBuild extends Build {
settings = defaultSettings
)
lazy val clusterSample = Project(
id = "akka-sample-cluster",
base = file("akka-samples/akka-sample-cluster"),
dependencies = Seq(cluster),
settings = defaultSettings
)
lazy val tutorials = Project(
id = "akka-tutorials",
base = file("akka-tutorials"),