From c950679fd853175a80910a124fa41fe1d73ae10a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 12 Oct 2011 16:29:33 +0200 Subject: [PATCH] #1285 - Implementing different internal states for the DefaultPromise --- .../src/main/scala/akka/dispatch/Future.scala | 77 ++++++++++++------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 5f9f779b81..4a92d0b8f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -780,21 +780,34 @@ trait Promise[T] extends Future[T] { //Companion object to FState, just to provide a cheap, immutable default entry private[akka] object FState { - val empty = new FState[Nothing]() - def apply[T](): FState[T] = empty.asInstanceOf[FState[T]] -} + def apply[T](): FState[T] = EmptyPending.asInstanceOf[FState[T]] -/** - * Represents the internal state of the DefaultCompletableFuture - */ -private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil) + /** + * Represents the internal state of the DefaultCompletableFuture + */ + + sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } + case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] { + def value: Option[Either[Throwable, T]] = None + } + case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def result: T = value.get.right.get + } + case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def exception: Throwable = value.get.left.get + } + case object Expired extends FState[Nothing] { + def value: Option[Either[Throwable, Nothing]] = None + } + val EmptyPending = Pending[Nothing](Nil) +} /** * The default concrete Future implementation. */ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { self ⇒ - + import FState.{ FState, Success, Failure, Pending, Expired } def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default) def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout)) @@ -842,7 +855,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState) @inline - protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) + protected final def getState: FState[T] = { + + @tailrec + def read(): FState[T] = { + val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) + if (cur.isInstanceOf[Pending[_]] && isExpired) { + if (updateState(cur, Expired)) Expired else read() + } else cur + } + + read() + } def complete(value: Either[Throwable, T]): this.type = { val callbacks = { @@ -850,15 +874,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec def tryComplete: List[Future[T] ⇒ Unit] = { val cur = getState - if (cur.value.isDefined) Nil - else if ( /*cur.value.isEmpty && */ isExpired) { - //Empty and expired, so remove listeners - //TODO Perhaps cancel existing onTimeout listeners in the future here? - updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails - Nil - } else { - if (updateState(cur, FState(Option(value), Nil))) cur.listeners - else tryComplete + + cur match { + case Pending(listeners) ⇒ + if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners + else tryComplete + case _ ⇒ Nil } } tryComplete @@ -876,10 +897,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Boolean = { val cur = getState - if (cur.value.isDefined) true - else if (isExpired) false - else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false - else tryAddCallback() + cur match { + case _: Success[_] | _: Failure[_] ⇒ true + case Expired ⇒ false + case p: Pending[_] ⇒ + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false + else tryAddCallback() + } } if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func)) @@ -912,10 +937,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi final def orElse[A >: T](fallback: ⇒ A): Future[A] = if (timeout.duration.isFinite) { - value match { - case Some(_) ⇒ this - case _ if isExpired ⇒ Future[A](fallback) - case _ ⇒ + getState match { + case _: Success[_] | _: Failure[_] ⇒ this + case Expired ⇒ Future[A](fallback) + case _: Pending[_] ⇒ val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense. promise completeWith this val runnable = new Runnable {