From 458724de348c37aa86f3da45e40aaf0f0844530c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 5 Aug 2011 22:27:16 +0200 Subject: [PATCH 1/7] Reimplementing DefaultCompletableFuture to be as non-blocking internally as possible --- .../src/main/scala/akka/dispatch/Future.scala | 152 ++++++++---------- 1 file changed, 63 insertions(+), 89 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 35b5cf216f..295e70c121 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,7 +21,9 @@ import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } + +import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } +import scala.Math class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -771,6 +773,17 @@ trait Promise[T] extends Future[T] { } +//Companion object to FState, just to provide a cheap, immutable default entry +private[akka] object FState { + val empty = new FState[Nothing]() + def apply[T](): FState[T] = empty.asInstanceOf[FState[T]] +} + +/** + * Represents the internal state of the DefaultCompletableFuture + */ +private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil) + /** * The default concrete Future implementation. */ @@ -784,78 +797,53 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit)) private val _startTimeInNanos = currentTimeInNanos - private val _lock = new ReentrantLock - private val _signal = _lock.newCondition - private var _value: Option[Either[Throwable, T]] = None - private var _listeners: List[Future[T] ⇒ Unit] = Nil + private val ref = new AtomicReference[FState[T]](FState()) - /** - * Must be called inside _lock.lock<->_lock.unlock - * Returns true if completed within the timeout - */ @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (_value.isEmpty && waitTimeNanos > 0) { - val start = currentTimeInNanos - val remainingNanos = try { - _signal.awaitNanos(waitTimeNanos) - } catch { - case e: InterruptedException ⇒ - waitTimeNanos - (currentTimeInNanos - start) - } - awaitUnsafe(remainingNanos) + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec + val start = System.nanoTime() + try { ref.synchronized { ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + + awaitUnsafe(waitTimeNanos - Math.abs(System.nanoTime() - start)) } else { - _value.isDefined + value.isDefined } } - def await(atMost: Duration) = { - _lock.lock() - try { - if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity - while (_value.isEmpty) { _signal.await } - this - } else { //Limited wait - val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout - else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost - else atMost.toNanos min timeLeft() //Otherwise use the smallest of them - if (awaitUnsafe(time)) this - else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms") - } - } finally { _lock.unlock } - } + protected def awaitThenThrow(waitNanos: Long): this.type = + if (awaitUnsafe(waitNanos)) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") - def await = await(timeout.duration) + def await(atMost: Duration) = awaitThenThrow(atMost.toNanos min timeLeft()) + + def await = awaitThenThrow(timeLeft()) def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false - def value: Option[Either[Throwable, T]] = { - _lock.lock - try { - _value - } finally { - _lock.unlock - } - } + def value: Option[Either[Throwable, T]] = ref.get.value def complete(value: Either[Throwable, T]): this.type = { val callbacks = { - _lock.lock try { - if (_value.isEmpty) { //Only complete if we aren't expired - if (!isExpired) { - _value = Some(value) - val existingListeners = _listeners - _listeners = Nil - existingListeners - } else { - _listeners = Nil + @tailrec + def tryComplete: List[Future[T] ⇒ Unit] = { + val cur = ref.get + if (cur.value.isDefined) Nil + else if ( /*cur.value.isEmpty && */ isExpired) { + //Empty and expired, so remove listeners + ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails Nil + } else { + if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners + else tryComplete } - } else Nil + } + tryComplete } finally { - _signal.signalAll - _lock.unlock + ref.synchronized { ref.notifyAll() } //Notify any evil blockers } } @@ -865,44 +853,34 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } def onComplete(func: Future[T] ⇒ Unit): this.type = { - _lock.lock - val notifyNow = try { - if (_value.isEmpty) { - if (!isExpired) { //Only add the listener if the future isn't expired - _listeners ::= func - false - } else false //Will never run the callback since the future is expired - } else true - } finally { - _lock.unlock + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = ref.get + if (cur.value.isDefined) true + else if (isExpired) false + else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false + else tryAddCallback() } + val notifyNow = tryAddCallback() + if (notifyNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) this } def onTimeout(func: Future[T] ⇒ Unit): this.type = { - if (timeout.duration.isFinite) { - _lock.lock - val runNow = try { - if (_value.isEmpty) { - if (!isExpired) { - val runnable = new Runnable { - def run() { - if (!isCompleted) func(self) - } - } - Scheduler.scheduleOnce(runnable, timeLeft, NANOS) - false - } else true - } else false - } finally { - _lock.unlock - } + val runNow = + if (!timeout.duration.isFinite) false //Not possible + else if (value.isEmpty) { + if (!isExpired) { + val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } } + Scheduler.scheduleOnce(runnable, timeLeft, NANOS) + false + } else true + } else false - if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) - } + if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) this } @@ -926,11 +904,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { - func(this) - } catch { - case e ⇒ EventHandler.error(e, this, "Future onComplete-callback raised an exception") - } + try { func(this) } catch { case e ⇒ EventHandler notify EventHandler.Error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really? } @inline From 9fb91e92c985dda9f16a9a65dc7e60ca804a1cbf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Aug 2011 14:03:04 +0200 Subject: [PATCH 2/7] Removing awaitBlocking from Future since Futures cannot be completed after timed out, also cleaning up a lot of code to use pattern matching instead of if/else while simplifying and avoiding allocations --- .../src/main/scala/akka/dispatch/Future.scala | 223 +++++++++--------- 1 file changed, 109 insertions(+), 114 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 295e70c121..47e0432b11 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -206,7 +206,7 @@ object Futures { /** * Java API. - * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A => Future[B]. + * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. */ @@ -221,15 +221,9 @@ object Futures { /** * Java API. - * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A => Future[B]. + * Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel. - * - * def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - * in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => - * val fb = fn(a.asInstanceOf[A]) - * for (r <- fr; b <-fb) yield (r += b) - * }.map(_.result) */ def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Timeout.default, fn) } @@ -277,11 +271,11 @@ object Future { sequence(in)(cbf, timeout) /** - * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. + * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A ⇒ Future[B]. * This is useful for performing a parallel map. For example, to apply a function to all items of a list * in parallel: *
-   * val myFutureList = Futures.traverse(myList)(x => Future(myFunc(x)))
+   * val myFutureList = Futures.traverse(myList)(x ⇒ Future(myFunc(x)))
    * 
*/ def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout): Future[M[B]] = @@ -384,20 +378,20 @@ sealed trait Future[+T] extends japi.Future[T] { /** * Await completion of this Future (as `await`) and return its value if it * conforms to A's erased type. - * - * def as[A](implicit m: Manifest[A]): Option[A] = - * try { - * await - * value match { - * case None ⇒ None - * case Some(_: Left[_, _]) ⇒ None - * case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) - * } - * } catch { - * case _: Exception ⇒ None - * } */ + def as[A](implicit m: Manifest[A]): Option[A] = + try { + await + value match { + case None ⇒ None + case Some(_: Left[_, _]) ⇒ None + case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) + } + } catch { + case _: Exception ⇒ None + } + /** * Tests whether this Future has been completed. */ @@ -429,19 +423,17 @@ sealed trait Future[+T] extends japi.Future[T] { /** * Returns the successful result of this Future if it exists. */ - final def result: Option[T] = { - val v = value - if (v.isDefined) v.get.right.toOption - else None + final def result: Option[T] = value match { + case Some(r) ⇒ r.right.toOption + case _ ⇒ None } /** * Returns the contained exception of this Future if it exists. */ - final def exception: Option[Throwable] = { - val v = value - if (v.isDefined) v.get.left.toOption - else None + final def exception: Option[Throwable] = value match { + case Some(r) ⇒ r.left.toOption + case _ ⇒ None } /** @@ -455,17 +447,29 @@ sealed trait Future[+T] extends japi.Future[T] { * When the future is completed with a valid result, apply the provided * PartialFunction to the result. *
-   *   val result = future onResult {
-   *     case Foo => "foo"
-   *     case Bar => "bar"
+   *   future receive {
+   *     case Foo ⇒ target ! "foo"
+   *     case Bar ⇒ target ! "bar"
    *   }
    * 
*/ - final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete { f ⇒ - val optr = f.result - if (optr.isDefined) { - val r = optr.get - if (pf isDefinedAt r) pf(r) + @deprecated("Use `onResult` instead, will be removed in the future", "1.2") + final def receive(pf: PartialFunction[Any, Unit]): this.type = onResult(pf) + + /** + * When the future is completed with a valid result, apply the provided + * PartialFunction to the result. + *
+   *   future onResult {
+   *     case Foo ⇒ target ! "foo"
+   *     case Bar ⇒ target ! "bar"
+   *   }
+   * 
+ */ + final def onResult(pf: PartialFunction[T, Unit]): this.type = onComplete { + _.value match { + case Some(Right(r)) if pf isDefinedAt r ⇒ pf(r) + case _ ⇒ } } @@ -473,17 +477,15 @@ sealed trait Future[+T] extends japi.Future[T] { * When the future is completed with an exception, apply the provided * PartialFunction to the exception. *
-   *   val result = future onException {
-   *     case Foo => "foo"
-   *     case Bar => "bar"
+   *   future onException {
+   *     case NumberFormatException ⇒ target ! "wrong format"
    *   }
    * 
*/ - final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒ - val opte = f.exception - if (opte.isDefined) { - val e = opte.get - if (pf isDefinedAt e) pf(e) + final def onException(pf: PartialFunction[Throwable, Unit]): this.type = onComplete { + _.value match { + case Some(Left(ex)) if pf isDefinedAt ex ⇒ pf(ex) + case _ ⇒ } } @@ -514,26 +516,31 @@ sealed trait Future[+T] extends japi.Future[T] { * a valid result then the new Future will contain the same. * Example: *
-   * Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
-   * Future(6 / 0) recover { case e: NotFoundException   => 0 } // result: exception
-   * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
+   * Future(6 / 0) failure { case e: ArithmeticException ⇒ 0 } // result: 0
+   * Future(6 / 0) failure { case e: NotFoundException   ⇒ 0 } // result: exception
+   * Future(6 / 2) failure { case e: ArithmeticException ⇒ 0 } // result: 3
+   * 
+ */ + @deprecated("will be replaced by `recover`", "1.2") + final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = recover(pf) + + /** + * Creates a new Future that will handle any matching Throwable that this + * Future might contain. If there is no match, or if this Future contains + * a valid result then the new Future will contain the same. + * Example: + *
+   * Future(6 / 0) failure { case e: ArithmeticException ⇒ 0 } // result: 0
+   * Future(6 / 0) failure { case e: NotFoundException   ⇒ 0 } // result: exception
+   * Future(6 / 2) failure { case e: ArithmeticException ⇒ 0 } // result: 3
    * 
*/ final def recover[A >: T](pf: PartialFunction[Throwable, A])(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) - onComplete { self ⇒ - future complete { - self.value.get match { - case Left(e) ⇒ - try { - if (pf isDefinedAt e) Right(pf(e)) - else Left(e) - } catch { - case x: Exception ⇒ - Left(x) - } - case v ⇒ v - } + onComplete { + _.value.get match { + case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ future complete otherwise } } future @@ -554,19 +561,17 @@ sealed trait Future[+T] extends japi.Future[T] { */ final def map[A](f: T ⇒ A)(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) - onComplete { self ⇒ - future complete { - self.value.get match { - case Right(r) ⇒ - try { - Right(f(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - } - case v ⇒ v.asInstanceOf[Either[Throwable, A]] - } + onComplete { + _.value.get match { + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] + case Right(res) ⇒ + future complete (try { + Right(f(res)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + }) } } future @@ -608,26 +613,26 @@ sealed trait Future[+T] extends japi.Future[T] { */ final def flatMap[A](f: T ⇒ Future[A])(implicit timeout: Timeout): Future[A] = { val future = new DefaultPromise[A](timeout) + onComplete { _.value.get match { - case Right(r) ⇒ - try { - future completeWith f(r) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - future complete Left(e) - } - case v ⇒ future complete v.asInstanceOf[Either[Throwable, A]] + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] + case Right(r) ⇒ try { + future.completeWith(f(r)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + future complete Left(e) + } } } future } final def foreach(f: T ⇒ Unit): Unit = onComplete { - _.result match { - case Some(v) ⇒ f(v) - case None ⇒ + _.value.get match { + case Right(r) ⇒ f(r) + case _ ⇒ } } @@ -642,22 +647,16 @@ sealed trait Future[+T] extends japi.Future[T] { final def filter(p: T ⇒ Boolean)(implicit timeout: Timeout): Future[T] = { val future = new DefaultPromise[T](timeout) - onComplete { self ⇒ - future complete { - self.value.get match { - case Right(r) ⇒ - try { - if (p(r)) - Right(r) - else - Left(new MatchError(r)) - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - Left(e) - } - case v ⇒ v - } + onComplete { + _.value.get match { + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, T]] + case r @ Right(res) ⇒ future complete (try { + if (p(res)) r else Left(new MatchError(res)) + } catch { + case e: Exception ⇒ + EventHandler.error(e, this, e.getMessage) + Left(e) + }) } } future @@ -666,13 +665,10 @@ sealed trait Future[+T] extends japi.Future[T] { /** * Returns the current result, throws the exception if one has been raised, else returns None */ - final def resultOrException: Option[T] = { - val v = value - if (v.isDefined) { - val r = v.get - if (r.isLeft) throw r.left.get - else r.right.toOption - } else None + final def resultOrException: Option[T] = value match { + case Some(Left(e)) ⇒ throw e + case Some(Right(r)) ⇒ Some(r) + case _ ⇒ None } } @@ -834,6 +830,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi if (cur.value.isDefined) Nil else if ( /*cur.value.isEmpty && */ isExpired) { //Empty and expired, so remove listeners + //TODO Perhaps cancel existing onTimeout listeners in the future here? ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails Nil } else { @@ -862,9 +859,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi else tryAddCallback() } - val notifyNow = tryAddCallback() - - if (notifyNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) + if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func)) this } @@ -874,7 +869,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi if (!timeout.duration.isFinite) false //Not possible else if (value.isEmpty) { if (!isExpired) { - val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } } + val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } } //TODO Reschedule is run prematurely Scheduler.scheduleOnce(runnable, timeLeft, NANOS) false } else true From 811e14e081aed754f83109c537453dceb4a8dd0b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Aug 2011 20:05:43 +0200 Subject: [PATCH 3/7] Fixing await so that it respects infinite timeouts --- .../src/main/scala/akka/dispatch/Future.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 47e0432b11..d01085e4df 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -809,13 +809,21 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } } - protected def awaitThenThrow(waitNanos: Long): this.type = + def await(atMost: Duration): this.type = { + val waitNanos = + if (timeout.duration.isFinite && atMost.isFinite) + atMost.toNanos min timeLeft() + else if (atMost.isFinite) + atMost.toNanos + else if (timeout.duration.isFinite) + timeLeft() + else Long.MaxValue //If both are infinite, use Long.MaxValue + if (awaitUnsafe(waitNanos)) this else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") + } - def await(atMost: Duration) = awaitThenThrow(atMost.toNanos min timeLeft()) - - def await = awaitThenThrow(timeLeft()) + def await = await(timeout.duration) def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false From 8a1d31691c573862d24d6677858ca3910b68c2a9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Aug 2011 20:44:49 +0200 Subject: [PATCH 4/7] Removing deprecated methods from Future and removing one of the bad guys _as_ --- .../src/main/scala/akka/dispatch/Future.scala | 69 ++----------------- 1 file changed, 4 insertions(+), 65 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d01085e4df..3b8be4bb94 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -375,23 +375,6 @@ sealed trait Future[+T] extends japi.Future[T] { */ def await(atMost: Duration): Future[T] - /** - * Await completion of this Future (as `await`) and return its value if it - * conforms to A's erased type. - */ - - def as[A](implicit m: Manifest[A]): Option[A] = - try { - await - value match { - case None ⇒ None - case Some(_: Left[_, _]) ⇒ None - case Some(Right(v)) ⇒ Some(BoxedType(m.erasure).cast(v).asInstanceOf[A]) - } - } catch { - case _: Exception ⇒ None - } - /** * Tests whether this Future has been completed. */ @@ -443,19 +426,6 @@ sealed trait Future[+T] extends japi.Future[T] { */ def onComplete(func: Future[T] ⇒ Unit): this.type - /** - * When the future is completed with a valid result, apply the provided - * PartialFunction to the result. - *
-   *   future receive {
-   *     case Foo ⇒ target ! "foo"
-   *     case Bar ⇒ target ! "bar"
-   *   }
-   * 
- */ - @deprecated("Use `onResult` instead, will be removed in the future", "1.2") - final def receive(pf: PartialFunction[Any, Unit]): this.type = onResult(pf) - /** * When the future is completed with a valid result, apply the provided * PartialFunction to the result. @@ -493,37 +463,6 @@ sealed trait Future[+T] extends japi.Future[T] { def orElse[A >: T](fallback: ⇒ A): Future[A] - /** - * Creates a new Future by applying a PartialFunction to the successful - * result of this Future if a match is found, or else return a MatchError. - * If this Future is completed with an exception then the new Future will - * also contain this exception. - * Example: - *
-   * val future1 = for {
-   *   a <- actor ? Req("Hello") collect { case Res(x: Int)    => x }
-   *   b <- actor ? Req(a)       collect { case Res(x: String) => x }
-   *   c <- actor ? Req(7)       collect { case Res(x: String) => x }
-   * } yield b + "-" + c
-   * 
- */ - @deprecated("No longer needed, use 'map' instead. Removed in 2.0", "2.0") - final def collect[A](pf: PartialFunction[T, A])(implicit timeout: Timeout): Future[A] = this map pf - - /** - * Creates a new Future that will handle any matching Throwable that this - * Future might contain. If there is no match, or if this Future contains - * a valid result then the new Future will contain the same. - * Example: - *
-   * Future(6 / 0) failure { case e: ArithmeticException ⇒ 0 } // result: 0
-   * Future(6 / 0) failure { case e: NotFoundException   ⇒ 0 } // result: exception
-   * Future(6 / 2) failure { case e: ArithmeticException ⇒ 0 } // result: 3
-   * 
- */ - @deprecated("will be replaced by `recover`", "1.2") - final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = recover(pf) - /** * Creates a new Future that will handle any matching Throwable that this * Future might contain. If there is no match, or if this Future contains @@ -800,10 +739,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi if (value.isEmpty && waitTimeNanos > 0) { val ms = NANOS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec - val start = System.nanoTime() + val start = currentTimeInNanos try { ref.synchronized { ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } - awaitUnsafe(waitTimeNanos - Math.abs(System.nanoTime() - start)) + awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start)) } else { value.isDefined } @@ -894,7 +833,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi case Some(_) ⇒ this case _ if isExpired ⇒ Future[A](fallback) case _ ⇒ - val promise = new DefaultPromise[A](Timeout.never) + val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense. promise completeWith this val runnable = new Runnable { def run() { @@ -911,7 +850,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } @inline - private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) + private def currentTimeInNanos: Long = MILLIS.toNanos(System.currentTimeMillis) //TODO Switch to math.abs(System.nanoTime)? @inline private def timeLeft(): Long = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) } From bc1f7565b7ff175c4bf6e9c08a8d30f9c1983cc8 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 6 Aug 2011 13:32:03 -0600 Subject: [PATCH 5/7] Fixed race in Future.await, and minor changes to Future.result and Future.exception --- akka-actor/src/main/scala/akka/dispatch/Future.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3b8be4bb94..cfd729a25e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -407,16 +407,16 @@ sealed trait Future[+T] extends japi.Future[T] { * Returns the successful result of this Future if it exists. */ final def result: Option[T] = value match { - case Some(r) ⇒ r.right.toOption - case _ ⇒ None + case Some(Right(r)) ⇒ Some(r) + case _ ⇒ None } /** * Returns the contained exception of this Future if it exists. */ final def exception: Option[Throwable] = value match { - case Some(r) ⇒ r.left.toOption - case _ ⇒ None + case Some(Left(e)) ⇒ Some(e) + case _ ⇒ None } /** @@ -740,7 +740,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val ms = NANOS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = currentTimeInNanos - try { ref.synchronized { ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + try { ref.synchronized { if (value.isEmpty) ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start)) } else { From 0ae7a72f3aa7b38c97166a664cb010090583aeb6 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 6 Aug 2011 14:10:36 -0600 Subject: [PATCH 6/7] Future: Reschedule onTimeout/orElse if not yet expired --- .../src/main/scala/akka/dispatch/Future.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index cfd729a25e..3876634628 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -816,7 +816,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi if (!timeout.duration.isFinite) false //Not possible else if (value.isEmpty) { if (!isExpired) { - val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } } //TODO Reschedule is run prematurely + val runnable = new Runnable { + def run() { + if (!isCompleted) { + if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS) + else func(DefaultPromise.this) + } + } + } Scheduler.scheduleOnce(runnable, timeLeft, NANOS) false } else true @@ -837,7 +844,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi promise completeWith this val runnable = new Runnable { def run() { - if (!isCompleted) promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) }) + if (!isCompleted) { + if (!isExpired) Scheduler.scheduleOnce(this, timeLeft, NANOS) + else promise complete (try { Right(fallback) } catch { case e: Exception ⇒ Left(e) }) + } } } Scheduler.scheduleOnce(runnable, timeLeft, NANOS) From a17b75fc09409d684a379a395e216d061712b72a Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 6 Aug 2011 15:00:15 -0600 Subject: [PATCH 7/7] Add check for jdk7 to disable -optimize --- project/AkkaBuild.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index bd30d3b4fd..7a1fb598d6 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -298,7 +298,8 @@ object AkkaBuild extends Build { resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release // compile options - scalacOptions ++= Seq("-encoding", "UTF-8", "-optimise", "-deprecation", "-unchecked"), + scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked") ++ ( + if (System getProperty "java.runtime.version" startsWith "1.7") Seq() else Seq("-optimize")), // -optimize fails with jdk7 javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"), // add config dir to classpaths