diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index be5176aee9..830a19eb29 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -93,6 +93,10 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { awaitCond(counter.get == 2) perform(_ + 4) perform(_ * 2) + sec.size must be === 2 + Thread.sleep(500) + sec.size must be === 2 + counter.get must be === 2 sec.resume() awaitCond(counter.get == 12) perform(_ * 2) @@ -100,7 +104,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { sec.isEmpty must be === true } - "execute 'throughput' nmber of tasks per sweep" in { + "execute 'throughput' number of tasks per sweep" in { val submissions = new AtomicInteger(0) val counter = new AtomicInteger(0) val underlying = new ExecutionContext { @@ -120,5 +124,40 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { submissions.get must be === (total / throughput) sec.isEmpty must be === true } + + "execute tasks in serial" in { + val sec = SerializedSuspendableExecutionContext(1)(ExecutionContext.global) + val total = 10000 + val counter = new AtomicInteger(0) + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + + 1 to total foreach { i ⇒ perform(c ⇒ if (c == (i - 1)) c + 1 else c) } + awaitCond(counter.get == total) + sec.isEmpty must be === true + } + + "should relinquish thread when suspended" in { + val submissions = new AtomicInteger(0) + val counter = new AtomicInteger(0) + val underlying = new ExecutionContext { + override def execute(r: Runnable) { submissions.incrementAndGet(); ExecutionContext.global.execute(r) } + override def reportFailure(t: Throwable) { ExecutionContext.global.reportFailure(t) } + } + val throughput = 25 + val sec = SerializedSuspendableExecutionContext(throughput)(underlying) + sec.suspend() + def perform(f: Int ⇒ Int) = sec execute new Runnable { def run = counter.set(f(counter.get)) } + perform(_ + 1) + 1 to 10 foreach { _ ⇒ perform(identity) } + perform(x ⇒ { sec.suspend(); x * 2 }) + perform(_ + 8) + sec.size must be === 13 + sec.resume() + awaitCond(counter.get == 2) + sec.resume() + awaitCond(counter.get == 10) + sec.isEmpty must be === true + submissions.get must be === 2 + } } } diff --git a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala index 1e6cbb60e7..841cd865fd 100644 --- a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala +++ b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala @@ -32,9 +32,9 @@ private[akka] final class SerializedSuspendableExecutionContext(throughput: Int) import SerializedSuspendableExecutionContext._ require(throughput > 0, s"SerializedSuspendableExecutionContext.throughput must be greater than 0 but was $throughput") - private final val state = new AtomicInteger(0) + private final val state = new AtomicInteger(Off) @tailrec private final def addState(newState: Int): Boolean = { - val c = state.get; + val c = state.get state.compareAndSet(c, c | newState) || addState(newState) } @tailrec private final def remState(oldState: Int) { diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index bc6f12c3a1..9c8e6c93bb 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -135,23 +135,14 @@ class Agent[T](initialValue: T, context: ExecutionContext) { * that new state can be obtained. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def alter(newValue: T): Future[T] = alter(_ ⇒ newValue) + def alter(newValue: T): Future[T] = doAlter({ ref.single.update(newValue); newValue }) /** * Dispatch a function to update the internal state, and return a Future where * that new state can be obtained. * In Java, pass in an instance of `akka.dispatch.Mapper`. */ - def alter(f: T ⇒ T): Future[T] = { - def dispatch = Future(ref.single.transformAndGet(f))(updater) - Txn.findCurrent match { - case Some(txn) ⇒ - val result = Promise[T]() - Txn.afterCommit(status ⇒ result completeWith dispatch)(txn) - result.future - case _ ⇒ dispatch - } - } + def alter(f: T ⇒ T): Future[T] = doAlter(ref.single.transformAndGet(f)) /** * Dispatch a function to update the internal state but on its own thread, @@ -180,6 +171,16 @@ class Agent[T](initialValue: T, context: ExecutionContext) { } } + private final def doAlter(f: ⇒ T): Future[T] = { + Txn.findCurrent match { + case Some(txn) ⇒ + val result = Promise[T]() + Txn.afterCommit(status ⇒ result completeWith Future(f)(updater))(txn) + result.future + case _ ⇒ Future(f)(updater) + } + } + /** * A future to the current value that will be completed after any currently * queued updates.