diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 7fde49c606..eee5049d3f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -370,7 +370,7 @@ object SupervisorHierarchySpec { def printErrors(): Unit = { val merged = errors flatMap { case (ref, ErrorLog(msg, log)) ⇒ - println(ref + " " + msg) + println("Error: " + ref + " " + msg) log map (l ⇒ (l.time, ref, l.msg.toString)) } merged.sorted foreach println diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index ab24192c5a..742805883b 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -1,9 +1,9 @@ package akka.dispatch -import akka.testkit.AkkaSpec -import akka.testkit.DefaultTimeout import java.util.concurrent.{ ExecutorService, Executor, Executors } -import scala.concurrent.ExecutionContext +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent._ +import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { @@ -19,15 +19,66 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es) executorService must not be (null) - /*val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es) + val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es) jExecutor must not be (null) val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es) jExecutorService must not be (null) - */ } finally { es.shutdown } } + + "be able to use Batching" in { + system.dispatcher.isInstanceOf[BatchingExecutor] must be(true) + + import system.dispatcher + + def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { + override def isBatchable = true + override def run: Unit = f + }) + + val p = Promise[Unit]() + batchable { + val lock, callingThreadLock, count = new AtomicInteger(0) + callingThreadLock.compareAndSet(0, 1) // Enable the lock + (1 to 100) foreach { i ⇒ + batchable { + if (callingThreadLock.get != 0) p.tryFailure(new IllegalStateException("Batch was executed inline!")) + else if (count.incrementAndGet == 100) p.trySuccess(()) //Done + else if (lock.compareAndSet(0, 1)) { + try Thread.sleep(10) finally lock.compareAndSet(1, 0) + } else p.tryFailure(new IllegalStateException("Executed batch in parallel!")) + } + } + callingThreadLock.compareAndSet(1, 0) // Disable the lock + } + Await.result(p.future, timeout.duration) must be === () + } + + "be able to avoid starvation when Batching is used and Await/blocking is called" in { + system.dispatcher.isInstanceOf[BatchingExecutor] must be(true) + import system.dispatcher + + def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { + override def isBatchable = true + override def run: Unit = f + }) + + val latch = TestLatch(101) + batchable { + (1 to 100) foreach { i ⇒ + batchable { + val deadlock = TestLatch(1) + batchable { deadlock.open() } + Await.ready(deadlock, timeout.duration) + latch.countDown() + } + } + latch.countDown() + } + Await.ready(latch, timeout.duration) + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index fff56a3776..a8c8d91d57 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -116,7 +116,13 @@ private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends S */ private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination -final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable { + final override def isBatchable: Boolean = runnable match { + case b: Batchable ⇒ b.isBatchable + case _: scala.concurrent.OnCompleteRunnable ⇒ true + case _ ⇒ false + } + def run(): Unit = try runnable.run() catch { case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) @@ -163,7 +169,7 @@ private[akka] object MessageDispatcher { implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } -abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext { +abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext { import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } @@ -209,8 +215,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown() - final override def execute(runnable: Runnable): Unit = { - val invocation = TaskInvocation(eventStream, runnable, taskCleanup) + final override protected def unbatchedExecute(r: Runnable): Unit = { + val invocation = TaskInvocation(eventStream, r, taskCleanup) addInhabitants(+1) try { executeTask(invocation) diff --git a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala index d0092d77e0..cde2034f64 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala @@ -11,7 +11,9 @@ import scala.annotation.tailrec /** * All Batchables are automatically batched when submitted to a BatchingExecutor */ -private[akka] trait Batchable extends Runnable +private[akka] trait Batchable extends Runnable { + def isBatchable: Boolean +} /** * Mixin trait for an Executor @@ -100,9 +102,9 @@ private[akka] trait BatchingExecutor extends Executor { } } - protected def unbatchedExecute(r: Runnable): Unit = super.execute(r) + protected def unbatchedExecute(r: Runnable): Unit - abstract override def execute(runnable: Runnable): Unit = { + override def execute(runnable: Runnable): Unit = { if (batchable(runnable)) { // If we can batch the runnable _tasksLocal.get match { case null ⇒ unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch @@ -112,5 +114,9 @@ private[akka] trait BatchingExecutor extends Executor { } /** Override this to define which runnables will be batched. */ - def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable] + def batchable(runnable: Runnable): Boolean = runnable match { + case b: Batchable ⇒ b.isBatchable + case _: scala.concurrent.OnCompleteRunnable ⇒ true + case _ ⇒ false + } }