A suggestion how things could be done otherwise.

This commit is contained in:
Viktor Klang 2012-07-25 21:24:15 +02:00
parent 58dcfb1004
commit cc3f576f0a
4 changed files with 39 additions and 57 deletions

View file

@ -9,7 +9,8 @@ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.stm._
import scala.concurrent.{ Future, Promise, Await }
import concurrent.{ ExecutionContext, Future, Promise, Await }
import concurrent.util.Duration
/**
* Used internally to send functions.
@ -22,7 +23,7 @@ private[akka] case object Get
* Factory method for creating an Agent.
*/
object Agent {
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system)
def apply[T](initialValue: T)(implicit system: ActorSystem) = new Agent(initialValue, system, system)
}
/**
@ -95,9 +96,11 @@ object Agent {
* agent4.close
* }}}
*/
class Agent[T](initialValue: T, system: ActorSystem) {
class Agent[T](initialValue: T, refFactory: ActorRefFactory, system: ActorSystem) {
private val ref = Ref(initialValue)
private val updater = system.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow?
private val updater = refFactory.actorOf(Props(new AgentUpdater(this, ref))).asInstanceOf[InternalActorRef] //TODO can we avoid this somehow?
def this(initialValue: T, system: ActorSystem) = this(initialValue, system, system)
/**
* Read the internal state of the agent.
@ -114,23 +117,25 @@ class Agent[T](initialValue: T, system: ActorSystem) {
*/
def send(f: T T): Unit = {
def dispatch = updater ! Update(f)
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status dispatch)(txn.get)
else dispatch
Txn.findCurrent match {
case Some(txn) Txn.afterCommit(status dispatch)(txn)
case _ dispatch
}
}
/**
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained within the given timeout.
*/
def alter(f: T T)(timeout: Timeout): Future[T] = {
def dispatch = ask(updater, Alter(f))(timeout).asInstanceOf[Future[T]]
val txn = Txn.findCurrent
if (txn.isDefined) {
val result = Promise[T]()
Txn.afterCommit(status result completeWith dispatch)(txn.get)
result.future
} else dispatch
def alter(f: T T)(implicit timeout: Timeout): Future[T] = {
def dispatch = ask(updater, Alter(f)).asInstanceOf[Future[T]]
Txn.findCurrent match {
case Some(txn)
val result = Promise[T]()
Txn.afterCommit(status result completeWith dispatch)(txn)
result.future
case _ dispatch
}
}
/**
@ -143,7 +148,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* Dispatch a new value for the internal state. Behaves the same
* as sending a function (x => newValue).
*/
def update(newValue: T) = send(newValue)
def update(newValue: T): Unit = send(newValue)
/**
* Dispatch a function to update the internal state but on its own thread.
@ -151,11 +156,10 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: T T): Unit = {
def sendOff(f: T T)(implicit ec: ExecutionContext): Unit = {
send((value: T) {
suspend()
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this, ref)).withDispatcher("akka.agent.send-off-dispatcher"))
threadBased ! Update(f)
Future(ref.single.transformAndGet(f)).andThen({ case _ resume() })
value
})
}
@ -167,12 +171,11 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: T T)(timeout: Timeout): Future[T] = {
def alterOff(f: T T)(implicit timeout: Timeout, ec: ExecutionContext): Future[T] = {
val result = Promise[T]()
send((value: T) {
suspend()
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this, ref)).withDispatcher("akka.agent.alter-off-dispatcher"))
result completeWith ask(threadBased, Alter(f))(timeout).asInstanceOf[Future[T]]
result completeWith Future(ref.single.transformAndGet(f)).andThen({ case _ resume() })
value
})
result.future
@ -182,7 +185,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* A future to the current value that will be completed after any currently
* queued updates.
*/
def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]]
def future(implicit timeout: Timeout): Future[T] = (updater ? Get).asInstanceOf[Future[T]] //Known to be safe
/**
* Gets this agent's value after all currently queued updates have completed.
@ -237,7 +240,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* Dispatch a function to update the internal state, and return a Future where that new state can be obtained
* within the given timeout
*/
def alter(f: JFunc[T, T], timeout: Long): Future[T] = alter(x f(x))(timeout)
def alter(f: JFunc[T, T], timeout: Duration): Future[T] = alter(x f(x))(timeout)
/**
* Java API:
@ -246,7 +249,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* or blocking operations. Dispatches using either `sendOff` or `send` will
* still be executed in order.
*/
def sendOff(f: JFunc[T, T]): Unit = sendOff(x f(x))
def sendOff(f: JFunc[T, T], ec: ExecutionContext): Unit = sendOff(x f(x))(ec)
/**
* Java API:
@ -256,7 +259,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* or blocking operations. Dispatches using either `alterOff` or `alter` will
* still be executed in order.
*/
def alterOff(f: JFunc[T, T], timeout: Long): Unit = alterOff(x f(x))(timeout)
def alterOff(f: JFunc[T, T], timeout: Duration, ec: ExecutionContext): Unit = alterOff(x f(x))(Timeout(timeout), ec)
/**
* Java API:
@ -293,29 +296,4 @@ private[akka] class AgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor
}
def update(function: T T): T = ref.single.transformAndGet(function)
}
/**
* Thread-based agent updater actor. Used internally for `sendOff` actions.
*
* INTERNAL API
*/
private[akka] class ThreadBasedAgentUpdater[T](agent: Agent[T], ref: Ref[T]) extends Actor {
def receive = {
case u: Update[_] try {
update(u.function.asInstanceOf[T T])
} finally {
agent.resume()
context.stop(self)
}
case a: Alter[_] try {
sender ! update(a.function.asInstanceOf[T T])
} finally {
agent.resume()
context.stop(self)
}
case _ context.stop(self)
}
def update(function: T T): T = ref.single.transformAndGet(function)
}
}

