Merge pull request #1051 from akka/wip-agent-rework-√

Migrating Agents to greener pastures
This commit is contained in:
Viktor Klang (√) 2013-01-28 07:16:41 -08:00
commit d6addd9f07
9 changed files with 490 additions and 463 deletions

View file

@ -4,6 +4,7 @@ import java.util.concurrent.{ ExecutorService, Executor, Executors }
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._ import scala.concurrent._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
import akka.util.SerializedSuspendableExecutionContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -81,4 +82,82 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
Await.ready(latch, timeout.duration) Await.ready(latch, timeout.duration)
} }
} }
"A SerializedSuspendableExecutionContext" must {
"be suspendable and resumable" in {
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
val counter = new AtomicInteger(0)
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
perform(_ + 1)
perform(x { sec.suspend(); x * 2 })
awaitCond(counter.get == 2)
perform(_ + 4)
perform(_ * 2)
sec.size must be === 2
Thread.sleep(500)
sec.size must be === 2
counter.get must be === 2
sec.resume()
awaitCond(counter.get == 12)
perform(_ * 2)
awaitCond(counter.get == 24)
sec.isEmpty must be === true
}
"execute 'throughput' number of tasks per sweep" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
}
val throughput = 25
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
sec.suspend()
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
val total = 1000
1 to total foreach { _ perform(_ + 1) }
sec.size() must be === total
sec.resume()
awaitCond(counter.get == total)
submissions.get must be === (total / throughput)
sec.isEmpty must be === true
}
"execute tasks in serial" in {
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
val total = 10000
val counter = new AtomicInteger(0)
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
1 to total foreach { i perform(c if (c == (i - 1)) c + 1 else c) }
awaitCond(counter.get == total)
sec.isEmpty must be === true
}
"should relinquish thread when suspended" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
}
val throughput = 25
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
sec.suspend()
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
perform(_ + 1)
1 to 10 foreach { _ perform(identity) }
perform(x { sec.suspend(); x * 2 })
perform(_ + 8)
sec.size must be === 13
sec.resume()
awaitCond(counter.get == 2)
sec.resume()
awaitCond(counter.get == 10)
sec.isEmpty must be === true
submissions.get must be === 2
}
}
} }

View file

@ -0,0 +1,81 @@
package akka.util
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.annotation.{ tailrec, switch }
private[akka] object SerializedSuspendableExecutionContext {
final val Off = 0
final val On = 1
final val Suspended = 2
def apply(batchSize: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext =
new SerializedSuspendableExecutionContext(batchSize)(context match {
case s: SerializedSuspendableExecutionContext s.context
case other other
})
}
/**
* This `ExecutionContext` allows to wrap an underlying `ExecutionContext` and provide guaranteed serial execution
* of tasks submitted to it. On top of that it also allows for *suspending* and *resuming* processing of tasks.
*
* WARNING: This type must never leak into User code as anything but `ExecutionContext`
*
* @param throughput maximum number of tasks to be executed in serial before relinquishing the executing thread.
* @param context the underlying context which will be used to actually execute the submitted tasks
*/
private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext)
extends ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext {
import SerializedSuspendableExecutionContext._
require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput")
private final val state = new AtomicInteger(Off)
@tailrec private final def addState(newState: Int): Boolean = {
val c = state.get
state.compareAndSet(c, c | newState) || addState(newState)
}
@tailrec private final def remState(oldState: Int) {
val c = state.get
if (state.compareAndSet(c, c & ~oldState)) attach() else remState(oldState)
}
/**
* Resumes execution of tasks until `suspend` is called,
* if it isn't currently suspended, it is a no-op.
* This operation is idempotent.
*/
final def resume(): Unit = remState(Suspended)
/**
* Suspends execution of tasks until `resume` is called,
* this operation is idempotent.
*/
final def suspend(): Unit = addState(Suspended)
final def run(): Unit = {
@tailrec def run(done: Int): Unit =
if (done < throughput && state.get == On) {
poll() match {
case null ()
case some
try some.run() catch { case NonFatal(t) context reportFailure t }
run(done + 1)
}
}
try run(0) finally remState(On)
}
final def attach(): Unit = if (!isEmpty && state.compareAndSet(Off, On)) context execute this
override final def execute(task: Runnable): Unit = try add(task) finally attach()
override final def reportFailure(t: Throwable): Unit = context reportFailure t
override final def toString: String = (state.get: @switch) match {
case 0 "Off"
case 1 "On"
case 2 "Off & Suspended"
case 3 "On & Suspended"
}
}

View file

