Merge branch 'wip-agent-rework-√' of github.com:akka/akka into wip-agent-rework-√

This commit is contained in:
Viktor Klang 2013-01-23 18:26:03 +01:00
commit 68f3dd549d
3 changed files with 54 additions and 14 deletions

View file

@ -93,6 +93,10 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
awaitCond(counter.get == 2)
perform(_ + 4)
perform(_ * 2)
sec.size must be === 2
Thread.sleep(500)
sec.size must be === 2
counter.get must be === 2
sec.resume()
awaitCond(counter.get == 12)
perform(_ * 2)
@ -100,7 +104,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
sec.isEmpty must be === true
}
"execute 'throughput' nmber of tasks per sweep" in {
"execute 'throughput' number of tasks per sweep" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
@ -120,5 +124,40 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
submissions.get must be === (total / throughput)
sec.isEmpty must be === true
}
"execute tasks in serial" in {
val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global)
val total = 10000
val counter = new AtomicInteger(0)
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
1 to total foreach { i perform(c if (c == (i - 1)) c + 1 else c) }
awaitCond(counter.get == total)
sec.isEmpty must be === true
}
"should relinquish thread when suspended" in {
val submissions = new AtomicInteger(0)
val counter = new AtomicInteger(0)
val underlying = new ExecutionContext {
override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) }
override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) }
}
val throughput = 25
val sec = SerializedSuspendableExecutionContext(throughput)(underlying)
sec.suspend()
def perform(f: Int Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) }
perform(_ + 1)
1 to 10 foreach { _ perform(identity) }
perform(x { sec.suspend(); x * 2 })
perform(_ + 8)
sec.size must be === 13
sec.resume()
awaitCond(counter.get == 2)
sec.resume()
awaitCond(counter.get == 10)
sec.isEmpty must be === true
submissions.get must be === 2
}
}
}

View file

@ -32,9 +32,9 @@ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int)
import SerializedSuspendableExecutionContext._
require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput")
private final val state = new AtomicInteger(0)
private final val state = new AtomicInteger(Off)
@tailrec private final def addState(newState: Int): Boolean = {
val c = state.get;
val c = state.get
state.compareAndSet(c, c | newState) || addState(newState)
}
@tailrec private final def remState(oldState: Int) {

View file

@ -135,23 +135,14 @@ class Agent[T](initialValue: T, context: ExecutionContext) {
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(newValue: T): Future[T] = alter(_ newValue)
def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue })
/**
* Dispatch a function to update the internal state, and return a Future where
* that new state can be obtained.
* In Java, pass in an instance of `akka.dispatch.Mapper`.
*/
def alter(f: T T): Future[T] = {
def dispatch = Future(ref.single.transformAndGet(f))(updater)
Txn.findCurrent match {
case Some(txn)
val result = Promise[T]()
Txn.afterCommit(status result completeWith dispatch)(txn)
result.future
case _ dispatch
}
}
def alter(f: T T): Future[T] = doAlter(ref.single.transformAndGet(f))
/**
* Dispatch a function to update the internal state but on its own thread,
@ -180,6 +171,16 @@ class Agent[T](initialValue: T, context: ExecutionContext) {
}
}
private final def doAlter(f: T): Future[T] = {
Txn.findCurrent match {
case Some(txn)
val result = Promise[T]()
Txn.afterCommit(status result completeWith Future(f)(updater))(txn)
result.future
case _ Future(f)(updater)
}
}
/**
* A future to the current value that will be completed after any currently
* queued updates.