View file

@ -39,6 +39,7 @@ class AgentSpec extends AkkaSpec {
"maintain order between send and sendOff" in {
val countDown = new CountDownFunction[String]
val l1, l2 = new CountDownLatch(1)
import system.dispatcher
val agent = Agent("a")
agent send (_ + "b")
@ -58,10 +59,10 @@ class AgentSpec extends AkkaSpec {
val l1, l2 = new CountDownLatch(1)
val agent = Agent("a")
val r1 = agent.alter(_ + "b")(5000)
val r2 = agent.alterOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })(5000)
val r1 = agent.alter(_ + "b")
val r2 = agent.alterOff((s: String) { l1.countDown; l2.await(5, TimeUnit.SECONDS); s + "c" })
l1.await(5, TimeUnit.SECONDS)
val r3 = agent.alter(_ + "d")(5000)
val r3 = agent.alter(_ + "d")
val result = Future.sequence(Seq(r1, r2, r3)).map(_.mkString(":"))
l2.countDown

View file

@ -5,6 +5,7 @@ package docs.agent;
import static org.junit.Assert.*;
import scala.concurrent.ExecutionContext;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -30,10 +31,12 @@ import static java.util.concurrent.TimeUnit.SECONDS;
public class AgentDocTest {
private static ActorSystem testSystem;
private static ExecutionContext ec;
@BeforeClass
public static void beforeAll() {
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf());
ec = testSystem.dispatcher();
}
@AfterClass
@ -81,7 +84,7 @@ public class AgentDocTest {
//#send-off
// sendOff a function
agent.sendOff(longRunningOrBlockingFunction);
agent.sendOff(longRunningOrBlockingFunction, ec);
//#send-off
//#read-await

View file

@ -54,7 +54,7 @@ class AgentDocSpec extends AkkaSpec {
"send and sendOff" in {
val agent = Agent(0)
import system.dispatcher
//#send
// send a value
agent send 7