Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-10-27 18:03:14 +02:00
commit f6547b1b3b
3 changed files with 204 additions and 76 deletions

View file

@ -4,58 +4,130 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm.Ref
import se.scalablesolutions.akka.stm.local._
import scala.collection.mutable
import java.util.concurrent.{ScheduledFuture, TimeUnit}
trait FSM[S] { this: Actor =>
trait FSM[S, D] {
this: Actor =>
type StateFunction = scala.PartialFunction[Event, State]
var currentState: State = initialState
var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
/** DSL */
protected final def inState(stateName: S)(stateFunction: StateFunction) = {
register(stateName, stateFunction)
}
def initialState: State
protected final def setInitialState(stateName: S, stateData: D, timeout: Option[Long] = None) = {
setState(State(stateName, stateData, timeout))
}
def handleEvent: StateFunction = {
case event@Event(value, stateData) =>
log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id)
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
protected final def goto(nextStateName: S): State = {
State(nextStateName, currentState.stateData)
}
protected final def stay(): State = {
goto(currentState.stateName)
}
protected final def stop(): State = {
stop(Normal)
}
protected final def stop(reason: Reason): State = {
stop(reason, currentState.stateData)
}
protected final def stop(reason: Reason, stateData: D): State = {
self ! Stop(reason, stateData)
stay
}
def whenUnhandled(stateFunction: StateFunction) = {
handleEvent = stateFunction
}
def onTermination(terminationHandler: PartialFunction[Reason, Unit]) = {
terminateEvent = terminationHandler
}
/** FSM State data and default handlers */
private var currentState: State = _
private var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
private val transitions = mutable.Map[S, StateFunction]()
private def register(name: S, function: StateFunction) {
if (transitions contains name) {
transitions(name) = transitions(name) orElse function
} else {
transitions(name) = function
}
}
private var handleEvent: StateFunction = {
case Event(value, stateData) =>
log.warning("Event %s not handled in state %s, staying at current state", value, currentState.stateName)
stay
}
private var terminateEvent: PartialFunction[Reason, Unit] = {
case failure@Failure(_) => log.error("Stopping because of a %s", failure)
case reason => log.info("Stopping because of reason: %s", reason)
}
override final protected def receive: Receive = {
case Stop(reason, stateData) =>
terminateEvent.apply(reason)
self.stop
case StateTimeout if (self.dispatcher.mailboxSize(self) > 0) =>
log.trace("Ignoring StateTimeout - ")
// state timeout when new message in queue, skip this timeout
case value => {
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}
val event = Event(value, currentState.stateData)
val newState = (currentState.stateFunction orElse handleEvent).apply(event)
val nextState = (transitions(currentState.stateName) orElse handleEvent).apply(event)
setState(nextState)
}
}
currentState = newState
newState match {
case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue)
case _ => () // ignore for now
}
newState.timeout.foreach {
timeout =>
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS))
private def setState(nextState: State) = {
if (!transitions.contains(nextState.stateName)) {
stop(Failure("Next state %s does not exist".format(nextState.stateName)))
} else {
currentState = nextState
currentState.timeout.foreach {
t =>
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, t, TimeUnit.MILLISECONDS))
}
}
}
case class State(stateEvent: StateEvent,
stateFunction: StateFunction,
stateData: S,
timeout: Option[Int] = None,
replyValue: Option[Any] = None)
case class Event(event: Any, stateData: D)
case class Event(event: Any, stateData: S)
case class State(stateName: S, stateData: D, timeout: Option[Long] = None) {
sealed trait StateEvent
object NextState extends StateEvent
object Reply extends StateEvent
def until(timeout: Long): State = {
copy(timeout = Some(timeout))
}
object StateTimeout
def replying(replyValue:Any): State = {
self.sender match {
case Some(sender) => sender ! replyValue
case None => log.error("Unable to send reply value %s, no sender reference to reply to", replyValue)
}
this
}
def using(nextStateDate: D): State = {
copy(stateData = nextStateDate)
}
}
sealed trait Reason
case object Normal extends Reason
case object Shutdown extends Reason
case class Failure(cause: Any) extends Reason
case object StateTimeout
private case class Stop(reason: Reason, stateData: D)
}

