diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 35b5cf216f..295e70c121 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,7 +21,9 @@ import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } + +import java.util.concurrent.atomic.{ AtomicReference, AtomicBoolean } +import scala.Math class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -771,6 +773,17 @@ trait Promise[T] extends Future[T] { } +//Companion object to FState, just to provide a cheap, immutable default entry +private[akka] object FState { + val empty = new FState[Nothing]() + def apply[T](): FState[T] = empty.asInstanceOf[FState[T]] +} + +/** + * Represents the internal state of the DefaultCompletableFuture + */ +private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, listeners: List[Future[T] ⇒ Unit] = Nil) + /** * The default concrete Future implementation. */ @@ -784,78 +797,53 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit)) private val _startTimeInNanos = currentTimeInNanos - private val _lock = new ReentrantLock - private val _signal = _lock.newCondition - private var _value: Option[Either[Throwable, T]] = None - private var _listeners: List[Future[T] ⇒ Unit] = Nil + private val ref = new AtomicReference[FState[T]](FState()) - /** - * Must be called inside _lock.lock<->_lock.unlock - * Returns true if completed within the timeout - */ @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (_value.isEmpty && waitTimeNanos > 0) { - val start = currentTimeInNanos - val remainingNanos = try { - _signal.awaitNanos(waitTimeNanos) - } catch { - case e: InterruptedException ⇒ - waitTimeNanos - (currentTimeInNanos - start) - } - awaitUnsafe(remainingNanos) + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec + val start = System.nanoTime() + try { ref.synchronized { ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + + awaitUnsafe(waitTimeNanos - Math.abs(System.nanoTime() - start)) } else { - _value.isDefined + value.isDefined } } - def await(atMost: Duration) = { - _lock.lock() - try { - if (!atMost.isFinite && !timeout.duration.isFinite) { //If wait until infinity - while (_value.isEmpty) { _signal.await } - this - } else { //Limited wait - val time = if (!atMost.isFinite) timeLeft() //If atMost is infinity, use preset timeout - else if (!timeout.duration.isFinite) atMost.toNanos //If preset timeout is infinite, use atMost - else atMost.toNanos min timeLeft() //Otherwise use the smallest of them - if (awaitUnsafe(time)) this - else throw new FutureTimeoutException("Future timed out after [" + NANOS.toMillis(time) + "] ms") - } - } finally { _lock.unlock } - } + protected def awaitThenThrow(waitNanos: Long): this.type = + if (awaitUnsafe(waitNanos)) this + else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(waitNanos) + "] milliseconds") - def await = await(timeout.duration) + def await(atMost: Duration) = awaitThenThrow(atMost.toNanos min timeLeft()) + + def await = awaitThenThrow(timeLeft()) def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false - def value: Option[Either[Throwable, T]] = { - _lock.lock - try { - _value - } finally { - _lock.unlock - } - } + def value: Option[Either[Throwable, T]] = ref.get.value def complete(value: Either[Throwable, T]): this.type = { val callbacks = { - _lock.lock try { - if (_value.isEmpty) { //Only complete if we aren't expired - if (!isExpired) { - _value = Some(value) - val existingListeners = _listeners - _listeners = Nil - existingListeners - } else { - _listeners = Nil + @tailrec + def tryComplete: List[Future[T] ⇒ Unit] = { + val cur = ref.get + if (cur.value.isDefined) Nil + else if ( /*cur.value.isEmpty && */ isExpired) { + //Empty and expired, so remove listeners + ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails Nil + } else { + if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners + else tryComplete } - } else Nil + } + tryComplete } finally { - _signal.signalAll - _lock.unlock + ref.synchronized { ref.notifyAll() } //Notify any evil blockers } } @@ -865,44 +853,34 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } def onComplete(func: Future[T] ⇒ Unit): this.type = { - _lock.lock - val notifyNow = try { - if (_value.isEmpty) { - if (!isExpired) { //Only add the listener if the future isn't expired - _listeners ::= func - false - } else false //Will never run the callback since the future is expired - } else true - } finally { - _lock.unlock + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = ref.get + if (cur.value.isDefined) true + else if (isExpired) false + else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false + else tryAddCallback() } + val notifyNow = tryAddCallback() + if (notifyNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) this } def onTimeout(func: Future[T] ⇒ Unit): this.type = { - if (timeout.duration.isFinite) { - _lock.lock - val runNow = try { - if (_value.isEmpty) { - if (!isExpired) { - val runnable = new Runnable { - def run() { - if (!isCompleted) func(self) - } - } - Scheduler.scheduleOnce(runnable, timeLeft, NANOS) - false - } else true - } else false - } finally { - _lock.unlock - } + val runNow = + if (!timeout.duration.isFinite) false //Not possible + else if (value.isEmpty) { + if (!isExpired) { + val runnable = new Runnable { def run() { if (!isCompleted) func(DefaultPromise.this) } } + Scheduler.scheduleOnce(runnable, timeLeft, NANOS) + false + } else true + } else false - if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) - } + if (runNow) Future.dispatchTask(() ⇒ notifyCompleted(func)) this } @@ -926,11 +904,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { - func(this) - } catch { - case e ⇒ EventHandler.error(e, this, "Future onComplete-callback raised an exception") - } + try { func(this) } catch { case e ⇒ EventHandler notify EventHandler.Error(e, this, "Future onComplete-callback raised an exception") } //TODO catch, everything? Really? } @inline