Migrating Agents to greener pastures
This commit is contained in:
parent
13b1324509
commit
9522add9b7
9 changed files with 396 additions and 444 deletions
|
|
@ -425,7 +425,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
|
|||
Thread.getAllStackTraces().asScala foreach {
|
||||
case (thread, stack) ⇒
|
||||
println(s"$thread:")
|
||||
stack foreach (s => println(s"\t$s"))
|
||||
stack foreach (s ⇒ println(s"\t$s"))
|
||||
}
|
||||
}
|
||||
assert(Await.result(f1, remaining) === "foo")
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import java.util.concurrent.{ ExecutorService, Executor, Executors }
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.concurrent._
|
||||
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
|
||||
import akka.util.SerializedSuspendableExecutionContext
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
|
||||
|
|
@ -81,4 +82,43 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
|
|||
Await.ready(latch, timeout.duration)
|
||||
}
|
||||
}
|
||||
|
||||
"A SerializedSuspendableExecutionContext" must {
|
||||
"be suspendable and resumable" in {
|
||||
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
|
||||
val counter = new AtomicInteger(0)
|
||||
def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
|
||||
perform(_ + 1)
|
||||
perform(x ⇒ { sec.suspend(); x * 2 })
|
||||
awaitCond(counter.get == 2)
|
||||
perform(_ + 4)
|
||||
perform(_ * 2)
|
||||
sec.resume()
|
||||
awaitCond(counter.get == 12)
|
||||
perform(_ * 2)
|
||||
awaitCond(counter.get == 24)
|
||||
sec.isEmpty must be === true
|
||||
}
|
||||
|
||||
"execute 'throughput' nmber of tasks per sweep" in {
|
||||
val submissions = new AtomicInteger(0)
|
||||
val counter = new AtomicInteger(0)
|
||||
val underlying = new ExecutionContext {
|
||||
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
|
||||
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
|
||||
}
|
||||
val throughput = 25
|
||||
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
|
||||
sec.suspend()
|
||||
def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
|
||||
|
||||
val total = 1000
|
||||
1 to total foreach { _ ⇒ perform(_ + 1) }
|
||||
sec.size() must be === total
|
||||
sec.resume()
|
||||
awaitCond(counter.get == total)
|
||||
submissions.get must be === (total / throughput)
|
||||
sec.isEmpty must be === true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
package akka.util
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.control.NonFatal
|
||||
import scala.annotation.{ tailrec, switch }
|
||||
|
||||
private[akka] object SerializedSuspendableExecutionContext {
|
||||
final val Off = 0
|
||||
final val On = 1
|
||||
final val Suspended = 2
|
||||
|
||||
def apply(batchSize: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext =
|
||||
new SerializedSuspendableExecutionContext(batchSize)(context match {
|
||||
case s: SerializedSuspendableExecutionContext ⇒ s.context
|
||||
case other ⇒ other
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* This `ExecutionContext` allows to wrap an underlying `ExecutionContext` and provide guaranteed serial execution
|
||||
* of tasks submitted to it. On top of that it also allows for *suspending* and *resuming* processing of tasks.
|
||||
*
|
||||
* WARNING: This type must never leak into User code as anything but `ExecutionContext`
|
||||
*
|
||||
* @param throughput maximum number of tasks to be executed in serial before relinquishing the executing thread.
|
||||
* @param context the underlying context which will be used to actually execute the submitted tasks
|
||||
*/
|
||||
private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext)
|
||||
extends ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext {
|
||||
import SerializedSuspendableExecutionContext._
|
||||
require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput")
|
||||
|
||||
private final val state = new AtomicInteger(0)
|
||||
@tailrec private final def addState(newState: Int): Boolean = {
|
||||
val c = state.get;
|
||||
state.compareAndSet(c, c | newState) || addState(newState)
|
||||
}
|
||||
@tailrec private final def remState(oldState: Int) {
|
||||
val c = state.get
|
||||
if (state.compareAndSet(c, c & ~oldState)) attach() else remState(oldState)
|
||||
}
|
||||
|
||||
/**
|
||||
* Resumes execution of tasks until `suspend` is called,
|
||||
* if it isn't currently suspended, it is a no-op.
|
||||
* This operation is idempotent.
|
||||
*/
|
||||
final def resume(): Unit = remState(Suspended)
|
||||
|
||||
/**
|
||||
* Suspends execution of tasks until `resume` is called,
|
||||
* this operation is idempotent.
|
||||
*/
|
||||
final def suspend(): Unit = addState(Suspended)
|
||||
|
||||
final def run(): Unit = {
|
||||
@tailrec def run(done: Int): Unit =
|
||||
if (done < throughput && state.get == On) {
|
||||
poll() match {
|
||||
case null ⇒ ()
|
||||
case some ⇒
|
||||
try some.run() catch { case NonFatal(t) ⇒ context reportFailure t }
|
||||
run(done + 1)
|
||||
}
|
||||
}
|
||||
try run(0) finally remState(On)
|
||||
}
|
||||
|
||||
final def attach(): Unit = if (!isEmpty && state.compareAndSet(Off, On)) context execute this
|
||||
override final def execute(task: Runnable): Unit = try add(task) finally attach()
|
||||
override final def reportFailure(t: Throwable): Unit = context reportFailure t
|
||||
|
||||
override final def toString: String = (state.get: @switch) match {
|
||||
case 0 ⇒ "Off"
|
||||
case 1 ⇒ "On"
|
||||
case 2 ⇒ "Off & Suspended"
|
||||
case 3 ⇒ "On & Suspended"
|
||||
}
|
||||
}
|
||||
|
|
@ -7,23 +7,17 @@ package akka.agent
|
|||
import akka.actor._
|
||||
import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc }
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.stm._
|
||||
import scala.concurrent.{ ExecutionContext, Future, Promise, Await }
|
||||
import scala.concurrent.duration.{ FiniteDuration, Duration }
|
||||
|
||||
/**
|
||||
* Used internally to send functions.
|
||||
*/
|
||||
private[akka] case class Update[T](function: T ⇒ T)
|
||||
private[akka] case class Alter[T](function: T ⇒ T)
|
||||
private[akka] case object Get
|
||||
import akka.util.{ SerializedSuspendableExecutionContext, Timeout }
|
||||
import util.Try
|
||||
|
||||
/**
|
||||
* Factory method for creating an Agent.
|
||||
*/
|
||||
object Agent {
|
||||
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system, system)
|
||||
def apply[T](initialValue: T)(implicit context: ExecutionContext) = new Agent(initialValue, context)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -96,14 +90,13 @@ object Agent {
|
|||
* agent4.close
|
||||
* }}}
|
||||
*/
|
||||
class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem) {
|
||||
class Agent[T](initialValue: T, context: ExecutionContext) {
|
||||
private val ref = Ref(initialValue)
|
||||
private val updater = refFactory.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow?
|
||||
|
||||
def this(initialValue: T, system: ActorSystem) = this(initialValue, system, system)
|
||||
private val updater = SerializedSuspendableExecutionContext(10)(context)
|
||||
|
||||
/**
|
||||
* Read the internal state of the agent.
|
||||
* Java API
|
||||
*/
|
||||
def get(): T = ref.single.get
|
||||
|
||||
|
|
@ -113,22 +106,44 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
|
|||
def apply(): T = get
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state.
|
||||
* Dispatch a new value for the internal state. Behaves the same
|
||||
* as sending a function (x => newValue).
|
||||
*/
|
||||
def send(f: T ⇒ T): Unit = {
|
||||
def dispatch = updater ! Update(f)
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn)
|
||||
case _ ⇒ dispatch
|
||||
}
|
||||
}
|
||||
def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def send(f: T ⇒ T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state but on its own thread.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `sendOff` or `send` will
|
||||
* still be executed in order.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = withinTransaction(
|
||||
new Runnable {
|
||||
def run =
|
||||
try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() })
|
||||
})
|
||||
|
||||
/**
|
||||
* Dispatch an update to the internal state, and return a Future where
|
||||
* that new state can be obtained.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alter(newValue: T): Future[T] = alter(_ ⇒ newValue)
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state, and return a Future where
|
||||
* that new state can be obtained within the given timeout.
|
||||
* that new state can be obtained.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alter(f: T ⇒ T)(implicit timeout: Timeout): Future[T] = {
|
||||
def dispatch = ask(updater, Alter(f)).asInstanceOf[Future[T]]
|
||||
def alter(f: T ⇒ T): Future[T] = {
|
||||
def dispatch = Future(ref.single.transformAndGet(f))(updater)
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) ⇒
|
||||
val result = Promise[T]()
|
||||
|
|
@ -138,162 +153,56 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a new value for the internal state. Behaves the same
|
||||
* as sending a function (x => newValue).
|
||||
*/
|
||||
def send(newValue: T): Unit = send(_ ⇒ newValue)
|
||||
|
||||
/**
|
||||
* Dispatch a new value for the internal state. Behaves the same
|
||||
* as sending a function (x => newValue).
|
||||
*/
|
||||
def update(newValue: T): Unit = send(newValue)
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state but on its own thread.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `sendOff` or `send` will
|
||||
* still be executed in order.
|
||||
*/
|
||||
def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = {
|
||||
send((value: T) ⇒ {
|
||||
suspend()
|
||||
Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() })
|
||||
value
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch a function to update the internal state but on its own thread,
|
||||
* and return a Future where that new state can be obtained within the given timeout.
|
||||
* and return a Future where that new state can be obtained.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `alterOff` or `alter` will
|
||||
* still be executed in order.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def alterOff(f: T ⇒ T)(implicit timeout: Timeout, ec: ExecutionContext): Future[T] = {
|
||||
def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] = {
|
||||
val result = Promise[T]()
|
||||
send((value: T) ⇒ {
|
||||
suspend()
|
||||
result completeWith Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() })
|
||||
value
|
||||
withinTransaction(new Runnable {
|
||||
def run = {
|
||||
updater.suspend()
|
||||
result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume())
|
||||
}
|
||||
})
|
||||
result.future
|
||||
}
|
||||
|
||||
private final def withinTransaction(run: Runnable): Unit = {
|
||||
def dispatch = updater.execute(run)
|
||||
Txn.findCurrent match {
|
||||
case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn)
|
||||
case _ ⇒ dispatch
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A future to the current value that will be completed after any currently
|
||||
* queued updates.
|
||||
*/
|
||||
def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] //Known to be safe
|
||||
|
||||
/**
|
||||
* Gets this agent's value after all currently queued updates have completed.
|
||||
*/
|
||||
def await(implicit timeout: Timeout): T = Await.result(future, timeout.duration)
|
||||
def future(): Future[T] = Future(ref.single.get)(updater)
|
||||
|
||||
/**
|
||||
* Map this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(system)
|
||||
def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(updater)
|
||||
|
||||
/**
|
||||
* Flatmap this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Mapper`.
|
||||
*/
|
||||
def flatMap[B](f: T ⇒ Agent[B]): Agent[B] = f(get)
|
||||
|
||||
/**
|
||||
* Applies the function to the internal state. Does not change the value of this agent.
|
||||
* In Java, pass in an instance of `akka.dispatch.Foreach`.
|
||||
*/
|
||||
def foreach[U](f: T ⇒ U): Unit = f(get)
|
||||
|
||||
/**
|
||||
* Suspends processing of `send` actions for the agent.
|
||||
*/
|
||||
def suspend(): Unit = updater.suspend()
|
||||
|
||||
/**
|
||||
* Resumes processing of `send` actions for the agent.
|
||||
*/
|
||||
def resume(): Unit = updater.resume(causedByFailure = null)
|
||||
|
||||
/**
|
||||
* Closes the agents and makes it eligible for garbage collection.
|
||||
* A closed agent cannot accept any `send` actions.
|
||||
*/
|
||||
def close(): Unit = updater.stop()
|
||||
|
||||
// ---------------------------------------------
|
||||
// Support for Java API Functions and Procedures
|
||||
// ---------------------------------------------
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Dispatch a function to update the internal state.
|
||||
*/
|
||||
def send(f: JFunc[T, T]): Unit = send(x ⇒ f(x))
|
||||
|
||||
/**
|
||||
* Java API
|
||||
* Dispatch a function to update the internal state, and return a Future where that new state can be obtained
|
||||
* within the given timeout
|
||||
*/
|
||||
def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x ⇒ f(x))(timeout)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Dispatch a function to update the internal state but on its own thread.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `sendOff` or `send` will
|
||||
* still be executed in order.
|
||||
*/
|
||||
def sendOff(f: JFunc[T, T], ec: ExecutionContext): Unit = sendOff(x ⇒ f(x))(ec)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Dispatch a function to update the internal state but on its own thread,
|
||||
* and return a Future where that new state can be obtained within the given timeout.
|
||||
* This does not use the reactive thread pool and can be used for long-running
|
||||
* or blocking operations. Dispatches using either `alterOff` or `alter` will
|
||||
* still be executed in order.
|
||||
*/
|
||||
def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x ⇒ f(x))(Timeout(timeout), ec)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Map this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
*/
|
||||
def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(system)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Flatmap this agent to a new agent, applying the function to the internal state.
|
||||
* Does not change the value of this agent.
|
||||
*/
|
||||
def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = f(get)
|
||||
|
||||
/**
|
||||
* Java API:
|
||||
* Applies the function to the internal state. Does not change the value of this agent.
|
||||
*/
|
||||
def foreach(f: JProc[T]): Unit = f(get)
|
||||
}
|
||||
|
||||
/**
|
||||
* Agent updater actor. Used internally for `send` actions.
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor {
|
||||
def receive = {
|
||||
case u: Update[_] ⇒ update(u.function.asInstanceOf[T ⇒ T])
|
||||
case a: Alter[_] ⇒ sender ! update(a.function.asInstanceOf[T ⇒ T])
|
||||
case Get ⇒ sender ! agent.get
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
def update(function: T ⇒ T): T = ref.single.transformAndGet(function)
|
||||
}
|
||||
}
|
||||
|
|
@ -18,7 +18,7 @@ class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
|
|||
class AgentSpec extends AkkaSpec {
|
||||
|
||||
implicit val timeout = Timeout(5.seconds.dilated)
|
||||
|
||||
import system.dispatcher
|
||||
"Agent" must {
|
||||
"update with send dispatches in order sent" in {
|
||||
val countDown = new CountDownFunction[String]
|
||||
|
|
@ -31,36 +31,29 @@ class AgentSpec extends AkkaSpec {
|
|||
|
||||
countDown.await(5 seconds)
|
||||
agent() must be("abcd")
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"maintain order between send and sendOff" in {
|
||||
val countDown = new CountDownFunction[String]
|
||||
val l1, l2 = new CountDownLatch(1)
|
||||
import system.dispatcher
|
||||
|
||||
val l1, l2 = new TestLatch(1)
|
||||
val agent = Agent("a")
|
||||
agent send (_ + "b")
|
||||
agent.sendOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })
|
||||
l1.await(5, TimeUnit.SECONDS)
|
||||
agent.sendOff((s: String) ⇒ { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
|
||||
Await.ready(l1, timeout.duration)
|
||||
agent send (_ + "d")
|
||||
agent send countDown
|
||||
l2.countDown
|
||||
countDown.await(5 seconds)
|
||||
agent() must be("abcd")
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"maintain order between alter and alterOff" in {
|
||||
import system.dispatcher
|
||||
val l1, l2 = new CountDownLatch(1)
|
||||
val l1, l2 = new TestLatch(1)
|
||||
val agent = Agent("a")
|
||||
|
||||
val r1 = agent.alter(_ + "b")
|
||||
val r2 = agent.alterOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })
|
||||
l1.await(5, TimeUnit.SECONDS)
|
||||
val r2 = agent.alterOff(s ⇒ { l1.countDown; Await.ready(l2, timeout.duration); s + "c" })
|
||||
Await.ready(l1, timeout.duration)
|
||||
val r3 = agent.alter(_ + "d")
|
||||
val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":"))
|
||||
l2.countDown
|
||||
|
|
@ -68,18 +61,16 @@ class AgentSpec extends AkkaSpec {
|
|||
Await.result(result, 5 seconds) must be === "ab:abc:abcd"
|
||||
|
||||
agent() must be("abcd")
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"be immediately readable" in {
|
||||
val countDown = new CountDownFunction[Int]
|
||||
val readLatch = new CountDownLatch(1)
|
||||
val readLatch = new TestLatch(1)
|
||||
val readTimeout = 5 seconds
|
||||
|
||||
val agent = Agent(5)
|
||||
val f1 = (i: Int) ⇒ {
|
||||
readLatch.await(readTimeout.length, readTimeout.unit)
|
||||
Await.ready(readLatch, readTimeout)
|
||||
i + 5
|
||||
}
|
||||
agent send f1
|
||||
|
|
@ -90,15 +81,12 @@ class AgentSpec extends AkkaSpec {
|
|||
countDown.await(5 seconds)
|
||||
read must be(5)
|
||||
agent() must be(10)
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"be readable within a transaction" in {
|
||||
val agent = Agent(5)
|
||||
val value = atomic { t ⇒ agent() }
|
||||
value must be(5)
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"dispatch sends in successful transactions" in {
|
||||
|
|
@ -112,8 +100,6 @@ class AgentSpec extends AkkaSpec {
|
|||
|
||||
countDown.await(5 seconds)
|
||||
agent() must be(10)
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"not dispatch sends in aborted transactions" in {
|
||||
|
|
@ -132,8 +118,6 @@ class AgentSpec extends AkkaSpec {
|
|||
|
||||
countDown.await(5 seconds)
|
||||
agent() must be(5)
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"be able to return a 'queued' future" in {
|
||||
|
|
@ -142,8 +126,6 @@ class AgentSpec extends AkkaSpec {
|
|||
agent send (_ + "c")
|
||||
|
||||
Await.result(agent.future, timeout.duration) must be("abc")
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"be able to await the value after updates have completed" in {
|
||||
|
|
@ -151,9 +133,7 @@ class AgentSpec extends AkkaSpec {
|
|||
agent send (_ + "b")
|
||||
agent send (_ + "c")
|
||||
|
||||
agent.await must be("abc")
|
||||
|
||||
agent.close()
|
||||
Await.result(agent.future, timeout.duration) must be("abc")
|
||||
}
|
||||
|
||||
"be able to be mapped" in {
|
||||
|
|
@ -162,9 +142,6 @@ class AgentSpec extends AkkaSpec {
|
|||
|
||||
agent1() must be(5)
|
||||
agent2() must be(10)
|
||||
|
||||
agent1.close()
|
||||
agent2.close()
|
||||
}
|
||||
|
||||
"be able to be used in a 'foreach' for comprehension" in {
|
||||
|
|
@ -176,8 +153,6 @@ class AgentSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
result must be(3)
|
||||
|
||||
agent.close()
|
||||
}
|
||||
|
||||
"be able to be used in a 'map' for comprehension" in {
|
||||
|
|
@ -186,9 +161,6 @@ class AgentSpec extends AkkaSpec {
|
|||
|
||||
agent1() must be(5)
|
||||
agent2() must be(10)
|
||||
|
||||
agent1.close()
|
||||
agent2.close()
|
||||
}
|
||||
|
||||
"be able to be used in a 'flatMap' for comprehension" in {
|
||||
|
|
@ -203,10 +175,6 @@ class AgentSpec extends AkkaSpec {
|
|||
agent1() must be(1)
|
||||
agent2() must be(2)
|
||||
agent3() must be(3)
|
||||
|
||||
agent1.close()
|
||||
agent2.close()
|
||||
agent3.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,77 +15,35 @@ functions that are asynchronously applied to the Agent's state and whose return
|
|||
value becomes the Agent's new state. The state of an Agent should be immutable.
|
||||
|
||||
While updates to Agents are asynchronous, the state of an Agent is always
|
||||
immediately available for reading by any thread (using ``get``) without any
|
||||
messages.
|
||||
immediately available for reading by any thread (using ``get``) without any messages.
|
||||
|
||||
Agents are reactive. The update actions of all Agents get interleaved amongst
|
||||
threads in a thread pool. At any point in time, at most one ``send`` action for
|
||||
threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for
|
||||
each Agent is being executed. Actions dispatched to an agent from another thread
|
||||
will occur in the order they were sent, potentially interleaved with actions
|
||||
dispatched to the same agent from other sources.
|
||||
dispatched to the same agent from other threads.
|
||||
|
||||
If an Agent is used within an enclosing transaction, then it will participate in
|
||||
that transaction. Agents are integrated with the STM - any dispatches made in
|
||||
that transaction. Agents are integrated with Scala STM - any dispatches made in
|
||||
a transaction are held until that transaction commits, and are discarded if it
|
||||
is retried or aborted.
|
||||
|
||||
|
||||
Creating and stopping Agents
|
||||
Creating Agents
|
||||
============================
|
||||
|
||||
Agents are created by invoking ``new Agent(value, system)`` passing in the
|
||||
Agent's initial value and a reference to the ``ActorSystem`` for your
|
||||
application. An ``ActorSystem`` is required to create the underlying Actors. See
|
||||
:ref:`actor-systems` for more information about actor systems.
|
||||
|
||||
Here is an example of creating an Agent:
|
||||
Agents are created by invoking ``new Agent<ValueType>(value, executionContext)`` – passing in the Agent's initial
|
||||
value and providing an ``ExecutionContext`` to be used for it:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-system,import-agent
|
||||
:include: import-agent,create
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#create
|
||||
:language: java
|
||||
|
||||
An Agent will be running until you invoke ``close`` on it. Then it will be
|
||||
eligible for garbage collection (unless you hold on to it in some way).
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#close
|
||||
:language: java
|
||||
|
||||
|
||||
Updating Agents
|
||||
===============
|
||||
|
||||
You update an Agent by sending a function that transforms the current value or
|
||||
by sending just a new value. The Agent will apply the new value or function
|
||||
atomically and asynchronously. The update is done in a fire-forget manner and
|
||||
you are only guaranteed that it will be applied. There is no guarantee of when
|
||||
the update will be applied but dispatches to an Agent from a single thread will
|
||||
occur in order. You apply a value or a function by invoking the ``send``
|
||||
function.
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#import-function
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#send
|
||||
:language: java
|
||||
|
||||
You can also dispatch a function to update the internal state but on its own
|
||||
thread. This does not use the reactive thread pool and can be used for
|
||||
long-running or blocking operations. You do this with the ``sendOff``
|
||||
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
|
||||
in order.
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#send-off
|
||||
:language: java
|
||||
|
||||
|
||||
Reading an Agent's value
|
||||
========================
|
||||
|
||||
Agents can be dereferenced (you can get an Agent's value) by calling the get
|
||||
method:
|
||||
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
|
||||
with ``get()`` like this:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#read-get
|
||||
:language: java
|
||||
|
|
@ -94,15 +52,58 @@ Reading an Agent's current value does not involve any message passing and
|
|||
happens immediately. So while updates to an Agent are asynchronous, reading the
|
||||
state of an Agent is synchronous.
|
||||
|
||||
You can also get a ``Future`` to the Agents value, that will be completed after the
|
||||
currently queued updates have completed:
|
||||
|
||||
Awaiting an Agent's value
|
||||
=========================
|
||||
|
||||
It is also possible to read the value after all currently queued sends have
|
||||
completed. You can do this with ``await``:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#import-timeout
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-future,read-future
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#read-await
|
||||
See :ref:`futures-java` for more information on ``Futures``.
|
||||
|
||||
Updating Agents (send & alter)
|
||||
==============================
|
||||
|
||||
You update an Agent by sending a function (``akka.dispatch.Mapper``) that transforms the current value or
|
||||
by sending just a new value. The Agent will apply the new value or function
|
||||
atomically and asynchronously. The update is done in a fire-forget manner and
|
||||
you are only guaranteed that it will be applied. There is no guarantee of when
|
||||
the update will be applied but dispatches to an Agent from a single thread will
|
||||
occur in order. You apply a value or a function by invoking the ``send``
|
||||
function.
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-function,send
|
||||
:language: java
|
||||
|
||||
You can also dispatch a function to update the internal state but on its own
|
||||
thread. This does not use the reactive thread pool and can be used for
|
||||
long-running or blocking operations. You do this with the ``sendOff``
|
||||
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
|
||||
in order.
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-function,send-off
|
||||
:language: java
|
||||
|
||||
All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``.
|
||||
See :ref:`futures-java` for more information on ``Futures``.
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-future,import-function,alter
|
||||
:language: java
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java
|
||||
:include: import-future,import-function,alter-off
|
||||
:language: java
|
||||
|
||||
Transactional Agents
|
||||
====================
|
||||
|
||||
If an Agent is used within an enclosing transaction, then it will participate in
|
||||
that transaction. If you send to an Agent within a transaction then the dispatch
|
||||
to the Agent will be held until that transaction commits, and discarded if the
|
||||
transaction is aborted. Here's an example:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocTest.java#transfer-example
|
||||
:language: java
|
||||
|
|
@ -5,107 +5,112 @@ package docs.agent;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import akka.testkit.AkkaSpec;
|
||||
|
||||
//#import-system
|
||||
import akka.actor.ActorSystem;
|
||||
//#import-system
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
//#import-agent
|
||||
import akka.agent.Agent;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import akka.agent.Agent;
|
||||
import akka.dispatch.ExecutionContexts;
|
||||
//#import-agent
|
||||
|
||||
//#import-function
|
||||
import akka.japi.Function;
|
||||
import akka.dispatch.Mapper;
|
||||
//#import-function
|
||||
|
||||
//#import-timeout
|
||||
import akka.util.Timeout;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
//#import-timeout
|
||||
//#import-future
|
||||
import scala.concurrent.Future;
|
||||
//#import-future
|
||||
|
||||
public class AgentDocTest {
|
||||
|
||||
private static ActorSystem testSystem;
|
||||
private static ExecutionContext ec;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAll() {
|
||||
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf());
|
||||
ec = testSystem.dispatcher();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterAll() {
|
||||
testSystem.shutdown();
|
||||
testSystem = null;
|
||||
}
|
||||
private static ExecutionContext ec = ExecutionContexts.global();
|
||||
|
||||
@Test
|
||||
public void createAndClose() {
|
||||
//#create
|
||||
ActorSystem system = ActorSystem.create("app");
|
||||
|
||||
Agent<Integer> agent = new Agent<Integer>(5, system);
|
||||
public void createAndRead() throws Exception {
|
||||
//#create
|
||||
ExecutionContext ec = ExecutionContexts.global();
|
||||
Agent<Integer> agent = new Agent<Integer>(5, ec);
|
||||
//#create
|
||||
|
||||
//#close
|
||||
agent.close();
|
||||
//#close
|
||||
//#read-get
|
||||
Integer result = agent.get();
|
||||
//#read-get
|
||||
|
||||
system.shutdown();
|
||||
//#read-future
|
||||
Future<Integer> future = agent.future();
|
||||
//#read-future
|
||||
|
||||
assertEquals(result, new Integer(5));
|
||||
assertEquals(Await.result(future, Duration.create(5,"s")), new Integer(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sendAndSendOffAndReadAwait() {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
|
||||
public void sendAndSendOffAndReadAwait() throws Exception {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, ec);
|
||||
|
||||
//#send
|
||||
// send a value
|
||||
agent.send(7);
|
||||
|
||||
// send a function
|
||||
agent.send(new Function<Integer, Integer>() {
|
||||
agent.send(new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
});
|
||||
//#send
|
||||
|
||||
Function<Integer, Integer> longRunningOrBlockingFunction = new Function<Integer, Integer>() {
|
||||
Mapper<Integer, Integer> longRunningOrBlockingFunction = new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 1;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutionContext theExecutionContextToExecuteItIn = ec;
|
||||
//#send-off
|
||||
// sendOff a function
|
||||
agent.sendOff(longRunningOrBlockingFunction, ec);
|
||||
agent.sendOff(longRunningOrBlockingFunction,
|
||||
theExecutionContextToExecuteItIn);
|
||||
//#send-off
|
||||
|
||||
//#read-await
|
||||
Integer result = agent.await(new Timeout(5, SECONDS));
|
||||
//#read-await
|
||||
|
||||
assertEquals(result, new Integer(14));
|
||||
|
||||
agent.close();
|
||||
assertEquals(Await.result(agent.future(), Duration.create(5,"s")), new Integer(14));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readWithGet() {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
|
||||
@Test
|
||||
public void alterAndAlterOff() throws Exception {
|
||||
Agent<Integer> agent = new Agent<Integer>(5, ec);
|
||||
|
||||
//#read-get
|
||||
Integer result = agent.get();
|
||||
//#read-get
|
||||
//#alter
|
||||
// alter a value
|
||||
Future<Integer> f1 = agent.alter(7);
|
||||
|
||||
assertEquals(result, new Integer(5));
|
||||
// alter a function (Mapper)
|
||||
Future<Integer> f2 = agent.alter(new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 2;
|
||||
}
|
||||
});
|
||||
//#alter
|
||||
|
||||
agent.close();
|
||||
}
|
||||
Mapper<Integer, Integer> longRunningOrBlockingFunction = new Mapper<Integer, Integer>() {
|
||||
public Integer apply(Integer i) {
|
||||
return i * 1;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutionContext theExecutionContextToExecuteItIn = ec;
|
||||
//#alter-off
|
||||
// alterOff a function (Mapper)
|
||||
Future<Integer> f3 = agent.alterOff(longRunningOrBlockingFunction,
|
||||
theExecutionContextToExecuteItIn);
|
||||
//#alter-off
|
||||
|
||||
assertEquals(Await.result(f3, Duration.create(5,"s")), new Integer(14));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,10 +19,10 @@ immediately available for reading by any thread (using ``get`` or ``apply``)
|
|||
without any messages.
|
||||
|
||||
Agents are reactive. The update actions of all Agents get interleaved amongst
|
||||
threads in a thread pool. At any point in time, at most one ``send`` action for
|
||||
threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for
|
||||
each Agent is being executed. Actions dispatched to an agent from another thread
|
||||
will occur in the order they were sent, potentially interleaved with actions
|
||||
dispatched to the same agent from other sources.
|
||||
dispatched to the same agent from other threads.
|
||||
|
||||
If an Agent is used within an enclosing transaction, then it will participate in
|
||||
that transaction. Agents are integrated with Scala STM - any dispatches made in
|
||||
|
|
@ -30,32 +30,33 @@ a transaction are held until that transaction commits, and are discarded if it
|
|||
is retried or aborted.
|
||||
|
||||
|
||||
Creating and stopping Agents
|
||||
Creating Agents
|
||||
============================
|
||||
|
||||
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial
|
||||
value:
|
||||
value and providing an implicit ``ExecutionContext`` to be used for it, for these
|
||||
examples we're going to use the default global one, but YMMV:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#create
|
||||
|
||||
Note that creating an Agent requires an implicit ``ActorSystem`` (for creating
|
||||
the underlying actors). See :ref:`actor-systems` for more information about
|
||||
actor systems. An ActorSystem can be in implicit scope when creating an Agent:
|
||||
Reading an Agent's value
|
||||
========================
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#create-implicit-system
|
||||
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
|
||||
with parentheses like this:
|
||||
|
||||
Or the ActorSystem can be passed explicitly when creating an Agent:
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#create-explicit-system
|
||||
Or by using the get method:
|
||||
|
||||
An Agent will be running until you invoke ``close`` on it. Then it will be
|
||||
eligible for garbage collection (unless you hold on to it in some way).
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-get
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#close
|
||||
Reading an Agent's current value does not involve any message passing and
|
||||
happens immediately. So while updates to an Agent are asynchronous, reading the
|
||||
state of an Agent is synchronous.
|
||||
|
||||
|
||||
Updating Agents
|
||||
===============
|
||||
Updating Agents (send & alter)
|
||||
======================
|
||||
|
||||
You update an Agent by sending a function that transforms the current value or
|
||||
by sending just a new value. The Agent will apply the new value or function
|
||||
|
|
@ -75,37 +76,22 @@ in order.
|
|||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#send-off
|
||||
|
||||
All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``.
|
||||
See :ref:`futures-scala` for more information on ``Futures``.
|
||||
|
||||
Reading an Agent's value
|
||||
========================
|
||||
|
||||
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
|
||||
with parentheses like this:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply
|
||||
|
||||
Or by using the get method:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-get
|
||||
|
||||
Reading an Agent's current value does not involve any message passing and
|
||||
happens immediately. So while updates to an Agent are asynchronous, reading the
|
||||
state of an Agent is synchronous.
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#alter
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#alter-off
|
||||
|
||||
Awaiting an Agent's value
|
||||
=========================
|
||||
|
||||
It is also possible to read the value after all currently queued sends have
|
||||
completed. You can do this with ``await``:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-await
|
||||
|
||||
You can also get a ``Future`` to this value, that will be completed after the
|
||||
You can also get a ``Future`` to the Agents value, that will be completed after the
|
||||
currently queued updates have completed:
|
||||
|
||||
.. includecode:: code/docs/agent/AgentDocSpec.scala#read-future
|
||||
|
||||
See :ref:`futures-scala` for more information on ``Futures``.
|
||||
|
||||
Transactional Agents
|
||||
====================
|
||||
|
|
|
|||
|
|
@ -7,54 +7,46 @@ import language.postfixOps
|
|||
|
||||
import akka.agent.Agent
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.{ Await, ExecutionContext }
|
||||
import akka.testkit._
|
||||
import scala.concurrent.Future
|
||||
|
||||
class AgentDocSpec extends AkkaSpec {
|
||||
|
||||
"create and close" in {
|
||||
"create" in {
|
||||
//#create
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import akka.agent.Agent
|
||||
|
||||
val agent = Agent(5)
|
||||
//#create
|
||||
|
||||
//#close
|
||||
agent.close()
|
||||
//#close
|
||||
}
|
||||
|
||||
"create with implicit system" in {
|
||||
//#create-implicit-system
|
||||
import akka.actor.ActorSystem
|
||||
import akka.agent.Agent
|
||||
"read value" in {
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val agent = Agent(0)
|
||||
|
||||
implicit val system = ActorSystem("app")
|
||||
{
|
||||
//#read-apply
|
||||
val result = agent()
|
||||
//#read-apply
|
||||
result must be === 0
|
||||
}
|
||||
{
|
||||
//#read-get
|
||||
val result = agent.get
|
||||
//#read-get
|
||||
result must be === 0
|
||||
}
|
||||
|
||||
val agent = Agent(5)
|
||||
//#create-implicit-system
|
||||
|
||||
agent.close()
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
"create with explicit system" in {
|
||||
//#create-explicit-system
|
||||
import akka.actor.ActorSystem
|
||||
import akka.agent.Agent
|
||||
|
||||
val system = ActorSystem("app")
|
||||
|
||||
val agent = Agent(5)(system)
|
||||
//#create-explicit-system
|
||||
|
||||
agent.close()
|
||||
system.shutdown()
|
||||
{
|
||||
//#read-future
|
||||
val future = agent.future
|
||||
//#read-future
|
||||
Await.result(future, 5 seconds) must be === 0
|
||||
}
|
||||
}
|
||||
|
||||
"send and sendOff" in {
|
||||
val agent = Agent(0)
|
||||
import system.dispatcher
|
||||
val agent = Agent(0)(ExecutionContext.global)
|
||||
//#send
|
||||
// send a value
|
||||
agent send 7
|
||||
|
|
@ -64,70 +56,47 @@ class AgentDocSpec extends AkkaSpec {
|
|||
agent send (_ * 2)
|
||||
//#send
|
||||
|
||||
def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1
|
||||
|
||||
def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 // Just for the example code
|
||||
def someExecutionContext() = scala.concurrent.ExecutionContext.Implicits.global // Just for the example code
|
||||
//#send-off
|
||||
// the ExecutionContext you want to run the function on
|
||||
implicit val ec = someExecutionContext()
|
||||
// sendOff a function
|
||||
agent sendOff (longRunningOrBlockingFunction)
|
||||
agent sendOff longRunningOrBlockingFunction
|
||||
//#send-off
|
||||
|
||||
val result = agent.await(Timeout(5 seconds))
|
||||
result must be === 16
|
||||
Await.result(agent.future, 5 seconds) must be === 16
|
||||
}
|
||||
|
||||
"read with apply" in {
|
||||
val agent = Agent(0)
|
||||
"alter and alterOff" in {
|
||||
val agent = Agent(0)(ExecutionContext.global)
|
||||
//#alter
|
||||
// alter a value
|
||||
val f1: Future[Int] = agent alter 7
|
||||
|
||||
//#read-apply
|
||||
val result = agent()
|
||||
//#read-apply
|
||||
// alter a function
|
||||
val f2: Future[Int] = agent alter (_ + 1)
|
||||
val f3: Future[Int] = agent alter (_ * 2)
|
||||
//#alter
|
||||
|
||||
result must be === 0
|
||||
}
|
||||
def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 // Just for the example code
|
||||
def someExecutionContext() = ExecutionContext.global // Just for the example code
|
||||
|
||||
"read with get" in {
|
||||
val agent = Agent(0)
|
||||
//#alter-off
|
||||
// the ExecutionContext you want to run the function on
|
||||
implicit val ec = someExecutionContext()
|
||||
// alterOff a function
|
||||
val f4: Future[Int] = agent alterOff longRunningOrBlockingFunction
|
||||
//#alter-off
|
||||
|
||||
//#read-get
|
||||
val result = agent.get
|
||||
//#read-get
|
||||
|
||||
result must be === 0
|
||||
}
|
||||
|
||||
"read with await" in {
|
||||
val agent = Agent(0)
|
||||
|
||||
//#read-await
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
val result = agent.await
|
||||
//#read-await
|
||||
|
||||
result must be === 0
|
||||
}
|
||||
|
||||
"read with future" in {
|
||||
val agent = Agent(0)
|
||||
|
||||
//#read-future
|
||||
import scala.concurrent.Await
|
||||
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
val future = agent.future
|
||||
val result = Await.result(future, timeout.duration)
|
||||
//#read-future
|
||||
|
||||
result must be === 0
|
||||
Await.result(f4, 5 seconds) must be === 16
|
||||
}
|
||||
|
||||
"transfer example" in {
|
||||
//#transfer-example
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import akka.agent.Agent
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
import scala.concurrent.stm._
|
||||
|
||||
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
|
||||
|
|
@ -145,25 +114,25 @@ class AgentDocSpec extends AkkaSpec {
|
|||
val to = Agent(20)
|
||||
val ok = transfer(from, to, 50)
|
||||
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
val fromValue = from.await // -> 50
|
||||
val toValue = to.await // -> 70
|
||||
val fromValue = from.future // -> 50
|
||||
val toValue = to.future // -> 70
|
||||
//#transfer-example
|
||||
|
||||
fromValue must be === 50
|
||||
toValue must be === 70
|
||||
Await.result(fromValue, 5 seconds) must be === 50
|
||||
Await.result(toValue, 5 seconds) must be === 70
|
||||
ok must be === true
|
||||
}
|
||||
|
||||
"monadic example" in {
|
||||
def println(a: Any) = ()
|
||||
//#monadic-example
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
val agent1 = Agent(3)
|
||||
val agent2 = Agent(5)
|
||||
|
||||
// uses foreach
|
||||
var result = 0
|
||||
for (value ← agent1) {
|
||||
result = value + 1
|
||||
}
|
||||
for (value ← agent1)
|
||||
println(value)
|
||||
|
||||
// uses map
|
||||
val agent3 = for (value ← agent1) yield value + 1
|
||||
|
|
@ -178,15 +147,8 @@ class AgentDocSpec extends AkkaSpec {
|
|||
} yield value1 + value2
|
||||
//#monadic-example
|
||||
|
||||
result must be === 4
|
||||
agent3() must be === 4
|
||||
agent4() must be === 4
|
||||
agent5() must be === 8
|
||||
|
||||
agent1.close()
|
||||
agent2.close()
|
||||
agent3.close()
|
||||
agent4.close()
|
||||
agent5.close()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue