Removed FState, switched to isCompleted checks instead of value.isDefined, added zip etc

This commit is contained in:
Viktor Klang 2012-01-19 13:50:02 +01:00
parent 97280ffeed
commit 44c7f49f92
2 changed files with 74 additions and 44 deletions

View file

@ -11,11 +11,11 @@ import akka.testkit.{ EventFilter, filterEvents, filterException }
import akka.util.duration._
import akka.testkit.AkkaSpec
import org.scalatest.junit.JUnitSuite
import java.lang.ArithmeticException
import akka.testkit.DefaultTimeout
import akka.testkit.TestLatch
import java.util.concurrent.{ TimeoutException, TimeUnit, CountDownLatch }
import scala.runtime.NonLocalReturnControl
import java.lang.{ IllegalStateException, ArithmeticException }
object FutureSpec {
class TestActor extends Actor {
@ -327,6 +327,24 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Await.result(Future.fold(futures)(0)(_ + _), timeout millis) must be(45)
}
"zip" in {
val timeout = 10000 millis
val f = new IllegalStateException("test")
intercept[IllegalStateException] {
Await.result(Promise.failed[String](f) zip Promise.successful("foo"), timeout)
} must be(f)
intercept[IllegalStateException] {
Await.result(Promise.successful("foo") zip Promise.failed[String](f), timeout)
} must be(f)
intercept[IllegalStateException] {
Await.result(Promise.failed[String](f) zip Promise.failed[String](f), timeout)
} must be(f)
Await.result(Promise.successful("foo") zip Promise.successful("foo"), timeout) must be(("foo", "foo"))
}
"fold by composing" in {
val actors = (1 to 10).toList map { _
system.actorOf(Props(new Actor {
@ -859,6 +877,12 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Await.result(p, timeout.duration) must be(result)
}
}
"zip properly" in {
f { (future, result)
Await.result(future zip Promise.successful("foo"), timeout.duration) must be((result, "foo"))
(evaluating { Await.result(future zip Promise.failed(new RuntimeException("ohnoes")), timeout.duration) } must produce[RuntimeException]).getMessage must be("ohnoes")
}
}
"not recover from exception" in { f((future, result) Await.result(future.recover({ case _ "pigdog" }), timeout.duration) must be(result)) }
"perform action on result" in {
f { (future, result)
@ -892,6 +916,10 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"retain exception with map" in { f((future, message) (evaluating { Await.result(future map (_.toString.length), timeout.duration) } must produce[E]).getMessage must be(message)) }
"retain exception with flatMap" in { f((future, message) (evaluating { Await.result(future flatMap (_ Promise.successful[Any]("foo")), timeout.duration) } must produce[E]).getMessage must be(message)) }
"not perform action with foreach" is pending
"zip properly" in {
f { (future, message) (evaluating { Await.result(future zip Promise.successful("foo"), timeout.duration) } must produce[E]).getMessage must be(message) }
}
"recover from exception" in { f((future, message) Await.result(future.recover({ case e if e.getMessage == message "pigdog" }), timeout.duration) must be("pigdog")) }
"not perform action on result" is pending
"project a failure" in { f((future, message) Await.result(future.failed, timeout.duration).getMessage must be(message)) }

View file

@ -346,6 +346,21 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
case _ source
}
/**
* @returns a new Future that will contain a tuple containing the successful result of this and that Future.
* If this or that fail, they will race to complete the returned Future with their failure.
* The returned Future will not be completed if neither this nor that are completed.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
val p = Promise[(T, U)]()
onComplete {
case Left(t) p failure t
case Right(r) that onSuccess { case r2 p success ((r, r2)) }
}
that onFailure { case f p failure f }
p
}
/**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
@ -357,7 +372,7 @@ sealed trait Future[+T] extends japi.Future[T] with Await.Awaitable[T] {
/**
* Tests whether this Future has been completed.
*/
final def isCompleted: Boolean = value.isDefined
def isCompleted: Boolean
/**
* The contained value of this Future. Before this Future is completed
@ -676,23 +691,7 @@ trait Promise[T] extends Future[T] {
//Companion object to FState, just to provide a cheap, immutable default entry
private[dispatch] object DefaultPromise {
def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
/**
* Represents the internal state of the DefaultCompletableFuture
*/
sealed trait FState[+T] { def value: Option[Either[Throwable, T]] }
case class Pending[T](listeners: List[Either[Throwable, T] Unit] = Nil) extends FState[T] {
def value: Option[Either[Throwable, T]] = None
}
case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
def result: T = value.get.right.get
}
case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] {
def exception: Throwable = value.get.left.get
}
private val emptyPendingValue = Pending[Nothing](Nil)
def EmptyPending[T](): List[T] = Nil
}
/**
@ -701,28 +700,25 @@ private[dispatch] object DefaultPromise {
class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self
import DefaultPromise.{ FState, Success, Failure, Pending }
protected final def tryAwait(atMost: Duration): Boolean = {
Future.blocking
@tailrec
def awaitUnsafe(waitTimeNanos: Long): Boolean = {
if (value.isEmpty && waitTimeNanos > 0) {
if (!isCompleted && waitTimeNanos > 0) {
val ms = NANOSECONDS.toMillis(waitTimeNanos)
val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
val start = System.nanoTime()
try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException }
try { synchronized { if (!isCompleted) wait(ms, ns) } } catch { case e: InterruptedException }
awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
} else
value.isDefined
} else isCompleted
}
awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (value.isDefined || tryAwait(atMost)) this
if (isCompleted || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
def result(atMost: Duration)(implicit permit: CanAwait): T =
@ -731,16 +727,24 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
case Right(r) r
}
def value: Option[Either[Throwable, T]] = getState.value
def value: Option[Either[Throwable, T]] = getState match {
case _: List[_] None
case c: Either[_, _] Some(c.asInstanceOf[Either[Throwable, T]])
}
def isCompleted(): Boolean = getState match {
case _: Either[_, _] true
case _ false
}
@inline
private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, AnyRef]]
@inline
protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
protected final def updateState(oldState: AnyRef, newState: AnyRef): Boolean = updater.compareAndSet(this, oldState, newState)
@inline
protected final def getState: FState[T] = updater.get(this)
protected final def getState: AnyRef = updater.get(this)
def tryComplete(value: Either[Throwable, T]): Boolean = {
val callbacks: List[Either[Throwable, T] Unit] = {
@ -748,9 +752,9 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
@tailrec
def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] Unit] = {
getState match {
case cur @ Pending(listeners)
if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners
else tryComplete(v)
case raw: List[_]
val cur = raw.asInstanceOf[List[Either[Throwable, T] Unit]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ null
}
}
@ -769,22 +773,20 @@ class DefaultPromise[T](implicit val executor: ExecutionContext) extends Abstrac
def onComplete(func: Either[Throwable, T] Unit): this.type = {
@tailrec //Returns whether the future has already been completed or not
def tryAddCallback(): Boolean = {
def tryAddCallback(): Either[Throwable, T] = {
val cur = getState
cur match {
case _: Success[_] | _: Failure[_] true
case p: Pending[_]
val pt = p.asInstanceOf[Pending[T]]
if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
case r: Either[_, _] r.asInstanceOf[Either[Throwable, T]]
case listeners: List[_] if (updateState(listeners, func :: listeners)) null else tryAddCallback()
}
}
if (tryAddCallback()) {
val result = value.get
Future.dispatchTask(() notifyCompleted(func, result))
tryAddCallback() match {
case null this
case completed
Future.dispatchTask(() notifyCompleted(func, completed))
this
}
this
}
private final def notifyCompleted(func: Either[Throwable, T] Unit, result: Either[Throwable, T]) {
@ -805,7 +807,7 @@ final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val exe
Future dispatchTask (() func(completedAs))
this
}
def isCompleted(): Boolean = true
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case Left(e) throw e