Reimplementing DefaultCompletableFuture to be as non-blocking internally as possible

This commit is contained in:
Viktor Klang 2011-08-05 22:27:16 +02:00
parent b1c64652ce
commit 458724de34

View file

@ -21,7 +21,9 @@ import java.util.{ LinkedList ⇒ JLinkedList }
import scala.annotation.tailrec
import scala.collection.mutable.Stack
import akka.util.{ Switch, Duration, BoxedType }
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean }
import scala.Math
class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -771,6 +773,17 @@ trait Promise[T] extends Future[T] {
}
//Companion object to FState, just to provide a cheap, immutable default entry
private[akka] object FState {
val empty = new FState[Nothing]()
def apply[T](): FState[T] = empty.asInstanceOf[FState[T]]
}
/**
* Represents the internal state of the DefaultCompletableFuture
*/
private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] Unit] = Nil)
/**
* The default concrete Future implementation.
*/
@ -784,78 +797,53 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit))
private val _startTimeInNanos = currentTimeInNanos
private val _lock = new ReentrantLock
private val _signal = _lock.newCondition
private var _value: Option[Either[Throwable, T]] = None
private var _listeners: List[Future[T] Unit] = Nil
private val ref = new AtomicReference[FState[T]](FState())
/**
* Must be called inside _lock.lock<->_lock.unlock
* Returns true if completed within the timeout
*/
@tailrec
private def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (_value.isEmpty && waitTimeNanos > 0) {
val start = currentTimeInNanos
val remainingNanos = try {
_signal.awaitNanos(waitTimeNanos)
} catch {
case e: InterruptedException
waitTimeNanos - (currentTimeInNanos - start)
}
awaitUnsafe(remainingNanos)
if (value.isEmpty && waitTimeNanos > 0) {
val ms = NANOS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = System.nanoTime()
try { ref.synchronized { ref.wait(ms, ns) } } catch { case e: InterruptedException }
awaitUnsafe(waitTimeNanos - Math.abs(System.nanoTime() - start))
} else {
_value.isDefined
value.isDefined
}
}
def await(atMost: Duration) = {
_lock.lock()
try {
if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity
while (_value.isEmpty) { _signal.await }
this
} else { //Limited wait
val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout
else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost
else atMost.toNanos min timeLeft() //Otherwise use the smallest of them
if (awaitUnsafe(time)) this
else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms")
}
} finally { _lock.unlock }
}
protected def awaitThenThrow(waitNanos: Long): this.type =
if (awaitUnsafe(waitNanos)) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds")
def await = await(timeout.duration)
def await(atMost: Duration) = awaitThenThrow(atMost.toNanos min timeLeft())
def await = awaitThenThrow(timeLeft())
def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false
def value: Option[Either[Throwable, T]] = {
_lock.lock
try {
_value
} finally {
_lock.unlock
}
}
def value: Option[Either[Throwable, T]] = ref.get.value
def complete(value: Either[Throwable, T]): this.type = {
val callbacks = {
_lock.lock
try {
if (_value.isEmpty) { //Only complete if we aren't expired
if (!isExpired) {
_value = Some(value)
val existingListeners = _listeners
_listeners = Nil
existingListeners
} else {
_listeners = Nil
@tailrec
def tryComplete: List[Future[T] Unit] = {
val cur = ref.get
if (cur.value.isDefined) Nil
else if ( /*cur.value.isEmpty && */ isExpired) {
//Empty and expired, so remove listeners
ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails
Nil
} else {
if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners
else tryComplete
}
} else Nil
}
tryComplete
} finally {
_signal.signalAll
_lock.unlock
ref.synchronized { ref.notifyAll() } //Notify any evil blockers
}
}
@ -865,44 +853,34 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
}
def onComplete(func: Future[T] Unit): this.type = {
_lock.lock
val notifyNow = try {
if (_value.isEmpty) {
if (!isExpired) { //Only add the listener if the future isn't expired
_listeners ::= func
false
} else false //Will never run the callback since the future is expired
} else true
} finally {
_lock.unlock
@tailrec //Returns whether the future has already been completed or not
def tryAddCallback(): Boolean = {
val cur = ref.get
if (cur.value.isDefined) true
else if (isExpired) false
else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false
else tryAddCallback()
}
val notifyNow = tryAddCallback()
if (notifyNow) Future.dispatchTask(() notifyCompleted(func))
this
}
def onTimeout(func: Future[T] Unit): this.type = {
if (timeout.duration.isFinite) {
_lock.lock
val runNow = try {
if (_value.isEmpty) {
if (!isExpired) {
val runnable = new Runnable {
def run() {
if (!isCompleted) func(self)
}
}
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
false
} else true
} else false
} finally {
_lock.unlock
}
val runNow =
if (!timeout.duration.isFinite) false //Not possible
else if (value.isEmpty) {
if (!isExpired) {
val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } }
Scheduler.scheduleOnce(runnable, timeLeft, NANOS)
false
} else true
} else false
if (runNow) Future.dispatchTask(() notifyCompleted(func))
}
if (runNow) Future.dispatchTask(() notifyCompleted(func))
this
}
@ -926,11 +904,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} else this
private def notifyCompleted(func: Future[T] Unit) {
try {
func(this)
} catch {
case e EventHandler.error(e, this, "Future onComplete-callback raised an exception")
}
try { func(this) } catch { case e EventHandler notify EventHandler.Error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really?
}
@inline