diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 7fde49c606..eee5049d3f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -370,7 +370,7 @@ object SupervisorHierarchySpec { def printErrors(): Unit = { val merged = errors flatMap { case (ref, ErrorLog(msg, log)) ⇒ - println(ref + " " + msg) + println("Error: " + ref + " " + msg) log map (l ⇒ (l.time, ref, l.msg.toString)) } merged.sorted foreach println diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index ab24192c5a..742805883b 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -1,9 +1,9 @@ package akka.dispatch -import akka.testkit.AkkaSpec -import akka.testkit.DefaultTimeout import java.util.concurrent.{ ExecutorService, Executor, Executors } -import scala.concurrent.ExecutionContext +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent._ +import akka.testkit.{ TestLatch, AkkaSpec, DefaultTimeout } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { @@ -19,15 +19,66 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es) executorService must not be (null) - /*val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es) + val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es) jExecutor must not be (null) val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es) jExecutorService must not be (null) - */ } finally { es.shutdown } } + + "be able to use Batching" in { + system.dispatcher.isInstanceOf[BatchingExecutor] must be(true) + + import system.dispatcher + + def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { + override def isBatchable = true + override def run: Unit = f + }) + + val p = Promise[Unit]() + batchable { + val lock, callingThreadLock, count = new AtomicInteger(0) + callingThreadLock.compareAndSet(0, 1) // Enable the lock + (1 to 100) foreach { i ⇒ + batchable { + if (callingThreadLock.get != 0) p.tryFailure(new IllegalStateException("Batch was executed inline!")) + else if (count.incrementAndGet == 100) p.trySuccess(()) //Done + else if (lock.compareAndSet(0, 1)) { + try Thread.sleep(10) finally lock.compareAndSet(1, 0) + } else p.tryFailure(new IllegalStateException("Executed batch in parallel!")) + } + } + callingThreadLock.compareAndSet(1, 0) // Disable the lock + } + Await.result(p.future, timeout.duration) must be === () + } + + "be able to avoid starvation when Batching is used and Await/blocking is called" in { + system.dispatcher.isInstanceOf[BatchingExecutor] must be(true) + import system.dispatcher + + def batchable[T](f: ⇒ T)(implicit ec: ExecutionContext): Unit = ec.execute(new Batchable { + override def isBatchable = true + override def run: Unit = f + }) + + val latch = TestLatch(101) + batchable { + (1 to 100) foreach { i ⇒ + batchable { + val deadlock = TestLatch(1) + batchable { deadlock.open() } + Await.ready(deadlock, timeout.duration) + latch.countDown() + } + } + latch.countDown() + } + Await.ready(latch, timeout.duration) + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index fff56a3776..a8c8d91d57 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -116,7 +116,13 @@ private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends S */ private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination -final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable { + final override def isBatchable: Boolean = runnable match { + case b: Batchable ⇒ b.isBatchable + case _: scala.concurrent.OnCompleteRunnable ⇒ true + case _ ⇒ false + } + def run(): Unit = try runnable.run() catch { case NonFatal(e) ⇒ eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage)) @@ -163,7 +169,7 @@ private[akka] object MessageDispatcher { implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } -abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Executor with ExecutionContext { +abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext { import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsOffset, shutdownScheduleOffset } @@ -209,8 +215,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ final def detach(actor: ActorCell): Unit = try unregister(actor) finally ifSensibleToDoSoThenScheduleShutdown() - final override def execute(runnable: Runnable): Unit = { - val invocation = TaskInvocation(eventStream, runnable, taskCleanup) + final override protected def unbatchedExecute(r: Runnable): Unit = { + val invocation = TaskInvocation(eventStream, r, taskCleanup) addInhabitants(+1) try { executeTask(invocation) diff --git a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala index d0092d77e0..cde2034f64 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BatchingExecutor.scala @@ -11,7 +11,9 @@ import scala.annotation.tailrec /** * All Batchables are automatically batched when submitted to a BatchingExecutor */ -private[akka] trait Batchable extends Runnable +private[akka] trait Batchable extends Runnable { + def isBatchable: Boolean +} /** * Mixin trait for an Executor @@ -100,9 +102,9 @@ private[akka] trait BatchingExecutor extends Executor { } } - protected def unbatchedExecute(r: Runnable): Unit = super.execute(r) + protected def unbatchedExecute(r: Runnable): Unit - abstract override def execute(runnable: Runnable): Unit = { + override def execute(runnable: Runnable): Unit = { if (batchable(runnable)) { // If we can batch the runnable _tasksLocal.get match { case null ⇒ unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch @@ -112,5 +114,9 @@ private[akka] trait BatchingExecutor extends Executor { } /** Override this to define which runnables will be batched. */ - def batchable(runnable: Runnable): Boolean = runnable.isInstanceOf[Batchable] + def batchable(runnable: Runnable): Boolean = runnable match { + case b: Batchable ⇒ b.isBatchable + case _: scala.concurrent.OnCompleteRunnable ⇒ true + case _ ⇒ false + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 0ab004fc59..80d57a6dca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -128,9 +128,11 @@ object Futures { */ def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = { implicit val d = executor - scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]())) { (fr, fa) ⇒ - for (r ← fr; a ← fa) yield { r add a; r } - } + scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ + for (r ← fr; a ← fa) yield { + r add a + r + }) } /** diff --git a/akka-docs/java/code/docs/future/FutureDocTestBase.java b/akka-docs/java/code/docs/future/FutureDocTestBase.java index d84c728dfe..e6c482a66d 100644 --- a/akka-docs/java/code/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/docs/future/FutureDocTestBase.java @@ -86,7 +86,7 @@ public class FutureDocTestBase { ExecutorService yourExecutorServiceGoesHere = Executors.newSingleThreadExecutor(); //#diy-execution-context ExecutionContext ec = - ExecutionContexts.fromExecutorService(yourExecutorServiceGoesHere); + ExecutionContext$.MODULE$.fromExecutorService(yourExecutorServiceGoesHere); //Use ec with your Futures Future f1 = Futures.successful("foo"); diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 863773e88c..536ef49f18 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -75,6 +75,7 @@ object AkkaBuild extends Build { base = file("akka-actor"), settings = defaultSettings ++ OSGi.actor ++ Seq( autoCompilerPlugins := true, + libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-reflect" % v }, packagedArtifact in (Compile, packageBin) <<= (artifact in (Compile, packageBin), OsgiKeys.bundle).identityMap, artifact in (Compile, packageBin) ~= (_.copy(`type` = "bundle")), // to fix scaladoc generation