Adding support for task batching complete with tests

This commit is contained in:
Viktor Klang 2012-08-03 23:33:45 +02:00
parent eb23b5b88d
commit 2af08127de
7 changed files with 84 additions and 18 deletions

View file

@ -370,7 +370,7 @@ object SupervisorHierarchySpec {
def printErrors(): Unit = {
val merged = errors flatMap {
case (ref, ErrorLog(msg, log))
println(ref + " " + msg)
println("Error: " + ref + " " + msg)
log map (l (l.time, ref, l.msg.toString))
}
merged.sorted foreach println

View file

@ -1,9 +1,9 @@
package akka.dispatch
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import java.util.concurrent.{ ExecutorService, Executor, Executors }
import scala.concurrent.ExecutionContext
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent._
import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
@ -19,15 +19,66 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es)
executorService must not be (null)
/*val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
jExecutor must not be (null)
val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es)
jExecutorService must not be (null)
*/
} finally {
es.shutdown
}
}
"be able to use Batching" in {
system.dispatcher.isInstanceOf[BatchingExecutor] must be(true)
import system.dispatcher
def batchable[T](f: T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable {
override def isBatchable = true
override def run: Unit = f
})
val p = Promise[Unit]()
batchable {
val lock, callingThreadLock, count = new AtomicInteger(0)
callingThreadLock.compareAndSet(0, 1) // Enable the lock
(1 to 100) foreach { i
batchable {
if (callingThreadLock.get != 0) p.tryFailure(new IllegalStateException("Batch was executed inline!"))
else if (count.incrementAndGet == 100) p.trySuccess(()) //Done
else if (lock.compareAndSet(0, 1)) {
try Thread.sleep(10) finally lock.compareAndSet(1, 0)
} else p.tryFailure(new IllegalStateException("Executed batch in parallel!"))
}
}
callingThreadLock.compareAndSet(1, 0) // Disable the lock
}
Await.result(p.future, timeout.duration) must be === ()
}
"be able to avoid starvation when Batching is used and Await/blocking is called" in {
system.dispatcher.isInstanceOf[BatchingExecutor] must be(true)
import system.dispatcher
def batchable[T](f: T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable {
override def isBatchable = true
override def run: Unit = f
})
val latch = TestLatch(101)
batchable {
(1 to 100) foreach { i
batchable {
val deadlock = TestLatch(1)
batchable { deadlock.open() }
Await.ready(deadlock, timeout.duration)
latch.countDown()
}
}
latch.countDown()
}
Await.ready(latch, timeout.duration)
}
}
}

View file

@ -116,7 +116,13 @@ private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends S
*/
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Batchable {
final override def isBatchable: Boolean = runnable match {
case b: Batchable b.isBatchable
case _: scala.concurrent.OnCompleteRunnable true
case _ false
}
def run(): Unit =
try runnable.run() catch {
case NonFatal(e) eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
@ -163,7 +169,7 @@ private[akka] object MessageDispatcher {
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
}
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext {
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset }
@ -209,8 +215,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown()
final override def execute(runnable: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
final override protected def unbatchedExecute(r: Runnable): Unit = {
val invocation = TaskInvocation(eventStream, r, taskCleanup)
addInhabitants(+1)
try {
executeTask(invocation)

View file

@ -11,7 +11,9 @@ import scala.annotation.tailrec
/**
* All Batchables are automatically batched when submitted to a BatchingExecutor
*/
private[akka] trait Batchable extends Runnable
private[akka] trait Batchable extends Runnable {
def isBatchable: Boolean
}
/**
* Mixin trait for an Executor
@ -100,9 +102,9 @@ private[akka] trait BatchingExecutor extends Executor {
}
}
protected def unbatchedExecute(r: Runnable): Unit = super.execute(r)
protected def unbatchedExecute(r: Runnable): Unit
abstract override def execute(runnable: Runnable): Unit = {
override def execute(runnable: Runnable): Unit = {
if (batchable(runnable)) { // If we can batch the runnable
_tasksLocal.get match {
case null unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
@ -112,5 +114,9 @@ private[akka] trait BatchingExecutor extends Executor {
}
/** Override this to define which runnables will be batched. */
def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable]
def batchable(runnable: Runnable): Boolean = runnable match {
case b: Batchable b.isBatchable
case _: scala.concurrent.OnCompleteRunnable true
case _ false
}
}

View file

@ -128,9 +128,11 @@ object Futures {
*/
def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]())) { (fr, fa)
for (r fr; a fa) yield { r add a; r }
}
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
r
})
}
/**

View file

@ -86,7 +86,7 @@ public class FutureDocTestBase {
ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor();
//#diy-execution-context
ExecutionContext ec =
ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere);
ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere);
//Use ec with your Futures
Future<String> f1 = Futures.successful("foo");

View file

@ -75,6 +75,7 @@ object AkkaBuild extends Build {
base = file("akka-actor"),
settings = defaultSettings ++ OSGi.actor ++ Seq(
autoCompilerPlugins := true,
libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-reflect" % v },
packagedArtifact in (Compile, packageBin) <<= (artifact in (Compile, packageBin), OsgiKeys.bundle).identityMap,
artifact in (Compile, packageBin) ~= (_.copy(`type` = "bundle")),
// to fix scaladoc generation