From a0350d03e91ff138e8cd28734b07c3676e020403 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Tue, 2 Aug 2011 10:19:49 -0600 Subject: [PATCH] Future: move implicit dispatcher from methods to constructor --- .../src/main/scala/akka/actor/ActorRef.scala | 4 +- .../src/main/scala/akka/dispatch/Future.scala | 78 ++++++++++--------- project/AkkaBuild.scala | 2 +- 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 71b5e7e2d1..32b779c466 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -680,7 +680,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, channel: UntypedChannel): Future[Any] = { val future = channel match { case f: ActorPromise ⇒ f - case _ ⇒ new ActorPromise(timeout) + case _ ⇒ new ActorPromise(timeout)(dispatcher) } dispatcher dispatchMessage new MessageInvocation(this, message, future) future @@ -1025,7 +1025,7 @@ private[akka] case class RemoteActorRef private[akka] ( case _ ⇒ None } val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout.duration.toMillis, false, this, loader) - if (future.isDefined) ActorPromise(future.get) + if (future.isDefined) ActorPromise(future.get)(dispatcher) else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b3c2b0c1ed..22bfa963df 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -317,6 +317,8 @@ object Future { sealed trait Future[+T] { + implicit def dispatcher: MessageDispatcher + /** * For use only within a Future.flow block or another compatible Delimited Continuations reset block. * @@ -327,7 +329,7 @@ sealed trait Future[+T] { * execution will fail. The normal result of getting a Future from an ActorRef using ? will return * an untyped Future. */ - def apply[A >: T]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): A @cps[Future[Any]] = shift(this flatMap (_: A ⇒ Future[Any])) + def apply[A >: T]()(implicit timeout: Timeout): A @cps[Future[Any]] = shift(this flatMap (_: A ⇒ Future[Any])) /** * Blocks awaiting completion of this Future, then returns the resulting value, @@ -422,7 +424,7 @@ sealed trait Future[+T] { * Future. If the Future has already been completed, this will apply * immediately. */ - def onComplete(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type + def onComplete(func: Future[T] ⇒ Unit): this.type /** * When the future is completed with a valid result, apply the provided @@ -434,7 +436,7 @@ sealed trait Future[+T] { * } * */ - final def onResult(pf: PartialFunction[Any, Unit])(implicit dispatcher: MessageDispatcher): this.type = onComplete { f ⇒ + final def onResult(pf: PartialFunction[Any, Unit]): this.type = onComplete { f ⇒ val optr = f.result if (optr.isDefined) { val r = optr.get @@ -452,7 +454,7 @@ sealed trait Future[+T] { * } * */ - final def onException(pf: PartialFunction[Throwable, Unit])(implicit dispatcher: MessageDispatcher): Future[T] = onComplete { f ⇒ + final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒ val opte = f.exception if (opte.isDefined) { val e = opte.get @@ -460,9 +462,9 @@ sealed trait Future[+T] { } } - def onTimeout(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type + def onTimeout(func: Future[T] ⇒ Unit): this.type - def orElse[A >: T](fallback: ⇒ A)(implicit dispatcher: MessageDispatcher): Future[A] + def orElse[A >: T](fallback: ⇒ A): Future[A] /** * Creates a new Future by applying a PartialFunction to the successful @@ -478,7 +480,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def collect[A](pf: PartialFunction[Any, A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = { + final def collect[A](pf: PartialFunction[Any, A])(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { @@ -510,7 +512,7 @@ sealed trait Future[+T] { * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3 * */ - final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = { + final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { @@ -543,7 +545,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def map[A](f: T ⇒ A)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = { + final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) onComplete { self ⇒ future complete { @@ -567,7 +569,7 @@ sealed trait Future[+T] { * Creates a new Future[A] which is completed with this Future's result if * that conforms to A's erased type or a ClassCastException otherwise. */ - final def mapTo[A](implicit m: Manifest[A], dispatcher: MessageDispatcher = implicitly, timeout: Timeout = this.timeout): Future[A] = { + final def mapTo[A](implicit m: Manifest[A], timeout: Timeout = this.timeout): Future[A] = { val fa = new DefaultPromise[A](timeout) onComplete { ft ⇒ fa complete (ft.value.get match { @@ -597,7 +599,7 @@ sealed trait Future[+T] { * } yield b + "-" + c * */ - final def flatMap[A](f: T ⇒ Future[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[A] = { + final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) onComplete { _.value.get match { @@ -615,23 +617,23 @@ sealed trait Future[+T] { future } - final def foreach(f: T ⇒ Unit)(implicit dispatcher: MessageDispatcher): Unit = onComplete { + final def foreach(f: T ⇒ Unit): Unit = onComplete { _.result match { case Some(v) ⇒ f(v) case None ⇒ } } - final def withFilter(p: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout) = new FutureWithFilter[T](this, p) + final def withFilter(p: T ⇒ Boolean)(implicit timeout: Timeout) = new FutureWithFilter[T](this, p) - final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout) { + final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean)(implicit timeout: Timeout) { def foreach(f: A ⇒ Unit): Unit = self filter p foreach f def map[B](f: A ⇒ B): Future[B] = self filter p map f def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) } - final def filter(p: T ⇒ Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = { + final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = { val future = new DefaultPromise[T](timeout) onComplete { self ⇒ future complete { @@ -684,17 +686,17 @@ object Promise { /** * Creates a non-completed, new, Promise with the supplied timeout in milliseconds */ - def apply[A](timeout: Timeout): Promise[A] = new DefaultPromise[A](timeout) + def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A](timeout) /** * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) */ - def apply[A](): Promise[A] = apply(Timeout.default) + def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = apply(Timeout.default) /** * Construct a completable channel */ - def channel(timeout: Long = Actor.TIMEOUT): ActorPromise = new ActorPromise(timeout) + def channel(timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(timeout) private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None @@ -709,26 +711,26 @@ trait Promise[T] extends Future[T] { * Completes this Future with the specified result, if not already completed. * @return this */ - def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type + def complete(value: Either[Throwable, T]): this.type /** * Completes this Future with the specified result, if not already completed. * @return this */ - final def completeWithResult(result: T)(implicit dispatcher: MessageDispatcher): this.type = complete(Right(result)) + final def completeWithResult(result: T): this.type = complete(Right(result)) /** * Completes this Future with the specified exception, if not already completed. * @return this */ - final def completeWithException(exception: Throwable)(implicit dispatcher: MessageDispatcher): this.type = complete(Left(exception)) + final def completeWithException(exception: Throwable): this.type = complete(Left(exception)) /** * Completes this Future with the specified other Future, when that Future is completed, * unless this Future has already been completed. * @return this. */ - final def completeWith(other: Future[T])(implicit dispatcher: MessageDispatcher): this.type = { + final def completeWith(other: Future[T]): this.type = { other onComplete { f ⇒ complete(f.value.get) } this } @@ -768,14 +770,14 @@ trait Promise[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { +class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends Promise[T] { self ⇒ - def this() = this(Timeout.default) + def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default) - def this(timeout: Long) = this(Timeout(timeout)) + def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout)) - def this(timeout: Long, timeunit: TimeUnit) = this(Timeout(timeout, timeunit)) + def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit)) private val _startTimeInNanos = currentTimeInNanos private val _lock = new ReentrantLock @@ -834,7 +836,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { } } - def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type = { + def complete(value: Either[Throwable, T]): this.type = { processCallbacks { _lock.lock try { @@ -858,7 +860,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { this } - private def processCallbacks(callbacks: List[Future[T] ⇒ Unit])(implicit dispatcher: MessageDispatcher): Unit = { + private def processCallbacks(callbacks: List[Future[T] ⇒ Unit]): Unit = { if (callbacks.nonEmpty) { // Steps to ensure we don't run into a stack-overflow situation @tailrec def runCallbacks(rest: List[Future[T] ⇒ Unit], callbackStack: Stack[() ⇒ Unit]) { @@ -887,7 +889,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { } } - def onComplete(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type = { + def onComplete(func: Future[T] ⇒ Unit): this.type = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -905,7 +907,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { this } - def onTimeout(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type = { + def onTimeout(func: Future[T] ⇒ Unit): this.type = { if (timeout.duration.isFinite) { _lock.lock val runNow = try { @@ -930,7 +932,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { this } - final def orElse[A >: T](fallback: ⇒ A)(implicit dispatcher: MessageDispatcher): Future[A] = + final def orElse[A >: T](fallback: ⇒ A): Future[A] = if (timeout.duration.isFinite) { value match { case Some(_) ⇒ this @@ -962,7 +964,7 @@ class DefaultPromise[T](val timeout: Timeout) extends Promise[T] { private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) } -class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with ForwardableChannel { +class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) extends DefaultPromise[Any](timeout)(dispatcher) with ForwardableChannel { def !(message: Any)(implicit channel: UntypedChannel = NullChannel) = completeWithResult(message) @@ -980,7 +982,7 @@ class ActorPromise(timeout: Timeout) extends DefaultPromise[Any](timeout) with F } object ActorPromise { - def apply(f: Promise[Any]): ActorPromise = + def apply(f: Promise[Any])(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(f.timeout) { completeWith(f) override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message @@ -992,11 +994,11 @@ object ActorPromise { * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { +sealed class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T])(implicit dispatcher: MessageDispatcher): this.type = this - def onComplete(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type = { + def complete(value: Either[Throwable, T]): this.type = this + def onComplete(func: Future[T] ⇒ Unit): this.type = { dispatcher dispatchTask (() ⇒ func(this)) //TODO: Use pending callback stack this } @@ -1005,7 +1007,7 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise def isExpired: Boolean = true def timeout: Timeout = Timeout.zero - final def onTimeout(func: Future[T] ⇒ Unit)(implicit dispatcher: MessageDispatcher): this.type = this - final def orElse[A >: T](fallback: ⇒ A)(implicit dispatcher: MessageDispatcher): Future[A] = this + final def onTimeout(func: Future[T] ⇒ Unit): this.type = this + final def orElse[A >: T](fallback: ⇒ A): Future[A] = this } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 5f314004dc..63c6ee0890 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -292,7 +292,7 @@ object AkkaBuild extends Build { resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release // compile options - scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"), + scalacOptions ++= Seq("-encoding", "UTF-8", /* "-optimise", */ "-deprecation", "-unchecked"), javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), // add config dir to classpaths