diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala new file mode 100644 index 0000000000..df02a7b1d2 --- /dev/null +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -0,0 +1,59 @@ +package actor + +import se.scalablesolutions.akka.actor.{Scheduler, Actor} +import se.scalablesolutions.akka.stm.Ref +import se.scalablesolutions.akka.stm.local._ +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +trait Fsm[S] { + this: Actor => + + type StateFunction = scala.PartialFunction[Event, State] + + var currentState: State = initialState + var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None + + def initialState: State + + def handleEvent: StateFunction = { + case event@Event(value, stateData) => + log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) + State(NextState, currentState.stateFunction, stateData, currentState.timeout) + } + + + override final protected def receive: Receive = { + case value => { + timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} + + val event = Event(value, currentState.stateData) + val newState = (currentState.stateFunction orElse handleEvent).apply(event) + + currentState = newState + + newState match { + case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) + case _ => () // ignore for now + } + + newState.timeout.foreach { + timeout => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } + + case class State(stateEvent: StateEvent, + stateFunction: StateFunction, + stateData: S, + timeout: Option[Int] = None, + replyValue: Option[Any] = None) + + case class Event(event: Any, stateData: S) + + sealed trait StateEvent + object NextState extends StateEvent + object Reply extends StateEvent + + object StateTimeout +} diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 18937862b7..67ec0b282a 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -200,7 +200,7 @@ case class RemoteServerClientDisconnected( * @author Jonas Bonér */ class RemoteServer extends Logging with ListenerManagement { - val name = "RemoteServer@" + hostname + ":" + port + def name = "RemoteServer@" + hostname + ":" + port private[akka] var hostname = RemoteServer.HOSTNAME private[akka] var port = RemoteServer.PORT diff --git a/akka-core/src/test/scala/actor/actor/FsmActorSpec.scala b/akka-core/src/test/scala/actor/actor/FsmActorSpec.scala new file mode 100644 index 0000000000..7960d66247 --- /dev/null +++ b/akka-core/src/test/scala/actor/actor/FsmActorSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import actor.Fsm +import java.util.concurrent.TimeUnit + +object FsmActorSpec { + + class Lock(code: String, + timeout: Int, + unlockedLatch: StandardLatch, + lockedLatch: StandardLatch) extends Actor with Fsm[CodeState] { + + def initialState = State(NextState, locked, CodeState("", code)) + + def locked: StateFunction = { + case Event(digit: Char, CodeState(soFar, code)) => { + soFar + digit match { + case incomplete if incomplete.length < code.length => + State(NextState, locked, CodeState(incomplete, code)) + case codeTry if (codeTry == code) => { + doUnlock + State(NextState, open, CodeState("", code), Some(timeout)) + } + case wrong => { + log.error("Wrong code %s", wrong) + State(NextState, locked, CodeState("", code)) + } + } + } + } + + def open: StateFunction = { + case Event(StateTimeout, stateData) => { + doLock + State(NextState, locked, stateData) + } + } + + private def doLock() { + log.info("Locked") + lockedLatch.open + } + + private def doUnlock = { + log.info("Unlocked") + unlockedLatch.open + } + } + + case class CodeState(soFar: String, code: String) +} + +class FsmActorSpec extends JUnitSuite { + import FsmActorSpec._ + + @Test + def unlockTheLock = { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + + // lock that locked after being open for 1 sec + val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + + lock ! '3' + lock ! '3' + lock ! '2' + lock ! '2' + lock ! '1' + + assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + } +} + diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala new file mode 100644 index 0000000000..e4ccd219ed --- /dev/null +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -0,0 +1,152 @@ +package dining.hakkerz + +import actor.Fsm +import se.scalablesolutions.akka.actor.{ActorRef, Actor} +import Actor._ + +/* + * Some messages for the chopstick + */ +sealed trait ChopstickMessage +object Take extends ChopstickMessage +object Put extends ChopstickMessage +case class Taken(chopstick: ActorRef) extends ChopstickMessage +case class Busy(chopstick: ActorRef) extends ChopstickMessage + +/** + * Some state container for the chopstick + */ +case class TakenBy(hakker: Option[ActorRef]) + +/* + * A chopstick is an actor, it can be taken, and put back + */ +class Chopstick(name: String) extends Actor with Fsm[TakenBy] { + self.id = name + + // A chopstick begins its existence as available and taken by no one + def initialState = State(NextState, available, TakenBy(None)) + + // When a chopstick is available, it can be taken by a some hakker + def available: StateFunction = { + case Event(Take, _) => + State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + } + + // When a chopstick is taken by a hakker + // It will refuse to be taken by other hakkers + // But the owning hakker can put it back + def taken: StateFunction = { + case Event(Take, currentState) => + State(Reply, taken, currentState, replyValue = Some(Busy(self))) + case Event(Put, TakenBy(hakker)) if self.sender == hakker => + State(NextState, available, TakenBy(None)) + } +} + +/** + * Some fsm hakker messages + */ +sealed trait FsmHakkerMessage +object Think extends FsmHakkerMessage + +/** + * Some state container to keep track of which chopsticks we have + */ +case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) + +/* + * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) + */ +class FsmHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with Fsm[TakenChopsticks] { + self.id = name + + //All hakkers start waiting + def initialState = State(NextState, waiting, TakenChopsticks(None, None)) + + def waiting: StateFunction = { + case Event(Think, _) => + log.info("%s starts to think", name) + startThinking(5000) + } + + //When a hakker is thinking it can become hungry + //and try to pick up its chopsticks and eat + def thinking: StateFunction = { + case Event(StateTimeout, current) => + left ! Take + right ! Take + State(NextState, hungry, current) + } + + // When a hakker is hungry it tries to pick up its chopsticks and eat + // When it picks one up, it goes into wait for the other + // If the hakkers first attempt at grabbing a chopstick fails, + // it starts to wait for the response of the other grab + def hungry: StateFunction = { + case Event(Taken(`left`), _) => + State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + case Event(Taken(`right`), _) => + State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) + case Event(Busy(_), current) => + State(NextState, firstChopstickDenied, current) + } + + // When a hakker is waiting for the last chopstick it can either obtain it + // and start eating, or the other chopstick was busy, and the hakker goes + // back to think about how he should obtain his chopsticks :-) + def waitForOtherChopstick: StateFunction = { + case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) + case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) + case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => + leftOption.foreach(_ ! Put) + rightOption.foreach(_ ! Put) + startThinking(10) + } + + private def startEating(left: ActorRef, right: ActorRef): State = { + log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) + State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + } + + // When the results of the other grab comes back, + // he needs to put it back if he got the other one. + // Then go back and think and try to grab the chopsticks again + def firstChopstickDenied: StateFunction = { + case Event(Taken(secondChopstick), _) => + secondChopstick ! Put + startThinking(10) + case Event(Busy(chopstick), _) => + startThinking(10) + } + + // When a hakker is eating, he can decide to start to think, + // then he puts down his chopsticks and starts to think + def eating: StateFunction = { + case Event(StateTimeout, _) => + log.info("%s puts down his chopsticks and starts to think", name) + left ! Put + right ! Put + startThinking(5000) + } + + private def startThinking(period: Int): State = { + State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + } +} + +/* + * Alright, here's our test-harness + */ +object DiningHakkersOnFsm { + def run { + // Create 5 chopsticks + val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start + // Create 5 awesome fsm hakkers and assign them their left and right chopstick + val hakkers = for{ + (name, i) <- List("Ghosh", "BonĂ©r", "Klang", "Krasser", "Manie").zipWithIndex + } yield actorOf(new FsmHakker(name, chopsticks(i), chopsticks((i + 1) % 5))).start + + hakkers.foreach(_ ! Think) + } +} \ No newline at end of file