Made Agent an abstract class, pushed existing implementation into a private SecretAgent class and added a java-lovin' factory method. See #3108

This commit is contained in:
Kevin Wright 2013-02-27 12:12:58 +00:00
parent 7884c6a71c
commit f305ed72a8
2 changed files with 91 additions and 52 deletions

View file

@ -8,11 +8,83 @@ import scala.concurrent.stm._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import akka.util.{ SerializedSuspendableExecutionContext }
/**
* Factory method for creating an Agent.
*/
object Agent {
def apply[T](initialValue: T)(implicit context: ExecutionContext) = new Agent(initialValue, context)
/**
* Factory method for creating an Agent.
*/
def apply[T](initialValue: T)(implicit context: ExecutionContext): Agent[T] = new SecretAgent(initialValue, context)
/**
* Factory method for Java Iterop.
*/
def create[T](initialValue: T, context: ExecutionContext): Agent[T] = Agent(initialValue)(context)
/**
* Default agent implementation.
*/
private class SecretAgent[T](initialValue: T, context: ExecutionContext) extends Agent[T] {
private val ref = Ref(initialValue)
private val updater = SerializedSuspendableExecutionContext(10)(context)
def get(): T = ref.single.get
def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })
def send(f: T T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit = withinTransaction(
new Runnable {
def run =
try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() })
})
def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })
def alter(f: T T): Future[T] = doAlter(ref.single.transformAndGet(f))
def alterOff(f: T T)(implicit ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
withinTransaction(new Runnable {
def run = {
updater.suspend()
result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume())
}
})
result.future
}
/**
* Internal helper method
*/
private final def withinTransaction(run: Runnable): Unit = {
def dispatch = updater.execute(run)
Txn.findCurrent match {
case Some(txn) Txn.afterCommit(status dispatch)(txn)
case _ dispatch
}
}
/**
* Internal helper method
*/
private final def doAlter(f: T): Future[T] = {
Txn.findCurrent match {
case Some(txn)
val result = Promise[T]()
Txn.afterCommit(status result completeWith Future(f)(updater))(txn)
result.future
case _ Future(f)(updater)
}
}
def future(): Future[T] = Future(ref.single.get)(updater)
def map[B](f: T B): Agent[B] = Agent(f(get))(updater)
def flatMap[B](f: T Agent[B]): Agent[B] = f(get)
def foreach[U](f: T U): Unit = f(get)
}
}
/**
@ -85,15 +157,13 @@ object Agent {
* agent4.close
* }}}
*/
class Agent[T](initialValue: T, context: ExecutionContext) {
private val ref = Ref(initialValue)
private val updater = SerializedSuspendableExecutionContext(10)(context)
abstract class Agent[T] {
/**
* Read the internal state of the agent.
* Java API
*/
def get(): T = ref.single.get
def get(): T
/**
* Read the internal state of the agent.
@ -104,13 +174,13 @@ class Agent[T](initialValue: T, context: ExecutionContext) {
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def send(newValue: T): Unit = withinTransaction(new Runnable { def run = ref.single.update(newValue) })
def send(newValue: T): Unit
/**
* Dispatch a function to update the internal state.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def send(f: T T): Unit = withinTransaction(new Runnable { def run = ref.single.transform(f) })
def send(f: T T): Unit
/**
* Dispatch a function to update the internal state but on its own thread.
@ -119,25 +189,21 @@ class Agent[T](initialValue: T, context: ExecutionContext) {
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit = withinTransaction(
new Runnable {
def run =
try updater.suspend() finally ec.execute(new Runnable { def run = try ref.single.transform(f) finally updater.resume() })
})
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit
/**
* Dispatch an update to the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })
def alter(newValue: T): Future[T]
/**
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(f: T T): Future[T] = doAlter(ref.single.transformAndGet(f))
def alter(f: T T): Future[T]
/**
* Dispatch a function to update the internal state but on its own thread,
@ -147,58 +213,31 @@ class Agent[T](initialValue: T, context: ExecutionContext) {
* still be executed in order.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alterOff(f: T T)(implicit ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
withinTransaction(new Runnable {
def run = {
updater.suspend()
result completeWith Future(try ref.single.transformAndGet(f) finally updater.resume())
}
})
result.future
}
private final def withinTransaction(run: Runnable): Unit = {
def dispatch = updater.execute(run)
Txn.findCurrent match {
case Some(txn) Txn.afterCommit(status dispatch)(txn)
case _ dispatch
}
}
private final def doAlter(f: T): Future[T] = {
Txn.findCurrent match {
case Some(txn)
val result = Promise[T]()
Txn.afterCommit(status result completeWith Future(f)(updater))(txn)
result.future
case _ Future(f)(updater)
}
}
def alterOff(f: T T)(implicit ec: ExecutionContext): Future[T]
/**
* A future to the current value that will be completed after any currently
* queued updates.
*/
def future(): Future[T] = Future(ref.single.get)(updater)
def future(): Future[T]
/**
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def map[B](f: T B): Agent[B] = Agent(f(get))(updater)
def map[B](f: T B): Agent[B]
/**
* Flatmap this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def flatMap[B](f: T Agent[B]): Agent[B] = f(get)
def flatMap[B](f: T Agent[B]): Agent[B]
/**
* Applies the function to the internal state. Does not change the value of this agent.
* In Java, pass in an instance of `akka.dispatch.Foreach`.
*/
def foreach[U](f: T U): Unit = f(get)
def foreach[U](f: T U): Unit
}

View file

@ -35,7 +35,7 @@ public class AgentDocTest {
public void createAndRead() throws Exception {
//#create
ExecutionContext ec = ExecutionContexts.global();
Agent<Integer> agent = new Agent<Integer>(5, ec);
Agent<Integer> agent = Agent.create(5, ec);
//#create
//#read-get
@ -52,7 +52,7 @@ public class AgentDocTest {
@Test
public void sendAndSendOffAndReadAwait() throws Exception {
Agent<Integer> agent = new Agent<Integer>(5, ec);
Agent<Integer> agent = Agent.create(5, ec);
//#send
// send a value, enqueues this change
@ -86,7 +86,7 @@ public class AgentDocTest {
@Test
public void alterAndAlterOff() throws Exception {
Agent<Integer> agent = new Agent<Integer>(5, ec);
Agent<Integer> agent = Agent.create(5, ec);
//#alter
// alter a value