From b5bf62ded95eac1c005efc9e0cc0f0259c9d2d11 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 19 Jul 2010 16:08:25 +0200 Subject: [PATCH 01/12] initial idea for FSM --- akka-core/src/main/scala/actor/Actor.scala | 34 +++++++++ akka-core/src/test/scala/FsmActorSpec.scala | 81 +++++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 akka-core/src/test/scala/FsmActorSpec.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 76adf9c729..5630fe3a24 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -25,6 +25,40 @@ trait Transactor extends Actor { self.makeTransactionRequired } +trait FsmActor[S] extends Actor { + + type State = scala.PartialFunction[Event, NextState] + + @volatile var currentState: NextState = initialState + @volatile var timeoutActor: Option[ActorRef] = None + + def initialState: NextState + + def handleEvent: State = { + case event@Event(value,stateData) => + log.warning("No state for event with value %s - keeping current state %s", value, stateData) + NextState(currentState.state, stateData, currentState.timeout) + } + + protected def receive = { + case value => { + timeoutActor.foreach{ref => Scheduler.unschedule(ref); timeoutActor = None } + + val event = Event(value, currentState.stateData) + currentState = (currentState.state orElse handleEvent).apply(event) + + currentState.timeout.foreach{timeout => + timeoutActor = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } + + case class NextState(state: State, stateData: S, timeout: Option[Int] = None) + case class Event(event: Any, stateData: S) + object StateTimeout +} + + /** * Extend this abstract class to create a remote actor. *

diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala new file mode 100644 index 0000000000..3a3a60162e --- /dev/null +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import org.multiverse.api.latches.StandardLatch +import Actor._ +import java.util.concurrent.TimeUnit + +object FsmActorSpec { + + class Lock(code: String, + timeout: Int, + unlockedLatch: StandardLatch, + lockedLatch: StandardLatch) extends FsmActor[CodeState] { + + def initialState = NextState(locked, CodeState("", "33221")) + + def locked: State = { + case Event(digit: Char, CodeState(soFar, code)) => { + soFar + digit match { + case incomplete if incomplete.length < code.length => + NextState(locked, CodeState(incomplete, code)) + case codeTry if (codeTry == code) => { + doUnlock + NextState(open, CodeState("", code), Some(timeout)) + } + case wrong => { + log.error("Wrong code %s", wrong) + NextState(locked, CodeState("", code)) + } + } + } + } + + def open: State = { + case Event(StateTimeout, stateData) => { + doLock + NextState(locked, stateData) + } + } + + private def doLock() { + log.info("Locked") + lockedLatch.open + } + + private def doUnlock = { + log.info("Unlocked") + unlockedLatch.open + } + } + + case class CodeState(soFar: String, code: String) +} + +class FsmActorSpec extends JUnitSuite { + import FsmActorSpec._ + + @Test + def unlockTheLock = { + val unlockedLatch = new StandardLatch + val lockedLatch = new StandardLatch + + // lock that locked after being open for 1 sec + val lock = actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + + lock ! '3' + lock ! '3' + lock ! '2' + lock ! '2' + lock ! '1' + + assert(unlockedLatch.tryAwait(1, TimeUnit.SECONDS)) + assert(lockedLatch.tryAwait(2, TimeUnit.SECONDS)) + } +} + From 2af5fda73edcc8b52fbbba53cb33b32bdf4942a8 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 19 Jul 2010 16:28:35 +0200 Subject: [PATCH 02/12] refactor fsm --- akka-core/src/main/scala/actor/Actor.scala | 34 ------------------ akka-core/src/main/scala/actor/Fsm.scala | 38 +++++++++++++++++++++ akka-core/src/test/scala/FsmActorSpec.scala | 8 ++--- 3 files changed, 42 insertions(+), 38 deletions(-) create mode 100644 akka-core/src/main/scala/actor/Fsm.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 5630fe3a24..76adf9c729 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -25,40 +25,6 @@ trait Transactor extends Actor { self.makeTransactionRequired } -trait FsmActor[S] extends Actor { - - type State = scala.PartialFunction[Event, NextState] - - @volatile var currentState: NextState = initialState - @volatile var timeoutActor: Option[ActorRef] = None - - def initialState: NextState - - def handleEvent: State = { - case event@Event(value,stateData) => - log.warning("No state for event with value %s - keeping current state %s", value, stateData) - NextState(currentState.state, stateData, currentState.timeout) - } - - protected def receive = { - case value => { - timeoutActor.foreach{ref => Scheduler.unschedule(ref); timeoutActor = None } - - val event = Event(value, currentState.stateData) - currentState = (currentState.state orElse handleEvent).apply(event) - - currentState.timeout.foreach{timeout => - timeoutActor = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) - } - } - } - - case class NextState(state: State, stateData: S, timeout: Option[Int] = None) - case class Event(event: Any, stateData: S) - object StateTimeout -} - - /** * Extend this abstract class to create a remote actor. *

diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala new file mode 100644 index 0000000000..b7cc457c53 --- /dev/null +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -0,0 +1,38 @@ +package actor + +import java.util.concurrent.TimeUnit +import se.scalablesolutions.akka.actor.{ActorRef, Scheduler, Actor} + +trait Fsm[S] { self: Actor => + + type State = scala.PartialFunction[Event, NextState] + + @volatile var currentState: NextState = initialState + @volatile var timeoutActor: Option[ActorRef] = None + + def initialState: NextState + + def handleEvent: State = { + case event@Event(value,stateData) => + log.warning("No state for event with value %s - keeping current state %s", value, stateData) + NextState(currentState.state, stateData, currentState.timeout) + } + + + override protected def receive: Receive = { + case value => { + timeoutActor.foreach{ref => Scheduler.unschedule(ref); timeoutActor = None } + + val event = Event(value, currentState.stateData) + currentState = (currentState.state orElse handleEvent).apply(event) + + currentState.timeout.foreach{timeout => + timeoutActor = Some(Scheduler.scheduleOnce(this.self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + } + } + } + + case class NextState(state: State, stateData: S, timeout: Option[Int] = None) + case class Event(event: Any, stateData: S) +} +object StateTimeout diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala index 3a3a60162e..06bf4d301b 100644 --- a/akka-core/src/test/scala/FsmActorSpec.scala +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import org.multiverse.api.latches.StandardLatch -import Actor._ +import actor.{StateTimeout, Fsm} import java.util.concurrent.TimeUnit object FsmActorSpec { @@ -15,7 +15,7 @@ object FsmActorSpec { class Lock(code: String, timeout: Int, unlockedLatch: StandardLatch, - lockedLatch: StandardLatch) extends FsmActor[CodeState] { + lockedLatch: StandardLatch) extends Actor with Fsm[CodeState] { def initialState = NextState(locked, CodeState("", "33221")) @@ -26,7 +26,7 @@ object FsmActorSpec { NextState(locked, CodeState(incomplete, code)) case codeTry if (codeTry == code) => { doUnlock - NextState(open, CodeState("", code), Some(timeout)) + new NextState(open, CodeState("", code), Some(timeout)) } case wrong => { log.error("Wrong code %s", wrong) @@ -66,7 +66,7 @@ class FsmActorSpec extends JUnitSuite { val lockedLatch = new StandardLatch // lock that locked after being open for 1 sec - val lock = actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start + val lock = Actor.actorOf(new Lock("33221", 1000, unlockedLatch, lockedLatch)).start lock ! '3' lock ! '3' From 8a750652aea8478e22c8451c5ae1c145760b7a2a Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 19 Jul 2010 18:01:35 +0200 Subject: [PATCH 03/12] foreach -> flatMap --- akka-core/src/main/scala/actor/Fsm.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index b7cc457c53..e493758c0b 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -21,7 +21,7 @@ trait Fsm[S] { self: Actor => override protected def receive: Receive = { case value => { - timeoutActor.foreach{ref => Scheduler.unschedule(ref); timeoutActor = None } + timeoutActor = timeoutActor flatMap { ref => Scheduler.unschedule(ref); None } val event = Event(value, currentState.stateData) currentState = (currentState.state orElse handleEvent).apply(event) From dcd182e22cd7ddb46362b39cb4d8d89abf8f5269 Mon Sep 17 00:00:00 2001 From: momania Date: Mon, 19 Jul 2010 18:22:15 +0200 Subject: [PATCH 04/12] move StateTimeout into Fsm --- akka-core/src/main/scala/actor/Fsm.scala | 2 +- akka-core/src/test/scala/FsmActorSpec.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index e493758c0b..beba759843 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -34,5 +34,5 @@ trait Fsm[S] { self: Actor => case class NextState(state: State, stateData: S, timeout: Option[Int] = None) case class Event(event: Any, stateData: S) + object StateTimeout } -object StateTimeout diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala index 06bf4d301b..12f83607a8 100644 --- a/akka-core/src/test/scala/FsmActorSpec.scala +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import org.multiverse.api.latches.StandardLatch -import actor.{StateTimeout, Fsm} +import actor.Fsm import java.util.concurrent.TimeUnit object FsmActorSpec { From 770f74fc59bd3db7b2836b06122ef0b25a47487e Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 20 Jul 2010 09:53:57 +0200 Subject: [PATCH 05/12] State refactor --- akka-core/src/main/scala/actor/Fsm.scala | 29 ++++++++++++++++----- akka-core/src/test/scala/FsmActorSpec.scala | 14 +++++----- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index beba759843..a36eca3be9 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -5,17 +5,17 @@ import se.scalablesolutions.akka.actor.{ActorRef, Scheduler, Actor} trait Fsm[S] { self: Actor => - type State = scala.PartialFunction[Event, NextState] + type StateFunction = scala.PartialFunction[Event, State] - @volatile var currentState: NextState = initialState + @volatile var currentState: State = initialState @volatile var timeoutActor: Option[ActorRef] = None - def initialState: NextState + def initialState: State - def handleEvent: State = { + def handleEvent: StateFunction = { case event@Event(value,stateData) => log.warning("No state for event with value %s - keeping current state %s", value, stateData) - NextState(currentState.state, stateData, currentState.timeout) + State(NextState, currentState.stateFunction, stateData, currentState.timeout) } @@ -24,15 +24,30 @@ trait Fsm[S] { self: Actor => timeoutActor = timeoutActor flatMap { ref => Scheduler.unschedule(ref); None } val event = Event(value, currentState.stateData) - currentState = (currentState.state orElse handleEvent).apply(event) + currentState = (currentState.stateFunction orElse handleEvent).apply(event) currentState.timeout.foreach{timeout => timeoutActor = Some(Scheduler.scheduleOnce(this.self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) } + + currentState match { + case State(Reply, _, _, _, reply) => reply.foreach(this.self.reply) + } } } - case class NextState(state: State, stateData: S, timeout: Option[Int] = None) + + case class State(stateEvent: StateEvent, + stateFunction: StateFunction, + stateData: S, + timeout: Option[Int] = None, + reply: Option[Any] =None) + case class Event(event: Any, stateData: S) + + sealed trait StateEvent + object NextState extends StateEvent + object Reply extends StateEvent + object StateTimeout } diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala index 12f83607a8..1742dc8bad 100644 --- a/akka-core/src/test/scala/FsmActorSpec.scala +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -17,29 +17,29 @@ object FsmActorSpec { unlockedLatch: StandardLatch, lockedLatch: StandardLatch) extends Actor with Fsm[CodeState] { - def initialState = NextState(locked, CodeState("", "33221")) + def initialState = State(NextState, locked, CodeState("", "33221")) - def locked: State = { + def locked: StateFunction = { case Event(digit: Char, CodeState(soFar, code)) => { soFar + digit match { case incomplete if incomplete.length < code.length => - NextState(locked, CodeState(incomplete, code)) + State(NextState, locked, CodeState(incomplete, code)) case codeTry if (codeTry == code) => { doUnlock - new NextState(open, CodeState("", code), Some(timeout)) + new State(NextState, open, CodeState("", code), Some(timeout)) } case wrong => { log.error("Wrong code %s", wrong) - NextState(locked, CodeState("", code)) + State(NextState, locked, CodeState("", code)) } } } } - def open: State = { + def open: StateFunction = { case Event(StateTimeout, stateData) => { doLock - NextState(locked, stateData) + State(NextState, locked, stateData) } } From c52c549e865b22179ea916fc58088a9b4ba249cb Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 20 Jul 2010 16:08:31 +0200 Subject: [PATCH 06/12] State refactor --- akka-core/src/main/scala/actor/Fsm.scala | 1 + akka-core/src/test/scala/FsmActorSpec.scala | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index a36eca3be9..f82a1c8035 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -32,6 +32,7 @@ trait Fsm[S] { self: Actor => currentState match { case State(Reply, _, _, _, reply) => reply.foreach(this.self.reply) + case _ => () // ignore for now } } } diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala index 1742dc8bad..bca358b2e1 100644 --- a/akka-core/src/test/scala/FsmActorSpec.scala +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -17,7 +17,7 @@ object FsmActorSpec { unlockedLatch: StandardLatch, lockedLatch: StandardLatch) extends Actor with Fsm[CodeState] { - def initialState = State(NextState, locked, CodeState("", "33221")) + def initialState = State(NextState, locked, CodeState("", code)) def locked: StateFunction = { case Event(digit: Char, CodeState(soFar, code)) => { @@ -26,7 +26,7 @@ object FsmActorSpec { State(NextState, locked, CodeState(incomplete, code)) case codeTry if (codeTry == code) => { doUnlock - new State(NextState, open, CodeState("", code), Some(timeout)) + State(NextState, open, CodeState("", code), Some(timeout)) } case wrong => { log.error("Wrong code %s", wrong) From 73106f6b32e03f5df245ebaa2777b53bc1b42f21 Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 20 Jul 2010 16:27:21 +0200 Subject: [PATCH 07/12] use ref for state- makes sense? --- akka-core/src/main/scala/actor/Fsm.scala | 30 +++++++++++++-------- akka-core/src/test/scala/FsmActorSpec.scala | 24 ++++++++--------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index f82a1c8035..b58d2237b1 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -2,12 +2,14 @@ package actor import java.util.concurrent.TimeUnit import se.scalablesolutions.akka.actor.{ActorRef, Scheduler, Actor} +import se.scalablesolutions.akka.stm.Ref +import se.scalablesolutions.akka.stm.local._ trait Fsm[S] { self: Actor => type StateFunction = scala.PartialFunction[Event, State] - @volatile var currentState: State = initialState + val stateRef: Ref[State] = Ref(initialState) @volatile var timeoutActor: Option[ActorRef] = None def initialState: State @@ -15,24 +17,30 @@ trait Fsm[S] { self: Actor => def handleEvent: StateFunction = { case event@Event(value,stateData) => log.warning("No state for event with value %s - keeping current state %s", value, stateData) + val currentState: State = stateRef.getOrElse(initialState) State(NextState, currentState.stateFunction, stateData, currentState.timeout) } override protected def receive: Receive = { case value => { - timeoutActor = timeoutActor flatMap { ref => Scheduler.unschedule(ref); None } + atomic { + timeoutActor = timeoutActor.flatMap{ref => Scheduler.unschedule(ref); None} - val event = Event(value, currentState.stateData) - currentState = (currentState.stateFunction orElse handleEvent).apply(event) + val currentState: State = stateRef.getOrElse(initialState) + val event = Event(value, currentState.stateData) + val newState = (currentState.stateFunction orElse handleEvent).apply(event) - currentState.timeout.foreach{timeout => - timeoutActor = Some(Scheduler.scheduleOnce(this.self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) - } + newState match { + case State(Reply, _, _, _, replyValue) => replyValue.foreach(this.self.reply) + case _ => () // ignore for now + } - currentState match { - case State(Reply, _, _, _, reply) => reply.foreach(this.self.reply) - case _ => () // ignore for now + newState.timeout.foreach{timeout => + timeoutActor = Some(Scheduler.scheduleOnce(this.self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) + } + + stateRef.swap(newState) } } } @@ -42,7 +50,7 @@ trait Fsm[S] { self: Actor => stateFunction: StateFunction, stateData: S, timeout: Option[Int] = None, - reply: Option[Any] =None) + replyValue: Option[Any] =None) case class Event(event: Any, stateData: S) diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/FsmActorSpec.scala index bca358b2e1..7960d66247 100644 --- a/akka-core/src/test/scala/FsmActorSpec.scala +++ b/akka-core/src/test/scala/FsmActorSpec.scala @@ -21,18 +21,18 @@ object FsmActorSpec { def locked: StateFunction = { case Event(digit: Char, CodeState(soFar, code)) => { - soFar + digit match { - case incomplete if incomplete.length < code.length => - State(NextState, locked, CodeState(incomplete, code)) - case codeTry if (codeTry == code) => { - doUnlock - State(NextState, open, CodeState("", code), Some(timeout)) - } - case wrong => { - log.error("Wrong code %s", wrong) - State(NextState, locked, CodeState("", code)) - } - } + soFar + digit match { + case incomplete if incomplete.length < code.length => + State(NextState, locked, CodeState(incomplete, code)) + case codeTry if (codeTry == code) => { + doUnlock + State(NextState, open, CodeState("", code), Some(timeout)) + } + case wrong => { + log.error("Wrong code %s", wrong) + State(NextState, locked, CodeState("", code)) + } + } } } From 17f37dffdd765e36479a8585cea0b26b6054dd20 Mon Sep 17 00:00:00 2001 From: momania Date: Tue, 20 Jul 2010 16:28:03 +0200 Subject: [PATCH 08/12] better matching reply value --- akka-core/src/main/scala/actor/Fsm.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index b58d2237b1..3554e41f89 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -32,7 +32,7 @@ trait Fsm[S] { self: Actor => val newState = (currentState.stateFunction orElse handleEvent).apply(event) newState match { - case State(Reply, _, _, _, replyValue) => replyValue.foreach(this.self.reply) + case State(Reply, _, _, _, Some(replyValue)) => this.self.reply(replyValue) case _ => () // ignore for now } From 0f26ccd42724ebeeb8398d7e1486dc95f5cd9592 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 19 Aug 2010 17:31:59 +0200 Subject: [PATCH 09/12] Dining hakkers on fsm --- akka-core/src/main/scala/actor/Fsm.scala | 47 +++--- .../src/main/scala/DiningHakkersOnFsm.scala | 152 ++++++++++++++++++ 2 files changed, 174 insertions(+), 25 deletions(-) create mode 100644 akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala diff --git a/akka-core/src/main/scala/actor/Fsm.scala b/akka-core/src/main/scala/actor/Fsm.scala index 3554e41f89..df02a7b1d2 100644 --- a/akka-core/src/main/scala/actor/Fsm.scala +++ b/akka-core/src/main/scala/actor/Fsm.scala @@ -1,56 +1,53 @@ package actor -import java.util.concurrent.TimeUnit -import se.scalablesolutions.akka.actor.{ActorRef, Scheduler, Actor} +import se.scalablesolutions.akka.actor.{Scheduler, Actor} import se.scalablesolutions.akka.stm.Ref import se.scalablesolutions.akka.stm.local._ +import java.util.concurrent.{ScheduledFuture, TimeUnit} + +trait Fsm[S] { + this: Actor => -trait Fsm[S] { self: Actor => - type StateFunction = scala.PartialFunction[Event, State] - val stateRef: Ref[State] = Ref(initialState) - @volatile var timeoutActor: Option[ActorRef] = None + var currentState: State = initialState + var timeoutFuture: Option[ScheduledFuture[AnyRef]] = None def initialState: State def handleEvent: StateFunction = { - case event@Event(value,stateData) => - log.warning("No state for event with value %s - keeping current state %s", value, stateData) - val currentState: State = stateRef.getOrElse(initialState) + case event@Event(value, stateData) => + log.warning("No state for event with value %s - keeping current state %s at %s", value, stateData, self.id) State(NextState, currentState.stateFunction, stateData, currentState.timeout) } - override protected def receive: Receive = { + override final protected def receive: Receive = { case value => { - atomic { - timeoutActor = timeoutActor.flatMap{ref => Scheduler.unschedule(ref); None} + timeoutFuture = timeoutFuture.flatMap {ref => ref.cancel(true); None} - val currentState: State = stateRef.getOrElse(initialState) - val event = Event(value, currentState.stateData) - val newState = (currentState.stateFunction orElse handleEvent).apply(event) + val event = Event(value, currentState.stateData) + val newState = (currentState.stateFunction orElse handleEvent).apply(event) - newState match { - case State(Reply, _, _, _, Some(replyValue)) => this.self.reply(replyValue) - case _ => () // ignore for now - } + currentState = newState - newState.timeout.foreach{timeout => - timeoutActor = Some(Scheduler.scheduleOnce(this.self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) - } + newState match { + case State(Reply, _, _, _, Some(replyValue)) => self.sender.foreach(_ ! replyValue) + case _ => () // ignore for now + } - stateRef.swap(newState) + newState.timeout.foreach { + timeout => + timeoutFuture = Some(Scheduler.scheduleOnce(self, StateTimeout, timeout, TimeUnit.MILLISECONDS)) } } } - case class State(stateEvent: StateEvent, stateFunction: StateFunction, stateData: S, timeout: Option[Int] = None, - replyValue: Option[Any] =None) + replyValue: Option[Any] = None) case class Event(event: Any, stateData: S) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala new file mode 100644 index 0000000000..45a2d6eac5 --- /dev/null +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -0,0 +1,152 @@ +package dining.hakkerz + +import actor.Fsm +import se.scalablesolutions.akka.actor.{ActorRef, Actor} +import Actor._ + +/* + * Some messages for the chopstick + */ +sealed trait ChopstickMessage +object Take extends ChopstickMessage +object Put extends ChopstickMessage +case class Taken(chopstick: ActorRef) extends ChopstickMessage +case class Busy(chopstick: ActorRef) extends ChopstickMessage + +/** + * Some state container for the chopstick + */ +case class TakenBy(hakker: Option[ActorRef]) + +/* + * A chopstick is an actor, it can be taken, and put back + */ +class Blade(name: String) extends Actor with Fsm[TakenBy] { + self.id = name + + // A chopstick begins its existence as available and taken by no one + def initialState = State(NextState, available, TakenBy(None)) + + // When a chopstick is available, it can be taken by a some hakker + def available: StateFunction = { + case Event(Take, _) => + State(Reply, taken, TakenBy(self.sender), replyValue = Some(Taken(self))) + } + + // When a chopstick is taken by a hakker + // It will refuse to be taken by other hakkers + // But the owning hakker can put it back + def taken: StateFunction = { + case Event(Take, currentState) => + State(Reply, taken, currentState, replyValue = Some(Busy(self))) + case Event(Put, TakenBy(hakker)) if self.sender == hakker => + State(NextState, available, TakenBy(None)) + } +} + +/** + * Some fsm hakker messages + */ +sealed trait FsmHakkerMessage +object Think extends FsmHakkerMessage + +/** + * Some state container to keep track of which chopsticks we have + */ +case class TakenChopsticks(left: Option[ActorRef], right: Option[ActorRef]) + +/* + * A fsm hakker is an awesome dude or dudette who either thinks about hacking or has to eat ;-) + */ +class FsmHakker(name: String, left: ActorRef, right: ActorRef) extends Actor with Fsm[TakenChopsticks] { + self.id = name + + //All hakkers start waiting + def initialState = State(NextState, waiting, TakenChopsticks(None, None)) + + def waiting: StateFunction = { + case Event(Think, _) => + log.info("%s starts to think", name) + startThinking(5000) + } + + //When a hakker is thinking it can become hungry + //and try to pick up its chopsticks and eat + def thinking: StateFunction = { + case Event(StateTimeout, current) => + left ! Take + right ! Take + State(NextState, hungry, current) + } + + // When a hakker is hungry it tries to pick up its chopsticks and eat + // When it picks one up, it goes into wait for the other + // If the hakkers first attempt at grabbing a chopstick fails, + // it starts to wait for the response of the other grab + def hungry: StateFunction = { + case Event(Taken(`left`), _) => + State(NextState, waitForOtherChopstick, TakenChopsticks(Some(left), None)) + case Event(Taken(`right`), _) => + State(NextState, waitForOtherChopstick, TakenChopsticks(None, Some(right))) + case Event(Busy(_), current) => + State(NextState, firstChopstickDenied, current) + } + + // When a hakker is waiting for the last chopstick it can either obtain it + // and start eating, or the other chopstick was busy, and the hakker goes + // back to think about how he should obtain his chopsticks :-) + def waitForOtherChopstick: StateFunction = { + case Event(Taken(`left`), TakenChopsticks(None, Some(right))) => startEating(left, right) + case Event(Taken(`right`), TakenChopsticks(Some(left), None)) => startEating(left, right) + case Event(Busy(chopstick), TakenChopsticks(leftOption, rightOption)) => + leftOption.foreach(_ ! Put) + rightOption.foreach(_ ! Put) + startThinking(10) + } + + private def startEating(left: ActorRef, right: ActorRef): State = { + log.info("%s has picked up %s and %s, and starts to eat", name, left.id, right.id) + State(NextState, eating, TakenChopsticks(Some(left), Some(right)), timeout = Some(5000)) + } + + // When the results of the other grab comes back, + // he needs to put it back if he got the other one. + // Then go back and think and try to grab the chopsticks again + def firstChopstickDenied: StateFunction = { + case Event(Taken(secondChopstick), _) => + secondChopstick ! Put + startThinking(10) + case Event(Busy(chopstick), _) => + startThinking(10) + } + + // When a hakker is eating, he can decide to start to think, + // then he puts down his chopsticks and starts to think + def eating: StateFunction = { + case Event(StateTimeout, _) => + log.info("%s puts down his chopsticks and starts to think", name) + left ! Put + right ! Put + startThinking(5000) + } + + private def startThinking(period: Int): State = { + State(NextState, thinking, TakenChopsticks(None, None), timeout = Some(period)) + } +} + +/* + * Alright, here's our test-harness + */ +object DiningHakkersOnFsm { + def run { + // Create 5 chopsticks + val chopsticks = for (i <- 1 to 5) yield actorOf(new Blade("Chopstick " + i)).start + // Create 5 awesome fsm hakkers and assign them their left and right chopstick + val hakkers = for{ + (name, i) <- List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex + } yield actorOf(new FsmHakker(name, chopsticks(i), chopsticks((i + 1) % 5))).start + + hakkers.foreach(_ ! Think) + } +} \ No newline at end of file From 5cd7a7b7ba4f4752840872bf74b2fa821bb3bef9 Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 19 Aug 2010 18:13:28 +0200 Subject: [PATCH 10/12] moved fsm spec to correct location --- akka-core/src/test/scala/{ => actor/actor}/FsmActorSpec.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename akka-core/src/test/scala/{ => actor/actor}/FsmActorSpec.scala (100%) diff --git a/akka-core/src/test/scala/FsmActorSpec.scala b/akka-core/src/test/scala/actor/actor/FsmActorSpec.scala similarity index 100% rename from akka-core/src/test/scala/FsmActorSpec.scala rename to akka-core/src/test/scala/actor/actor/FsmActorSpec.scala From e26914606b71e7b6c007a4acfdcd820d3cc2ecfc Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 19 Aug 2010 18:15:30 +0200 Subject: [PATCH 11/12] blade -> chopstick --- .../akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index 45a2d6eac5..e4ccd219ed 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -21,7 +21,7 @@ case class TakenBy(hakker: Option[ActorRef]) /* * A chopstick is an actor, it can be taken, and put back */ -class Blade(name: String) extends Actor with Fsm[TakenBy] { +class Chopstick(name: String) extends Actor with Fsm[TakenBy] { self.id = name // A chopstick begins its existence as available and taken by no one @@ -141,7 +141,7 @@ class FsmHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit object DiningHakkersOnFsm { def run { // Create 5 chopsticks - val chopsticks = for (i <- 1 to 5) yield actorOf(new Blade("Chopstick " + i)).start + val chopsticks = for (i <- 1 to 5) yield actorOf(new Chopstick("Chopstick " + i)).start // Create 5 awesome fsm hakkers and assign them their left and right chopstick val hakkers = for{ (name, i) <- List("Ghosh", "Bonér", "Klang", "Krasser", "Manie").zipWithIndex From 7b5f07806f8f22d304a3f99129f371b558e727c2 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Thu, 19 Aug 2010 20:55:28 +0200 Subject: [PATCH 12/12] fixed remote server name --- akka-core/src/main/scala/remote/RemoteServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index d816aba580..c90f596991 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -200,7 +200,7 @@ case class RemoteServerClientDisconnected( * @author Jonas Bonér */ class RemoteServer extends Logging with ListenerManagement { - val name = "RemoteServer@" + hostname + ":" + port + def name = "RemoteServer@" + hostname + ":" + port private[akka] var hostname = RemoteServer.HOSTNAME private[akka] var port = RemoteServer.PORT