#1285 - Implementing different internal states for the DefaultPromise
This commit is contained in:
parent
fe3c22fe23
commit
c950679fd8
1 changed files with 51 additions and 26 deletions
|
|
@ -780,21 +780,34 @@ trait Promise[T] extends Future[T] {
|
|||
|
||||
//Companion object to FState, just to provide a cheap, immutable default entry
|
||||
private[akka] object FState {
|
||||
val empty = new FState[Nothing]()
|
||||
def apply[T](): FState[T] = empty.asInstanceOf[FState[T]]
|
||||
}
|
||||
def apply[T](): FState[T] = EmptyPending.asInstanceOf[FState[T]]
|
||||
|
||||
/**
|
||||
* Represents the internal state of the DefaultCompletableFuture
|
||||
*/
|
||||
private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil)
|
||||
/**
|
||||
* Represents the internal state of the DefaultCompletableFuture
|
||||
*/
|
||||
|
||||
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
|
||||
case class Pending[T](listeners: List[Future[T] ⇒ Unit] = Nil) extends FState[T] {
|
||||
def value: Option[Either[Throwable, T]] = None
|
||||
}
|
||||
case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
||||
def result: T = value.get.right.get
|
||||
}
|
||||
case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
|
||||
def exception: Throwable = value.get.left.get
|
||||
}
|
||||
case object Expired extends FState[Nothing] {
|
||||
def value: Option[Either[Throwable, Nothing]] = None
|
||||
}
|
||||
val EmptyPending = Pending[Nothing](Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* The default concrete Future implementation.
|
||||
*/
|
||||
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
|
||||
self ⇒
|
||||
|
||||
import FState.{ FState, Success, Failure, Pending, Expired }
|
||||
def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default)
|
||||
|
||||
def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout))
|
||||
|
|
@ -842,7 +855,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
|
||||
|
||||
@inline
|
||||
protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
|
||||
protected final def getState: FState[T] = {
|
||||
|
||||
@tailrec
|
||||
def read(): FState[T] = {
|
||||
val cur = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
|
||||
if (cur.isInstanceOf[Pending[_]] && isExpired) {
|
||||
if (updateState(cur, Expired)) Expired else read()
|
||||
} else cur
|
||||
}
|
||||
|
||||
read()
|
||||
}
|
||||
|
||||
def complete(value: Either[Throwable, T]): this.type = {
|
||||
val callbacks = {
|
||||
|
|
@ -850,15 +874,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
@tailrec
|
||||
def tryComplete: List[Future[T] ⇒ Unit] = {
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) Nil
|
||||
else if ( /*cur.value.isEmpty && */ isExpired) {
|
||||
//Empty and expired, so remove listeners
|
||||
//TODO Perhaps cancel existing onTimeout listeners in the future here?
|
||||
updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
|
||||
Nil
|
||||
} else {
|
||||
if (updateState(cur, FState(Option(value), Nil))) cur.listeners
|
||||
else tryComplete
|
||||
|
||||
cur match {
|
||||
case Pending(listeners) ⇒
|
||||
if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
|
||||
else tryComplete
|
||||
case _ ⇒ Nil
|
||||
}
|
||||
}
|
||||
tryComplete
|
||||
|
|
@ -876,10 +897,14 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
@tailrec //Returns whether the future has already been completed or not
|
||||
def tryAddCallback(): Boolean = {
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) true
|
||||
else if (isExpired) false
|
||||
else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false
|
||||
else tryAddCallback()
|
||||
cur match {
|
||||
case _: Success[_] | _: Failure[_] ⇒ true
|
||||
case Expired ⇒ false
|
||||
case p: Pending[_] ⇒
|
||||
val pt = p.asInstanceOf[Pending[T]]
|
||||
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false
|
||||
else tryAddCallback()
|
||||
}
|
||||
}
|
||||
|
||||
if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
|
||||
|
|
@ -912,10 +937,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
|
||||
final def orElse[A >: T](fallback: ⇒ A): Future[A] =
|
||||
if (timeout.duration.isFinite) {
|
||||
value match {
|
||||
case Some(_) ⇒ this
|
||||
case _ if isExpired ⇒ Future[A](fallback)
|
||||
case _ ⇒
|
||||
getState match {
|
||||
case _: Success[_] | _: Failure[_] ⇒ this
|
||||
case Expired ⇒ Future[A](fallback)
|
||||
case _: Pending[_] ⇒
|
||||
val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
|
||||
promise completeWith this
|
||||
val runnable = new Runnable {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue