Future: move implicit dispatcher from methods to constructor

This commit is contained in:
Derek Williams 2011-08-02 10:19:49 -06:00
parent 377fc2b1cf
commit a0350d03e9
3 changed files with 43 additions and 41 deletions

View file

@ -680,7 +680,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
channel: UntypedChannel): Future[Any] = {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)
case _ new ActorPromise(timeout)(dispatcher)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
@ -1025,7 +1025,7 @@ private[akka] case class RemoteActorRef private[akka] (
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout.duration.toMillis, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
if (future.isDefined) ActorPromise(future.get)(dispatcher)
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}

View file

@ -317,6 +317,8 @@ object Future {
sealed trait Future[+T] {
implicit def dispatcher: MessageDispatcher
/**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
@ -327,7 +329,7 @@ sealed trait Future[+T] {
* execution will fail. The normal result of getting a Future from an ActorRef using ? will return
* an untyped Future.
*/
def apply[A >: T]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): A @cps[Future[Any]] = shift(this flatMap (_: A Future[Any]))
def apply[A >: T]()(implicit timeout: Timeout): A @cps[Future[Any]] = shift(this flatMap (_: A Future[Any]))
/**
* Blocks awaiting completion of this Future, then returns the resulting value,
@ -422,7 +424,7 @@ sealed trait Future[+T] {
* Future. If the Future has already been completed, this will apply
* immediately.
*/
def onComplete(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type
def onComplete(func: Future[T] Unit): this.type
/**
* When the future is completed with a valid result, apply the provided
@ -434,7 +436,7 @@ sealed trait Future[+T] {
* }
* </pre>
*/
final def onResult(pf: PartialFunction[Any, Unit])(implicit dispatcher: MessageDispatcher): this.type = onComplete { f
final def onResult(pf: PartialFunction[Any, Unit]): this.type = onComplete { f
val optr = f.result
if (optr.isDefined) {
val r = optr.get
@ -452,7 +454,7 @@ sealed trait Future[+T] {
* }
* </pre>
*/
final def onException(pf: PartialFunction[Throwable, Unit])(implicit dispatcher: MessageDispatcher): Future[T] = onComplete { f
final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f
val opte = f.exception
if (opte.isDefined) {
val e = opte.get
@ -460,9 +462,9 @@ sealed trait Future[+T] {
}
}
def onTimeout(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type
def onTimeout(func: Future[T] Unit): this.type
def orElse[A >: T](fallback: A)(implicit dispatcher: MessageDispatcher): Future[A]
def orElse[A >: T](fallback: A): Future[A]
/**
* Creates a new Future by applying a PartialFunction to the successful
@ -478,7 +480,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def collect[A](pf: PartialFunction[Any, A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
final def collect[A](pf: PartialFunction[Any, A])(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
@ -510,7 +512,7 @@ sealed trait Future[+T] {
* Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* </pre>
*/
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
@ -543,7 +545,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def map[A](f: T A)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
final def map[A](f: T A)(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
onComplete { self
future complete {
@ -567,7 +569,7 @@ sealed trait Future[+T] {
* Creates a new Future[A] which is completed with this Future's result if
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[A](implicit m: Manifest[A], dispatcher: MessageDispatcher = implicitly, timeout: Timeout = this.timeout): Future[A] = {
final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = {
val fa = new DefaultPromise[A](timeout)
onComplete { ft
fa complete (ft.value.get match {
@ -597,7 +599,7 @@ sealed trait Future[+T] {
* } yield b + "-" + c
* </pre>
*/
final def flatMap[A](f: T Future[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = {
final def flatMap[A](f: T Future[A])(implicit timeout: Timeout): Future[A] = {
val future = new DefaultPromise[A](timeout)
onComplete {
_.value.get match {
@ -615,23 +617,23 @@ sealed trait Future[+T] {
future
}
final def foreach(f: T Unit)(implicit dispatcher: MessageDispatcher): Unit = onComplete {
final def foreach(f: T Unit): Unit = onComplete {
_.result match {
case Some(v) f(v)
case None
}
}
final def withFilter(p: T Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout) = new FutureWithFilter[T](this, p)
final def withFilter(p: T Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p)
final class FutureWithFilter[+A](self: Future[A], p: A Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout) {
final class FutureWithFilter[+A](self: Future[A], p: A Boolean)(implicit timeout: Timeout) {
def foreach(f: A Unit): Unit = self filter p foreach f
def map[B](f: A B): Future[B] = self filter p map f
def flatMap[B](f: A Future[B]): Future[B] = self filter p flatMap f
def withFilter(q: A Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x p(x) && q(x))
}
final def filter(p: T Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
final def filter(p: T Boolean)(implicit timeout: Timeout): Future[T] = {
val future = new DefaultPromise[T](timeout)
onComplete { self
future complete {
@ -684,17 +686,17 @@ object Promise {
/**
* Creates a non-completed, new, Promise with the supplied timeout in milliseconds
*/
def apply[A](timeout: Timeout): Promise[A] = new DefaultPromise[A](timeout)
def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A](timeout)
/**
* Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
*/
def apply[A](): Promise[A] = apply(Timeout.default)
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = apply(Timeout.default)
/**
* Construct a completable channel
*/
def channel(timeout: Long = Actor.TIMEOUT): ActorPromise = new ActorPromise(timeout)
def channel(timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(timeout)
private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() Unit]]]() {
override def initialValue = None
@ -709,26 +711,26 @@ trait Promise[T] extends Future[T] {
* Completes this Future with the specified result, if not already completed.
* @return this
*/
def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type
def complete(value: Either[Throwable, T]): this.type
/**
* Completes this Future with the specified result, if not already completed.
* @return this
*/
final def completeWithResult(result: T)(implicit dispatcher: MessageDispatcher): this.type = complete(Right(result))
final def completeWithResult(result: T): this.type = complete(Right(result))
/**
* Completes this Future with the specified exception, if not already completed.
* @return this
*/
final def completeWithException(exception: Throwable)(implicit dispatcher: MessageDispatcher): this.type = complete(Left(exception))
final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed.
* @return this.
*/
final def completeWith(other: Future[T])(implicit dispatcher: MessageDispatcher): this.type = {
final def completeWith(other: Future[T]): this.type = {
other onComplete { f complete(f.value.get) }
this
}
@ -768,14 +770,14 @@ trait Promise[T] extends Future[T] {
/**
* The default concrete Future implementation.
*/
class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
self
def this() = this(Timeout.default)
def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default)
def this(timeout: Long) = this(Timeout(timeout))
def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout))
def this(timeout: Long, timeunit: TimeUnit) = this(Timeout(timeout, timeunit))
def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit))
private val _startTimeInNanos = currentTimeInNanos
private val _lock = new ReentrantLock
@ -834,7 +836,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
}
}
def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type = {
def complete(value: Either[Throwable, T]): this.type = {
processCallbacks {
_lock.lock
try {
@ -858,7 +860,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
this
}
private def processCallbacks(callbacks: List[Future[T] Unit])(implicit dispatcher: MessageDispatcher): Unit = {
private def processCallbacks(callbacks: List[Future[T] Unit]): Unit = {
if (callbacks.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation
@tailrec
def runCallbacks(rest: List[Future[T] Unit], callbackStack: Stack[() Unit]) {
@ -887,7 +889,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
}
}
def onComplete(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type = {
def onComplete(func: Future[T] Unit): this.type = {
_lock.lock
val notifyNow = try {
if (_value.isEmpty) {
@ -905,7 +907,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
this
}
def onTimeout(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type = {
def onTimeout(func: Future[T] Unit): this.type = {
if (timeout.duration.isFinite) {
_lock.lock
val runNow = try {
@ -930,7 +932,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
this
}
final def orElse[A >: T](fallback: A)(implicit dispatcher: MessageDispatcher): Future[A] =
final def orElse[A >: T](fallback: A): Future[A] =
if (timeout.duration.isFinite) {
value match {
case Some(_) this
@ -962,7 +964,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] {
private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
}
class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel {
class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with ForwardableChannel {
def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message)
@ -980,7 +982,7 @@ class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with F
}
object ActorPromise {
def apply(f: Promise[Any]): ActorPromise =
def apply(f: Promise[Any])(implicit dispatcher: MessageDispatcher): ActorPromise =
new ActorPromise(f.timeout) {
completeWith(f)
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
@ -992,11 +994,11 @@ object ActorPromise {
* An already completed Future is seeded with it's result at creation, is useful for when you are participating in
* a Future-composition but you already have a value to contribute.
*/
sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
val value = Some(suppliedValue)
def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type = this
def onComplete(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type = {
def complete(value: Either[Throwable, T]): this.type = this
def onComplete(func: Future[T] Unit): this.type = {
dispatcher dispatchTask (() func(this)) //TODO: Use pending callback stack
this
}
@ -1005,7 +1007,7 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise
def isExpired: Boolean = true
def timeout: Timeout = Timeout.zero
final def onTimeout(func: Future[T] Unit)(implicit dispatcher: MessageDispatcher): this.type = this
final def orElse[A >: T](fallback: A)(implicit dispatcher: MessageDispatcher): Future[A] = this
final def onTimeout(func: Future[T] Unit): this.type = this
final def orElse[A >: T](fallback: A): Future[A] = this
}

View file

@ -292,7 +292,7 @@ object AkkaBuild extends Build {
resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release
// compile options
scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"),
scalacOptions ++= Seq("-encoding", "UTF-8", /* "-optimise", */ "-deprecation", "-unchecked"),
javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
// add config dir to classpaths