diff --git a/akka-stm/src/main/scala/agent/Agent.scala b/akka-stm/src/main/scala/agent/Agent.scala new file mode 100644 index 0000000000..445b1c5002 --- /dev/null +++ b/akka-stm/src/main/scala/agent/Agent.scala @@ -0,0 +1,247 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.agent + +import akka.stm._ +import akka.actor.Actor +import akka.japi.{Function => JFunc, Procedure => JProc} +import akka.dispatch.Dispatchers + +/** + * Factory method for creating an Agent. + */ +object Agent { + private[akka] case class Update[T](function: T => T) + + def apply[T](initialValue: T) = new Agent(initialValue) +} + +/** + * The Agent class was inspired by agents in Clojure. + * + * Agents provide asynchronous change of individual locations. Agents + * are bound to a single storage location for their lifetime, and only + * allow mutation of that location (to a new state) to occur as a result + * of an action. Update actions are functions that are asynchronously + * applied to the Agent's state and whose return value becomes the + * Agent's new state. The state of an Agent should be immutable. + * + * While updates to Agents are asynchronous, the state of an Agent is + * always immediately available for reading by any thread (using ''get'' + * or ''apply'') without any messages. + * + * Agents are reactive. The update actions of all Agents get interleaved + * amongst threads in a thread pool. At any point in time, at most one + * ''send'' action for each Agent is being executed. Actions dispatched to + * an agent from another thread will occur in the order they were sent, + * potentially interleaved with actions dispatched to the same agent from + * other sources. + * + * If an Agent is used within an enclosing transaction, then it will + * participate in that transaction. Agents are integrated with the STM - + * any dispatches made in a transaction are held until that transaction + * commits, and are discarded if it is retried or aborted. + *

+ * + * Example of usage: + * {{{ + * val agent = Agent(5) + * + * agent send (_ * 2) + * + * ... + * + * val result = agent() + * // use result ... + * + * agent.close + * }}} + *
+ * + * Agent is also monadic, which means that you can compose operations using + * for-comprehensions. In monadic usage the original agents are not touched + * but new agents are created. So the old values (agents) are still available + * as-is. They are so-called 'persistent'. + *

+ * + * Example of monadic usage: + * {{{ + * val agent1 = Agent(3) + * val agent2 = Agent(5) + * + * for (value <- agent1) { + * result = value + 1 + * } + * + * val agent3 = for (value <- agent1) yield value + 1 + * + * val agent4 = for { + * value1 <- agent1 + * value2 <- agent2 + * } yield value1 + value2 + * + * agent1.close + * agent2.close + * agent3.close + * agent4.close + * }}} + */ +class Agent[T](initialValue: T) { + import Agent._ + + private[akka] val ref = Ref(initialValue) + private[akka] val updater = Actor.actorOf(new AgentUpdater(this)).start + + /** + * Read the internal state of the agent. + */ + def get() = ref.get + + /** + * Read the internal state of the agent. + */ + def apply() = get + + /** + * Dispatch a function to update the internal state. + */ + def send(f: T => T): Unit = { + def dispatch = updater ! Update(f) + if (Stm.activeTransaction) { get; deferred(dispatch) } + else dispatch + } + + /** + * Dispatch a new value for the internal state. Behaves the same + * as sending a fuction (x => newValue). + */ + def send(newValue: T): Unit = send(x => newValue) + + /** + * Dispatch a new value for the internal state. Behaves the same + * as sending a fuction (x => newValue). + */ + def update(newValue: T) = send(newValue) + + /** + * Dispatch a function to update the internal state but on its own thread. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `sendOff` or `send` will + * still be executed in order. + */ + def sendOff(f: T => T): Unit = send((value: T) => { + suspend + val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start + threadBased ! Update(f) + value + }) + + /** + * Map this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + */ + def map[B](f: T => B): Agent[B] = Agent(f(get)) + + /** + * Flatmap this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + */ + def flatMap[B](f: T => Agent[B]): Agent[B] = f(get) + + /** + * Applies the function to the internal state. Does not change the value of this agent. + */ + def foreach[U](f: T => U): Unit = f(get) + + /** + * Suspends processing of `send` actions for the agent. + */ + def suspend() = updater.dispatcher.suspend(updater) + + /** + * Resumes processing of `send` actions for the agent. + */ + def resume() = updater.dispatcher.resume(updater) + + /** + * Closes the agents and makes it eligable for garbage collection. + * A closed agent cannot accept any `send` actions. + */ + def close() = updater.stop + + // --------------------------------------------- + // Support for Java API Functions and Procedures + // --------------------------------------------- + + /** + * Java API: + * Dispatch a function to update the internal state. + */ + def send(f: JFunc[T, T]): Unit = send(x => f(x)) + + /** + * Java API: + * Dispatch a function to update the internal state but on its own thread. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `sendOff` or `send` will + * still be executed in order. + */ + def sendOff(f: JFunc[T, T]): Unit = sendOff(x => f(x)) + + /** + * Java API: + * Map this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + */ + def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get)) + + /** + * Java API: + * Flatmap this agent to a new agent, applying the function to the internal state. + * Does not change the value of this agent. + */ + def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = f(get) + + /** + * Java API: + * Applies the function to the internal state. Does not change the value of this agent. + */ + def foreach(f: JProc[T]): Unit = f(get) +} + +/** + * Agent updater actor. Used internally for `send` actions. + */ +class AgentUpdater[T](agent: Agent[T]) extends Actor { + import Agent._ + + val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false) + + def receive = { + case update: Update[T] => + atomic(txFactory) { agent.ref alter update.function } + case _ => () + } +} + +/** + * Thread-based agent updater actor. Used internally for `sendOff` actions. + */ +class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { + import Agent._ + + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + + val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false) + + def receive = { + case update: Update[T] => { + atomic(txFactory) { agent.ref alter update.function } + agent.resume + self.stop + } + case _ => self.stop + } +} diff --git a/akka-stm/src/test/scala/agent/AgentSpec.scala b/akka-stm/src/test/scala/agent/AgentSpec.scala new file mode 100644 index 0000000000..1f595d1cf8 --- /dev/null +++ b/akka-stm/src/test/scala/agent/AgentSpec.scala @@ -0,0 +1,164 @@ +package akka.agent.test + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +import akka.agent.Agent +import akka.stm._ +import akka.util.Duration +import akka.util.duration._ + +import java.util.concurrent.CountDownLatch + +class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { + val latch = new CountDownLatch(num) + def apply(a: A) = { latch.countDown; a } + def await(timeout: Duration) = latch.await(timeout.length, timeout.unit) +} + +class AgentSpec extends WordSpec with MustMatchers { + "Agent" should { + "update with send dispatches in order sent" in { + val countDown = new CountDownFunction[String] + + val agent = Agent("a") + agent send (_ + "b") + agent send (_ + "c") + agent send (_ + "d") + agent send countDown + + countDown.await(5 seconds) + agent() must be ("abcd") + + agent.close + } + + "maintain order between send and sendOff" in { + val countDown = new CountDownFunction[String] + + val agent = Agent("a") + agent send (_ + "b") + val longRunning = (s: String) => { Thread.sleep(2000); s + "c" } + agent sendOff longRunning + agent send (_ + "d") + agent send countDown + + countDown.await(5 seconds) + agent() must be ("abcd") + + agent.close + } + + "be immediately readable" in { + val countDown = new CountDownFunction[Int] + + val agent = Agent(5) + val f1 = (i: Int) => { Thread.sleep(2000); i + 5 } + agent send f1 + val read = agent() + agent send countDown + + countDown.await(5 seconds) + read must be (5) + agent() must be (10) + + agent.close + } + + "be readable within a transaction" in { + val agent = Agent(5) + val value = atomic { agent() } + value must be (5) + agent.close + } + + "dispatch sends in successful transactions" in { + val countDown = new CountDownFunction[Int] + + val agent = Agent(5) + atomic { + agent send (_ * 2) + } + agent send countDown + + countDown.await(5 seconds) + agent() must be (10) + + agent.close + } + + "not dispatch sends in aborted transactions" in { + val countDown = new CountDownFunction[Int] + + val agent = Agent(5) + + try { + atomic(DefaultTransactionFactory) { + agent send (_ * 2) + throw new RuntimeException("Expected failure") + } + } catch { case _ => } + + agent send countDown + + countDown.await(5 seconds) + agent() must be (5) + + agent.close + } + + "be able to be mapped" in { + val agent1 = Agent(5) + val agent2 = agent1 map (_ * 2) + + agent1() must be (5) + agent2() must be (10) + + agent1.close + agent2.close + } + + "be able to be used in a 'foreach' for comprehension" in { + val agent = Agent(3) + var result = 0 + + for (value <- agent) { + result += value + } + + result must be (3) + + agent.close + } + + "be able to be used in a 'map' for comprehension" in { + val agent1 = Agent(5) + val agent2 = for (value <- agent1) yield value * 2 + + agent1() must be (5) + agent2() must be (10) + + agent1.close + agent2.close + } + + "be able to be used in a 'flatMap' for comprehension" in { + val agent1 = Agent(1) + val agent2 = Agent(2) + + val agent3 = for { + value1 <- agent1 + value2 <- agent2 + } yield value1 + value2 + + agent1() must be (1) + agent2() must be (2) + agent3() must be (3) + + agent1.close + agent2.close + agent3.close + } + } +} +