View file

@ -13,37 +13,57 @@ import java.util.concurrent.TimeUnit
object FSMActorSpec {
class Lock(code: String,
timeout: Int,
unlockedLatch: StandardLatch,
lockedLatch: StandardLatch) extends Actor with FSM[CodeState] {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch
val unhandledLatch = new StandardLatch
val terminatedLatch = new StandardLatch
def initialState = State(NextState, locked, CodeState("", code))
sealed trait LockState
case object Locked extends LockState
case object Open extends LockState
def locked: StateFunction = {
class Lock(code: String, timeout: Int) extends Actor with FSM[LockState, CodeState] {
inState(Locked) {
case Event(digit: Char, CodeState(soFar, code)) => {
soFar + digit match {
case incomplete if incomplete.length < code.length =>
State(NextState, locked, CodeState(incomplete, code))
stay using CodeState(incomplete, code)
case codeTry if (codeTry == code) => {
doUnlock
State(NextState, open, CodeState("", code), Some(timeout))
goto(Open) using CodeState("", code) until timeout
}
case wrong => {
log.error("Wrong code %s", wrong)
State(NextState, locked, CodeState("", code))
stay using CodeState("", code)
}
}
}
case Event("hello", _) => stay replying "world"
case Event("bye", _) => stop(Shutdown)
}
def open: StateFunction = {
inState(Open) {
case Event(StateTimeout, stateData) => {
doLock
State(NextState, locked, stateData)
goto(Locked)
}
}
setInitialState(Locked, CodeState("", code))
whenUnhandled {
case Event(_, stateData) => {
log.info("Unhandled")
unhandledLatch.open
stay
}
}
onTermination {
case reason => terminatedLatch.open
}
private def doLock() {
log.info("Locked")
lockedLatch.open
@ -63,11 +83,9 @@ class FSMActorSpec extends JUnitSuite {
@Test
def unlockTheLock = {
val unlockedLatch = new StandardLatch
val lockedLatch = new StandardLatch
// lock that locked after being open for 1 sec
val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start
val lock = Actor.actorOf(new Lock("33221", 1000)).start
lock ! '3'
lock ! '3'
@ -77,6 +95,25 @@ class FSMActorSpec extends JUnitSuite {
assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
lock ! "not_handled"
assert(unhandledLatch.tryAwait(2, TimeUnit.SECONDS))
val answerLatch = new StandardLatch
object Hello
object Bye
val tester = Actor.actorOf(new Actor {
protected def receive = {
case Hello => lock ! "hello"
case "world" => answerLatch.open
case Bye => lock ! "bye"
}
}).start
tester ! Hello
assert(answerLatch.tryAwait(2, TimeUnit.SECONDS))
tester ! Bye
assert(terminatedLatch.tryAwait(2, TimeUnit.SECONDS))
}
}

View file

@ -12,6 +12,13 @@ object Put extends ChopstickMessage
case class Taken(chopstick: ActorRef) extends ChopstickMessage
case class Busy(chopstick: ActorRef) extends ChopstickMessage
/**
* Some states the chopstick can be in
*/
sealed trait ChopstickState
case object Available extends ChopstickState
case object Taken extends ChopstickState
/**
* Some state container for the chopstick
*/
@ -20,27 +27,27 @@ case class TakenBy(hakker: Option[ActorRef])
/*
* A chopstick is an actor, it can be taken, and put back
*/
class Chopstick(name: String) extends Actor with FSM[TakenBy] {
class Chopstick(name: String) extends Actor with FSM[ChopstickState, TakenBy] {
self.id = name
// A chopstick begins its existence as available and taken by no one
def initialState = State(NextState, available, TakenBy(None))
// When a chopstick is available, it can be taken by a some hakker
def available: StateFunction = {
inState(Available) {
case Event(Take, _) =>
State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self)))
goto(Taken) using TakenBy(self.sender) replying Taken(self)
}
// When a chopstick is taken by a hakker
// It will refuse to be taken by other hakkers
// But the owning hakker can put it back
def taken: StateFunction = {
inState(Taken) {
case Event(Take, currentState) =>
State(Reply, taken, currentState, replyValue = Some(Busy(self)))
stay replying Busy(self)
case Event(Put, TakenBy(hakker)) if self.sender == hakker =>
State(NextState, available, TakenBy(None))
goto(Available) using TakenBy(None)
}
// A chopstick begins its existence as available and taken by no one
setInitialState(Available, TakenBy(None))
}
/**
@ -49,6 +56,17 @@ class Chopstick(name: String) extends Actor with FSM[TakenBy] {
sealed trait FSMHakkerMessage
object Think extends FSMHakkerMessage
/**
* Some fsm hakker states
*/
sealed trait FSMHakkerState
case object Waiting extends FSMHakkerState
case object Thinking extends FSMHakkerState
case object Hungry extends FSMHakkerState
case object WaitForOtherChopstick extends FSMHakkerState
case object FirstChopstickDenied extends FSMHakkerState
case object Eating extends FSMHakkerState
/**
* Some state container to keep track of which chopsticks we have
*/
@ -57,13 +75,10 @@ case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef])
/*
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
*/
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[TakenChopsticks] {
class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with FSM[FSMHakkerState, TakenChopsticks] {
self.id = name
//All hakkers start waiting
def initialState = State(NextState, waiting, TakenChopsticks(None, None))
def waiting: StateFunction = {
inState(Waiting) {
case Event(Think, _) =>
log.info("%s starts to think", name)
startThinking(5000)
@ -71,30 +86,30 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
//When a hakker is thinking it can become hungry
//and try to pick up its chopsticks and eat
def thinking: StateFunction = {
case Event(StateTimeout, current) =>
inState(Thinking) {
case Event(StateTimeout, _) =>
left ! Take
right ! Take
State(NextState, hungry, current)
goto(Hungry)
}
// When a hakker is hungry it tries to pick up its chopsticks and eat
// When it picks one up, it goes into wait for the other
// If the hakkers first attempt at grabbing a chopstick fails,
// it starts to wait for the response of the other grab
def hungry: StateFunction = {
inState(Hungry) {
case Event(Taken(`left`), _) =>
State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None))
goto(WaitForOtherChopstick) using TakenChopsticks(Some(left), None)
case Event(Taken(`right`), _) =>
State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right)))
case Event(Busy(_), current) =>
State(NextState, firstChopstickDenied, current)
goto(WaitForOtherChopstick) using TakenChopsticks(None, Some(right))
case Event(Busy(_), _) =>
goto(FirstChopstickDenied)
}
// When a hakker is waiting for the last chopstick it can either obtain it
// and start eating, or the other chopstick was busy, and the hakker goes
// back to think about how he should obtain his chopsticks :-)
def waitForOtherChopstick: StateFunction = {
inState(WaitForOtherChopstick) {
case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right)
case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right)
case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) =>
@ -105,13 +120,13 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
private def startEating(left: ActorRef, right: ActorRef): State = {
log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id)
State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000))
goto(Eating) using TakenChopsticks(Some(left), Some(right)) until 5000
}
// When the results of the other grab comes back,
// he needs to put it back if he got the other one.
// Then go back and think and try to grab the chopsticks again
def firstChopstickDenied: StateFunction = {
inState(FirstChopstickDenied) {
case Event(Taken(secondChopstick), _) =>
secondChopstick ! Put
startThinking(10)
@ -121,7 +136,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
// When a hakker is eating, he can decide to start to think,
// then he puts down his chopsticks and starts to think
def eating: StateFunction = {
inState(Eating) {
case Event(StateTimeout, _) =>
log.info("%s puts down his chopsticks and starts to think", name)
left ! Put
@ -130,15 +145,19 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
}
private def startThinking(period: Int): State = {
State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period))
goto(Thinking) using TakenChopsticks(None, None) until period
}
//All hakkers start waiting
setInitialState(Waiting, TakenChopsticks(None, None))
}
/*
* Alright, here's our test-harness
*/
object DiningHakkersOnFSM {
def run {
object DiningHakkersOnFsm {
def run = {
// Create 5 chopsticks
val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start
// Create 5 awesome fsm hakkers and assign them their left and right chopstick