diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d5f9b5ed54..d940aa2c20 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -560,9 +560,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { - def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ runnable.run()) - } + def run(timeout: org.jboss.netty.akka.util.Timeout) { dispatcher.execute(runnable) } } private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = @@ -575,7 +573,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createSingleTask(f: ⇒ Unit): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ f) + dispatcher.execute(new Runnable { def run = f }) } } @@ -598,7 +596,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ f) + dispatcher.execute(new Runnable { def run = f }) try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } @@ -609,7 +607,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.dispatchTask(() ⇒ runnable.run()) + dispatcher.execute(runnable) try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 24b9da4447..52f35fd952 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -74,10 +74,10 @@ case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to sup case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch -final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { def run() { try { - function() + runnable.run() } catch { // FIXME catching all and continue isn't good for OOME, ticket #1418 case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) @@ -87,15 +87,48 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, } } +object ExecutionContext { + implicit def defaultExecutionContext(implicit system: ActorSystem): ExecutionContext = system.dispatcher + + /** + * Creates an ExecutionContext from the given ExecutorService + */ + def fromExecutorService(e: ExecutorService): ExecutionContext = new WrappedExecutorService(e) + + /** + * Creates an ExecutionContext from the given Executor + */ + def fromExecutor(e: Executor): ExecutionContext = new WrappedExecutor(e) + + private class WrappedExecutorService(val executor: ExecutorService) extends ExecutorServiceDelegate with ExecutionContext + + private class WrappedExecutor(val executor: Executor) extends Executor with ExecutionContext { + override final def execute(runnable: Runnable): Unit = executor.execute(runnable) + } +} + +/** + * An ExecutionContext is essentially the same thing as a java.util.concurrent.Executor + * This interface/trait exists to decouple the concept of execution from Actors & MessageDispatchers + * It is also needed to provide a fallback implicit default instance (in the companion object). + */ +trait ExecutionContext { + + /** + * Submits the runnable for execution + */ + def execute(runnable: Runnable): Unit +} + object MessageDispatcher { val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher val SCHEDULED = 1 val RESCHEDULED = 2 - implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher + implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } -abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable { +abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable with Executor with ExecutionContext { import MessageDispatcher._ import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater } @@ -131,8 +164,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext ifSensibleToDoSoThenScheduleShutdown() } - protected[akka] final def dispatchTask(block: () ⇒ Unit) { - val invocation = TaskInvocation(eventStream, block, taskCleanup) + final def execute(runnable: Runnable) { + val invocation = TaskInvocation(eventStream, runnable, taskCleanup) inhabitantsUpdater.incrementAndGet(this) try { executeTask(invocation) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 59996e9311..76de3d07c3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,8 +21,9 @@ import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } -import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable } import akka.dispatch.Await.CanAwait +import java.util.concurrent._ +import akka.actor.ActorSystem object Await { sealed trait CanAwait @@ -55,37 +56,37 @@ object Futures { /** * Java API, equivalent to Future.apply */ - def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher) + def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor) /** * Java API, equivalent to Promise.apply */ - def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher) + def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor) /** * Java API, creates an already completed Promise with the specified exception */ - def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher) + def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor) /** * Java API, Creates an already completed Promise with the specified result */ - def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher) + def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor) /** * Java API. * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = { - Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_)) + def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = { + Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(executor).map(JOption.fromScalaOption(_)) } /** * Java API. * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] = - Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher) + def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] = + Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(executor) /** * Java API @@ -94,23 +95,23 @@ object Futures { * the result will be the first failure of any of the futures, or any failure in the actual fold, * or the result of the fold. */ - def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] = - Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher) + def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] = + Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(executor) /** * Java API. * Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first */ - def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] = - Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher) + def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], executor: ExecutionContext): Future[R] = + Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(executor) /** * Java API. * Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = { - implicit val d = dispatcher + def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = { + implicit val d = executor scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa) ⇒ for (r ← fr; a ← fa) yield { r add a @@ -124,8 +125,8 @@ object Futures { * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ - def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = { - implicit val d = dispatcher + def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], executor: ExecutionContext): Future[JIterable[B]] = { + implicit val d = executor scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a) ⇒ val fb = fn(a) for (r ← fr; b ← fb) yield { r add b; r } @@ -139,18 +140,19 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: ⇒ T)(implicit dispatcher: MessageDispatcher): Future[T] = { + def apply[T](body: ⇒ T)(implicit executor: ExecutionContext): Future[T] = { val promise = Promise[T]() - dispatcher dispatchTask { () ⇒ - promise complete { - try { - Right(body) - } catch { - // FIXME catching all and continue isn't good for OOME, ticket #1418 - case e ⇒ Left(e) + executor.execute(new Runnable { + def run = + promise complete { + try { + Right(body) + } catch { + // FIXME catching all and continue isn't good for OOME, ticket #1418 + case e ⇒ Left(e) + } } - } - } + }) promise } @@ -161,13 +163,13 @@ object Future { * Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]]. * Useful for reducing many Futures into a single Future. */ - def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] = + def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Returns a Future to the result of the first future in the list that is completed */ - def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = { + def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = { val futureResult = Promise[T]() val completeFirst: Either[Throwable, T] ⇒ Unit = futureResult complete _ @@ -179,7 +181,7 @@ object Future { /** * Returns a Future that will hold the optional result of the first Future with a result that matches the predicate */ - def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = { + def find[T](futures: Traversable[Future[T]])(predicate: T ⇒ Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = { if (futures.isEmpty) Promise.successful[Option[T]](None) else { val result = Promise[Option[T]]() @@ -210,7 +212,7 @@ object Future { * val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds) * */ - def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit dispatcher: MessageDispatcher): Future[R] = { + def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) ⇒ R)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise.successful(zero) else sequence(futures).map(_.foldLeft(zero)(foldFun)) } @@ -222,7 +224,7 @@ object Future { * val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds) * */ - def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit dispatcher: MessageDispatcher): Future[R] = { + def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) ⇒ T)(implicit executor: ExecutionContext): Future[R] = { if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection")) else sequence(futures).map(_ reduce op) } @@ -234,7 +236,7 @@ object Future { * val myFutureList = Future.traverse(myList)(x ⇒ Future(myFunc(x))) * */ - def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] = + def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) @@ -256,7 +258,7 @@ object Future { * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ - def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = { + def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { val future = Promise[A] dispatchTask({ () ⇒ (reify(body) foreachFull (future success, future failure): Future[Any]) onFailure { @@ -290,7 +292,7 @@ object Future { * } * */ - def blocking(implicit dispatcher: MessageDispatcher): Unit = + def blocking(implicit executor: ExecutionContext): Unit = _taskStack.get match { case Some(taskStack) if taskStack.nonEmpty ⇒ val tasks = taskStack.elems @@ -308,32 +310,38 @@ object Future { /** * Internal API, do not call */ - private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit = + private[akka] def dispatchTask(task: () ⇒ Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit = _taskStack.get match { case Some(taskStack) if !force ⇒ taskStack push task - case _ ⇒ - dispatcher dispatchTask { () ⇒ - try { - val taskStack = Stack[() ⇒ Unit](task) - _taskStack set Some(taskStack) - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case e ⇒ - // FIXME catching all and continue isn't good for OOME, ticket #1418 - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage)) + case _ ⇒ executor.execute( + new Runnable { + def run = + try { + val taskStack = Stack[() ⇒ Unit](task) + _taskStack set Some(taskStack) + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e ⇒ + // FIXME catching all and continue isn't good for OOME, ticket #1418 + executor match { + case m: MessageDispatcher ⇒ + m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage)) + case other ⇒ + e.printStackTrace() + } + } } - } - } finally { _taskStack set None } - } + } finally { _taskStack set None } + }) } } sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { - implicit def dispatcher: MessageDispatcher + implicit def executor: ExecutionContext protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) ⇒ Right(t.value.asInstanceOf[X]) @@ -463,7 +471,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage)) + logError("Future.map", e) Left(e) }) } @@ -513,7 +521,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { } catch { case e: Exception ⇒ p complete Left(e) - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage)) + logError("Future.flatMap", e) } } p @@ -541,12 +549,19 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] { if (pred(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage)) + logError("Future.filter", e) Left(e) }) } p } + + protected def logError(msg: String, problem: Throwable): Unit = { + executor match { + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case other ⇒ problem.printStackTrace() + } + } } object Promise { @@ -555,17 +570,17 @@ object Promise { * * Scala API */ - def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() + def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]() /** * Creates an already completed Promise with the specified exception */ - def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception)) + def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception)) /** * Creates an already completed Promise with the specified result */ - def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result)) + def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result)) } /** @@ -622,7 +637,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(thisPromise) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) + logError("Promise.completeWith", e) fr failure e } } @@ -637,7 +652,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) + logError("Promise.completeWith", e) fr failure e } } @@ -669,7 +684,7 @@ private[dispatch] object DefaultPromise { /** * The default concrete Future implementation. */ -class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { +class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { self ⇒ import DefaultPromise.{ FState, Success, Failure, Pending } @@ -759,7 +774,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst } private final def notifyCompleted(func: Either[Throwable, T] ⇒ Unit, result: Either[Throwable, T]) { - try { func(result) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } + try { func(result) } catch { case e ⇒ logError("Future onComplete-callback raised an exception", e) } } } @@ -767,7 +782,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { +final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { val value = Some(resolve(suppliedValue)) def tryComplete(value: Either[Throwable, T]): Boolean = true diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index fa42c0c402..1b6a755ede 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -70,8 +70,6 @@ class FutureDocSpec extends AkkaSpec { import akka.dispatch.Future import akka.util.duration._ - implicit def dispatcher = system.dispatcher - val future = Future { "Hello" + "World" }