From e91af31fb99c184e84becd364bb36a57a07c4f37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Tue, 28 Feb 2012 17:04:48 +0100 Subject: [PATCH] Added FSM to the Node's ClusterCommandDaemon to manage the cluster command state as an FSM. Also added tests for all the FSM state changes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Node.scala | 193 ++++++++++++++---- .../cluster/AccrualFailureDetectorSpec.scala | 4 + .../cluster/ClusterCommandDaemonFSMSpec.scala | 156 ++++++++++++++ .../akka/cluster/LeaderElectionSpec.scala | 1 + .../scala/akka/cluster/VectorClockSpec.scala | 4 + akka-docs/scala/fsm.rst | 4 +- 6 files changed, 325 insertions(+), 37 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index 0f324936f3..cea128d027 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -46,24 +46,48 @@ trait MetaDataChangeListener { // FIXME add management and notification for Meta sealed trait ClusterMessage extends Serializable /** - * Command to join the cluster. + * Cluster commands sent by the USER. */ -case class Join(node: Address) extends ClusterMessage +object UserAction { + + /** + * Command to join the cluster. Sent when a node (reprsesented by 'address') + * wants to join another node (the receiver). + */ + case class Join(address: Address) extends ClusterMessage + + /** + * Command to leave the cluster. + */ + case object Leave extends ClusterMessage + + /** + * Command to mark node as temporary down. + */ + case object Down extends ClusterMessage + + /** + * Command to mark a node to be removed from the cluster immediately. + */ + case object Exit extends ClusterMessage +} /** - * Command to leave the cluster. + * Cluster commands sent by the LEADER. + * Node: Leader can also send UserActions but not vice versa. */ -case class Leave(node: Address) extends ClusterMessage +object LeaderAction { -/** - * Command to mark node as temporay down. - */ -case class Down(node: Address) extends ClusterMessage + /** + * Command to set a node to Up (from Joining). + */ + case object Up extends ClusterMessage -/** - * Command to remove a node from the cluster immediately. - */ -case class Remove(node: Address) extends ClusterMessage + /** + * Command to remove a node from the cluster immediately. + */ + case object Remove extends ClusterMessage +} /** * Represents the address and the current status of a cluster member node. @@ -87,6 +111,7 @@ object MemberStatus { case object Leaving extends MemberStatus case object Exiting extends MemberStatus case object Down extends MemberStatus + case object Removed extends MemberStatus } // sealed trait PartitioningStatus @@ -153,20 +178,92 @@ case class Gossip( ")" } -// FIXME ClusterCommandDaemon with FSM trait /** - * Single instance. FSM managing the different cluster nodes states. - * Serialized access to Node. + * FSM actor managing the different cluster nodes states. + * Single instance - e.g. serialized access to Node - message after message. */ -final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor { - val log = Logging(system, "ClusterCommandDaemon") +final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor with FSM[MemberStatus, Unit] { - def receive = { - case Join(address) ⇒ node.joining(address) - case Leave(address) ⇒ //node.leaving(address) - case Down(address) ⇒ //node.downing(address) - case Remove(address) ⇒ //node.removing(address) - case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") + // start in JOINING + startWith(MemberStatus.Joining, Unit) + + // ======================== + // === IN JOINING === + when(MemberStatus.Joining) { + case Event(LeaderAction.Up, _) ⇒ + node.up() + goto(MemberStatus.Up) + } + + // ======================== + // === IN UP === + when(MemberStatus.Up) { + case Event(UserAction.Down, _) ⇒ + node.downing() + goto(MemberStatus.Down) + + case Event(UserAction.Leave, _) ⇒ + node.leaving() + goto(MemberStatus.Leaving) + + case Event(UserAction.Exit, _) ⇒ + node.exiting() + goto(MemberStatus.Exiting) + + case Event(LeaderAction.Remove, _) ⇒ + node.removing() + goto(MemberStatus.Removed) + } + + // ======================== + // === IN LEAVING === + when(MemberStatus.Leaving) { + case Event(UserAction.Down, _) ⇒ + node.downing() + goto(MemberStatus.Down) + + case Event(LeaderAction.Remove, _) ⇒ + node.removing() + goto(MemberStatus.Removed) + } + + // ======================== + // === IN EXITING === + when(MemberStatus.Exiting) { + case Event(LeaderAction.Remove, _) ⇒ + node.removing() + goto(MemberStatus.Removed) + } + + // ======================== + // === IN DOWN === + when(MemberStatus.Down) { + // FIXME How to transition from DOWN => JOINING when node comes back online. Can't just listen to Gossip message since it is received be another actor. How to fix this? + case Event(LeaderAction.Remove, _) ⇒ + node.removing() + goto(MemberStatus.Removed) + } + + // ======================== + // === IN REMOVED === + when(MemberStatus.Removed) { + case command ⇒ + log.warning("Removed node [{}] received cluster command [{}]", system.name, command) + stay + } + + // ======================== + // === GENERIC AND UNHANDLED COMMANDS === + whenUnhandled { + // should be able to handle Join in any state + case Event(UserAction.Join(address), _) ⇒ + node.joining(address) + stay + + case Event(command, _) ⇒ { + log.warning("Unhandled command [{}] in state [{}]", command, stateName) + stay + } } } @@ -274,7 +371,7 @@ class Node(system: ActorSystemImpl) extends Extension { log.info("Node [{}] - Starting cluster Node...", remoteAddress) // try to join the node defined in the 'akka.cluster.node-to-join' option - join() + autoJoin() // start periodic gossip to random nodes in cluster private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { @@ -369,6 +466,7 @@ class Node(system: ActorSystemImpl) extends Extension { // ======================================================== /** + * State transition to JOINING. * New node joining. */ @tailrec @@ -397,6 +495,31 @@ class Node(system: ActorSystemImpl) extends Extension { } } + /** + * State transition to UP. + */ + private[cluster] final def up() {} + + /** + * State transition to LEAVING. + */ + private[cluster] final def leaving() {} + + /** + * State transition to EXITING. + */ + private[cluster] final def exiting() {} + + /** + * State transition to REMOVED. + */ + private[cluster] final def removing() {} + + /** + * State transition to DOWN. + */ + private[cluster] final def downing() {} + /** * Receive new gossip. */ @@ -444,9 +567,9 @@ class Node(system: ActorSystemImpl) extends Extension { /** * Joins the pre-configured contact point and retrieves current gossip state. */ - private def join() = nodeToJoin foreach { address ⇒ + private def autoJoin() = nodeToJoin foreach { address ⇒ val connection = clusterCommandConnectionFor(address) - val command = Join(remoteAddress) + val command = UserAction.Join(remoteAddress) log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) connection ! command } @@ -501,16 +624,18 @@ class Node(system: ActorSystemImpl) extends Extension { } /** - * Switches the state in the FSM. + * Switches the member status. + * + * @param newStatus the new member status + * @param oldState the state to change the member status in + * @return the updated new state with the new member status */ - @tailrec - final private def switchStatusTo(newStatus: MemberStatus) { + private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = { log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus) - val localState = state.get - val localSelf = localState.self + val localSelf = state.self - val localGossip = localState.latestGossip + val localGossip = state.latestGossip val localMembers = localGossip.members val newSelf = localSelf copy (status = newStatus) @@ -526,9 +651,7 @@ class Node(system: ActorSystemImpl) extends Extension { val versionedGossip = newGossip + vclockNode val seenVersionedGossip = versionedGossip seen remoteAddress - val newState = localState copy (self = newSelf, latestGossip = seenVersionedGossip) - - if (!state.compareAndSet(localState, newState)) switchStatusTo(newStatus) // recur if we failed update + state copy (self = newSelf, latestGossip = seenVersionedGossip) } /** diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index e867bc834b..275cd32c75 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + package akka.cluster import java.net.InetSocketAddress diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala new file mode 100644 index 0000000000..69512f3ad9 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterCommandDaemonFSMSpec.scala @@ -0,0 +1,156 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.testkit._ +import akka.actor.Address + +class ClusterCommandDaemonFSMSpec extends AkkaSpec( + """ + akka { + actor { + provider = akka.remote.RemoteActorRefProvider + } + } + """) with ImplicitSender { + + "A ClusterCommandDaemon FSM" must { + + "start in Joining" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + } + + "be able to switch from Joining to Up" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + } + + "be able to switch from Up to Down" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Down + fsm.stateName must be(MemberStatus.Down) + } + + "be able to switch from Up to Leaving" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Leave + fsm.stateName must be(MemberStatus.Leaving) + } + + "be able to switch from Up to Exiting" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Exit + fsm.stateName must be(MemberStatus.Exiting) + } + + "be able to switch from Up to Removed" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + } + + "be able to switch from Leaving to Down" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Leave + fsm.stateName must be(MemberStatus.Leaving) + fsm ! UserAction.Down + fsm.stateName must be(MemberStatus.Down) + } + + "be able to switch from Leaving to Removed" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Leave + fsm.stateName must be(MemberStatus.Leaving) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + } + + "be able to switch from Exiting to Removed" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Exit + fsm.stateName must be(MemberStatus.Exiting) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + } + + "be able to switch from Down to Removed" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Down + fsm.stateName must be(MemberStatus.Down) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + } + + "not be able to switch from Removed to any other state" in { + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Removed) + fsm ! UserAction.Leave + fsm.stateName must be(MemberStatus.Removed) + fsm ! UserAction.Down + fsm.stateName must be(MemberStatus.Removed) + fsm ! UserAction.Exit + fsm.stateName must be(MemberStatus.Removed) + fsm ! LeaderAction.Remove + fsm.stateName must be(MemberStatus.Removed) + } + + "remain in the same state when receiving a Join command" in { + val address = Address("akka", system.name) + + val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node)) + fsm.stateName must be(MemberStatus.Joining) + fsm ! UserAction.Join(address) + fsm.stateName must be(MemberStatus.Joining) + + fsm ! LeaderAction.Up + fsm.stateName must be(MemberStatus.Up) + fsm ! UserAction.Join(address) + fsm.stateName must be(MemberStatus.Up) + + fsm ! UserAction.Leave + fsm.stateName must be(MemberStatus.Leaving) + fsm ! UserAction.Join(address) + fsm.stateName must be(MemberStatus.Leaving) + + fsm ! UserAction.Down + fsm.stateName must be(MemberStatus.Down) + fsm ! UserAction.Join(address) + fsm.stateName must be(MemberStatus.Down) + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala index 85587f8780..a9a42ef26c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala @@ -1,6 +1,7 @@ /** * Copyright (C) 2009-2012 Typesafe Inc. */ + package akka.cluster import akka.testkit._ diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index 65f2aa1d75..d0e4c8da13 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + package akka.cluster import java.net.InetSocketAddress diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst index f0f2758e89..e44d9cc132 100644 --- a/akka-docs/scala/fsm.rst +++ b/akka-docs/scala/fsm.rst @@ -157,7 +157,7 @@ Defining States A state is defined by one or more invocations of the method :func:`when([, stateTimeout = ])(stateFunction)`. - + The given name must be an object which is type-compatible with the first type parameter given to the :class:`FSM` trait. This object is used as a hash key, so you must ensure that it properly implements :meth:`equals` and @@ -440,7 +440,7 @@ and in the following. Event Tracing ------------- -The setting ``akka.actor.debug.fsm`` in `:ref:`configuration` enables logging of an +The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an event trace by :class:`LoggingFSM` instances:: class MyFSM extends Actor with LoggingFSM[X, Z] {