Add reworked Agent

This commit is contained in:
Peter Vlugter 2010-11-09 13:16:44 +13:00
parent d59806c10f
commit c17cbf76db
2 changed files with 411 additions and 0 deletions

View file

@ -0,0 +1,247 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.agent
import akka.stm._
import akka.actor.Actor
import akka.japi.{Function => JFunc, Procedure => JProc}
import akka.dispatch.Dispatchers
/**
* Factory method for creating an Agent.
*/
object Agent {
private[akka] case class Update[T](function: T => T)
def apply[T](initialValue: T) = new Agent(initialValue)
}
/**
* The Agent class was inspired by agents in Clojure.
*
* Agents provide asynchronous change of individual locations. Agents
* are bound to a single storage location for their lifetime, and only
* allow mutation of that location (to a new state) to occur as a result
* of an action. Update actions are functions that are asynchronously
* applied to the Agent's state and whose return value becomes the
* Agent's new state. The state of an Agent should be immutable.
*
* While updates to Agents are asynchronous, the state of an Agent is
* always immediately available for reading by any thread (using ''get''
* or ''apply'') without any messages.
*
* Agents are reactive. The update actions of all Agents get interleaved
* amongst threads in a thread pool. At any point in time, at most one
* ''send'' action for each Agent is being executed. Actions dispatched to
* an agent from another thread will occur in the order they were sent,
* potentially interleaved with actions dispatched to the same agent from
* other sources.
*
* If an Agent is used within an enclosing transaction, then it will
* participate in that transaction. Agents are integrated with the STM -
* any dispatches made in a transaction are held until that transaction
* commits, and are discarded if it is retried or aborted.
* <br/><br/>
*
* Example of usage:
* {{{
* val agent = Agent(5)
*
* agent send (_ * 2)
*
* ...
*
* val result = agent()
* // use result ...
*
* agent.close
* }}}
* <br/>
*
* Agent is also monadic, which means that you can compose operations using
* for-comprehensions. In monadic usage the original agents are not touched
* but new agents are created. So the old values (agents) are still available
* as-is. They are so-called 'persistent'.
* <br/><br/>
*
* Example of monadic usage:
* {{{
* val agent1 = Agent(3)
* val agent2 = Agent(5)
*
* for (value <- agent1) {
* result = value + 1
* }
*
* val agent3 = for (value <- agent1) yield value + 1
*
* val agent4 = for {
* value1 <- agent1
* value2 <- agent2
* } yield value1 + value2
*
* agent1.close
* agent2.close
* agent3.close
* agent4.close
* }}}
*/
class Agent[T](initialValue: T) {
import Agent._
private[akka] val ref = Ref(initialValue)
private[akka] val updater = Actor.actorOf(new AgentUpdater(this)).start
/**
* Read the internal state of the agent.
*/
def get() = ref.get
/**
* Read the internal state of the agent.
*/
def apply() = get
/**
* Dispatch a function to update the internal state.
*/
def send(f: T => T): Unit = {
def dispatch = updater ! Update(f)
if (Stm.activeTransaction) { get; deferred(dispatch) }
else dispatch
}
/**
* Dispatch a new value for the internal state. Behaves the same
* as sending a fuction (x => newValue).
*/
def send(newValue: T): Unit = send(x => newValue)
/**
* Dispatch a new value for the internal state. Behaves the same
* as sending a fuction (x => newValue).
*/
def update(newValue: T) = send(newValue)
/**
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: T => T): Unit = send((value: T) => {
suspend
val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start
threadBased ! Update(f)
value
})
/**
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def map[B](f: T => B): Agent[B] = Agent(f(get))
/**
* Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def flatMap[B](f: T => Agent[B]): Agent[B] = f(get)
/**
* Applies the function to the internal state. Does not change the value of this agent.
*/
def foreach[U](f: T => U): Unit = f(get)
/**
* Suspends processing of `send` actions for the agent.
*/
def suspend() = updater.dispatcher.suspend(updater)
/**
* Resumes processing of `send` actions for the agent.
*/
def resume() = updater.dispatcher.resume(updater)
/**
* Closes the agents and makes it eligable for garbage collection.
* A closed agent cannot accept any `send` actions.
*/
def close() = updater.stop
// ---------------------------------------------
// Support for Java API Functions and Procedures
// ---------------------------------------------
/**
* Java API:
* Dispatch a function to update the internal state.
*/
def send(f: JFunc[T, T]): Unit = send(x => f(x))
/**
* Java API:
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: JFunc[T, T]): Unit = sendOff(x => f(x))
/**
* Java API:
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))
/**
* Java API:
* Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = f(get)
/**
* Java API:
* Applies the function to the internal state. Does not change the value of this agent.
*/
def foreach(f: JProc[T]): Unit = f(get)
}
/**
* Agent updater actor. Used internally for `send` actions.
*/
class AgentUpdater[T](agent: Agent[T]) extends Actor {
import Agent._
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
def receive = {
case update: Update[T] =>
atomic(txFactory) { agent.ref alter update.function }
case _ => ()
}
}
/**
* Thread-based agent updater actor. Used internally for `sendOff` actions.
*/
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
import Agent._
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
def receive = {
case update: Update[T] => {
atomic(txFactory) { agent.ref alter update.function }
agent.resume
self.stop
}
case _ => self.stop
}
}

View file

@ -0,0 +1,164 @@
package akka.agent.test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.agent.Agent
import akka.stm._
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.CountDownLatch
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
def apply(a: A) = { latch.countDown; a }
def await(timeout: Duration) = latch.await(timeout.length, timeout.unit)
}
class AgentSpec extends WordSpec with MustMatchers {
"Agent" should {
"update with send dispatches in order sent" in {
val countDown = new CountDownFunction[String]
val agent = Agent("a")
agent send (_ + "b")
agent send (_ + "c")
agent send (_ + "d")
agent send countDown
countDown.await(5 seconds)
agent() must be ("abcd")
agent.close
}
"maintain order between send and sendOff" in {
val countDown = new CountDownFunction[String]
val agent = Agent("a")
agent send (_ + "b")
val longRunning = (s: String) => { Thread.sleep(2000); s + "c" }
agent sendOff longRunning
agent send (_ + "d")
agent send countDown
countDown.await(5 seconds)
agent() must be ("abcd")
agent.close
}
"be immediately readable" in {
val countDown = new CountDownFunction[Int]
val agent = Agent(5)
val f1 = (i: Int) => { Thread.sleep(2000); i + 5 }
agent send f1
val read = agent()
agent send countDown
countDown.await(5 seconds)
read must be (5)
agent() must be (10)
agent.close
}
"be readable within a transaction" in {
val agent = Agent(5)
val value = atomic { agent() }
value must be (5)
agent.close
}
"dispatch sends in successful transactions" in {
val countDown = new CountDownFunction[Int]
val agent = Agent(5)
atomic {
agent send (_ * 2)
}
agent send countDown
countDown.await(5 seconds)
agent() must be (10)
agent.close
}
"not dispatch sends in aborted transactions" in {
val countDown = new CountDownFunction[Int]
val agent = Agent(5)
try {
atomic(DefaultTransactionFactory) {
agent send (_ * 2)
throw new RuntimeException("Expected failure")
}
} catch { case _ => }
agent send countDown
countDown.await(5 seconds)
agent() must be (5)
agent.close
}
"be able to be mapped" in {
val agent1 = Agent(5)
val agent2 = agent1 map (_ * 2)
agent1() must be (5)
agent2() must be (10)
agent1.close
agent2.close
}
"be able to be used in a 'foreach' for comprehension" in {
val agent = Agent(3)
var result = 0
for (value <- agent) {
result += value
}
result must be (3)
agent.close
}
"be able to be used in a 'map' for comprehension" in {
val agent1 = Agent(5)
val agent2 = for (value <- agent1) yield value * 2
agent1() must be (5)
agent2() must be (10)
agent1.close
agent2.close
}
"be able to be used in a 'flatMap' for comprehension" in {
val agent1 = Agent(1)
val agent2 = Agent(2)
val agent3 = for {
value1 <- agent1
value2 <- agent2
} yield value1 + value2
agent1() must be (1)
agent2() must be (2)
agent3() must be (3)
agent1.close
agent2.close
agent3.close
}
}
}