Merge pull request #587 from akka/wip-2350-enable-future-batching-√

#2350 - enabling batching for Futures
This commit is contained in:
Viktor Klang (√) 2012-08-06 08:14:17 -07:00
commit 4805fa7e08
4 changed files with 77 additions and 14 deletions

View file

@ -370,7 +370,7 @@ object SupervisorHierarchySpec {
def printErrors(): Unit = {
val merged = errors flatMap {
case (ref, ErrorLog(msg, log))
println(ref + " " + msg)
println("Error: " + ref + " " + msg)
log map (l (l.time, ref, l.msg.toString))
}
merged.sorted foreach println

View file

@ -1,9 +1,9 @@
package akka.dispatch
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.{ ExecutorService, Executor, Executors }
import scala.concurrent.ExecutionContext
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -19,15 +19,66 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es)
executorService must not be (null)
/*val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
jExecutor must not be (null)
val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es)
jExecutorService must not be (null)
*/
} finally {
es.shutdown
}
}
"be able to use Batching" in {
system.dispatcher.isInstanceOf[BatchingExecutor] must be(true)
import system.dispatcher
def batchable[T](f: T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable {
override def isBatchable = true
override def run: Unit = f
})
val p = Promise[Unit]()
batchable {
val lock, callingThreadLock, count = new AtomicInteger(0)
callingThreadLock.compareAndSet(0, 1) // Enable the lock
(1 to 100) foreach { i
batchable {
if (callingThreadLock.get != 0) p.tryFailure(new IllegalStateException("Batch was executed inline!"))
else if (count.incrementAndGet == 100) p.trySuccess(()) //Done
else if (lock.compareAndSet(0, 1)) {
try Thread.sleep(10) finally lock.compareAndSet(1, 0)
} else p.tryFailure(new IllegalStateException("Executed batch in parallel!"))
}
}
callingThreadLock.compareAndSet(1, 0) // Disable the lock
}
Await.result(p.future, timeout.duration) must be === ()
}
"be able to avoid starvation when Batching is used and Await/blocking is called" in {
system.dispatcher.isInstanceOf[BatchingExecutor] must be(true)
import system.dispatcher
def batchable[T](f: T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable {
override def isBatchable = true
override def run: Unit = f
})
val latch = TestLatch(101)
batchable {
(1 to 100) foreach { i
batchable {
val deadlock = TestLatch(1)
batchable { deadlock.open() }
Await.ready(deadlock, timeout.duration)
latch.countDown()
}
}
latch.countDown()
}
Await.ready(latch, timeout.duration)
}
}
}

View file

@ -116,7 +116,13 @@ private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends S
*/
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Batchable {
final override def isBatchable: Boolean = runnable match {
case b: Batchable b.isBatchable
case _: scala.concurrent.OnCompleteRunnable true
case _ false
}
def run(): Unit =
try runnable.run() catch {
case NonFatal(e) eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
@ -163,7 +169,7 @@ private[akka] object MessageDispatcher {
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
}
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext {
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
@ -209,8 +215,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown()
final override def execute(runnable: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
final override protected def unbatchedExecute(r: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, r, taskCleanup)
addInhabitants(+1)
try {
executeTask(invocation)

View file

@ -11,7 +11,9 @@ import scala.annotation.tailrec
/**
* All Batchables are automatically batched when submitted to a BatchingExecutor
*/
private[akka] trait Batchable extends Runnable
private[akka] trait Batchable extends Runnable {
def isBatchable: Boolean
}
/**
* Mixin trait for an Executor
@ -100,9 +102,9 @@ private[akka] trait BatchingExecutor extends Executor {
}
}
protected def unbatchedExecute(r: Runnable): Unit = super.execute(r)
protected def unbatchedExecute(r: Runnable): Unit
abstract override def execute(runnable: Runnable): Unit = {
override def execute(runnable: Runnable): Unit = {
if (batchable(runnable)) { // If we can batch the runnable
_tasksLocal.get match {
case null unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
@ -112,5 +114,9 @@ private[akka] trait BatchingExecutor extends Executor {
}
/** Override this to define which runnables will be batched. */
def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable]
def batchable(runnable: Runnable): Boolean = runnable match {
case b: Batchable b.isBatchable
case _: scala.concurrent.OnCompleteRunnable true
case _ false
}
}