diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index fae81d1dc9..61bb5e73b3 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -8,11 +8,84 @@ import scala.concurrent.stm._ import scala.concurrent.{ ExecutionContext, Future, Promise } import akka.util.{ SerializedSuspendableExecutionContext } -/** - * Factory method for creating an Agent. - */ object Agent { - def apply[T](initialValue: T)(implicit context: ExecutionContext) = new Agent(initialValue, context) + /** + * Factory method for creating an Agent. + */ + def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context) + + /** + * Java API + * Factory method for creating an Agent. + */ + def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context) + + /** + * Default agent implementation. + */ + private final class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] { + private val ref = Ref(initialValue) + private val updater = SerializedSuspendableExecutionContext(10)(context) + + def get(): T = ref.single.get + + def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) }) + + def send(f: T ⇒ T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) }) + + def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = withinTransaction( + new Runnable { + def run = + try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() }) + }) + + def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) + + def alter(f: T ⇒ T): Future[T] = doAlter(ref.single.transformAndGet(f)) + + def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] = { + val result = Promise[T]() + withinTransaction(new Runnable { + def run = { + updater.suspend() + result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume()) + } + }) + result.future + } + + /** + * Internal helper method + */ + private final def withinTransaction(run: Runnable): Unit = { + def dispatch = updater.execute(run) + Txn.findCurrent match { + case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn) + case _ ⇒ dispatch + } + } + + /** + * Internal helper method + */ + private final def doAlter(f: ⇒ T): Future[T] = { + Txn.findCurrent match { + case Some(txn) ⇒ + val result = Promise[T]() + Txn.afterCommit(status ⇒ result completeWith Future(f)(updater))(txn) + result.future + case _ ⇒ Future(f)(updater) + } + } + + def future(): Future[T] = Future(ref.single.get)(updater) + + def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(updater) + + def flatMap[B](f: T ⇒ Agent[B]): Agent[B] = f(get) + + def foreach[U](f: T ⇒ U): Unit = f(get) + } } /** @@ -85,15 +158,13 @@ object Agent { * agent4.close * }}} */ -class Agent[T](initialValue: T, context: ExecutionContext) { - private val ref = Ref(initialValue) - private val updater = SerializedSuspendableExecutionContext(10)(context) +abstract class Agent[T] { /** * Read the internal state of the agent. * Java API */ - def get(): T = ref.single.get + def get(): T /** * Read the internal state of the agent. @@ -104,13 +175,13 @@ class Agent[T](initialValue: T, context: ExecutionContext) { * Dispatch a new value for the internal state. Behaves the same * as sending a function (x => newValue). */ - def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) }) + def send(newValue: T): Unit /** * Dispatch a function to update the internal state. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def send(f: T ⇒ T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) }) + def send(f: T ⇒ T): Unit /** * Dispatch a function to update the internal state but on its own thread. @@ -119,25 +190,21 @@ class Agent[T](initialValue: T, context: ExecutionContext) { * still be executed in order. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit = withinTransaction( - new Runnable { - def run = - try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() }) - }) + def sendOff(f: T ⇒ T)(implicit ec: ExecutionContext): Unit /** * Dispatch an update to the internal state, and return a Future where * that new state can be obtained. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) + def alter(newValue: T): Future[T] /** * Dispatch a function to update the internal state, and return a Future where * that new state can be obtained. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def alter(f: T ⇒ T): Future[T] = doAlter(ref.single.transformAndGet(f)) + def alter(f: T ⇒ T): Future[T] /** * Dispatch a function to update the internal state but on its own thread, @@ -147,58 +214,31 @@ class Agent[T](initialValue: T, context: ExecutionContext) { * still be executed in order. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] = { - val result = Promise[T]() - withinTransaction(new Runnable { - def run = { - updater.suspend() - result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume()) - } - }) - result.future - } - - private final def withinTransaction(run: Runnable): Unit = { - def dispatch = updater.execute(run) - Txn.findCurrent match { - case Some(txn) ⇒ Txn.afterCommit(status ⇒ dispatch)(txn) - case _ ⇒ dispatch - } - } - - private final def doAlter(f: ⇒ T): Future[T] = { - Txn.findCurrent match { - case Some(txn) ⇒ - val result = Promise[T]() - Txn.afterCommit(status ⇒ result completeWith Future(f)(updater))(txn) - result.future - case _ ⇒ Future(f)(updater) - } - } + def alterOff(f: T ⇒ T)(implicit ec: ExecutionContext): Future[T] /** * A future to the current value that will be completed after any currently * queued updates. */ - def future(): Future[T] = Future(ref.single.get)(updater) + def future(): Future[T] /** * Map this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def map[B](f: T ⇒ B): Agent[B] = Agent(f(get))(updater) + def map[B](f: T ⇒ B): Agent[B] /** * Flatmap this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def flatMap[B](f: T ⇒ Agent[B]): Agent[B] = f(get) + def flatMap[B](f: T ⇒ Agent[B]): Agent[B] /** * Applies the function to the internal state. Does not change the value of this agent. * In Java, pass in an instance of `akka.dispatch.Foreach`. */ - def foreach[U](f: T ⇒ U): Unit = f(get) + def foreach[U](f: T ⇒ U): Unit } \ No newline at end of file diff --git a/akka-docs/rst/java/code/docs/agent/AgentDocTest.java b/akka-docs/rst/java/code/docs/agent/AgentDocTest.java index f3facaca90..3c5883d384 100644 --- a/akka-docs/rst/java/code/docs/agent/AgentDocTest.java +++ b/akka-docs/rst/java/code/docs/agent/AgentDocTest.java @@ -35,7 +35,7 @@ public class AgentDocTest { public void createAndRead() throws Exception { //#create ExecutionContext ec = ExecutionContexts.global(); - Agent agent = new Agent(5, ec); + Agent agent = Agent.create(5, ec); //#create //#read-get @@ -52,7 +52,7 @@ public class AgentDocTest { @Test public void sendAndSendOffAndReadAwait() throws Exception { - Agent agent = new Agent(5, ec); + Agent agent = Agent.create(5, ec); //#send // send a value, enqueues this change @@ -86,7 +86,7 @@ public class AgentDocTest { @Test public void alterAndAlterOff() throws Exception { - Agent agent = new Agent(5, ec); + Agent agent = Agent.create(5, ec); //#alter // alter a value diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index e9b1a058bf..f414095a79 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -64,12 +64,14 @@ Brand new Agents ================ Akka's ``Agent`` has been rewritten to improve the API and to remove the need to manually ``close`` an Agent. +It's also now an abstract class with the potential for subtyping and has a new factory method +allowing Java to correctly infer the type of the Agent. The Java API has also been harmonized so both Java and Scala call the same methods. ======================================================= ======================================================= Old Java API New Java API ======================================================= ======================================================= -``new Agent(value, actorSystem)`` ``new Agent(value, executionContext)`` +``new Agent(value, actorSystem)`` ``Agent.create(value, executionContext)`` ``agent.update(newValue)`` ``agent.send(newValue)`` ``agent.future(Timeout)`` ``agent.future()`` ``agent.await(Timeout)`` ``Await.result(agent.future(), Timeout)``