@ -4,26 +4,15 @@
package akka.agent package akka.agent
import akka.actor._
import akka.japi.{ Function JFunc, Procedure JProc }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.stm._ import scala.concurrent.stm._
import scala.concurrent.{ ExecutionContext, Future, Promise, Await } import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration.{ FiniteDuration, Duration } import akka.util.{ SerializedSuspendableExecutionContext }
/**
* Used internally to send functions.
*/
private[akka] case class Update[T](function: T T)
private[akka] case class Alter[T](function: T T)
private[akka] case object Get
/** /**
* Factory method for creating an Agent. * Factory method for creating an Agent.
*/ */
object Agent { object Agent {
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system, system) def apply[T](initialValue: T)(implicit context: ExecutionContext) = new Agent(initialValue, context)
} }
/** /**
@ -96,14 +85,13 @@ object Agent {
* agent4.close * agent4.close
* }}} * }}}
*/ */
class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem) { class Agent[T](initialValue: T, context: ExecutionContext) {
private val ref = Ref(initialValue) private val ref = Ref(initialValue)
private val updater = refFactory.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow? private val updater = SerializedSuspendableExecutionContext(10)(context)
def this(initialValue: T, system: ActorSystem) = this(initialValue, system, system)
/** /**
* Read the internal state of the agent. * Read the internal state of the agent.
* Java API
*/ */
def get(): T = ref.single.get def get(): T = ref.single.get
@ -113,187 +101,104 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
def apply(): T = get def apply(): T = get
/** /**
* Dispatch a function to update the internal state. * Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/ */
def send(f: T T): Unit = { def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })
def dispatch = updater ! Update(f)
/**
* Dispatch a function to update the internal state.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def send(f: T T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })
/**
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit = withinTransaction(
new Runnable {
def run =
try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() })
})
/**
* Dispatch an update to the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })
/**
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(f: T T): Future[T] = doAlter(ref.single.transformAndGet(f))
/**
* Dispatch a function to update the internal state but on its own thread,
* and return a Future where that new state can be obtained.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alterOff(f: T T)(implicit ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
withinTransaction(new Runnable {
def run = {
updater.suspend()
result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume())
}
})
result.future
}
private final def withinTransaction(run: Runnable): Unit = {
def dispatch = updater.execute(run)
Txn.findCurrent match { Txn.findCurrent match {
case Some(txn) Txn.afterCommit(status dispatch)(txn) case Some(txn) Txn.afterCommit(status dispatch)(txn)
case _ dispatch case _ dispatch
} }
} }
/** private final def doAlter(f: T): Future[T] = {
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained within the given timeout.
*/
def alter(f: T T)(implicit timeout: Timeout): Future[T] = {
def dispatch = ask(updater, Alter(f)).asInstanceOf[Future[T]]
Txn.findCurrent match { Txn.findCurrent match {
case Some(txn) case Some(txn)
val result = Promise[T]() val result = Promise[T]()
Txn.afterCommit(status result completeWith dispatch)(txn) Txn.afterCommit(status result completeWith Future(f)(updater))(txn)
result.future result.future
case _ dispatch case _ Future(f)(updater)
} }
} }
/**
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def send(newValue: T): Unit = send(_ newValue)
/**
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def update(newValue: T): Unit = send(newValue)
/**
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit = {
send((value: T) {
suspend()
Future(ref.single.transformAndGet(f)).andThen({ case _ resume() })
value
})
}
/**
* Dispatch a function to update the internal state but on its own thread,
* and return a Future where that new state can be obtained within the given timeout.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: T T)(implicit timeout: Timeout, ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
send((value: T) {
suspend()
result completeWith Future(ref.single.transformAndGet(f)).andThen({ case _ resume() })
value
})
result.future
}
/** /**
* A future to the current value that will be completed after any currently * A future to the current value that will be completed after any currently
* queued updates. * queued updates.
*/ */
def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] //Known to be safe def future(): Future[T] = Future(ref.single.get)(updater)
/**
* Gets this agent's value after all currently queued updates have completed.
*/
def await(implicit timeout: Timeout): T = Await.result(future, timeout.duration)
/** /**
* Map this agent to a new agent, applying the function to the internal state. * Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent. * Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/ */
def map[B](f: T B): Agent[B] = Agent(f(get))(system) def map[B](f: T B): Agent[B] = Agent(f(get))(updater)
/** /**
* Flatmap this agent to a new agent, applying the function to the internal state. * Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent. * Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/ */
def flatMap[B](f: T Agent[B]): Agent[B] = f(get) def flatMap[B](f: T Agent[B]): Agent[B] = f(get)
/** /**
* Applies the function to the internal state. Does not change the value of this agent. * Applies the function to the internal state. Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Foreach`.
*/ */
def foreach[U](f: T U): Unit = f(get) def foreach[U](f: T U): Unit = f(get)
/**
* Suspends processing of `send` actions for the agent.
*/
def suspend(): Unit = updater.suspend()
/**
* Resumes processing of `send` actions for the agent.
*/
def resume(): Unit = updater.resume(causedByFailure = null)
/**
* Closes the agents and makes it eligible for garbage collection.
* A closed agent cannot accept any `send` actions.
*/
def close(): Unit = updater.stop()
// ---------------------------------------------
// Support for Java API Functions and Procedures
// ---------------------------------------------
/**
* Java API:
* Dispatch a function to update the internal state.
*/
def send(f: JFunc[T, T]): Unit = send(x f(x))
/**
* Java API
* Dispatch a function to update the internal state, and return a Future where that new state can be obtained
* within the given timeout
*/
def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x f(x))(timeout)
/**
* Java API:
* Dispatch a function to update the internal state but on its own thread.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: JFunc[T, T], ec: ExecutionContext): Unit = sendOff(x f(x))(ec)
/**
* Java API:
* Dispatch a function to update the internal state but on its own thread,
* and return a Future where that new state can be obtained within the given timeout.
* This does not use the reactive thread pool and can be used for long-running
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)
/**
* Java API:
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(system)
/**
* Java API:
* Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
*/
def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = f(get)
/**
* Java API:
* Applies the function to the internal state. Does not change the value of this agent.
*/
def foreach(f: JProc[T]): Unit = f(get)
}
/**
* Agent updater actor. Used internally for `send` actions.
*
* INTERNAL API
*/
private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor {
def receive = {
case u: Update[_] update(u.function.asInstanceOf[T T])
case a: Alter[_] sender ! update(a.function.asInstanceOf[T T])
case Get sender ! agent.get
case _
}
def update(function: T T): T = ref.single.transformAndGet(function)
} }

