diff --git a/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java b/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java new file mode 100644 index 0000000000..73d2c004cb --- /dev/null +++ b/akka-actor/src/main/java/akka/dispatch/AbstractPromise.java @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.dispatch; + +import sun.tools.tree.FinallyStatement; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +abstract class AbstractPromise { + private volatile Object _ref = FState.apply(); + protected final static AtomicReferenceFieldUpdater updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 7287377a8b..5f9f779b81 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -21,7 +21,7 @@ import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack import akka.util.{ Switch, Duration, BoxedType } -import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference, AtomicBoolean } +import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicReference, AtomicBoolean } class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(message: String) = this(message, null) @@ -792,7 +792,7 @@ private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, l /** * The default concrete Future implementation. */ -class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends Promise[T] { +class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { self ⇒ def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default) @@ -802,7 +802,6 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def this(timeout: Long, timeunit: TimeUnit)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout, timeunit)) private val _startTimeInNanos = currentTimeInNanos - private val ref = new AtomicReference[FState[T]](FState()) //FIXME create a base-class in Java with an AtomicReferenceFieldUpdater and avoid allocating the AtomicReference for each @tailrec private def awaitUnsafe(waitTimeNanos: Long): Boolean = { @@ -810,7 +809,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val ms = NANOS.toMillis(waitTimeNanos) val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec val start = currentTimeInNanos - try { ref.synchronized { if (value.isEmpty) ref.wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } awaitUnsafe(waitTimeNanos - (currentTimeInNanos - start)) } else { @@ -836,28 +835,35 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def isExpired: Boolean = if (timeout.duration.isFinite) timeLeft() <= 0 else false - def value: Option[Either[Throwable, T]] = ref.get.value + def value: Option[Either[Throwable, T]] = getState.value + + @inline + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = + AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].compareAndSet(this, oldState, newState) + + @inline + protected final def getState: FState[T] = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]].get(this) def complete(value: Either[Throwable, T]): this.type = { val callbacks = { try { @tailrec def tryComplete: List[Future[T] ⇒ Unit] = { - val cur = ref.get + val cur = getState if (cur.value.isDefined) Nil else if ( /*cur.value.isEmpty && */ isExpired) { //Empty and expired, so remove listeners //TODO Perhaps cancel existing onTimeout listeners in the future here? - ref.compareAndSet(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails + updateState(cur, FState()) //Try to reset the state to the default, doesn't matter if it fails Nil } else { - if (ref.compareAndSet(cur, FState(Option(value), Nil))) cur.listeners + if (updateState(cur, FState(Option(value), Nil))) cur.listeners else tryComplete } } tryComplete } finally { - ref.synchronized { ref.notifyAll() } //Notify any evil blockers + synchronized { notifyAll() } //Notify any evil blockers } } @@ -869,10 +875,10 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi def onComplete(func: Future[T] ⇒ Unit): this.type = { @tailrec //Returns whether the future has already been completed or not def tryAddCallback(): Boolean = { - val cur = ref.get + val cur = getState if (cur.value.isDefined) true else if (isExpired) false - else if (ref.compareAndSet(cur, cur.copy(listeners = func :: cur.listeners))) false + else if (updateState(cur, cur.copy(listeners = func :: cur.listeners))) false else tryAddCallback() }