diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index bd77c1d9ff..e3fb838d3f 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -9,7 +9,8 @@ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } import akka.pattern.ask import akka.util.Timeout import scala.concurrent.stm._ -import scala.concurrent.{ Future, Promise, Await } +import concurrent.{ ExecutionContext, Future, Promise, Await } +import concurrent.util.Duration /** * Used internally to send functions. @@ -22,7 +23,7 @@ private[akka] case object Get * Factory method for creating an Agent. */ object Agent { - def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system) + def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system, system) } /** @@ -95,9 +96,11 @@ object Agent { * agent4.close * }}} */ -class Agent[T](initialValue: T, system: ActorSystem) { +class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem) { private val ref = Ref(initialValue) - private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow? + private val updater = refFactory.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow? + + def this(initialValue: T, system: ActorSystem) = this(initialValue, system, system) /** * Read the internal state of the agent. @@ -114,23 +117,25 @@ class Agent[T](initialValue: T, system: ActorSystem) { */ def send(f: T ⇒ T): Unit = { def dispatch = updater ! Update(f) - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCommit(status ⇒ dispatch)(txn.get) - else dispatch + Txn.findCurrent match { + case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn) + case _ ⇒ dispatch + } } /** * Dispatch a function to update the internal state, and return a Future where * that new state can be obtained within the given timeout. */ - def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { - def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]] - val txn = Txn.findCurrent - if (txn.isDefined) { - val result = Promise[T]() - Txn.afterCommit(status ⇒ result completeWith dispatch)(txn.get) - result.future - } else dispatch + def alter(f: T ⇒ T)(implicit timeout: Timeout): Future[T] = { + def dispatch = ask(updater, Alter(f)).asInstanceOf[Future[T]] + Txn.findCurrent match { + case Some(txn) ⇒ + val result = Promise[T]() + Txn.afterCommit(status ⇒ result completeWith dispatch)(txn) + result.future + case _ ⇒ dispatch + } } /** @@ -143,7 +148,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * Dispatch a new value for the internal state. Behaves the same * as sending a function (x => newValue). */ - def update(newValue: T) = send(newValue) + def update(newValue: T): Unit = send(newValue) /** * Dispatch a function to update the internal state but on its own thread. @@ -151,11 +156,10 @@ class Agent[T](initialValue: T, system: ActorSystem) { * or blocking operations. Dispatches using either `sendOff` or `send` will * still be executed in order. */ - def sendOff(f: T ⇒ T): Unit = { + def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = { send((value: T) ⇒ { suspend() - val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this, ref)).withDispatcher("akka.agent.send-off-dispatcher")) - threadBased ! Update(f) + Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() }) value }) } @@ -167,12 +171,11 @@ class Agent[T](initialValue: T, system: ActorSystem) { * or blocking operations. Dispatches using either `alterOff` or `alter` will * still be executed in order. */ - def alterOff(f: T ⇒ T)(timeout: Timeout): Future[T] = { + def alterOff(f: T ⇒ T)(implicit timeout: Timeout, ec: ExecutionContext): Future[T] = { val result = Promise[T]() send((value: T) ⇒ { suspend() - val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this, ref)).withDispatcher("akka.agent.alter-off-dispatcher")) - result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]] + result completeWith Future(ref.single.transformAndGet(f)).andThen({ case _ ⇒ resume() }) value }) result.future @@ -182,7 +185,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * A future to the current value that will be completed after any currently * queued updates. */ - def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] + def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] //Known to be safe /** * Gets this agent's value after all currently queued updates have completed. @@ -237,7 +240,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * Dispatch a function to update the internal state, and return a Future where that new state can be obtained * within the given timeout */ - def alter(f: JFunc[T, T], timeout: Long): Future[T] = alter(x ⇒ f(x))(timeout) + def alter(f: JFunc[T, T], timeout: Duration): Future[T] = alter(x ⇒ f(x))(timeout) /** * Java API: @@ -246,7 +249,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * or blocking operations. Dispatches using either `sendOff` or `send` will * still be executed in order. */ - def sendOff(f: JFunc[T, T]): Unit = sendOff(x ⇒ f(x)) + def sendOff(f: JFunc[T, T], ec: ExecutionContext): Unit = sendOff(x ⇒ f(x))(ec) /** * Java API: @@ -256,7 +259,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { * or blocking operations. Dispatches using either `alterOff` or `alter` will * still be executed in order. */ - def alterOff(f: JFunc[T, T], timeout: Long): Unit = alterOff(x ⇒ f(x))(timeout) + def alterOff(f: JFunc[T, T], timeout: Duration, ec: ExecutionContext): Unit = alterOff(x ⇒ f(x))(Timeout(timeout), ec) /** * Java API: @@ -293,29 +296,4 @@ private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor } def update(function: T ⇒ T): T = ref.single.transformAndGet(function) -} - -/** - * Thread-based agent updater actor. Used internally for `sendOff` actions. - * - * INTERNAL API - */ -private[akka] class ThreadBasedAgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor { - def receive = { - case u: Update[_] ⇒ try { - update(u.function.asInstanceOf[T ⇒ T]) - } finally { - agent.resume() - context.stop(self) - } - case a: Alter[_] ⇒ try { - sender ! update(a.function.asInstanceOf[T ⇒ T]) - } finally { - agent.resume() - context.stop(self) - } - case _ ⇒ context.stop(self) - } - - def update(function: T ⇒ T): T = ref.single.transformAndGet(function) -} +} \ No newline at end of file diff --git a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala index dd57abe33f..746cc18fae 100644 --- a/akka-agent/src/test/scala/akka/agent/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -39,6 +39,7 @@ class AgentSpec extends AkkaSpec { "maintain order between send and sendOff" in { val countDown = new CountDownFunction[String] val l1, l2 = new CountDownLatch(1) + import system.dispatcher val agent = Agent("a") agent send (_ + "b") @@ -58,10 +59,10 @@ class AgentSpec extends AkkaSpec { val l1, l2 = new CountDownLatch(1) val agent = Agent("a") - val r1 = agent.alter(_ + "b")(5000) - val r2 = agent.alterOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })(5000) + val r1 = agent.alter(_ + "b") + val r2 = agent.alterOff((s: String) ⇒ { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" }) l1.await(5, TimeUnit.SECONDS) - val r3 = agent.alter(_ + "d")(5000) + val r3 = agent.alter(_ + "d") val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":")) l2.countDown diff --git a/akka-docs/java/code/docs/agent/AgentDocTest.java b/akka-docs/java/code/docs/agent/AgentDocTest.java index 0da96ebfc9..b57eaa4189 100644 --- a/akka-docs/java/code/docs/agent/AgentDocTest.java +++ b/akka-docs/java/code/docs/agent/AgentDocTest.java @@ -5,6 +5,7 @@ package docs.agent; import static org.junit.Assert.*; +import scala.concurrent.ExecutionContext; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -30,10 +31,12 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class AgentDocTest { private static ActorSystem testSystem; + private static ExecutionContext ec; @BeforeClass public static void beforeAll() { testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf()); + ec = testSystem.dispatcher(); } @AfterClass @@ -81,7 +84,7 @@ public class AgentDocTest { //#send-off // sendOff a function - agent.sendOff(longRunningOrBlockingFunction); + agent.sendOff(longRunningOrBlockingFunction, ec); //#send-off //#read-await diff --git a/akka-docs/scala/code/docs/agent/AgentDocSpec.scala b/akka-docs/scala/code/docs/agent/AgentDocSpec.scala index 6c259bb34b..1f855057e4 100644 --- a/akka-docs/scala/code/docs/agent/AgentDocSpec.scala +++ b/akka-docs/scala/code/docs/agent/AgentDocSpec.scala @@ -54,7 +54,7 @@ class AgentDocSpec extends AkkaSpec { "send and sendOff" in { val agent = Agent(0) - + import system.dispatcher //#send // send a value agent send 7