From b0d0b27604dc2ce29c2f8e7cee0a5911fb987510 Mon Sep 17 00:00:00 2001 From: imn Date: Tue, 26 Oct 2010 19:58:14 +0200 Subject: [PATCH 1/3] refactoring the FSM part --- akka-actor/src/main/scala/actor/FSM.scala | 135 +++++++++++++----- .../test/scala/actor/actor/FSMActorSpec.scala | 51 +++++-- .../src/main/scala/DiningHakkersOnFsm.scala | 57 ++++---- 3 files changed, 167 insertions(+), 76 deletions(-) diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index 0bdc04fc48..c5eb00a6fd 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -4,58 +4,125 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.stm.Ref -import se.scalablesolutions.akka.stm.local._ - +import scala.collection.mutable import java.util.concurrent.{ScheduledFuture, TimeUnit} -trait FSM[S] { this: Actor => +trait FSM[S, D] { + this: Actor => type StateFunction = scala.PartialFunction[Event, State] - var currentState: State = initialState - var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None - def initialState: State + private val transitions = mutable.Map[S, StateFunction]() - def handleEvent: StateFunction = { - case event@Event(value, stateData) => - log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) - State(NextState, currentState.stateFunction, stateData, currentState.timeout) + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + + protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { + setState(State(stateName, stateData, timeout)) + } + + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) + } + + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } + + protected final def stay(): State = { + goto(currentState.stateName) + } + + protected final def reply(replyValue: Any): State = { + self.sender.foreach(_ ! replyValue) + stay() + } + + /** + * Stop + */ + protected final def stop(): State = { + stop(Normal) + } + + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + protected final def stop(reason: Reason, stateData: D): State = { + log.info("Stopped because of reason: %s", reason) + terminate(reason, currentState.stateName, stateData) + self.stop + State(currentState.stateName, stateData) + } + + def terminate(reason: Reason, stateName: S, stateData: D) = () + + def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction + } + + private var handleEvent: StateFunction = { + case Event(value, stateData) => + log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData) + currentState } override final protected def receive: Receive = { + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => () + // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) - - currentState = newState - - newState match { - case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) - case _ => () // ignore for now - } - - newState.timeout.foreach { - timeout => - timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) + if (self.isRunning) { + setState(nextState) } } } - case class State(stateEvent: StateEvent, - stateFunction: StateFunction, - stateData: S, - timeout: Option[Int] = None, - replyValue: Option[Any] = None) + private def setState(nextState: State) = { + if (!transitions.contains(nextState.stateName)) { + stop(Failure("Next state %s not available".format(nextState.stateName))) + } else { + currentState = nextState + currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))} + } + } - case class Event(event: Any, stateData: S) + case class Event(event: Any, stateData: D) - sealed trait StateEvent - object NextState extends StateEvent - object Reply extends StateEvent + case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { + def until(timeout: Long): State = { + copy(timeout = Some(timeout)) + } - object StateTimeout + def then(nextStateName: S): State = { + copy(stateName = nextStateName) + } + + def replying(replyValue:Any): State = { + self.sender.foreach(_ ! replyValue) + this + } + + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + } + + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason + + case object StateTimeout } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index e4515bd3da..496d9e9e01 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -13,34 +13,44 @@ import java.util.concurrent.TimeUnit object FSMActorSpec { - class Lock(code: String, - timeout: Int, - unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends Actor with FSM[CodeState] { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + val unhandledLatch = new StandardLatch - def initialState = State(NextState, locked, CodeState("", code)) + class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] { - def locked: StateFunction = { + inState("locked") { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) + stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) + goto("open") using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) + stay using CodeState("", code) } } } + case Event("hello", _) => stay replying "world" } - def open: StateFunction = { + inState("open") { case Event(StateTimeout, stateData) => { doLock - State(NextState, locked, stateData) + goto("locked") + } + } + + setInitialState("locked", CodeState("", code)) + + whenUnhandled { + case Event(_, stateData) => { + log.info("Unhandled") + unhandledLatch.open + stay } } @@ -63,11 +73,9 @@ class FSMActorSpec extends JUnitSuite { @Test def unlockTheLock = { - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000)).start lock ! '3' lock ! '3' @@ -77,6 +85,21 @@ class FSMActorSpec extends JUnitSuite { assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + + lock ! "not_handled" + assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + + val answerLatch = new StandardLatch + object Go + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Go => lock ! "hello" + case "world" => answerLatch.open + + } + }).start + tester ! Go + assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 8348de2134..9ab27d4fbb 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -20,27 +20,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[TakenBy] { +class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { self.id = name - // A chopstick begins its existence as available and taken by no one - def initialState = State(NextState, available, TakenBy(None)) - // When a chopstick is available, it can be taken by a some hakker - def available: StateFunction = { + inState("available") { case Event(Take, _) => - State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + goto("taken") using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - def taken: StateFunction = { + inState("taken") { case Event(Take, currentState) => - State(Reply, taken, currentState, replyValue = Some(Busy(self))) + stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - State(NextState, available, TakenBy(None)) + goto("available") using TakenBy(None) } + + // A chopstick begins its existence as available and taken by no one + setInitialState("available", TakenBy(None)) } /** @@ -57,13 +57,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] { self.id = name - //All hakkers start waiting - def initialState = State(NextState, waiting, TakenChopsticks(None, None)) - - def waiting: StateFunction = { + inState("waiting") { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -71,30 +68,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - def thinking: StateFunction = { - case Event(StateTimeout, current) => + inState("thinking") { + case Event(StateTimeout, _) => left ! Take right ! Take - State(NextState, hungry, current) + goto("hungry") } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - def hungry: StateFunction = { + inState("hungry") { case Event(Taken(`left`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) - case Event(Busy(_), current) => - State(NextState, firstChopstickDenied, current) + goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right)) + case Event(Busy(_), _) => + goto("firstChopstickDenied") } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - def waitForOtherChopstick: StateFunction = { + inState("waitForOtherChopstick") { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -105,13 +102,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - def firstChopstickDenied: StateFunction = { + inState("firstChopstickDenied") { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -121,7 +118,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - def eating: StateFunction = { + inState("eating") { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -130,15 +127,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + goto("thinking") using TakenChopsticks(None, None) until period } + + //All hakkers start waiting + setInitialState("waiting", TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ object DiningHakkersOnFSM { - def run { + def main(args: Array[String]) { + // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick From efb99fc3a7e2d723d01722165608c82df15563e0 Mon Sep 17 00:00:00 2001 From: imn Date: Wed, 27 Oct 2010 11:41:35 +0200 Subject: [PATCH 2/3] use nice case objects for the states :-) --- .../test/scala/actor/actor/FSMActorSpec.scala | 16 +++-- .../src/main/scala/DiningHakkersOnFsm.scala | 62 ++++++++++++------- 2 files changed, 50 insertions(+), 28 deletions(-) diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index 496d9e9e01..8646dd5561 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -17,16 +17,20 @@ object FSMActorSpec { val lockedLatch = new StandardLatch val unhandledLatch = new StandardLatch - class Lock(code: String, timeout: Int) extends Actor with FSM[String, CodeState] { + sealed trait LockState + case object Locked extends LockState + case object Open extends LockState - inState("locked") { + class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] { + + inState(Locked) { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - goto("open") using CodeState("", code) until timeout + goto(Open) using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) @@ -37,14 +41,14 @@ object FSMActorSpec { case Event("hello", _) => stay replying "world" } - inState("open") { + inState(Open) { case Event(StateTimeout, stateData) => { doLock - goto("locked") + goto(Locked) } } - setInitialState("locked", CodeState("", code)) + setInitialState(Locked, CodeState("", code)) whenUnhandled { case Event(_, stateData) => { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 9ab27d4fbb..ecb4d82ba0 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -12,6 +12,13 @@ object Put extends ChopstickMessage case class Taken(chopstick: ActorRef) extends ChopstickMessage case class Busy(chopstick: ActorRef) extends ChopstickMessage +/** + * Some states the chopstick can be in + */ +sealed trait ChopstickState +case object Available extends ChopstickState +case object Taken extends ChopstickState + /** * Some state container for the chopstick */ @@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { +class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { self.id = name // When a chopstick is available, it can be taken by a some hakker - inState("available") { + inState(Available) { case Event(Take, _) => - goto("taken") using TakenBy(self.sender) replying Taken(self) + goto(Taken) using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - inState("taken") { + inState(Taken) { case Event(Take, currentState) => stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - goto("available") using TakenBy(None) + goto(Available) using TakenBy(None) } // A chopstick begins its existence as available and taken by no one - setInitialState("available", TakenBy(None)) + setInitialState(Available, TakenBy(None)) } /** @@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[String, TakenBy] { sealed trait FSMHakkerMessage object Think extends FSMHakkerMessage +/** + * Some fsm hakker states + */ +sealed trait FSMHakkerState +case object Waiting extends FSMHakkerState +case object Thinking extends FSMHakkerState +case object Hungry extends FSMHakkerState +case object WaitForOtherChopstick extends FSMHakkerState +case object FirstChopstickDenied extends FSMHakkerState +case object Eating extends FSMHakkerState + /** * Some state container to keep track of which chopsticks we have */ @@ -57,10 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[String, TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { self.id = name - inState("waiting") { + inState(Waiting) { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -68,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - inState("thinking") { + inState(Thinking) { case Event(StateTimeout, _) => left ! Take right ! Take - goto("hungry") + goto(Hungry) } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - inState("hungry") { + inState(Hungry) { case Event(Taken(`left`), _) => - goto("waitForOtherChopstick") using TakenChopsticks(Some(left), None) + goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - goto("waitForOtherChopstick") using TakenChopsticks(None, Some(right)) + goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right)) case Event(Busy(_), _) => - goto("firstChopstickDenied") + goto(FirstChopstickDenied) } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - inState("waitForOtherChopstick") { + inState(WaitForOtherChopstick) { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -102,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - goto("eating") using TakenChopsticks(Some(left), Some(right)) until 5000 + goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - inState("firstChopstickDenied") { + inState(FirstChopstickDenied) { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -118,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - inState("eating") { + inState(Eating) { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -127,19 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - goto("thinking") using TakenChopsticks(None, None) until period + goto(Thinking) using TakenChopsticks(None, None) until period } //All hakkers start waiting - setInitialState("waiting", TakenChopsticks(None, None)) + setInitialState(Waiting, TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ -object DiningHakkersOnFSM { - def main(args: Array[String]) { +object DiningHakkersOnFsm { + def run = { // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick From 847e0c7b044ddefdeba2e625912980d7ac8d6811 Mon Sep 17 00:00:00 2001 From: imn Date: Wed, 27 Oct 2010 15:45:30 +0200 Subject: [PATCH 3/3] polishing up code --- akka-actor/src/main/scala/actor/FSM.scala | 89 ++++++++++--------- .../test/scala/actor/actor/FSMActorSpec.scala | 18 +++- 2 files changed, 61 insertions(+), 46 deletions(-) diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index c5eb00a6fd..eac861d358 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -12,27 +12,15 @@ trait FSM[S, D] { type StateFunction = scala.PartialFunction[Event, State] - private var currentState: State = _ - private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None - - private val transitions = mutable.Map[S, StateFunction]() - - private def register(name: S, function: StateFunction) { - if (transitions contains name) { - transitions(name) = transitions(name) orElse function - } else { - transitions(name) = function - } + /** DSL */ + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) } protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { setState(State(stateName, stateData, timeout)) } - protected final def inState(stateName: S)(stateFunction: StateFunction) = { - register(stateName, stateFunction) - } - protected final def goto(nextStateName: S): State = { State(nextStateName, currentState.stateData) } @@ -41,14 +29,6 @@ trait FSM[S, D] { goto(currentState.stateName) } - protected final def reply(replyValue: Any): State = { - self.sender.foreach(_ ! replyValue) - stay() - } - - /** - * Stop - */ protected final def stop(): State = { stop(Normal) } @@ -58,59 +38,82 @@ trait FSM[S, D] { } protected final def stop(reason: Reason, stateData: D): State = { - log.info("Stopped because of reason: %s", reason) - terminate(reason, currentState.stateName, stateData) - self.stop - State(currentState.stateName, stateData) + self ! Stop(reason, stateData) + stay } - def terminate(reason: Reason, stateName: S, stateData: D) = () - def whenUnhandled(stateFunction: StateFunction) = { handleEvent = stateFunction } + def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = { + terminateEvent = terminationHandler + } + + /** FSM State data and default handlers */ + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + private val transitions = mutable.Map[S, StateFunction]() + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + private var handleEvent: StateFunction = { case Event(value, stateData) => - log.warning("Event %s not handled in state %s - keeping current state with data %s", value, currentState.stateName, stateData) - currentState + log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName) + stay + } + + private var terminateEvent: PartialFunction[Reason, Unit] = { + case failure@Failure(_) => log.error("Stopping because of a %s", failure) + case reason => log.info("Stopping because of reason: %s", reason) } override final protected def receive: Receive = { - case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => () + case Stop(reason, stateData) => + terminateEvent.apply(reason) + self.stop + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => + log.trace("Ignoring StateTimeout - ") // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} val event = Event(value, currentState.stateData) val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) - if (self.isRunning) { - setState(nextState) - } + setState(nextState) } } private def setState(nextState: State) = { if (!transitions.contains(nextState.stateName)) { - stop(Failure("Next state %s not available".format(nextState.stateName))) + stop(Failure("Next state %s does not exist".format(nextState.stateName))) } else { currentState = nextState - currentState.timeout.foreach {t => timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))} + currentState.timeout.foreach { + t => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS)) + } } } case class Event(event: Any, stateData: D) case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { + def until(timeout: Long): State = { copy(timeout = Some(timeout)) } - def then(nextStateName: S): State = { - copy(stateName = nextStateName) - } - def replying(replyValue:Any): State = { - self.sender.foreach(_ ! replyValue) + self.sender match { + case Some(sender) => sender ! replyValue + case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue) + } this } @@ -125,4 +128,6 @@ trait FSM[S, D] { case class Failure(cause: Any) extends Reason case object StateTimeout + + private case class Stop(reason: Reason, stateData: D) } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index 8646dd5561..dc6893c820 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -16,6 +16,7 @@ object FSMActorSpec { val unlockedLatch = new StandardLatch val lockedLatch = new StandardLatch val unhandledLatch = new StandardLatch + val terminatedLatch = new StandardLatch sealed trait LockState case object Locked extends LockState @@ -39,6 +40,7 @@ object FSMActorSpec { } } case Event("hello", _) => stay replying "world" + case Event("bye", _) => stop(Shutdown) } inState(Open) { @@ -58,6 +60,10 @@ object FSMActorSpec { } } + onTermination { + case reason => terminatedLatch.open + } + private def doLock() { log.info("Locked") lockedLatch.open @@ -94,16 +100,20 @@ class FSMActorSpec extends JUnitSuite { assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) val answerLatch = new StandardLatch - object Go + object Hello + object Bye val tester = Actor.actorOf(new Actor { protected def receive = { - case Go => lock ! "hello" + case Hello => lock ! "hello" case "world" => answerLatch.open - + case Bye => lock ! "bye" } }).start - tester ! Go + tester ! Hello assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + + tester ! Bye + assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) } }