Adding AbstractPromise to create an AtomicReferenceFieldUpdater to get rid of the AtomicReference allocation
This commit is contained in:
parent
c4508a3e26
commit
56f8f858be
2 changed files with 32 additions and 11 deletions
|
|
@ -21,7 +21,7 @@ import java.util.{ LinkedList ⇒ JLinkedList }
|
|||
import scala.annotation.tailrec
|
||||
import scala.collection.mutable.Stack
|
||||
import akka.util.{ Switch, Duration, BoxedType }
|
||||
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference, AtomicBoolean }
|
||||
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicReference, AtomicBoolean }
|
||||
|
||||
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
def this(message: String) = this(message, null)
|
||||
|
|
@ -792,7 +792,7 @@ private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, l
|
|||
/**
|
||||
* The default concrete Future implementation.
|
||||
*/
|
||||
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
|
||||
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
|
||||
self ⇒
|
||||
|
||||
def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default)
|
||||
|
|
@ -802,7 +802,6 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit))
|
||||
|
||||
private val _startTimeInNanos = currentTimeInNanos
|
||||
private val ref = new AtomicReference[FState[T]](FState()) //FIXME create a base-class in Java with an AtomicReferenceFieldUpdater and avoid allocating the AtomicReference for each
|
||||
|
||||
@tailrec
|
||||
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
|
||||
|
|
@ -810,7 +809,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
val ms = NANOS.toMillis(waitTimeNanos)
|
||||
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
|
||||
val start = currentTimeInNanos
|
||||
try { ref.synchronized { if (value.isEmpty) ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
|
||||
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
|
||||
|
||||
awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start))
|
||||
} else {
|
||||
|
|
@ -836,28 +835,35 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
|
||||
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
|
||||
|
||||
def value: Option[Either[Throwable, T]] = ref.get.value
|
||||
def value: Option[Either[Throwable, T]] = getState.value
|
||||
|
||||
@inline
|
||||
protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean =
|
||||
AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState)
|
||||
|
||||
@inline
|
||||
protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this)
|
||||
|
||||
def complete(value: Either[Throwable, T]): this.type = {
|
||||
val callbacks = {
|
||||
try {
|
||||
@tailrec
|
||||
def tryComplete: List[Future[T] ⇒ Unit] = {
|
||||
val cur = ref.get
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) Nil
|
||||
else if ( /*cur.value.isEmpty && */ isExpired) {
|
||||
//Empty and expired, so remove listeners
|
||||
//TODO Perhaps cancel existing onTimeout listeners in the future here?
|
||||
ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
|
||||
updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
|
||||
Nil
|
||||
} else {
|
||||
if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners
|
||||
if (updateState(cur, FState(Option(value), Nil))) cur.listeners
|
||||
else tryComplete
|
||||
}
|
||||
}
|
||||
tryComplete
|
||||
} finally {
|
||||
ref.synchronized { ref.notifyAll() } //Notify any evil blockers
|
||||
synchronized { notifyAll() } //Notify any evil blockers
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -869,10 +875,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
|
|||
def onComplete(func: Future[T] ⇒ Unit): this.type = {
|
||||
@tailrec //Returns whether the future has already been completed or not
|
||||
def tryAddCallback(): Boolean = {
|
||||
val cur = ref.get
|
||||
val cur = getState
|
||||
if (cur.value.isDefined) true
|
||||
else if (isExpired) false
|
||||
else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false
|
||||
else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false
|
||||
else tryAddCallback()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue