diff --git a/akka-actor/src/main/scala/actor/FSM.scala b/akka-actor/src/main/scala/actor/FSM.scala index 0bdc04fc48..eac861d358 100644 --- a/akka-actor/src/main/scala/actor/FSM.scala +++ b/akka-actor/src/main/scala/actor/FSM.scala @@ -4,58 +4,130 @@ package se.scalablesolutions.akka.actor -import se.scalablesolutions.akka.stm.Ref -import se.scalablesolutions.akka.stm.local._ - +import scala.collection.mutable import java.util.concurrent.{ScheduledFuture, TimeUnit} -trait FSM[S] { this: Actor => +trait FSM[S, D] { + this: Actor => type StateFunction = scala.PartialFunction[Event, State] - var currentState: State = initialState - var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + /** DSL */ + protected final def inState(stateName: S)(stateFunction: StateFunction) = { + register(stateName, stateFunction) + } - def initialState: State + protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = { + setState(State(stateName, stateData, timeout)) + } - def handleEvent: StateFunction = { - case event@Event(value, stateData) => - log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) - State(NextState, currentState.stateFunction, stateData, currentState.timeout) + protected final def goto(nextStateName: S): State = { + State(nextStateName, currentState.stateData) + } + + protected final def stay(): State = { + goto(currentState.stateName) + } + + protected final def stop(): State = { + stop(Normal) + } + + protected final def stop(reason: Reason): State = { + stop(reason, currentState.stateData) + } + + protected final def stop(reason: Reason, stateData: D): State = { + self ! Stop(reason, stateData) + stay + } + + def whenUnhandled(stateFunction: StateFunction) = { + handleEvent = stateFunction + } + + def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = { + terminateEvent = terminationHandler + } + + /** FSM State data and default handlers */ + private var currentState: State = _ + private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + private val transitions = mutable.Map[S, StateFunction]() + private def register(name: S, function: StateFunction) { + if (transitions contains name) { + transitions(name) = transitions(name) orElse function + } else { + transitions(name) = function + } + } + + private var handleEvent: StateFunction = { + case Event(value, stateData) => + log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName) + stay + } + + private var terminateEvent: PartialFunction[Reason, Unit] = { + case failure@Failure(_) => log.error("Stopping because of a %s", failure) + case reason => log.info("Stopping because of reason: %s", reason) } override final protected def receive: Receive = { + case Stop(reason, stateData) => + terminateEvent.apply(reason) + self.stop + case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) => + log.trace("Ignoring StateTimeout - ") + // state timeout when new message in queue, skip this timeout case value => { timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) + val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event) + setState(nextState) + } + } - currentState = newState - - newState match { - case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) - case _ => () // ignore for now - } - - newState.timeout.foreach { - timeout => - timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + private def setState(nextState: State) = { + if (!transitions.contains(nextState.stateName)) { + stop(Failure("Next state %s does not exist".format(nextState.stateName))) + } else { + currentState = nextState + currentState.timeout.foreach { + t => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS)) } } } - case class State(stateEvent: StateEvent, - stateFunction: StateFunction, - stateData: S, - timeout: Option[Int] = None, - replyValue: Option[Any] = None) + case class Event(event: Any, stateData: D) - case class Event(event: Any, stateData: S) + case class State(stateName: S, stateData: D, timeout: Option[Long] = None) { - sealed trait StateEvent - object NextState extends StateEvent - object Reply extends StateEvent + def until(timeout: Long): State = { + copy(timeout = Some(timeout)) + } - object StateTimeout + def replying(replyValue:Any): State = { + self.sender match { + case Some(sender) => sender ! replyValue + case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue) + } + this + } + + def using(nextStateDate: D): State = { + copy(stateData = nextStateDate) + } + } + + sealed trait Reason + case object Normal extends Reason + case object Shutdown extends Reason + case class Failure(cause: Any) extends Reason + + case object StateTimeout + + private case class Stop(reason: Reason, stateData: D) } diff --git a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala index e4515bd3da..dc6893c820 100644 --- a/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala +++ b/akka-actor/src/test/scala/actor/actor/FSMActorSpec.scala @@ -13,37 +13,57 @@ import java.util.concurrent.TimeUnit object FSMActorSpec { - class Lock(code: String, - timeout: Int, - unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends Actor with FSM[CodeState] { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + val unhandledLatch = new StandardLatch + val terminatedLatch = new StandardLatch - def initialState = State(NextState, locked, CodeState("", code)) + sealed trait LockState + case object Locked extends LockState + case object Open extends LockState - def locked: StateFunction = { + class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] { + + inState(Locked) { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) + stay using CodeState(incomplete, code) case codeTry if (codeTry == code) => { doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) + goto(Open) using CodeState("", code) until timeout } case wrong => { log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) + stay using CodeState("", code) } } } + case Event("hello", _) => stay replying "world" + case Event("bye", _) => stop(Shutdown) } - def open: StateFunction = { + inState(Open) { case Event(StateTimeout, stateData) => { doLock - State(NextState, locked, stateData) + goto(Locked) } } + setInitialState(Locked, CodeState("", code)) + + whenUnhandled { + case Event(_, stateData) => { + log.info("Unhandled") + unhandledLatch.open + stay + } + } + + onTermination { + case reason => terminatedLatch.open + } + private def doLock() { log.info("Locked") lockedLatch.open @@ -63,11 +83,9 @@ class FSMActorSpec extends JUnitSuite { @Test def unlockTheLock = { - val unlockedLatch = new StandardLatch - val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000)).start lock ! '3' lock ! '3' @@ -77,6 +95,25 @@ class FSMActorSpec extends JUnitSuite { assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + + lock ! "not_handled" + assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS)) + + val answerLatch = new StandardLatch + object Hello + object Bye + val tester = Actor.actorOf(new Actor { + protected def receive = { + case Hello => lock ! "hello" + case "world" => answerLatch.open + case Bye => lock ! "bye" + } + }).start + tester ! Hello + assert(answerLatch.tryAwait(2, TimeUnit.SECONDS)) + + tester ! Bye + assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS)) } } diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 8348de2134..ecb4d82ba0 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -12,6 +12,13 @@ object Put extends ChopstickMessage case class Taken(chopstick: ActorRef) extends ChopstickMessage case class Busy(chopstick: ActorRef) extends ChopstickMessage +/** + * Some states the chopstick can be in + */ +sealed trait ChopstickState +case object Available extends ChopstickState +case object Taken extends ChopstickState + /** * Some state container for the chopstick */ @@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Chopstick(name: String) extends Actor with FSM[TakenBy] { +class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] { self.id = name - // A chopstick begins its existence as available and taken by no one - def initialState = State(NextState, available, TakenBy(None)) - // When a chopstick is available, it can be taken by a some hakker - def available: StateFunction = { + inState(Available) { case Event(Take, _) => - State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + goto(Taken) using TakenBy(self.sender) replying Taken(self) } // When a chopstick is taken by a hakker // It will refuse to be taken by other hakkers // But the owning hakker can put it back - def taken: StateFunction = { + inState(Taken) { case Event(Take, currentState) => - State(Reply, taken, currentState, replyValue = Some(Busy(self))) + stay replying Busy(self) case Event(Put, TakenBy(hakker)) if self.sender == hakker => - State(NextState, available, TakenBy(None)) + goto(Available) using TakenBy(None) } + + // A chopstick begins its existence as available and taken by no one + setInitialState(Available, TakenBy(None)) } /** @@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[TakenBy] { sealed trait FSMHakkerMessage object Think extends FSMHakkerMessage +/** + * Some fsm hakker states + */ +sealed trait FSMHakkerState +case object Waiting extends FSMHakkerState +case object Thinking extends FSMHakkerState +case object Hungry extends FSMHakkerState +case object WaitForOtherChopstick extends FSMHakkerState +case object FirstChopstickDenied extends FSMHakkerState +case object Eating extends FSMHakkerState + /** * Some state container to keep track of which chopsticks we have */ @@ -57,13 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) /* * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) */ -class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] { +class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] { self.id = name - //All hakkers start waiting - def initialState = State(NextState, waiting, TakenChopsticks(None, None)) - - def waiting: StateFunction = { + inState(Waiting) { case Event(Think, _) => log.info("%s starts to think", name) startThinking(5000) @@ -71,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat - def thinking: StateFunction = { - case Event(StateTimeout, current) => + inState(Thinking) { + case Event(StateTimeout, _) => left ! Take right ! Take - State(NextState, hungry, current) + goto(Hungry) } // When a hakker is hungry it tries to pick up its chopsticks and eat // When it picks one up, it goes into wait for the other // If the hakkers first attempt at grabbing a chopstick fails, // it starts to wait for the response of the other grab - def hungry: StateFunction = { + inState(Hungry) { case Event(Taken(`left`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None) case Event(Taken(`right`), _) => - State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) - case Event(Busy(_), current) => - State(NextState, firstChopstickDenied, current) + goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right)) + case Event(Busy(_), _) => + goto(FirstChopstickDenied) } // When a hakker is waiting for the last chopstick it can either obtain it // and start eating, or the other chopstick was busy, and the hakker goes // back to think about how he should obtain his chopsticks :-) - def waitForOtherChopstick: StateFunction = { + inState(WaitForOtherChopstick) { case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => @@ -105,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit private def startEating(left: ActorRef, right: ActorRef): State = { log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) - State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000 } // When the results of the other grab comes back, // he needs to put it back if he got the other one. // Then go back and think and try to grab the chopsticks again - def firstChopstickDenied: StateFunction = { + inState(FirstChopstickDenied) { case Event(Taken(secondChopstick), _) => secondChopstick ! Put startThinking(10) @@ -121,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit // When a hakker is eating, he can decide to start to think, // then he puts down his chopsticks and starts to think - def eating: StateFunction = { + inState(Eating) { case Event(StateTimeout, _) => log.info("%s puts down his chopsticks and starts to think", name) left ! Put @@ -130,15 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit } private def startThinking(period: Int): State = { - State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + goto(Thinking) using TakenChopsticks(None, None) until period } + + //All hakkers start waiting + setInitialState(Waiting, TakenChopsticks(None, None)) } /* * Alright, here's our test-harness */ -object DiningHakkersOnFSM { - def run { +object DiningHakkersOnFsm { + + def run = { // Create 5 chopsticks val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick