Renaming completeWithResult to success, completeWithException to failure, adding tryComplete to signal whether the completion was made or not

This commit is contained in:
Viktor Klang 2011-12-12 17:25:34 +01:00
parent 7026ded91d
commit 2d418c188f
13 changed files with 88 additions and 69 deletions

View file

@ -67,7 +67,7 @@ public class JavaFutureTests {
}
});
cf.completeWithResult("foo");
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@ -85,7 +85,7 @@ public class JavaFutureTests {
});
Throwable exception = new NullPointerException();
cf.completeWithException(exception);
cf.failure(exception);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.value().get().left().get(), exception);
}
@ -101,7 +101,7 @@ public class JavaFutureTests {
}
});
cf.completeWithResult("foo");
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@ -117,7 +117,7 @@ public class JavaFutureTests {
}
});
cf.completeWithResult("foo");
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@ -126,13 +126,13 @@ public class JavaFutureTests {
public void mustBeAbleToFlatMapAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = Futures.promise(system.dispatcher());
cf.completeWithResult("1000");
cf.success("1000");
Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(String r) {
latch.countDown();
Promise<Integer> cf = Futures.promise(system.dispatcher());
cf.completeWithResult(Integer.parseInt(r));
cf.success(Integer.parseInt(r));
return cf;
}
});
@ -154,7 +154,7 @@ public class JavaFutureTests {
}
});
cf.completeWithResult("foo");
cf.success("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(r.get(), "foo");
@ -275,7 +275,7 @@ public class JavaFutureTests {
public void BlockMustBeCallable() {
Promise<String> p = Futures.promise(system.dispatcher());
Duration d = Duration.create(1, TimeUnit.SECONDS);
p.completeWithResult("foo");
p.success("foo");
Block.on(p, d);
assertEquals(Block.sync(p, d), "foo");
}

View file

@ -117,11 +117,11 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
def promiseIntercept(f: Actor)(to: Promise[Actor]): Actor = try {
val r = f
to.completeWithResult(r)
to.success(r)
r
} catch {
case e
to.completeWithException(e)
to.failure(e)
throw e
}

View file

@ -61,9 +61,9 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val result = Promise[T]()
val t = new Thread(new Runnable {
def run = try {
result.completeWithResult(fun)
result.success(fun)
} catch {
case e: Throwable result.completeWithException(e)
case e: Throwable result.failure(e)
}
})
t.start

View file

@ -414,9 +414,9 @@ class AskActorRef(
final val result = Promise[Any]()(dispatcher)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match {
case Status.Success(r) result.completeWithResult(r)
case Status.Failure(f) result.completeWithException(f)
case other result.completeWithResult(other)
case Status.Success(r) result.success(r)
case Status.Failure(f) result.failure(f)
case other result.success(other)
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {

View file

@ -171,12 +171,12 @@ object Future {
val ref = new AtomicInteger(futures.size)
val search: Future[T] Unit = f try {
f.value.get match {
case Right(r) if (predicate(r)) result completeWithResult Some(r)
case Right(r) if (predicate(r)) result success Some(r)
case _
}
} finally {
if (ref.decrementAndGet == 0)
result completeWithResult None
result success None
}
futures.foreach(_ onComplete search)
@ -214,11 +214,11 @@ object Future {
val i = results.iterator
var currentValue = zero
while (i.hasNext) { currentValue = foldFun(currentValue, i.next) }
result completeWithResult currentValue
result success currentValue
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
result completeWithException e
result failure e
} finally {
results.clear
}
@ -226,7 +226,7 @@ object Future {
}
case Left(exception)
if (done.switchOn) {
result completeWithException exception
result failure exception
results.clear
}
}
@ -254,7 +254,7 @@ object Future {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case Right(value) result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
case Left(exception) result.completeWithException(exception)
case Left(exception) result.failure(exception)
}
}
}
@ -295,8 +295,8 @@ object Future {
def flow[A](body: A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = {
val future = Promise[A]
dispatchTask({ ()
(reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException {
case e: Exception future completeWithException e
(reify(body) foreachFull (future success, future failure): Future[Any]) onException {
case e: Exception future failure e
}
}, true)
future
@ -597,38 +597,55 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] {
object Promise {
/**
* Creates a non-completed, new, Promise
* Creates a non-completed Promise
*
* Scala API
*/
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
/**
* Creates an already completed Promise with the specified exception
*/
def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception))
/**
* Creates an already completed Promise with the specified result
*/
def fulfilled[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result))
}
/**
* Essentially this is the Promise (or write-side) of a Future (read-side).
*/
trait Promise[T] extends Future[T] {
/**
* Completes this Future with the specified result, if not already completed.
* @return this
*/
def complete(value: Either[Throwable, T]): this.type
/**
* Completes this Future with the specified result, if not already completed.
* @return this
* Completes this Promise with the specified result, if not already completed.
* @return whether this call completed the Promise
*/
final def completeWithResult(result: T): this.type = complete(Right(result))
def tryComplete(value: Either[Throwable, T]): Boolean
/**
* Completes this Future with the specified exception, if not already completed.
* Completes this Promise with the specified result, if not already completed.
* @return this
*/
final def completeWithException(exception: Throwable): this.type = complete(Left(exception))
final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this }
/**
* Completes this Future with the specified other Future, when that Future is completed,
* unless this Future has already been completed.
* Completes this Promise with the specified result, if not already completed.
* @return this
*/
final def success(result: T): this.type = complete(Right(result))
/**
* Completes this Promise with the specified exception, if not already completed.
* @return this
*/
final def failure(exception: Throwable): this.type = complete(Left(exception))
/**
* Completes this Promise with the specified other Future, when that Future is completed,
* unless this Promise has already been completed.
* @return this.
*/
final def completeWith(other: Future[T]): this.type = {
@ -646,7 +663,7 @@ trait Promise[T] extends Future[T] {
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
fr failure e
}
}
fr
@ -660,7 +677,7 @@ trait Promise[T] extends Future[T] {
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
fr failure e
}
}
fr
@ -735,8 +752,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
@inline
protected final def getState: FState[T] = updater.get(this)
def complete(value: Either[Throwable, T]): this.type = {
val callbacks = {
def tryComplete(value: Either[Throwable, T]): Boolean = {
val callbacks: List[Future[T] Unit] = {
try {
@tailrec
def tryComplete: List[Future[T] Unit] = {
@ -746,7 +763,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
case Pending(listeners)
if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
else tryComplete
case _ Nil
case _ null
}
}
tryComplete
@ -755,9 +772,11 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
}
}
if (callbacks.nonEmpty) Future.dispatchTask(() callbacks foreach notifyCompleted)
this
callbacks match {
case null false
case cs if cs.isEmpty true
case cs Future.dispatchTask(() cs foreach notifyCompleted); true
}
}
def onComplete(func: Future[T] Unit): this.type = {
@ -790,7 +809,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
val value = Some(suppliedValue)
def complete(value: Either[Throwable, T]): this.type = this
def tryComplete(value: Either[Throwable, T]): Boolean = true
def onComplete(func: Future[T] Unit): this.type = {
Future dispatchTask (() func(this))
this

View file

@ -166,7 +166,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout:
} else enqueue(elem)
} else {
if (_pendOut.compareAndSet(po, po.tail)) {
po.head completeWithResult elem
po.head success elem
if (!po.head.isCompleted) enqueue(elem)
} else enqueue(elem)
}
@ -227,7 +227,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout:
} else dequeue(promise)
} else {
if (_elemOut.compareAndSet(eo, eo.tail)) {
promise completeWithResult eo.head
promise success eo.head
} else dequeue(promise)
}
}

View file

@ -42,7 +42,7 @@ package cps {
if (test)
Future(reify(block) flatMap (_ reify(whileC(test)(block))) foreach c)
else
Promise() completeWithResult (shiftUnitR[Unit, Future[Any]](()) foreach c)
Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c)
}
def repeatC[U](times: Int)(block: U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] =
@ -50,7 +50,7 @@ package cps {
if (times > 0)
Future(reify(block) flatMap (_ reify(repeatC(times - 1)(block))) foreach c)
else
Promise() completeWithResult (shiftUnitR[Unit, Future[Any]](()) foreach c)
Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c)
}
}

View file

@ -210,8 +210,8 @@ class TransactionLog private (
val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]]
val entries = toByteArrays(enumeration)
if (returnCode == BKException.Code.OK) future.completeWithResult(entries)
else future.completeWithException(BKException.create(returnCode))
if (returnCode == BKException.Code.OK) future.success(entries)
else future.failure(BKException.create(returnCode))
}
},
future)
@ -474,8 +474,8 @@ object TransactionLog {
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
val future = ctx.asInstanceOf[Promise[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
if (returnCode == BKException.Code.OK) future.success(ledgerHandle)
else future.failure(BKException.create(returnCode))
}
},
future)
@ -532,8 +532,8 @@ object TransactionLog {
new AsyncCallback.OpenCallback {
def openComplete(returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) {
val future = ctx.asInstanceOf[Promise[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
if (returnCode == BKException.Code.OK) future.success(ledgerHandle)
else future.failure(BKException.create(returnCode))
}
},
future)

View file

@ -81,7 +81,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode {
def reactsOn(metrics: NodeMetrics) = metrics.usedHeapMemory > 1
def react(metrics: NodeMetrics) = monitorReponse.completeWithResult("Too much memory is used!")
def react(metrics: NodeMetrics) = monitorReponse.success("Too much memory is used!")
})

View file

@ -284,8 +284,8 @@ Summary of reply semantics and options
Promise represents the write-side of a Future, enabled by the methods
* completeWithResult(..)
* completeWithException(..)
* success(..)
* break(..)
Starting actors
---------------

View file

@ -46,8 +46,8 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
val result = Promise[Boolean]()(dispatcher)
mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)]
wr match {
case Right((oid, wr)) result.completeWithResult(true)
case Left(t) result.completeWithException(t)
case Right((oid, wr)) result.success(true)
case Left(t) result.failure(t)
}
})
Block.on(result, settings.WriteTimeout)
@ -66,12 +66,12 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
doc match {
case Some(msg) {
log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg)
envelopePromise.completeWithResult(msg.envelope())
envelopePromise.success(msg.envelope())
log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise)
}
case None
log.info("No matching document found. Not an error, just an empty queue.")
envelopePromise.completeWithResult(null)
envelopePromise.success(null)
()
}
}
@ -80,7 +80,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
def numberOfMessages: Int = {
val count = Promise[Int]()(dispatcher)
mongo.count()(count.completeWithResult)
mongo.count()(count.success)
try { Block.sync(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception -1 }
}

View file

@ -84,9 +84,9 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, name, systemService)
else {
val path = supervisor.path / name
val newFuture = Promise[ActorRef]()(dispatcher)
val creationPromise = Promise[ActorRef]()(dispatcher)
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
actors.putIfAbsent(path.toString, creationPromise) match { // we won the race -- create the actor and resolve the future
case null
val actor: InternalActorRef = try {
deployer.lookupDeploymentFor(path.toString) match {
@ -158,14 +158,14 @@ class RemoteActorRefProvider(
}
} catch {
case e: Exception
newFuture completeWithException e // so the other threads gets notified of error
creationPromise failure e // so the other threads gets notified of error
throw e
}
// actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actors.replace(path.toString, newFuture, actor)
creationPromise success actor
actors.replace(path.toString, creationPromise, actor)
actor
case actor: InternalActorRef actor
case future: Future[_] Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef]

View file

@ -57,7 +57,7 @@ object TestActorRefSpec {
class WorkerActor() extends TActor {
def receiveT = {
case "work" sender ! "workDone"; self.stop()
case replyTo: Promise[Any] replyTo.completeWithResult("complexReply")
case replyTo: Promise[Any] replyTo.success("complexReply")
case replyTo: ActorRef replyTo ! "complexReply"
}
}