View file

@ -18,7 +18,7 @@ class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
class AgentSpec extends AkkaSpec { class AgentSpec extends AkkaSpec {
implicit val timeout = Timeout(5.seconds.dilated) implicit val timeout = Timeout(5.seconds.dilated)
import system.dispatcher
"Agent" must { "Agent" must {
"update with send dispatches in order sent" in { "update with send dispatches in order sent" in {
val countDown = new CountDownFunction[String] val countDown = new CountDownFunction[String]
@ -31,36 +31,29 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds) countDown.await(5 seconds)
agent() must be("abcd") agent() must be("abcd")
agent.close()
} }
"maintain order between send and sendOff" in { "maintain order between send and sendOff" in {
val countDown = new CountDownFunction[String] val countDown = new CountDownFunction[String]
val l1, l2 = new CountDownLatch(1) val l1, l2 = new TestLatch(1)
import system.dispatcher
val agent = Agent("a") val agent = Agent("a")
agent send (_ + "b") agent send (_ + "b")
agent.sendOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) agent.sendOff((s: String) { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
l1.await(5, TimeUnit.SECONDS) Await.ready(l1, timeout.duration)
agent send (_ + "d") agent send (_ + "d")
agent send countDown agent send countDown
l2.countDown l2.countDown
countDown.await(5 seconds) countDown.await(5 seconds)
agent() must be("abcd") agent() must be("abcd")
agent.close()
} }
"maintain order between alter and alterOff" in { "maintain order between alter and alterOff" in {
import system.dispatcher val l1, l2 = new TestLatch(1)
val l1, l2 = new CountDownLatch(1)
val agent = Agent("a") val agent = Agent("a")
val r1 = agent.alter(_ + "b") val r1 = agent.alter(_ + "b")
val r2 = agent.alterOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) val r2 = agent.alterOff(s { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
l1.await(5, TimeUnit.SECONDS) Await.ready(l1, timeout.duration)
val r3 = agent.alter(_ + "d") val r3 = agent.alter(_ + "d")
val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":")) val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":"))
l2.countDown l2.countDown
@ -68,18 +61,16 @@ class AgentSpec extends AkkaSpec {
Await.result(result, 5 seconds) must be === "ab:abc:abcd" Await.result(result, 5 seconds) must be === "ab:abc:abcd"
agent() must be("abcd") agent() must be("abcd")
agent.close()
} }
"be immediately readable" in { "be immediately readable" in {
val countDown = new CountDownFunction[Int] val countDown = new CountDownFunction[Int]
val readLatch = new CountDownLatch(1) val readLatch = new TestLatch(1)
val readTimeout = 5 seconds val readTimeout = 5 seconds
val agent = Agent(5) val agent = Agent(5)
val f1 = (i: Int) { val f1 = (i: Int) {
readLatch.await(readTimeout.length, readTimeout.unit) Await.ready(readLatch, readTimeout)
i + 5 i + 5
} }
agent send f1 agent send f1
@ -90,15 +81,12 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds) countDown.await(5 seconds)
read must be(5) read must be(5)
agent() must be(10) agent() must be(10)
agent.close()
} }
"be readable within a transaction" in { "be readable within a transaction" in {
val agent = Agent(5) val agent = Agent(5)
val value = atomic { t agent() } val value = atomic { t agent() }
value must be(5) value must be(5)
agent.close()
} }
"dispatch sends in successful transactions" in { "dispatch sends in successful transactions" in {
@ -112,8 +100,6 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds) countDown.await(5 seconds)
agent() must be(10) agent() must be(10)
agent.close()
} }
"not dispatch sends in aborted transactions" in { "not dispatch sends in aborted transactions" in {
@ -132,8 +118,6 @@ class AgentSpec extends AkkaSpec {
countDown.await(5 seconds) countDown.await(5 seconds)
agent() must be(5) agent() must be(5)
agent.close()
} }
"be able to return a 'queued' future" in { "be able to return a 'queued' future" in {
@ -142,8 +126,6 @@ class AgentSpec extends AkkaSpec {
agent send (_ + "c") agent send (_ + "c")
Await.result(agent.future, timeout.duration) must be("abc") Await.result(agent.future, timeout.duration) must be("abc")
agent.close()
} }
"be able to await the value after updates have completed" in { "be able to await the value after updates have completed" in {
@ -151,9 +133,7 @@ class AgentSpec extends AkkaSpec {
agent send (_ + "b") agent send (_ + "b")
agent send (_ + "c") agent send (_ + "c")
agent.await must be("abc") Await.result(agent.future, timeout.duration) must be("abc")
agent.close()
} }
"be able to be mapped" in { "be able to be mapped" in {
@ -162,9 +142,6 @@ class AgentSpec extends AkkaSpec {
agent1() must be(5) agent1() must be(5)
agent2() must be(10) agent2() must be(10)
agent1.close()
agent2.close()
} }
"be able to be used in a 'foreach' for comprehension" in { "be able to be used in a 'foreach' for comprehension" in {
@ -176,8 +153,6 @@ class AgentSpec extends AkkaSpec {
} }
result must be(3) result must be(3)
agent.close()
} }
"be able to be used in a 'map' for comprehension" in { "be able to be used in a 'map' for comprehension" in {
@ -186,9 +161,6 @@ class AgentSpec extends AkkaSpec {
agent1() must be(5) agent1() must be(5)
agent2() must be(10) agent2() must be(10)
agent1.close()
agent2.close()
} }
"be able to be used in a 'flatMap' for comprehension" in { "be able to be used in a 'flatMap' for comprehension" in {
@ -203,10 +175,6 @@ class AgentSpec extends AkkaSpec {
agent1() must be(1) agent1() must be(1)
agent2() must be(2) agent2() must be(2)
agent3() must be(3) agent3() must be(3)
agent1.close()
agent2.close()
agent3.close()
} }
} }
} }

View file

@ -15,77 +15,35 @@ functions that are asynchronously applied to the Agent's state and whose return
value becomes the Agent's new state. The state of an Agent should be immutable. value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always While updates to Agents are asynchronous, the state of an Agent is always
immediately available for reading by any thread (using ``get``) without any immediately available for reading by any thread (using ``get``) without any messages.
messages.
Agents are reactive. The update actions of all Agents get interleaved amongst Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources. dispatched to the same agent from other threads.
If an Agent is used within an enclosing transaction, then it will participate in If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with the STM - any dispatches made in that transaction. Agents are integrated with Scala STM - any dispatches made in
a transaction are held until that transaction commits, and are discarded if it a transaction are held until that transaction commits, and are discarded if it
is retried or aborted. is retried or aborted.
Creating and stopping Agents Creating Agents
============================ ============================
Agents are created by invoking ``new Agent(value, system)`` passing in the Agents are created by invoking ``new Agent<ValueType>(value, executionContext)`` passing in the Agent's initial
Agent's initial value and a reference to the ``ActorSystem`` for your value and providing an ``ExecutionContext`` to be used for it:
application. An ``ActorSystem`` is required to create the underlying Actors. See
:ref:`actor-systems` for more information about actor systems.
Here is an example of creating an Agent:
.. includecode:: code/docs/agent/AgentDocTest.java .. includecode:: code/docs/agent/AgentDocTest.java
:include: import-system,import-agent :include: import-agent,create
:language: java :language: java
.. includecode:: code/docs/agent/AgentDocTest.java#create
:language: java
An Agent will be running until you invoke ``close`` on it. Then it will be
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/docs/agent/AgentDocTest.java#close
:language: java
Updating Agents
===============
You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/docs/agent/AgentDocTest.java#import-function
:language: java
.. includecode:: code/docs/agent/AgentDocTest.java#send
:language: java
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/docs/agent/AgentDocTest.java#send-off
:language: java
Reading an Agent's value Reading an Agent's value
======================== ========================
Agents can be dereferenced (you can get an Agent's value) by calling the get Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
method: with ``get()`` like this:
.. includecode:: code/docs/agent/AgentDocTest.java#read-get .. includecode:: code/docs/agent/AgentDocTest.java#read-get
:language: java :language: java
@ -94,15 +52,55 @@ Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous. state of an Agent is synchronous.
You can also get a ``Future`` to the Agents value, that will be completed after the
currently queued updates have completed:
Awaiting an Agent's value .. includecode:: code/docs/agent/AgentDocTest.java
========================= :include: import-future,read-future
It is also possible to read the value after all currently queued sends have
completed. You can do this with ``await``:
.. includecode:: code/docs/agent/AgentDocTest.java#import-timeout
:language: java :language: java
.. includecode:: code/docs/agent/AgentDocTest.java#read-await See :ref:`futures-java` for more information on ``Futures``.
Updating Agents (send & alter)
==============================
You update an Agent by sending a function (``akka.dispatch.Mapper``) that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/docs/agent/AgentDocTest.java
:include: import-function,send
:language: java :language: java
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/docs/agent/AgentDocTest.java
:include: import-function,send-off
:language: java
All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``.
See :ref:`futures-java` for more information on ``Futures``.
.. includecode:: code/docs/agent/AgentDocTest.java
:include: import-future,import-function,alter
:language: java
.. includecode:: code/docs/agent/AgentDocTest.java
:include: import-future,import-function,alter-off
:language: java
Transactional Agents
====================
If an Agent is used within an enclosing ``Scala STM transaction``, then it will participate in
that transaction. If you send to an Agent within a transaction then the dispatch
to the Agent will be held until that transaction commits, and discarded if the
transaction is aborted.

View file

@ -5,107 +5,114 @@ package docs.agent;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import scala.concurrent.ExecutionContext;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
//#import-system import scala.concurrent.Await;
import akka.actor.ActorSystem; import scala.concurrent.duration.Duration;
//#import-system
//#import-agent //#import-agent
import scala.concurrent.ExecutionContext;
import akka.agent.Agent; import akka.agent.Agent;
import akka.dispatch.ExecutionContexts;
//#import-agent //#import-agent
//#import-function //#import-function
import akka.japi.Function; import akka.dispatch.Mapper;
//#import-function //#import-function
//#import-timeout //#import-future
import akka.util.Timeout; import scala.concurrent.Future;
import static java.util.concurrent.TimeUnit.SECONDS; //#import-future
//#import-timeout
public class AgentDocTest { public class AgentDocTest {
private static ActorSystem testSystem; private static ExecutionContext ec = ExecutionContexts.global();
private static ExecutionContext ec;
@BeforeClass @Test
public static void beforeAll() { public void createAndRead() throws Exception {
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf()); //#create
ec = testSystem.dispatcher(); ExecutionContext ec = ExecutionContexts.global();
} Agent<Integer> agent = new Agent<Integer>(5, ec);
//#create
@AfterClass //#read-get
public static void afterAll() { Integer result = agent.get();
testSystem.shutdown(); //#read-get
testSystem = null;
//#read-future
Future<Integer> future = agent.future();
//#read-future
assertEquals(result, new Integer(5));
assertEquals(Await.result(future, Duration.create(5,"s")), new Integer(5));
} }
@Test @Test
public void createAndClose() { public void sendAndSendOffAndReadAwait() throws Exception {
//#create Agent<Integer> agent = new Agent<Integer>(5, ec);
ActorSystem system = ActorSystem.create("app");
Agent<Integer> agent = new Agent<Integer>(5, system);
//#create
//#close
agent.close();
//#close
system.shutdown();
}
@Test
public void sendAndSendOffAndReadAwait() {
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
//#send //#send
// send a value // send a value, enqueues this change
// of the value of the Agent
agent.send(7); agent.send(7);
// send a function // send a Mapper, enqueues this change
agent.send(new Function<Integer, Integer>() { // to the value of the Agent
agent.send(new Mapper<Integer, Integer>() {
public Integer apply(Integer i) { public Integer apply(Integer i) {
return i * 2; return i * 2;
} }
}); });
//#send //#send
Function<Integer, Integer> longRunningOrBlockingFunction = new Function<Integer, Integer>() { Mapper<Integer, Integer> longRunningOrBlockingFunction = new Mapper<Integer, Integer>() {
public Integer apply(Integer i) { public Integer apply(Integer i) {
return i * 1; return i * 1;
} }
}; };
ExecutionContext theExecutionContextToExecuteItIn = ec;
//#send-off //#send-off
// sendOff a function // sendOff a function
agent.sendOff(longRunningOrBlockingFunction, ec); agent.sendOff(longRunningOrBlockingFunction,
theExecutionContextToExecuteItIn);
//#send-off //#send-off
//#read-await assertEquals(Await.result(agent.future(), Duration.create(5,"s")), new Integer(14));
Integer result = agent.await(new Timeout(5, SECONDS));
//#read-await
assertEquals(result, new Integer(14));
agent.close();
} }
@Test @Test
public void readWithGet() { public void alterAndAlterOff() throws Exception {
Agent<Integer> agent = new Agent<Integer>(5, testSystem); Agent<Integer> agent = new Agent<Integer>(5, ec);
//#read-get //#alter
Integer result = agent.get(); // alter a value
//#read-get Future<Integer> f1 = agent.alter(7);
assertEquals(result, new Integer(5)); // alter a function (Mapper)
Future<Integer> f2 = agent.alter(new Mapper<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 2;
}
});
//#alter
agent.close(); Mapper<Integer, Integer> longRunningOrBlockingFunction = new Mapper<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 1;
}
};
ExecutionContext theExecutionContextToExecuteItIn = ec;
//#alter-off
// alterOff a function (Mapper)
Future<Integer> f3 = agent.alterOff(longRunningOrBlockingFunction,
theExecutionContextToExecuteItIn);
//#alter-off
assertEquals(Await.result(f3, Duration.create(5,"s")), new Integer(14));
} }
} }

