Added FSM to the Node's ClusterCommandDaemon to manage the cluster command state as an FSM. Also added tests for all the FSM state changes.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-28 17:04:48 +01:00
parent 96ed8bdccf
commit e91af31fb9
6 changed files with 325 additions and 37 deletions

View file

@ -46,24 +46,48 @@ trait MetaDataChangeListener { // FIXME add management and notification for Meta
sealed trait ClusterMessage extends Serializable sealed trait ClusterMessage extends Serializable
/** /**
* Command to join the cluster. * Cluster commands sent by the USER.
*/ */
case class Join(node: Address) extends ClusterMessage object UserAction {
/** /**
* Command to join the cluster. Sent when a node (reprsesented by 'address')
* wants to join another node (the receiver).
*/
case class Join(address: Address) extends ClusterMessage
/**
* Command to leave the cluster. * Command to leave the cluster.
*/ */
case class Leave(node: Address) extends ClusterMessage case object Leave extends ClusterMessage
/** /**
* Command to mark node as temporay down. * Command to mark node as temporary down.
*/ */
case class Down(node: Address) extends ClusterMessage case object Down extends ClusterMessage
/**
* Command to mark a node to be removed from the cluster immediately.
*/
case object Exit extends ClusterMessage
}
/** /**
* Cluster commands sent by the LEADER.
* Node: Leader can also send UserActions but not vice versa.
*/
object LeaderAction {
/**
* Command to set a node to Up (from Joining).
*/
case object Up extends ClusterMessage
/**
* Command to remove a node from the cluster immediately. * Command to remove a node from the cluster immediately.
*/ */
case class Remove(node: Address) extends ClusterMessage case object Remove extends ClusterMessage
}
/** /**
* Represents the address and the current status of a cluster member node. * Represents the address and the current status of a cluster member node.
@ -87,6 +111,7 @@ object MemberStatus {
case object Leaving extends MemberStatus case object Leaving extends MemberStatus
case object Exiting extends MemberStatus case object Exiting extends MemberStatus
case object Down extends MemberStatus case object Down extends MemberStatus
case object Removed extends MemberStatus
} }
// sealed trait PartitioningStatus // sealed trait PartitioningStatus
@ -153,20 +178,92 @@ case class Gossip(
")" ")"
} }
// FIXME ClusterCommandDaemon with FSM trait
/** /**
* Single instance. FSM managing the different cluster nodes states. * FSM actor managing the different cluster nodes states.
* Serialized access to Node. * Single instance - e.g. serialized access to Node - message after message.
*/ */
final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor { final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor with FSM[MemberStatus, Unit] {
val log = Logging(system, "ClusterCommandDaemon")
def receive = { // start in JOINING
case Join(address) node.joining(address) startWith(MemberStatus.Joining, Unit)
case Leave(address) //node.leaving(address)
case Down(address) //node.downing(address) // ========================
case Remove(address) //node.removing(address) // === IN JOINING ===
case unknown log.error("Unknown message sent to cluster daemon [" + unknown + "]") when(MemberStatus.Joining) {
case Event(LeaderAction.Up, _)
node.up()
goto(MemberStatus.Up)
}
// ========================
// === IN UP ===
when(MemberStatus.Up) {
case Event(UserAction.Down, _)
node.downing()
goto(MemberStatus.Down)
case Event(UserAction.Leave, _)
node.leaving()
goto(MemberStatus.Leaving)
case Event(UserAction.Exit, _)
node.exiting()
goto(MemberStatus.Exiting)
case Event(LeaderAction.Remove, _)
node.removing()
goto(MemberStatus.Removed)
}
// ========================
// === IN LEAVING ===
when(MemberStatus.Leaving) {
case Event(UserAction.Down, _)
node.downing()
goto(MemberStatus.Down)
case Event(LeaderAction.Remove, _)
node.removing()
goto(MemberStatus.Removed)
}
// ========================
// === IN EXITING ===
when(MemberStatus.Exiting) {
case Event(LeaderAction.Remove, _)
node.removing()
goto(MemberStatus.Removed)
}
// ========================
// === IN DOWN ===
when(MemberStatus.Down) {
// FIXME How to transition from DOWN => JOINING when node comes back online. Can't just listen to Gossip message since it is received be another actor. How to fix this?
case Event(LeaderAction.Remove, _)
node.removing()
goto(MemberStatus.Removed)
}
// ========================
// === IN REMOVED ===
when(MemberStatus.Removed) {
case command
log.warning("Removed node [{}] received cluster command [{}]", system.name, command)
stay
}
// ========================
// === GENERIC AND UNHANDLED COMMANDS ===
whenUnhandled {
// should be able to handle Join in any state
case Event(UserAction.Join(address), _)
node.joining(address)
stay
case Event(command, _) {
log.warning("Unhandled command [{}] in state [{}]", command, stateName)
stay
}
} }
} }
@ -274,7 +371,7 @@ class Node(system: ActorSystemImpl) extends Extension {
log.info("Node [{}] - Starting cluster Node...", remoteAddress) log.info("Node [{}] - Starting cluster Node...", remoteAddress)
// try to join the node defined in the 'akka.cluster.node-to-join' option // try to join the node defined in the 'akka.cluster.node-to-join' option
join() autoJoin()
// start periodic gossip to random nodes in cluster // start periodic gossip to random nodes in cluster
private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { private val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) {
@ -369,6 +466,7 @@ class Node(system: ActorSystemImpl) extends Extension {
// ======================================================== // ========================================================
/** /**
* State transition to JOINING.
* New node joining. * New node joining.
*/ */
@tailrec @tailrec
@ -397,6 +495,31 @@ class Node(system: ActorSystemImpl) extends Extension {
} }
} }
/**
* State transition to UP.
*/
private[cluster] final def up() {}
/**
* State transition to LEAVING.
*/
private[cluster] final def leaving() {}
/**
* State transition to EXITING.
*/
private[cluster] final def exiting() {}
/**
* State transition to REMOVED.
*/
private[cluster] final def removing() {}
/**
* State transition to DOWN.
*/
private[cluster] final def downing() {}
/** /**
* Receive new gossip. * Receive new gossip.
*/ */
@ -444,9 +567,9 @@ class Node(system: ActorSystemImpl) extends Extension {
/** /**
* Joins the pre-configured contact point and retrieves current gossip state. * Joins the pre-configured contact point and retrieves current gossip state.
*/ */
private def join() = nodeToJoin foreach { address private def autoJoin() = nodeToJoin foreach { address
val connection = clusterCommandConnectionFor(address) val connection = clusterCommandConnectionFor(address)
val command = Join(remoteAddress) val command = UserAction.Join(remoteAddress)
log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection) log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
connection ! command connection ! command
} }
@ -501,16 +624,18 @@ class Node(system: ActorSystemImpl) extends Extension {
} }
/** /**
* Switches the state in the FSM. * Switches the member status.
*
* @param newStatus the new member status
* @param oldState the state to change the member status in
* @return the updated new state with the new member status
*/ */
@tailrec private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
final private def switchStatusTo(newStatus: MemberStatus) {
log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus) log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
val localState = state.get val localSelf = state.self
val localSelf = localState.self
val localGossip = localState.latestGossip val localGossip = state.latestGossip
val localMembers = localGossip.members val localMembers = localGossip.members
val newSelf = localSelf copy (status = newStatus) val newSelf = localSelf copy (status = newStatus)
@ -526,9 +651,7 @@ class Node(system: ActorSystemImpl) extends Extension {
val versionedGossip = newGossip + vclockNode val versionedGossip = newGossip + vclockNode
val seenVersionedGossip = versionedGossip seen remoteAddress val seenVersionedGossip = versionedGossip seen remoteAddress
val newState = localState copy (self = newSelf, latestGossip = seenVersionedGossip) state copy (self = newSelf, latestGossip = seenVersionedGossip)
if (!state.compareAndSet(localState, newState)) switchStatusTo(newStatus) // recur if we failed update
} }
/** /**

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster package akka.cluster
import java.net.InetSocketAddress import java.net.InetSocketAddress

View file

@ -0,0 +1,156 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit._
import akka.actor.Address
class ClusterCommandDaemonFSMSpec extends AkkaSpec(
"""
akka {
actor {
provider = akka.remote.RemoteActorRefProvider
}
}
""") with ImplicitSender {
"A ClusterCommandDaemon FSM" must {
"start in Joining" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
}
"be able to switch from Joining to Up" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
}
"be able to switch from Up to Down" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Down
fsm.stateName must be(MemberStatus.Down)
}
"be able to switch from Up to Leaving" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Leave
fsm.stateName must be(MemberStatus.Leaving)
}
"be able to switch from Up to Exiting" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Exit
fsm.stateName must be(MemberStatus.Exiting)
}
"be able to switch from Up to Removed" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
}
"be able to switch from Leaving to Down" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Leave
fsm.stateName must be(MemberStatus.Leaving)
fsm ! UserAction.Down
fsm.stateName must be(MemberStatus.Down)
}
"be able to switch from Leaving to Removed" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Leave
fsm.stateName must be(MemberStatus.Leaving)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
}
"be able to switch from Exiting to Removed" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Exit
fsm.stateName must be(MemberStatus.Exiting)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
}
"be able to switch from Down to Removed" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Down
fsm.stateName must be(MemberStatus.Down)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
}
"not be able to switch from Removed to any other state" in {
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Removed)
fsm ! UserAction.Leave
fsm.stateName must be(MemberStatus.Removed)
fsm ! UserAction.Down
fsm.stateName must be(MemberStatus.Removed)
fsm ! UserAction.Exit
fsm.stateName must be(MemberStatus.Removed)
fsm ! LeaderAction.Remove
fsm.stateName must be(MemberStatus.Removed)
}
"remain in the same state when receiving a Join command" in {
val address = Address("akka", system.name)
val fsm = TestFSMRef(new ClusterCommandDaemon(system, system.node))
fsm.stateName must be(MemberStatus.Joining)
fsm ! UserAction.Join(address)
fsm.stateName must be(MemberStatus.Joining)
fsm ! LeaderAction.Up
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Join(address)
fsm.stateName must be(MemberStatus.Up)
fsm ! UserAction.Leave
fsm.stateName must be(MemberStatus.Leaving)
fsm ! UserAction.Join(address)
fsm.stateName must be(MemberStatus.Leaving)
fsm ! UserAction.Down
fsm.stateName must be(MemberStatus.Down)
fsm ! UserAction.Join(address)
fsm.stateName must be(MemberStatus.Down)
}
}
}

View file

@ -1,6 +1,7 @@
/** /**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.cluster package akka.cluster
import akka.testkit._ import akka.testkit._

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster package akka.cluster
import java.net.InetSocketAddress import java.net.InetSocketAddress

View file

@ -440,7 +440,7 @@ and in the following.
Event Tracing Event Tracing
------------- -------------
The setting ``akka.actor.debug.fsm`` in `:ref:`configuration` enables logging of an The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances:: event trace by :class:`LoggingFSM` instances::
class MyFSM extends Actor with LoggingFSM[X, Z] { class MyFSM extends Actor with LoggingFSM[X, Z] {