Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
a1ae775aa0
4 changed files with 293 additions and 1 deletions
59
akka-core/src/main/scala/actor/Fsm.scala
Normal file
59
akka-core/src/main/scala/actor/Fsm.scala
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
package actor
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Scheduler, Actor}
|
||||
import se.scalablesolutions.akka.stm.Ref
|
||||
import se.scalablesolutions.akka.stm.local._
|
||||
import java.util.concurrent.{ScheduledFuture, TimeUnit}
|
||||
|
||||
trait Fsm[S] {
|
||||
this: Actor =>
|
||||
|
||||
type StateFunction = scala.PartialFunction[Event, State]
|
||||
|
||||
var currentState: State = initialState
|
||||
var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None
|
||||
|
||||
def initialState: State
|
||||
|
||||
def handleEvent: StateFunction = {
|
||||
case event@Event(value, stateData) =>
|
||||
log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id)
|
||||
State(NextState, currentState.stateFunction, stateData, currentState.timeout)
|
||||
}
|
||||
|
||||
|
||||
override final protected def receive: Receive = {
|
||||
case value => {
|
||||
timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None}
|
||||
|
||||
val event = Event(value, currentState.stateData)
|
||||
val newState = (currentState.stateFunction orElse handleEvent).apply(event)
|
||||
|
||||
currentState = newState
|
||||
|
||||
newState match {
|
||||
case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue)
|
||||
case _ => () // ignore for now
|
||||
}
|
||||
|
||||
newState.timeout.foreach {
|
||||
timeout =>
|
||||
timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class State(stateEvent: StateEvent,
|
||||
stateFunction: StateFunction,
|
||||
stateData: S,
|
||||
timeout: Option[Int] = None,
|
||||
replyValue: Option[Any] = None)
|
||||
|
||||
case class Event(event: Any, stateData: S)
|
||||
|
||||
sealed trait StateEvent
|
||||
object NextState extends StateEvent
|
||||
object Reply extends StateEvent
|
||||
|
||||
object StateTimeout
|
||||
}
|
||||
|
|
@ -200,7 +200,7 @@ case class RemoteServerClientDisconnected(
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteServer extends Logging with ListenerManagement {
|
||||
val name = "RemoteServer@" + hostname + ":" + port
|
||||
def name = "RemoteServer@" + hostname + ":" + port
|
||||
|
||||
private[akka] var hostname = RemoteServer.HOSTNAME
|
||||
private[akka] var port = RemoteServer.PORT
|
||||
|
|
|
|||
81
akka-core/src/test/scala/actor/actor/FsmActorSpec.scala
Normal file
81
akka-core/src/test/scala/actor/actor/FsmActorSpec.scala
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.Test
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import actor.Fsm
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
object FsmActorSpec {
|
||||
|
||||
class Lock(code: String,
|
||||
timeout: Int,
|
||||
unlockedLatch: StandardLatch,
|
||||
lockedLatch: StandardLatch) extends Actor with Fsm[CodeState] {
|
||||
|
||||
def initialState = State(NextState, locked, CodeState("", code))
|
||||
|
||||
def locked: StateFunction = {
|
||||
case Event(digit: Char, CodeState(soFar, code)) => {
|
||||
soFar + digit match {
|
||||
case incomplete if incomplete.length < code.length =>
|
||||
State(NextState, locked, CodeState(incomplete, code))
|
||||
case codeTry if (codeTry == code) => {
|
||||
doUnlock
|
||||
State(NextState, open, CodeState("", code), Some(timeout))
|
||||
}
|
||||
case wrong => {
|
||||
log.error("Wrong code %s", wrong)
|
||||
State(NextState, locked, CodeState("", code))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def open: StateFunction = {
|
||||
case Event(StateTimeout, stateData) => {
|
||||
doLock
|
||||
State(NextState, locked, stateData)
|
||||
}
|
||||
}
|
||||
|
||||
private def doLock() {
|
||||
log.info("Locked")
|
||||
lockedLatch.open
|
||||
}
|
||||
|
||||
private def doUnlock = {
|
||||
log.info("Unlocked")
|
||||
unlockedLatch.open
|
||||
}
|
||||
}
|
||||
|
||||
case class CodeState(soFar: String, code: String)
|
||||
}
|
||||
|
||||
class FsmActorSpec extends JUnitSuite {
|
||||
import FsmActorSpec._
|
||||
|
||||
@Test
|
||||
def unlockTheLock = {
|
||||
val unlockedLatch = new StandardLatch
|
||||
val lockedLatch = new StandardLatch
|
||||
|
||||
// lock that locked after being open for 1 sec
|
||||
val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start
|
||||
|
||||
lock ! '3'
|
||||
lock ! '3'
|
||||
lock ! '2'
|
||||
lock ! '2'
|
||||
lock ! '1'
|
||||
|
||||
assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS))
|
||||
assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
package dining.hakkerz
|
||||
|
||||
import actor.Fsm
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, Actor}
|
||||
import Actor._
|
||||
|
||||
/*
|
||||
* Some messages for the chopstick
|
||||
*/
|
||||
sealed trait ChopstickMessage
|
||||
object Take extends ChopstickMessage
|
||||
object Put extends ChopstickMessage
|
||||
case class Taken(chopstick: ActorRef) extends ChopstickMessage
|
||||
case class Busy(chopstick: ActorRef) extends ChopstickMessage
|
||||
|
||||
/**
|
||||
* Some state container for the chopstick
|
||||
*/
|
||||
case class TakenBy(hakker: Option[ActorRef])
|
||||
|
||||
/*
|
||||
* A chopstick is an actor, it can be taken, and put back
|
||||
*/
|
||||
class Chopstick(name: String) extends Actor with Fsm[TakenBy] {
|
||||
self.id = name
|
||||
|
||||
// A chopstick begins its existence as available and taken by no one
|
||||
def initialState = State(NextState, available, TakenBy(None))
|
||||
|
||||
// When a chopstick is available, it can be taken by a some hakker
|
||||
def available: StateFunction = {
|
||||
case Event(Take, _) =>
|
||||
State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self)))
|
||||
}
|
||||
|
||||
// When a chopstick is taken by a hakker
|
||||
// It will refuse to be taken by other hakkers
|
||||
// But the owning hakker can put it back
|
||||
def taken: StateFunction = {
|
||||
case Event(Take, currentState) =>
|
||||
State(Reply, taken, currentState, replyValue = Some(Busy(self)))
|
||||
case Event(Put, TakenBy(hakker)) if self.sender == hakker =>
|
||||
State(NextState, available, TakenBy(None))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Some fsm hakker messages
|
||||
*/
|
||||
sealed trait FsmHakkerMessage
|
||||
object Think extends FsmHakkerMessage
|
||||
|
||||
/**
|
||||
* Some state container to keep track of which chopsticks we have
|
||||
*/
|
||||
case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef])
|
||||
|
||||
/*
|
||||
* A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-)
|
||||
*/
|
||||
class FsmHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with Fsm[TakenChopsticks] {
|
||||
self.id = name
|
||||
|
||||
//All hakkers start waiting
|
||||
def initialState = State(NextState, waiting, TakenChopsticks(None, None))
|
||||
|
||||
def waiting: StateFunction = {
|
||||
case Event(Think, _) =>
|
||||
log.info("%s starts to think", name)
|
||||
startThinking(5000)
|
||||
}
|
||||
|
||||
//When a hakker is thinking it can become hungry
|
||||
//and try to pick up its chopsticks and eat
|
||||
def thinking: StateFunction = {
|
||||
case Event(StateTimeout, current) =>
|
||||
left ! Take
|
||||
right ! Take
|
||||
State(NextState, hungry, current)
|
||||
}
|
||||
|
||||
// When a hakker is hungry it tries to pick up its chopsticks and eat
|
||||
// When it picks one up, it goes into wait for the other
|
||||
// If the hakkers first attempt at grabbing a chopstick fails,
|
||||
// it starts to wait for the response of the other grab
|
||||
def hungry: StateFunction = {
|
||||
case Event(Taken(`left`), _) =>
|
||||
State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None))
|
||||
case Event(Taken(`right`), _) =>
|
||||
State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right)))
|
||||
case Event(Busy(_), current) =>
|
||||
State(NextState, firstChopstickDenied, current)
|
||||
}
|
||||
|
||||
// When a hakker is waiting for the last chopstick it can either obtain it
|
||||
// and start eating, or the other chopstick was busy, and the hakker goes
|
||||
// back to think about how he should obtain his chopsticks :-)
|
||||
def waitForOtherChopstick: StateFunction = {
|
||||
case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right)
|
||||
case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right)
|
||||
case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) =>
|
||||
leftOption.foreach(_ ! Put)
|
||||
rightOption.foreach(_ ! Put)
|
||||
startThinking(10)
|
||||
}
|
||||
|
||||
private def startEating(left: ActorRef, right: ActorRef): State = {
|
||||
log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id)
|
||||
State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000))
|
||||
}
|
||||
|
||||
// When the results of the other grab comes back,
|
||||
// he needs to put it back if he got the other one.
|
||||
// Then go back and think and try to grab the chopsticks again
|
||||
def firstChopstickDenied: StateFunction = {
|
||||
case Event(Taken(secondChopstick), _) =>
|
||||
secondChopstick ! Put
|
||||
startThinking(10)
|
||||
case Event(Busy(chopstick), _) =>
|
||||
startThinking(10)
|
||||
}
|
||||
|
||||
// When a hakker is eating, he can decide to start to think,
|
||||
// then he puts down his chopsticks and starts to think
|
||||
def eating: StateFunction = {
|
||||
case Event(StateTimeout, _) =>
|
||||
log.info("%s puts down his chopsticks and starts to think", name)
|
||||
left ! Put
|
||||
right ! Put
|
||||
startThinking(5000)
|
||||
}
|
||||
|
||||
private def startThinking(period: Int): State = {
|
||||
State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Alright, here's our test-harness
|
||||
*/
|
||||
object DiningHakkersOnFsm {
|
||||
def run {
|
||||
// Create 5 chopsticks
|
||||
val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start
|
||||
// Create 5 awesome fsm hakkers and assign them their left and right chopstick
|
||||
val hakkers = for{
|
||||
(name, i) <- List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex
|
||||
} yield actorOf(new FsmHakker(name, chopsticks(i), chopsticks((i + 1) % 5))).start
|
||||
|
||||
hakkers.foreach(_ ! Think)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue