diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index 742805883b..830a19eb29 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -4,6 +4,7 @@ import java.util.concurrent.{ ExecutorService, Executor, Executors } import java.util.concurrent.atomic.AtomicInteger import scala.concurrent._ import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } +import akka.util.SerializedSuspendableExecutionContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { @@ -81,4 +82,82 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { Await.ready(latch, timeout.duration) } } + + "A SerializedSuspendableExecutionContext" must { + "be suspendable and resumable" in { + val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global) + val counter = new AtomicInteger(0) + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + perform(_ + 1) + perform(x ⇒ { sec.suspend(); x * 2 }) + awaitCond(counter.get == 2) + perform(_ + 4) + perform(_ * 2) + sec.size must be === 2 + Thread.sleep(500) + sec.size must be === 2 + counter.get must be === 2 + sec.resume() + awaitCond(counter.get == 12) + perform(_ * 2) + awaitCond(counter.get == 24) + sec.isEmpty must be === true + } + + "execute 'throughput' number of tasks per sweep" in { + val submissions = new AtomicInteger(0) + val counter = new AtomicInteger(0) + val underlying = new ExecutionContext { + override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) } + override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) } + } + val throughput = 25 + val sec = SerializedSuspendableExecutionContext(throughput)(underlying) + sec.suspend() + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + + val total = 1000 + 1 to total foreach { _ ⇒ perform(_ + 1) } + sec.size() must be === total + sec.resume() + awaitCond(counter.get == total) + submissions.get must be === (total / throughput) + sec.isEmpty must be === true + } + + "execute tasks in serial" in { + val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global) + val total = 10000 + val counter = new AtomicInteger(0) + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + + 1 to total foreach { i ⇒ perform(c ⇒ if (c == (i - 1)) c + 1 else c) } + awaitCond(counter.get == total) + sec.isEmpty must be === true + } + + "should relinquish thread when suspended" in { + val submissions = new AtomicInteger(0) + val counter = new AtomicInteger(0) + val underlying = new ExecutionContext { + override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) } + override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) } + } + val throughput = 25 + val sec = SerializedSuspendableExecutionContext(throughput)(underlying) + sec.suspend() + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + perform(_ + 1) + 1 to 10 foreach { _ ⇒ perform(identity) } + perform(x ⇒ { sec.suspend(); x * 2 }) + perform(_ + 8) + sec.size must be === 13 + sec.resume() + awaitCond(counter.get == 2) + sec.resume() + awaitCond(counter.get == 10) + sec.isEmpty must be === true + submissions.get must be === 2 + } + } } diff --git a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala new file mode 100644 index 0000000000..841cd865fd --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala @@ -0,0 +1,81 @@ +package akka.util + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal +import scala.annotation.{ tailrec, switch } + +private[akka] object SerializedSuspendableExecutionContext { + final val Off = 0 + final val On = 1 + final val Suspended = 2 + + def apply(batchSize: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext = + new SerializedSuspendableExecutionContext(batchSize)(context match { + case s: SerializedSuspendableExecutionContext ⇒ s.context + case other ⇒ other + }) +} + +/** + * This `ExecutionContext` allows to wrap an underlying `ExecutionContext` and provide guaranteed serial execution + * of tasks submitted to it. On top of that it also allows for *suspending* and *resuming* processing of tasks. + * + * WARNING: This type must never leak into User code as anything but `ExecutionContext` + * + * @param throughput maximum number of tasks to be executed in serial before relinquishing the executing thread. + * @param context the underlying context which will be used to actually execute the submitted tasks + */ +private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)(val context: ExecutionContext) + extends ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext { + import SerializedSuspendableExecutionContext._ + require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput") + + private final val state = new AtomicInteger(Off) + @tailrec private final def addState(newState: Int): Boolean = { + val c = state.get + state.compareAndSet(c, c | newState) || addState(newState) + } + @tailrec private final def remState(oldState: Int) { + val c = state.get + if (state.compareAndSet(c, c & ~oldState)) attach() else remState(oldState) + } + + /** + * Resumes execution of tasks until `suspend` is called, + * if it isn't currently suspended, it is a no-op. + * This operation is idempotent. + */ + final def resume(): Unit = remState(Suspended) + + /** + * Suspends execution of tasks until `resume` is called, + * this operation is idempotent. + */ + final def suspend(): Unit = addState(Suspended) + + final def run(): Unit = { + @tailrec def run(done: Int): Unit = + if (done < throughput && state.get == On) { + poll() match { + case null ⇒ () + case some ⇒ + try some.run() catch { case NonFatal(t) ⇒ context reportFailure t } + run(done + 1) + } + } + try run(0) finally remState(On) + } + + final def attach(): Unit = if (!isEmpty && state.compareAndSet(Off, On)) context execute this + override final def execute(task: Runnable): Unit = try add(task) finally attach() + override final def reportFailure(t: Throwable): Unit = context reportFailure t + + override final def toString: String = (state.get: @switch) match { + case 0 ⇒ "Off" + case 1 ⇒ "On" + case 2 ⇒ "Off & Suspended" + case 3 ⇒ "On & Suspended" + } +} \ No newline at end of file diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index a19d01c338..fae81d1dc9 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -4,26 +4,15 @@ package akka.agent -import akka.actor._ -import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.pattern.ask -import akka.util.Timeout import scala.concurrent.stm._ -import scala.concurrent.{ ExecutionContext, Future, Promise, Await } -import scala.concurrent.duration.{ FiniteDuration, Duration } - -/** - * Used internally to send functions. - */ -private[akka] case class Update[T](function: T ⇒ T) -private[akka] case class Alter[T](function: T ⇒ T) -private[akka] case object Get +import scala.concurrent.{ ExecutionContext, Future, Promise } +import akka.util.{ SerializedSuspendableExecutionContext } /** * Factory method for creating an Agent. */ object Agent { - def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system, system) + def apply[T](initialValue: T)(implicit context: ExecutionContext) = new Agent(initialValue, context) } /** @@ -96,14 +85,13 @@ object Agent { * agent4.close * }}} */ -class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem) { +class Agent[T](initialValue: T, context: ExecutionContext) { private val ref = Ref(initialValue) - private val updater = refFactory.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow? - - def this(initialValue: T, system: ActorSystem) = this(initialValue, system, system) + private val updater = SerializedSuspendableExecutionContext(10)(context) /** * Read the internal state of the agent. + * Java API */ def get(): T = ref.single.get @@ -113,187 +101,104 @@ class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem def apply(): T = get /** - * Dispatch a function to update the internal state. + * Dispatch a new value for the internal state. Behaves the same + * as sending a function (x => newValue). */ - def send(f: T ⇒ T): Unit = { - def dispatch = updater ! Update(f) + def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) }) + + /** + * Dispatch a function to update the internal state. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def send(f: T ⇒ T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) }) + + /** + * Dispatch a function to update the internal state but on its own thread. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `sendOff` or `send` will + * still be executed in order. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = withinTransaction( + new Runnable { + def run = + try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() }) + }) + + /** + * Dispatch an update to the internal state, and return a Future where + * that new state can be obtained. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) + + /** + * Dispatch a function to update the internal state, and return a Future where + * that new state can be obtained. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alter(f: T ⇒ T): Future[T] = doAlter(ref.single.transformAndGet(f)) + + /** + * Dispatch a function to update the internal state but on its own thread, + * and return a Future where that new state can be obtained. + * This does not use the reactive thread pool and can be used for long-running + * or blocking operations. Dispatches using either `alterOff` or `alter` will + * still be executed in order. + * In Java, pass in an instance of `akka.dispatch.Mapper`. + */ + def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] = { + val result = Promise[T]() + withinTransaction(new Runnable { + def run = { + updater.suspend() + result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume()) + } + }) + result.future + } + + private final def withinTransaction(run: Runnable): Unit = { + def dispatch = updater.execute(run) Txn.findCurrent match { case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn) case _ ⇒ dispatch } } - /** - * Dispatch a function to update the internal state, and return a Future where - * that new state can be obtained within the given timeout. - */ - def alter(f: T ⇒ T)(implicit timeout: Timeout): Future[T] = { - def dispatch = ask(updater, Alter(f)).asInstanceOf[Future[T]] + private final def doAlter(f: ⇒ T): Future[T] = { Txn.findCurrent match { case Some(txn) ⇒ val result = Promise[T]() - Txn.afterCommit(status ⇒ result completeWith dispatch)(txn) + Txn.afterCommit(status ⇒ result completeWith Future(f)(updater))(txn) result.future - case _ ⇒ dispatch + case _ ⇒ Future(f)(updater) } } - /** - * Dispatch a new value for the internal state. Behaves the same - * as sending a function (x => newValue). - */ - def send(newValue: T): Unit = send(_ ⇒ newValue) - - /** - * Dispatch a new value for the internal state. Behaves the same - * as sending a function (x => newValue). - */ - def update(newValue: T): Unit = send(newValue) - - /** - * Dispatch a function to update the internal state but on its own thread. - * This does not use the reactive thread pool and can be used for long-running - * or blocking operations. Dispatches using either `sendOff` or `send` will - * still be executed in order. - */ - def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = { - send((value: T) ⇒ { - suspend() - Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() }) - value - }) - } - - /** - * Dispatch a function to update the internal state but on its own thread, - * and return a Future where that new state can be obtained within the given timeout. - * This does not use the reactive thread pool and can be used for long-running - * or blocking operations. Dispatches using either `alterOff` or `alter` will - * still be executed in order. - */ - def alterOff(f: T ⇒ T)(implicit timeout: Timeout, ec: ExecutionContext): Future[T] = { - val result = Promise[T]() - send((value: T) ⇒ { - suspend() - result completeWith Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() }) - value - }) - result.future - } - /** * A future to the current value that will be completed after any currently * queued updates. */ - def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] //Known to be safe - - /** - * Gets this agent's value after all currently queued updates have completed. - */ - def await(implicit timeout: Timeout): T = Await.result(future, timeout.duration) + def future(): Future[T] = Future(ref.single.get)(updater) /** * Map this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(system) + def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(updater) /** * Flatmap this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Mapper`. */ def flatMap[B](f: T ⇒ Agent[B]): Agent[B] = f(get) /** * Applies the function to the internal state. Does not change the value of this agent. + * In Java, pass in an instance of `akka.dispatch.Foreach`. */ def foreach[U](f: T ⇒ U): Unit = f(get) - - /** - * Suspends processing of `send` actions for the agent. - */ - def suspend(): Unit = updater.suspend() - - /** - * Resumes processing of `send` actions for the agent. - */ - def resume(): Unit = updater.resume(causedByFailure = null) - - /** - * Closes the agents and makes it eligible for garbage collection. - * A closed agent cannot accept any `send` actions. - */ - def close(): Unit = updater.stop() - - // --------------------------------------------- - // Support for Java API Functions and Procedures - // --------------------------------------------- - - /** - * Java API: - * Dispatch a function to update the internal state. - */ - def send(f: JFunc[T, T]): Unit = send(x ⇒ f(x)) - - /** - * Java API - * Dispatch a function to update the internal state, and return a Future where that new state can be obtained - * within the given timeout - */ - def alter(f: JFunc[T, T], timeout: FiniteDuration): Future[T] = alter(x ⇒ f(x))(timeout) - - /** - * Java API: - * Dispatch a function to update the internal state but on its own thread. - * This does not use the reactive thread pool and can be used for long-running - * or blocking operations. Dispatches using either `sendOff` or `send` will - * still be executed in order. - */ - def sendOff(f: JFunc[T, T], ec: ExecutionContext): Unit = sendOff(x ⇒ f(x))(ec) - - /** - * Java API: - * Dispatch a function to update the internal state but on its own thread, - * and return a Future where that new state can be obtained within the given timeout. - * This does not use the reactive thread pool and can be used for long-running - * or blocking operations. Dispatches using either `alterOff` or `alter` will - * still be executed in order. - */ - def alterOff(f: JFunc[T, T], timeout: FiniteDuration, ec: ExecutionContext): Unit = alterOff(x ⇒ f(x))(Timeout(timeout), ec) - - /** - * Java API: - * Map this agent to a new agent, applying the function to the internal state. - * Does not change the value of this agent. - */ - def map[B](f: JFunc[T, B]): Agent[B] = Agent(f(get))(system) - - /** - * Java API: - * Flatmap this agent to a new agent, applying the function to the internal state. - * Does not change the value of this agent. - */ - def flatMap[B](f: JFunc[T, Agent[B]]): Agent[B] = f(get) - - /** - * Java API: - * Applies the function to the internal state. Does not change the value of this agent. - */ - def foreach(f: JProc[T]): Unit = f(get) -} - -/** - * Agent updater actor. Used internally for `send` actions. - * - * INTERNAL API - */ -private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor { - def receive = { - case u: Update[_] ⇒ update(u.function.asInstanceOf[T ⇒ T]) - case a: Alter[_] ⇒ sender ! update(a.function.asInstanceOf[T ⇒ T]) - case Get ⇒ sender ! agent.get - case _ ⇒ - } - - def update(function: T ⇒ T): T = ref.single.transformAndGet(function) -} +} \ No newline at end of file diff --git a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala index e6fb305151..926720600c 100644 --- a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -18,7 +18,7 @@ class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { class AgentSpec extends AkkaSpec { implicit val timeout = Timeout(5.seconds.dilated) - + import system.dispatcher "Agent" must { "update with send dispatches in order sent" in { val countDown = new CountDownFunction[String] @@ -31,36 +31,29 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be("abcd") - - agent.close() } "maintain order between send and sendOff" in { val countDown = new CountDownFunction[String] - val l1, l2 = new CountDownLatch(1) - import system.dispatcher - + val l1, l2 = new TestLatch(1) val agent = Agent("a") agent send (_ + "b") - agent.sendOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) - l1.await(5, TimeUnit.SECONDS) + agent.sendOff((s: String) ⇒ { l1.countDown; Await.ready(l2, timeout.duration); s + "c" }) + Await.ready(l1, timeout.duration) agent send (_ + "d") agent send countDown l2.countDown countDown.await(5 seconds) agent() must be("abcd") - - agent.close() } "maintain order between alter and alterOff" in { - import system.dispatcher - val l1, l2 = new CountDownLatch(1) + val l1, l2 = new TestLatch(1) val agent = Agent("a") val r1 = agent.alter(_ + "b") - val r2 = agent.alterOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) - l1.await(5, TimeUnit.SECONDS) + val r2 = agent.alterOff(s ⇒ { l1.countDown; Await.ready(l2, timeout.duration); s + "c" }) + Await.ready(l1, timeout.duration) val r3 = agent.alter(_ + "d") val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":")) l2.countDown @@ -68,18 +61,16 @@ class AgentSpec extends AkkaSpec { Await.result(result, 5 seconds) must be === "ab:abc:abcd" agent() must be("abcd") - - agent.close() } "be immediately readable" in { val countDown = new CountDownFunction[Int] - val readLatch = new CountDownLatch(1) + val readLatch = new TestLatch(1) val readTimeout = 5 seconds val agent = Agent(5) val f1 = (i: Int) ⇒ { - readLatch.await(readTimeout.length, readTimeout.unit) + Await.ready(readLatch, readTimeout) i + 5 } agent send f1 @@ -90,15 +81,12 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) read must be(5) agent() must be(10) - - agent.close() } "be readable within a transaction" in { val agent = Agent(5) val value = atomic { t ⇒ agent() } value must be(5) - agent.close() } "dispatch sends in successful transactions" in { @@ -112,8 +100,6 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be(10) - - agent.close() } "not dispatch sends in aborted transactions" in { @@ -132,8 +118,6 @@ class AgentSpec extends AkkaSpec { countDown.await(5 seconds) agent() must be(5) - - agent.close() } "be able to return a 'queued' future" in { @@ -142,8 +126,6 @@ class AgentSpec extends AkkaSpec { agent send (_ + "c") Await.result(agent.future, timeout.duration) must be("abc") - - agent.close() } "be able to await the value after updates have completed" in { @@ -151,9 +133,7 @@ class AgentSpec extends AkkaSpec { agent send (_ + "b") agent send (_ + "c") - agent.await must be("abc") - - agent.close() + Await.result(agent.future, timeout.duration) must be("abc") } "be able to be mapped" in { @@ -162,9 +142,6 @@ class AgentSpec extends AkkaSpec { agent1() must be(5) agent2() must be(10) - - agent1.close() - agent2.close() } "be able to be used in a 'foreach' for comprehension" in { @@ -176,8 +153,6 @@ class AgentSpec extends AkkaSpec { } result must be(3) - - agent.close() } "be able to be used in a 'map' for comprehension" in { @@ -186,9 +161,6 @@ class AgentSpec extends AkkaSpec { agent1() must be(5) agent2() must be(10) - - agent1.close() - agent2.close() } "be able to be used in a 'flatMap' for comprehension" in { @@ -203,10 +175,6 @@ class AgentSpec extends AkkaSpec { agent1() must be(1) agent2() must be(2) agent3() must be(3) - - agent1.close() - agent2.close() - agent3.close() } } } diff --git a/akka-docs/rst/java/agents.rst b/akka-docs/rst/java/agents.rst index 0ba7dd90ce..98884bf37f 100644 --- a/akka-docs/rst/java/agents.rst +++ b/akka-docs/rst/java/agents.rst @@ -15,77 +15,35 @@ functions that are asynchronously applied to the Agent's state and whose return value becomes the Agent's new state. The state of an Agent should be immutable. While updates to Agents are asynchronous, the state of an Agent is always -immediately available for reading by any thread (using ``get``) without any -messages. +immediately available for reading by any thread (using ``get``) without any messages. Agents are reactive. The update actions of all Agents get interleaved amongst -threads in a thread pool. At any point in time, at most one ``send`` action for +threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for each Agent is being executed. Actions dispatched to an agent from another thread will occur in the order they were sent, potentially interleaved with actions -dispatched to the same agent from other sources. +dispatched to the same agent from other threads. If an Agent is used within an enclosing transaction, then it will participate in -that transaction. Agents are integrated with the STM - any dispatches made in +that transaction. Agents are integrated with Scala STM - any dispatches made in a transaction are held until that transaction commits, and are discarded if it is retried or aborted. -Creating and stopping Agents +Creating Agents ============================ -Agents are created by invoking ``new Agent(value, system)`` passing in the -Agent's initial value and a reference to the ``ActorSystem`` for your -application. An ``ActorSystem`` is required to create the underlying Actors. See -:ref:`actor-systems` for more information about actor systems. - -Here is an example of creating an Agent: +Agents are created by invoking ``new Agent(value, executionContext)`` – passing in the Agent's initial +value and providing an ``ExecutionContext`` to be used for it: .. includecode:: code/docs/agent/AgentDocTest.java - :include: import-system,import-agent + :include: import-agent,create :language: java -.. includecode:: code/docs/agent/AgentDocTest.java#create - :language: java - -An Agent will be running until you invoke ``close`` on it. Then it will be -eligible for garbage collection (unless you hold on to it in some way). - -.. includecode:: code/docs/agent/AgentDocTest.java#close - :language: java - - -Updating Agents -=============== - -You update an Agent by sending a function that transforms the current value or -by sending just a new value. The Agent will apply the new value or function -atomically and asynchronously. The update is done in a fire-forget manner and -you are only guaranteed that it will be applied. There is no guarantee of when -the update will be applied but dispatches to an Agent from a single thread will -occur in order. You apply a value or a function by invoking the ``send`` -function. - -.. includecode:: code/docs/agent/AgentDocTest.java#import-function - :language: java - -.. includecode:: code/docs/agent/AgentDocTest.java#send - :language: java - -You can also dispatch a function to update the internal state but on its own -thread. This does not use the reactive thread pool and can be used for -long-running or blocking operations. You do this with the ``sendOff`` -method. Dispatches using either ``sendOff`` or ``send`` will still be executed -in order. - -.. includecode:: code/docs/agent/AgentDocTest.java#send-off - :language: java - - Reading an Agent's value ======================== -Agents can be dereferenced (you can get an Agent's value) by calling the get -method: +Agents can be dereferenced (you can get an Agent's value) by invoking the Agent +with ``get()`` like this: .. includecode:: code/docs/agent/AgentDocTest.java#read-get :language: java @@ -94,15 +52,55 @@ Reading an Agent's current value does not involve any message passing and happens immediately. So while updates to an Agent are asynchronous, reading the state of an Agent is synchronous. +You can also get a ``Future`` to the Agents value, that will be completed after the +currently queued updates have completed: -Awaiting an Agent's value -========================= - -It is also possible to read the value after all currently queued sends have -completed. You can do this with ``await``: - -.. includecode:: code/docs/agent/AgentDocTest.java#import-timeout +.. includecode:: code/docs/agent/AgentDocTest.java + :include: import-future,read-future :language: java -.. includecode:: code/docs/agent/AgentDocTest.java#read-await +See :ref:`futures-java` for more information on ``Futures``. + +Updating Agents (send & alter) +============================== + +You update an Agent by sending a function (``akka.dispatch.Mapper``) that transforms the current value or +by sending just a new value. The Agent will apply the new value or function +atomically and asynchronously. The update is done in a fire-forget manner and +you are only guaranteed that it will be applied. There is no guarantee of when +the update will be applied but dispatches to an Agent from a single thread will +occur in order. You apply a value or a function by invoking the ``send`` +function. + +.. includecode:: code/docs/agent/AgentDocTest.java + :include: import-function,send :language: java + +You can also dispatch a function to update the internal state but on its own +thread. This does not use the reactive thread pool and can be used for +long-running or blocking operations. You do this with the ``sendOff`` +method. Dispatches using either ``sendOff`` or ``send`` will still be executed +in order. + +.. includecode:: code/docs/agent/AgentDocTest.java + :include: import-function,send-off + :language: java + +All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``. +See :ref:`futures-java` for more information on ``Futures``. + +.. includecode:: code/docs/agent/AgentDocTest.java + :include: import-future,import-function,alter + :language: java + +.. includecode:: code/docs/agent/AgentDocTest.java + :include: import-future,import-function,alter-off + :language: java + +Transactional Agents +==================== + +If an Agent is used within an enclosing ``Scala STM transaction``, then it will participate in +that transaction. If you send to an Agent within a transaction then the dispatch +to the Agent will be held until that transaction commits, and discarded if the +transaction is aborted. \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/agent/AgentDocTest.java b/akka-docs/rst/java/code/docs/agent/AgentDocTest.java index 6d7345b52a..f3facaca90 100644 --- a/akka-docs/rst/java/code/docs/agent/AgentDocTest.java +++ b/akka-docs/rst/java/code/docs/agent/AgentDocTest.java @@ -5,107 +5,114 @@ package docs.agent; import static org.junit.Assert.*; -import scala.concurrent.ExecutionContext; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import akka.testkit.AkkaSpec; -//#import-system -import akka.actor.ActorSystem; -//#import-system +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; //#import-agent -import akka.agent.Agent; + import scala.concurrent.ExecutionContext; + import akka.agent.Agent; + import akka.dispatch.ExecutionContexts; //#import-agent //#import-function -import akka.japi.Function; + import akka.dispatch.Mapper; //#import-function -//#import-timeout -import akka.util.Timeout; -import static java.util.concurrent.TimeUnit.SECONDS; -//#import-timeout +//#import-future + import scala.concurrent.Future; +//#import-future public class AgentDocTest { - private static ActorSystem testSystem; - private static ExecutionContext ec; - - @BeforeClass - public static void beforeAll() { - testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf()); - ec = testSystem.dispatcher(); - } - - @AfterClass - public static void afterAll() { - testSystem.shutdown(); - testSystem = null; - } + private static ExecutionContext ec = ExecutionContexts.global(); @Test - public void createAndClose() { - //#create - ActorSystem system = ActorSystem.create("app"); - - Agent agent = new Agent(5, system); + public void createAndRead() throws Exception { + //#create + ExecutionContext ec = ExecutionContexts.global(); + Agent agent = new Agent(5, ec); //#create - //#close - agent.close(); - //#close + //#read-get + Integer result = agent.get(); + //#read-get - system.shutdown(); + //#read-future + Future future = agent.future(); + //#read-future + + assertEquals(result, new Integer(5)); + assertEquals(Await.result(future, Duration.create(5,"s")), new Integer(5)); } @Test - public void sendAndSendOffAndReadAwait() { - Agent agent = new Agent(5, testSystem); + public void sendAndSendOffAndReadAwait() throws Exception { + Agent agent = new Agent(5, ec); //#send - // send a value + // send a value, enqueues this change + // of the value of the Agent agent.send(7); - // send a function - agent.send(new Function() { + // send a Mapper, enqueues this change + // to the value of the Agent + agent.send(new Mapper() { public Integer apply(Integer i) { return i * 2; } }); //#send - Function longRunningOrBlockingFunction = new Function() { + Mapper longRunningOrBlockingFunction = new Mapper() { public Integer apply(Integer i) { return i * 1; } }; + ExecutionContext theExecutionContextToExecuteItIn = ec; //#send-off // sendOff a function - agent.sendOff(longRunningOrBlockingFunction, ec); + agent.sendOff(longRunningOrBlockingFunction, + theExecutionContextToExecuteItIn); //#send-off - //#read-await - Integer result = agent.await(new Timeout(5, SECONDS)); - //#read-await - - assertEquals(result, new Integer(14)); - - agent.close(); + assertEquals(Await.result(agent.future(), Duration.create(5,"s")), new Integer(14)); } - @Test - public void readWithGet() { - Agent agent = new Agent(5, testSystem); + @Test + public void alterAndAlterOff() throws Exception { + Agent agent = new Agent(5, ec); - //#read-get - Integer result = agent.get(); - //#read-get + //#alter + // alter a value + Future f1 = agent.alter(7); - assertEquals(result, new Integer(5)); + // alter a function (Mapper) + Future f2 = agent.alter(new Mapper() { + public Integer apply(Integer i) { + return i * 2; + } + }); + //#alter - agent.close(); - } + Mapper longRunningOrBlockingFunction = new Mapper() { + public Integer apply(Integer i) { + return i * 1; + } + }; + + ExecutionContext theExecutionContextToExecuteItIn = ec; + //#alter-off + // alterOff a function (Mapper) + Future f3 = agent.alterOff(longRunningOrBlockingFunction, + theExecutionContextToExecuteItIn); + //#alter-off + + assertEquals(Await.result(f3, Duration.create(5,"s")), new Integer(14)); + } } diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index 8baa78b249..0a7d3da4df 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -53,3 +53,42 @@ ZeroMQ ByteString ``akka.zeromq.Frame`` and the use of ``Seq[Byte]`` in the API has been removed and is replaced by ``akka.util.ByteString``. ``ZMQMessage.firstFrameAsString`` has been removed, please use ``ZMQMessage.frames`` or ``ZMQMessage.frame(int)`` to access the frames. + +Brand new Agents +================ + +Akka's ``Agent`` has been rewritten to improve the API and to remove the need to manually ``close`` an Agent. +The Java API has also been harmonized so both Java and Scala call the same methods. + +======================================================= ======================================================= +Old Java API New Java API +======================================================= ======================================================= +``new Agent(value, actorSystem)`` ``new Agent(value, executionContext)`` +``agent.update(newValue)`` ``agent.send(newValue)`` +``agent.future(Timeout)`` ``agent.future()`` +``agent.await(Timeout)`` ``Await.result(agent.future(), Timeout)`` +``agent.send(Function)`` ``agent.send(Mapper)`` +``agent.sendOff(Function, ExecutionContext)`` ``agent.sendOff(Mapper, ExecutionContext)`` +``agent.alter(Function, Timeout)`` ``agent.alter(Mapper)`` +``agent.alterOff(Function, Timeout, ExecutionContext)`` ``agent.alter(Mapper, ExecutionContext)`` +``agent.map(Function)`` ``agent.map(Mapper)`` +``agent.flatMap(Function)`` ``agent.flatMap(Mapper)`` +``agent.foreach(Procedure)`` ``agent.foreach(Foreach)`` +``agent.suspend()`` ``No replacement, pointless feature`` +``agent.resume()`` ``No replacement, pointless feature`` +``agent.close()`` ``No replacement, not needed in new implementation`` +======================================================= ======================================================= + + +======================================================== ======================================================== +Old Scala API New Scala API +======================================================== ======================================================== +``Agent[T](value)(implicit ActorSystem)`` ``Agent[T](value)(implicit ExecutionContext)`` +``agent.update(newValue)`` ``agent.send(newValue)`` +``agent.alterOff(Function1)(Timeout, ExecutionContext)`` ``agent.alterOff(Function1)(ExecutionContext)`` +``agent.await(Timeout)`` ``Await.result(agent.future, Timeout)`` +``agent.future(Timeout)`` ``agent.future`` +``agent.suspend()`` ``No replacement, pointless feature`` +``agent.resume()`` ``No replacement, pointless feature`` +``agent.close()`` ``No replacement, not needed in new implementation`` +======================================================== ======================================================== \ No newline at end of file diff --git a/akka-docs/rst/scala/agents.rst b/akka-docs/rst/scala/agents.rst index 5657986a88..29fe8f5e23 100644 --- a/akka-docs/rst/scala/agents.rst +++ b/akka-docs/rst/scala/agents.rst @@ -19,10 +19,10 @@ immediately available for reading by any thread (using ``get`` or ``apply``) without any messages. Agents are reactive. The update actions of all Agents get interleaved amongst -threads in a thread pool. At any point in time, at most one ``send`` action for +threads in an ``ExecutionContext``. At any point in time, at most one ``send`` action for each Agent is being executed. Actions dispatched to an agent from another thread will occur in the order they were sent, potentially interleaved with actions -dispatched to the same agent from other sources. +dispatched to the same agent from other threads. If an Agent is used within an enclosing transaction, then it will participate in that transaction. Agents are integrated with Scala STM - any dispatches made in @@ -30,32 +30,33 @@ a transaction are held until that transaction commits, and are discarded if it is retried or aborted. -Creating and stopping Agents +Creating Agents ============================ Agents are created by invoking ``Agent(value)`` passing in the Agent's initial -value: +value and providing an implicit ``ExecutionContext`` to be used for it, for these +examples we're going to use the default global one, but YMMV: .. includecode:: code/docs/agent/AgentDocSpec.scala#create -Note that creating an Agent requires an implicit ``ActorSystem`` (for creating -the underlying actors). See :ref:`actor-systems` for more information about -actor systems. An ActorSystem can be in implicit scope when creating an Agent: +Reading an Agent's value +======================== -.. includecode:: code/docs/agent/AgentDocSpec.scala#create-implicit-system +Agents can be dereferenced (you can get an Agent's value) by invoking the Agent +with parentheses like this: -Or the ActorSystem can be passed explicitly when creating an Agent: +.. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply -.. includecode:: code/docs/agent/AgentDocSpec.scala#create-explicit-system +Or by using the get method: -An Agent will be running until you invoke ``close`` on it. Then it will be -eligible for garbage collection (unless you hold on to it in some way). +.. includecode:: code/docs/agent/AgentDocSpec.scala#read-get -.. includecode:: code/docs/agent/AgentDocSpec.scala#close +Reading an Agent's current value does not involve any message passing and +happens immediately. So while updates to an Agent are asynchronous, reading the +state of an Agent is synchronous. - -Updating Agents -=============== +Updating Agents (send & alter) +============================== You update an Agent by sending a function that transforms the current value or by sending just a new value. The Agent will apply the new value or function @@ -75,37 +76,22 @@ in order. .. includecode:: code/docs/agent/AgentDocSpec.scala#send-off +All ``send`` methods also have a corresponding ``alter`` method that returns a ``Future``. +See :ref:`futures-scala` for more information on ``Futures``. -Reading an Agent's value -======================== - -Agents can be dereferenced (you can get an Agent's value) by invoking the Agent -with parentheses like this: - -.. includecode:: code/docs/agent/AgentDocSpec.scala#read-apply - -Or by using the get method: - -.. includecode:: code/docs/agent/AgentDocSpec.scala#read-get - -Reading an Agent's current value does not involve any message passing and -happens immediately. So while updates to an Agent are asynchronous, reading the -state of an Agent is synchronous. +.. includecode:: code/docs/agent/AgentDocSpec.scala#alter +.. includecode:: code/docs/agent/AgentDocSpec.scala#alter-off Awaiting an Agent's value ========================= -It is also possible to read the value after all currently queued sends have -completed. You can do this with ``await``: - -.. includecode:: code/docs/agent/AgentDocSpec.scala#read-await - -You can also get a ``Future`` to this value, that will be completed after the +You can also get a ``Future`` to the Agents value, that will be completed after the currently queued updates have completed: .. includecode:: code/docs/agent/AgentDocSpec.scala#read-future +See :ref:`futures-scala` for more information on ``Futures``. Transactional Agents ==================== diff --git a/akka-docs/rst/scala/code/docs/agent/AgentDocSpec.scala b/akka-docs/rst/scala/code/docs/agent/AgentDocSpec.scala index 121d2d695f..b977f823bd 100644 --- a/akka-docs/rst/scala/code/docs/agent/AgentDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/agent/AgentDocSpec.scala @@ -7,127 +7,98 @@ import language.postfixOps import akka.agent.Agent import scala.concurrent.duration._ -import akka.util.Timeout +import scala.concurrent.{ Await, ExecutionContext } import akka.testkit._ +import scala.concurrent.Future class AgentDocSpec extends AkkaSpec { - - "create and close" in { + "create" in { //#create + import scala.concurrent.ExecutionContext.Implicits.global import akka.agent.Agent - val agent = Agent(5) //#create - - //#close - agent.close() - //#close } - "create with implicit system" in { - //#create-implicit-system - import akka.actor.ActorSystem - import akka.agent.Agent + "read value" in { + import scala.concurrent.ExecutionContext.Implicits.global + val agent = Agent(0) - implicit val system = ActorSystem("app") + { + //#read-apply + val result = agent() + //#read-apply + result must be === 0 + } + { + //#read-get + val result = agent.get + //#read-get + result must be === 0 + } - val agent = Agent(5) - //#create-implicit-system - - agent.close() - system.shutdown() - } - - "create with explicit system" in { - //#create-explicit-system - import akka.actor.ActorSystem - import akka.agent.Agent - - val system = ActorSystem("app") - - val agent = Agent(5)(system) - //#create-explicit-system - - agent.close() - system.shutdown() + { + //#read-future + val future = agent.future + //#read-future + Await.result(future, 5 seconds) must be === 0 + } } "send and sendOff" in { - val agent = Agent(0) - import system.dispatcher + val agent = Agent(0)(ExecutionContext.global) //#send - // send a value + // send a value, enqueues this change + // of the value of the Agent agent send 7 - // send a function + // send a function, enqueues this change + // to the value of the Agent agent send (_ + 1) agent send (_ * 2) //#send - def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 - + def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 // Just for the example code + def someExecutionContext() = scala.concurrent.ExecutionContext.Implicits.global // Just for the example code //#send-off + // the ExecutionContext you want to run the function on + implicit val ec = someExecutionContext() // sendOff a function - agent sendOff (longRunningOrBlockingFunction) + agent sendOff longRunningOrBlockingFunction //#send-off - val result = agent.await(Timeout(5 seconds)) - result must be === 16 + Await.result(agent.future, 5 seconds) must be === 16 } - "read with apply" in { - val agent = Agent(0) + "alter and alterOff" in { + val agent = Agent(0)(ExecutionContext.global) + //#alter + // alter a value + val f1: Future[Int] = agent alter 7 - //#read-apply - val result = agent() - //#read-apply + // alter a function + val f2: Future[Int] = agent alter (_ + 1) + val f3: Future[Int] = agent alter (_ * 2) + //#alter - result must be === 0 - } + def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 // Just for the example code + def someExecutionContext() = ExecutionContext.global // Just for the example code - "read with get" in { - val agent = Agent(0) + //#alter-off + // the ExecutionContext you want to run the function on + implicit val ec = someExecutionContext() + // alterOff a function + val f4: Future[Int] = agent alterOff longRunningOrBlockingFunction + //#alter-off - //#read-get - val result = agent.get - //#read-get - - result must be === 0 - } - - "read with await" in { - val agent = Agent(0) - - //#read-await - import scala.concurrent.duration._ - import akka.util.Timeout - - implicit val timeout = Timeout(5 seconds) - val result = agent.await - //#read-await - - result must be === 0 - } - - "read with future" in { - val agent = Agent(0) - - //#read-future - import scala.concurrent.Await - - implicit val timeout = Timeout(5 seconds) - val future = agent.future - val result = Await.result(future, timeout.duration) - //#read-future - - result must be === 0 + Await.result(f4, 5 seconds) must be === 16 } "transfer example" in { //#transfer-example + import scala.concurrent.ExecutionContext.Implicits.global import akka.agent.Agent import scala.concurrent.duration._ - import akka.util.Timeout import scala.concurrent.stm._ def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = { @@ -145,25 +116,25 @@ class AgentDocSpec extends AkkaSpec { val to = Agent(20) val ok = transfer(from, to, 50) - implicit val timeout = Timeout(5 seconds) - val fromValue = from.await // -> 50 - val toValue = to.await // -> 70 + val fromValue = from.future // -> 50 + val toValue = to.future // -> 70 //#transfer-example - fromValue must be === 50 - toValue must be === 70 + Await.result(fromValue, 5 seconds) must be === 50 + Await.result(toValue, 5 seconds) must be === 70 + ok must be === true } "monadic example" in { + def println(a: Any) = () //#monadic-example + import scala.concurrent.ExecutionContext.Implicits.global val agent1 = Agent(3) val agent2 = Agent(5) // uses foreach - var result = 0 - for (value ← agent1) { - result = value + 1 - } + for (value ← agent1) + println(value) // uses map val agent3 = for (value ← agent1) yield value + 1 @@ -178,15 +149,8 @@ class AgentDocSpec extends AkkaSpec { } yield value1 + value2 //#monadic-example - result must be === 4 agent3() must be === 4 agent4() must be === 4 agent5() must be === 8 - - agent1.close() - agent2.close() - agent3.close() - agent4.close() - agent5.close() } }