View file

@ -53,3 +53,42 @@ ZeroMQ ByteString
``akka.zeromq.Frame`` and the use of ``Seq[Byte]`` in the API has been removed and is replaced by ``akka.util.ByteString``. ``akka.zeromq.Frame`` and the use of ``Seq[Byte]`` in the API has been removed and is replaced by ``akka.util.ByteString``.
``ZMQMessage.firstFrameAsString`` has been removed, please use ``ZMQMessage.frames`` or ``ZMQMessage.frame(int)`` to access the frames. ``ZMQMessage.firstFrameAsString`` has been removed, please use ``ZMQMessage.frames`` or ``ZMQMessage.frame(int)`` to access the frames.
Brand new Agents
================
Akka's ``Agent`` has been rewritten to improve the API and to remove the need to manually ``close`` an Agent.
The Java API has also been harmonized so both Java and Scala call the same methods.
======================================================= =======================================================
Old Java API New Java API
======================================================= =======================================================
``new Agent<type>(value, actorSystem)`` ``new Agent<type>(value, executionContext)``
``agent.update(newValue)`` ``agent.send(newValue)``
``agent.future(Timeout)`` ``agent.future()``
``agent.await(Timeout)`` ``Await.result(agent.future(), Timeout)``
``agent.send(Function)`` ``agent.send(Mapper)``
``agent.sendOff(Function, ExecutionContext)`` ``agent.sendOff(Mapper, ExecutionContext)``
``agent.alter(Function, Timeout)`` ``agent.alter(Mapper)``
``agent.alterOff(Function, Timeout, ExecutionContext)`` ``agent.alter(Mapper, ExecutionContext)``
``agent.map(Function)`` ``agent.map(Mapper)``
``agent.flatMap(Function)`` ``agent.flatMap(Mapper)``
``agent.foreach(Procedure)`` ``agent.foreach(Foreach)``
``agent.suspend()`` ``No replacement, pointless feature``
``agent.resume()`` ``No replacement, pointless feature``
``agent.close()`` ``No replacement, not needed in new implementation``
======================================================= =======================================================
======================================================== ========================================================
Old Scala API New Scala API
======================================================== ========================================================
``Agent[T](value)(implicit ActorSystem)`` ``Agent[T](value)(implicit ExecutionContext)``
``agent.update(newValue)`` ``agent.send(newValue)``
``agent.alterOff(Function1)(Timeout, ExecutionContext)`` ``agent.alterOff(Function1)(ExecutionContext)``
``agent.await(Timeout)`` ``Await.result(agent.future, Timeout)``
``agent.future(Timeout)`` ``agent.future``
``agent.suspend()`` ``No replacement, pointless feature``
``agent.resume()`` ``No replacement, pointless feature``
``agent.close()`` ``No replacement, not needed in new implementation``
======================================================== ========================================================

