Introducing ExecutionContext to MessageDispatcher and attaching it to Future

This commit is contained in:
Viktor Klang 2011-12-30 13:48:31 +01:00
parent 018033b9d5
commit b34dc4c33b
4 changed files with 97 additions and 78 deletions

View file

@ -560,9 +560,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() runnable.run())
}
def run(timeout: org.jboss.netty.akka.util.Timeout) { dispatcher.execute(runnable) }
}
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
@ -575,7 +573,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createSingleTask(f: Unit): TimerTask =
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() f)
dispatcher.execute(new Runnable { def run = f })
}
}
@ -598,7 +596,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createContinuousTask(delay: Duration, f: Unit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() f)
dispatcher.execute(new Runnable { def run = f })
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
@ -609,7 +607,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.dispatchTask(() runnable.run())
dispatcher.execute(runnable)
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}

View file

@ -74,10 +74,10 @@ case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to sup
case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch
case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch
final case class TaskInvocation(eventStream: EventStream, function: () Unit, cleanup: () Unit) extends Runnable {
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run() {
try {
function()
runnable.run()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
@ -87,15 +87,23 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
}
}
object ExecutionContext {
implicit def defaultExecutionContext(implicit system: ActorSystem): ExecutionContext = system.dispatcher
}
trait ExecutionContext {
def execute(runnable: Runnable): Unit
}
object MessageDispatcher {
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
val SCHEDULED = 1
val RESCHEDULED = 2
implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
}
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable {
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable with Executor with ExecutionContext {
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
@ -131,8 +139,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
ifSensibleToDoSoThenScheduleShutdown()
}
protected[akka] final def dispatchTask(block: () Unit) {
val invocation = TaskInvocation(eventStream, block, taskCleanup)
final def execute(runnable: Runnable) {
val invocation = TaskInvocation(eventStream, runnable, taskCleanup)
inhabitantsUpdater.incrementAndGet(this)
try {
executeTask(invocation)

View file

@ -21,8 +21,9 @@ import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.{ Switch, Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean }
import java.util.concurrent.{ TimeoutException, ConcurrentLinkedQueue, TimeUnit, Callable }
import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.actor.ActorSystem
object Await {
sealed trait CanAwait
@ -55,37 +56,37 @@ object Futures {
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] = Future(body.call)(dispatcher)
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)
/**
* Java API, equivalent to Promise.apply
*/
def promise[T](dispatcher: MessageDispatcher): Promise[T] = Promise[T]()(dispatcher)
def promise[T](executor: ExecutionContext): Promise[T] = Promise[T]()(executor)
/**
* Java API, creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable, dispatcher: MessageDispatcher): Promise[T] = Promise.failed(exception)(dispatcher)
def failed[T](exception: Throwable, executor: ExecutionContext): Promise[T] = Promise.failed(exception)(executor)
/**
* Java API, Creates an already completed Promise with the specified result
*/
def successful[T](result: T, dispatcher: MessageDispatcher): Promise[T] = Promise.successful(result)(dispatcher)
def successful[T](result: T, executor: ExecutionContext): Promise[T] = Promise.successful(result)(executor)
/**
* Java API.
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], dispatcher: MessageDispatcher): Future[JOption[T]] = {
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(dispatcher).map(JOption.fromScalaOption(_))
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], executor: ExecutionContext): Future[JOption[T]] = {
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)))(predicate.apply(_))(executor).map(JOption.fromScalaOption(_))
}
/**
* Java API.
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], dispatcher: MessageDispatcher): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(dispatcher)
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(executor)
/**
* Java API
@ -94,23 +95,23 @@ object Futures {
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], dispatcher: MessageDispatcher): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(dispatcher)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] =
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(zero)(fun.apply _)(executor)
/**
* Java API.
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
*/
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], dispatcher: MessageDispatcher): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(dispatcher)
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, T], executor: ExecutionContext): Future[R] =
Future.reduce(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)(executor)
/**
* Java API.
* Simple version of Future.traverse. Transforms a JIterable[Future[A]] into a Future[JIterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], dispatcher: MessageDispatcher): Future[JIterable[A]] = {
implicit val d = dispatcher
def sequence[A](in: JIterable[Future[A]], executor: ExecutionContext): Future[JIterable[A]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
@ -124,8 +125,8 @@ object Futures {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], dispatcher: MessageDispatcher): Future[JIterable[B]] = {
implicit val d = dispatcher
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]], executor: ExecutionContext): Future[JIterable[B]] = {
implicit val d = executor
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a)
for (r fr; b fb) yield { r add b; r }
@ -139,18 +140,19 @@ object Future {
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T)(implicit dispatcher: MessageDispatcher): Future[T] = {
def apply[T](body: T)(implicit executor: ExecutionContext): Future[T] = {
val promise = Promise[T]()
dispatcher dispatchTask { ()
promise complete {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
executor.execute(new Runnable {
def run =
promise complete {
try {
Right(body)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e Left(e)
}
}
}
}
})
promise
}
@ -161,13 +163,13 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit dispatcher: MessageDispatcher): Future[T] = {
def firstCompletedOf[T](futures: Traversable[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val futureResult = Promise[T]()
val completeFirst: Either[Throwable, T] Unit = futureResult complete _
@ -179,7 +181,7 @@ object Future {
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] = {
def find[T](futures: Traversable[Future[T]])(predicate: T Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
if (futures.isEmpty) Promise.successful[Option[T]](None)
else {
val result = Promise[Option[T]]()
@ -210,7 +212,7 @@ object Future {
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* </pre>
*/
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] = {
def fold[T, R](futures: Traversable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise.successful(zero)
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
@ -222,7 +224,7 @@ object Future {
* val result = Await.result(Futures.reduce(futures)(_ + _), 5 seconds)
* </pre>
*/
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] = {
def reduce[T, R >: T](futures: Traversable[Future[T]])(op: (R, T) T)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Promise[R].failure(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduce op)
}
@ -234,7 +236,7 @@ object Future {
* val myFutureList = Future.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Promise.successful(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
@ -256,7 +258,7 @@ object Future {
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = {
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val future = Promise[A]
dispatchTask({ ()
(reify(body) foreachFull (future success, future failure): Future[Any]) onFailure {
@ -290,7 +292,7 @@ object Future {
* }
* </pre>
*/
def blocking(implicit dispatcher: MessageDispatcher): Unit =
def blocking(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case Some(taskStack) if taskStack.nonEmpty
val tasks = taskStack.elems
@ -308,32 +310,38 @@ object Future {
/**
* Internal API, do not call
*/
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit dispatcher: MessageDispatcher): Unit =
private[akka] def dispatchTask(task: () Unit, force: Boolean = false)(implicit executor: ExecutionContext): Unit =
_taskStack.get match {
case Some(taskStack) if !force taskStack push task
case _
dispatcher dispatchTask { ()
try {
val taskStack = Stack[() Unit](task)
_taskStack set Some(taskStack)
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", "Failed to dispatch task, due to: " + e.getMessage))
case _ executor.execute(
new Runnable {
def run =
try {
val taskStack = Stack[() Unit](task)
_taskStack set Some(taskStack)
while (taskStack.nonEmpty) {
val next = taskStack.pop()
try {
next.apply()
} catch {
case e
// FIXME catching all and continue isn't good for OOME, ticket #1418
executor match {
case m: MessageDispatcher
m.prerequisites.eventStream.publish(Error(e, "Future.dispatchTask", e.getMessage))
case other
e.printStackTrace()
}
}
}
}
} finally { _taskStack set None }
}
} finally { _taskStack set None }
})
}
}
sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
implicit def dispatcher: MessageDispatcher
implicit def executor: ExecutionContext
protected final def resolve[X](source: Either[Throwable, X]): Either[Throwable, X] = source match {
case Left(t: scala.runtime.NonLocalReturnControl[_]) Right(t.value.asInstanceOf[X])
@ -463,7 +471,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
Right(f(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
logError("Future.map", e)
Left(e)
})
}
@ -513,7 +521,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
} catch {
case e: Exception
p complete Left(e)
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
logError("Future.flatMap", e)
}
}
p
@ -541,12 +549,19 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
if (pred(res)) r else Left(new MatchError(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
logError("Future.filter", e)
Left(e)
})
}
p
}
protected def logError(msg: String, problem: Throwable): Unit = {
executor match {
case m: MessageDispatcher m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage))
case other problem.printStackTrace()
}
}
}
object Promise {
@ -555,17 +570,17 @@ object Promise {
*
* Scala API
*/
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
def apply[A]()(implicit executor: ExecutionContext): Promise[A] = new DefaultPromise[A]()
/**
* Creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception))
def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Left(exception))
/**
* Creates an already completed Promise with the specified result
*/
def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result))
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new KeptPromise[T](Right(result))
}
/**
@ -622,7 +637,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(thisPromise)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
logError("Promise.completeWith", e)
fr failure e
}
}
@ -637,7 +652,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
logError("Promise.completeWith", e)
fr failure e
}
}
@ -669,7 +684,7 @@ private[dispatch] object DefaultPromise {
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self
import DefaultPromise.{ FState, Success, Failure, Pending }
@ -759,7 +774,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
}
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) {
try { func(result) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
try { func(result) } catch { case e logError("Future onComplete-callback raised an exception", e) }
}
}
@ -767,7 +782,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
val value = Some(resolve(suppliedValue))
def tryComplete(value: Either[Throwable, T]): Boolean = true

View file

@ -70,8 +70,6 @@ class FutureDocSpec extends AkkaSpec {
import akka.dispatch.Future
import akka.util.duration._
implicit def dispatcher = system.dispatcher
val future = Future {
"Hello" + "World"
}