View file

@ -19,10 +19,10 @@ immediately available for reading by any thread (using ``get`` or ``apply``)
without any messages. without any messages.
Agents are reactive. The update actions of all Agents get interleaved amongst Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources. dispatched to the same agent from other threads.
If an Agent is used within an enclosing transaction, then it will participate in If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with Scala STM - any dispatches made in that transaction. Agents are integrated with Scala STM - any dispatches made in
@ -30,32 +30,33 @@ a transaction are held until that transaction commits, and are discarded if it
is retried or aborted. is retried or aborted.
Creating and stopping Agents Creating Agents
============================ ============================
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial Agents are created by invoking ``Agent(value)`` passing in the Agent's initial
value: value and providing an implicit ``ExecutionContext`` to be used for it, for these
examples we're going to use the default global one, but YMMV:
.. includecode:: code/docs/agent/AgentDocSpec.scala#create .. includecode:: code/docs/agent/AgentDocSpec.scala#create
Note that creating an Agent requires an implicit ``ActorSystem`` (for creating Reading an Agent's value
the underlying actors). See :ref:`actor-systems` for more information about ========================
actor systems. An ActorSystem can be in implicit scope when creating an Agent:
.. includecode:: code/docs/agent/AgentDocSpec.scala#create-implicit-system Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
with parentheses like this:
Or the ActorSystem can be passed explicitly when creating an Agent: .. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply
.. includecode:: code/docs/agent/AgentDocSpec.scala#create-explicit-system Or by using the get method:
An Agent will be running until you invoke ``close`` on it. Then it will be .. includecode:: code/docs/agent/AgentDocSpec.scala#read-get
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/docs/agent/AgentDocSpec.scala#close Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
Updating Agents (send & alter)
Updating Agents ==============================
===============
You update an Agent by sending a function that transforms the current value or You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function by sending just a new value. The Agent will apply the new value or function
@ -75,37 +76,22 @@ in order.
.. includecode:: code/docs/agent/AgentDocSpec.scala#send-off .. includecode:: code/docs/agent/AgentDocSpec.scala#send-off
All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``.
See :ref:`futures-scala` for more information on ``Futures``.
Reading an Agent's value .. includecode:: code/docs/agent/AgentDocSpec.scala#alter
========================
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
with parentheses like this:
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply
Or by using the get method:
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-get
Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
.. includecode:: code/docs/agent/AgentDocSpec.scala#alter-off
Awaiting an Agent's value Awaiting an Agent's value
========================= =========================
It is also possible to read the value after all currently queued sends have You can also get a ``Future`` to the Agents value, that will be completed after the
completed. You can do this with ``await``:
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-await
You can also get a ``Future`` to this value, that will be completed after the
currently queued updates have completed: currently queued updates have completed:
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-future .. includecode:: code/docs/agent/AgentDocSpec.scala#read-future
See :ref:`futures-scala` for more information on ``Futures``.
Transactional Agents Transactional Agents
==================== ====================

View file

@ -7,127 +7,98 @@ import language.postfixOps
import akka.agent.Agent import akka.agent.Agent
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.util.Timeout import scala.concurrent.{ Await, ExecutionContext }
import akka.testkit._ import akka.testkit._
import scala.concurrent.Future
class AgentDocSpec extends AkkaSpec { class AgentDocSpec extends AkkaSpec {
"create" in {
"create and close" in {
//#create //#create
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent import akka.agent.Agent
val agent = Agent(5) val agent = Agent(5)
//#create //#create
//#close
agent.close()
//#close
} }
"create with implicit system" in { "read value" in {
//#create-implicit-system import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor.ActorSystem val agent = Agent(0)
import akka.agent.Agent
implicit val system = ActorSystem("app") {
//#read-apply
val agent = Agent(5) val result = agent()
//#create-implicit-system //#read-apply
result must be === 0
agent.close() }
system.shutdown() {
//#read-get
val result = agent.get
//#read-get
result must be === 0
} }
"create with explicit system" in { {
//#create-explicit-system //#read-future
import akka.actor.ActorSystem val future = agent.future
import akka.agent.Agent //#read-future
Await.result(future, 5 seconds) must be === 0
val system = ActorSystem("app") }
val agent = Agent(5)(system)
//#create-explicit-system
agent.close()
system.shutdown()
} }
"send and sendOff" in { "send and sendOff" in {
val agent = Agent(0) val agent = Agent(0)(ExecutionContext.global)
import system.dispatcher
//#send //#send
// send a value // send a value, enqueues this change
// of the value of the Agent
agent send 7 agent send 7
// send a function // send a function, enqueues this change
// to the value of the Agent
agent send (_ + 1) agent send (_ + 1)
agent send (_ * 2) agent send (_ * 2)
//#send //#send
def longRunningOrBlockingFunction = (i: Int) i * 1 def longRunningOrBlockingFunction = (i: Int) i * 1 // Just for the example code
def someExecutionContext() = scala.concurrent.ExecutionContext.Implicits.global // Just for the example code
//#send-off //#send-off
// the ExecutionContext you want to run the function on
implicit val ec = someExecutionContext()
// sendOff a function // sendOff a function
agent sendOff (longRunningOrBlockingFunction) agent sendOff longRunningOrBlockingFunction
//#send-off //#send-off
val result = agent.await(Timeout(5 seconds)) Await.result(agent.future, 5 seconds) must be === 16
result must be === 16
} }
"read with apply" in { "alter and alterOff" in {
val agent = Agent(0) val agent = Agent(0)(ExecutionContext.global)
//#alter
// alter a value
val f1: Future[Int] = agent alter 7
//#read-apply // alter a function
val result = agent() val f2: Future[Int] = agent alter (_ + 1)
//#read-apply val f3: Future[Int] = agent alter (_ * 2)
//#alter
result must be === 0 def longRunningOrBlockingFunction = (i: Int) i * 1 // Just for the example code
} def someExecutionContext() = ExecutionContext.global // Just for the example code
"read with get" in { //#alter-off
val agent = Agent(0) // the ExecutionContext you want to run the function on
implicit val ec = someExecutionContext()
// alterOff a function
val f4: Future[Int] = agent alterOff longRunningOrBlockingFunction
//#alter-off
//#read-get Await.result(f4, 5 seconds) must be === 16
val result = agent.get
//#read-get
result must be === 0
}
"read with await" in {
val agent = Agent(0)
//#read-await
import scala.concurrent.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val result = agent.await
//#read-await
result must be === 0
}
"read with future" in {
val agent = Agent(0)
//#read-future
import scala.concurrent.Await
implicit val timeout = Timeout(5 seconds)
val future = agent.future
val result = Await.result(future, timeout.duration)
//#read-future
result must be === 0
} }
"transfer example" in { "transfer example" in {
//#transfer-example //#transfer-example
import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent import akka.agent.Agent
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.util.Timeout
import scala.concurrent.stm._ import scala.concurrent.stm._
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = { def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
@ -145,25 +116,25 @@ class AgentDocSpec extends AkkaSpec {
val to = Agent(20) val to = Agent(20)
val ok = transfer(from, to, 50) val ok = transfer(from, to, 50)
implicit val timeout = Timeout(5 seconds) val fromValue = from.future // -> 50
val fromValue = from.await // -> 50 val toValue = to.future // -> 70
val toValue = to.await // -> 70
//#transfer-example //#transfer-example
fromValue must be === 50 Await.result(fromValue, 5 seconds) must be === 50
toValue must be === 70 Await.result(toValue, 5 seconds) must be === 70
ok must be === true
} }
"monadic example" in { "monadic example" in {
def println(a: Any) = ()
//#monadic-example //#monadic-example
import scala.concurrent.ExecutionContext.Implicits.global
val agent1 = Agent(3) val agent1 = Agent(3)
val agent2 = Agent(5) val agent2 = Agent(5)
// uses foreach // uses foreach
var result = 0 for (value agent1)
for (value agent1) { println(value)
result = value + 1
}
// uses map // uses map
val agent3 = for (value agent1) yield value + 1 val agent3 = for (value agent1) yield value + 1
@ -178,15 +149,8 @@ class AgentDocSpec extends AkkaSpec {
} yield value1 + value2 } yield value1 + value2
//#monadic-example //#monadic-example
result must be === 4
agent3() must be === 4 agent3() must be === 4
agent4() must be === 4 agent4() must be === 4
agent5() must be === 8 agent5() must be === 8
agent1.close()
agent2.close()
agent3.close()
agent4.close()
agent5.close()
} }
} }