diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index 0f06dbd5c7..ad99a4af58 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -67,7 +67,7 @@ public class JavaFutureTests { } }); - cf.completeWithResult("foo"); + cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); assertEquals(f.get(), "foo"); } @@ -85,7 +85,7 @@ public class JavaFutureTests { }); Throwable exception = new NullPointerException(); - cf.completeWithException(exception); + cf.failure(exception); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); assertEquals(f.value().get().left().get(), exception); } @@ -101,7 +101,7 @@ public class JavaFutureTests { } }); - cf.completeWithResult("foo"); + cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); assertEquals(f.get(), "foo"); } @@ -117,7 +117,7 @@ public class JavaFutureTests { } }); - cf.completeWithResult("foo"); + cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); assertEquals(f.get(), "foo"); } @@ -126,13 +126,13 @@ public class JavaFutureTests { public void mustBeAbleToFlatMapAFuture() throws Throwable { final CountDownLatch latch = new CountDownLatch(1); Promise cf = Futures.promise(system.dispatcher()); - cf.completeWithResult("1000"); + cf.success("1000"); Future f = cf; Future r = f.flatMap(new Function>() { public Future apply(String r) { latch.countDown(); Promise cf = Futures.promise(system.dispatcher()); - cf.completeWithResult(Integer.parseInt(r)); + cf.success(Integer.parseInt(r)); return cf; } }); @@ -154,7 +154,7 @@ public class JavaFutureTests { } }); - cf.completeWithResult("foo"); + cf.success("foo"); assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); assertEquals(f.get(), "foo"); assertEquals(r.get(), "foo"); @@ -275,7 +275,7 @@ public class JavaFutureTests { public void BlockMustBeCallable() { Promise p = Futures.promise(system.dispatcher()); Duration d = Duration.create(1, TimeUnit.SECONDS); - p.completeWithResult("foo"); + p.success("foo"); Block.on(p, d); assertEquals(Block.sync(p, d), "foo"); } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 9a506031a3..7622c597ac 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -117,11 +117,11 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { def promiseIntercept(f: ⇒ Actor)(to: Promise[Actor]): Actor = try { val r = f - to.completeWithResult(r) + to.success(r) r } catch { case e ⇒ - to.completeWithException(e) + to.failure(e) throw e } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index acaa7b5c18..0cd04e5c60 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -61,9 +61,9 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val result = Promise[T]() val t = new Thread(new Runnable { def run = try { - result.completeWithResult(fun) + result.success(fun) } catch { - case e: Throwable ⇒ result.completeWithException(e) + case e: Throwable ⇒ result.failure(e) } }) t.start diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0e2fd1deba..07e03d42cc 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -414,9 +414,9 @@ class AskActorRef( final val result = Promise[Any]()(dispatcher) override def !(message: Any)(implicit sender: ActorRef = null): Unit = if (running.get) message match { - case Status.Success(r) ⇒ result.completeWithResult(r) - case Status.Failure(f) ⇒ result.completeWithException(f) - case other ⇒ result.completeWithResult(other) + case Status.Success(r) ⇒ result.success(r) + case Status.Failure(f) ⇒ result.failure(f) + case other ⇒ result.success(other) } override def sendSystemMessage(message: SystemMessage): Unit = message match { diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 494a387a98..785dc46703 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -171,12 +171,12 @@ object Future { val ref = new AtomicInteger(futures.size) val search: Future[T] ⇒ Unit = f ⇒ try { f.value.get match { - case Right(r) ⇒ if (predicate(r)) result completeWithResult Some(r) + case Right(r) ⇒ if (predicate(r)) result success Some(r) case _ ⇒ } } finally { if (ref.decrementAndGet == 0) - result completeWithResult None + result success None } futures.foreach(_ onComplete search) @@ -214,11 +214,11 @@ object Future { val i = results.iterator var currentValue = zero while (i.hasNext) { currentValue = foldFun(currentValue, i.next) } - result completeWithResult currentValue + result success currentValue } catch { case e: Exception ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage)) - result completeWithException e + result failure e } finally { results.clear } @@ -226,7 +226,7 @@ object Future { } case Left(exception) ⇒ if (done.switchOn) { - result completeWithException exception + result failure exception results.clear } } @@ -254,7 +254,7 @@ object Future { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold f.value.get match { case Right(value) ⇒ result.completeWith(fold(futures.filterNot(_ eq f))(value)(op)) - case Left(exception) ⇒ result.completeWithException(exception) + case Left(exception) ⇒ result.failure(exception) } } } @@ -295,8 +295,8 @@ object Future { def flow[A](body: ⇒ A @cps[Future[Any]])(implicit dispatcher: MessageDispatcher): Future[A] = { val future = Promise[A] dispatchTask({ () ⇒ - (reify(body) foreachFull (future completeWithResult, future completeWithException): Future[Any]) onException { - case e: Exception ⇒ future completeWithException e + (reify(body) foreachFull (future success, future failure): Future[Any]) onException { + case e: Exception ⇒ future failure e } }, true) future @@ -597,38 +597,55 @@ sealed trait Future[+T] extends japi.Future[T] with Block.Blockable[T] { object Promise { /** - * Creates a non-completed, new, Promise + * Creates a non-completed Promise * * Scala API */ def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() + + /** + * Creates an already completed Promise with the specified exception + */ + def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception)) + + /** + * Creates an already completed Promise with the specified result + */ + def fulfilled[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result)) } /** * Essentially this is the Promise (or write-side) of a Future (read-side). */ trait Promise[T] extends Future[T] { - /** - * Completes this Future with the specified result, if not already completed. - * @return this - */ - def complete(value: Either[Throwable, T]): this.type /** - * Completes this Future with the specified result, if not already completed. - * @return this + * Completes this Promise with the specified result, if not already completed. + * @return whether this call completed the Promise */ - final def completeWithResult(result: T): this.type = complete(Right(result)) + def tryComplete(value: Either[Throwable, T]): Boolean /** - * Completes this Future with the specified exception, if not already completed. + * Completes this Promise with the specified result, if not already completed. * @return this */ - final def completeWithException(exception: Throwable): this.type = complete(Left(exception)) + final def complete(value: Either[Throwable, T]): this.type = { tryComplete(value); this } /** - * Completes this Future with the specified other Future, when that Future is completed, - * unless this Future has already been completed. + * Completes this Promise with the specified result, if not already completed. + * @return this + */ + final def success(result: T): this.type = complete(Right(result)) + + /** + * Completes this Promise with the specified exception, if not already completed. + * @return this + */ + final def failure(exception: Throwable): this.type = complete(Left(exception)) + + /** + * Completes this Promise with the specified other Future, when that Future is completed, + * unless this Promise has already been completed. * @return this. */ final def completeWith(other: Future[T]): this.type = { @@ -646,7 +663,7 @@ trait Promise[T] extends Future[T] { } catch { case e: Exception ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) - fr completeWithException e + fr failure e } } fr @@ -660,7 +677,7 @@ trait Promise[T] extends Future[T] { } catch { case e: Exception ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) - fr completeWithException e + fr failure e } } fr @@ -735,8 +752,8 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst @inline protected final def getState: FState[T] = updater.get(this) - def complete(value: Either[Throwable, T]): this.type = { - val callbacks = { + def tryComplete(value: Either[Throwable, T]): Boolean = { + val callbacks: List[Future[T] ⇒ Unit] = { try { @tailrec def tryComplete: List[Future[T] ⇒ Unit] = { @@ -746,7 +763,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst case Pending(listeners) ⇒ if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners else tryComplete - case _ ⇒ Nil + case _ ⇒ null } } tryComplete @@ -755,9 +772,11 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst } } - if (callbacks.nonEmpty) Future.dispatchTask(() ⇒ callbacks foreach notifyCompleted) - - this + callbacks match { + case null ⇒ false + case cs if cs.isEmpty ⇒ true + case cs ⇒ Future.dispatchTask(() ⇒ cs foreach notifyCompleted); true + } } def onComplete(func: Future[T] ⇒ Unit): this.type = { @@ -790,7 +809,7 @@ class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends Abst final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): this.type = this + def tryComplete(value: Either[Throwable, T]): Boolean = true def onComplete(func: Future[T] ⇒ Unit): this.type = { Future dispatchTask (() ⇒ func(this)) this diff --git a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala index 918eb7c080..6460e1e1aa 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PromiseStream.scala @@ -166,7 +166,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: } else enqueue(elem) } else { if (_pendOut.compareAndSet(po, po.tail)) { - po.head completeWithResult elem + po.head success elem if (!po.head.isCompleted) enqueue(elem) } else enqueue(elem) } @@ -227,7 +227,7 @@ class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: } else dequeue(promise) } else { if (_elemOut.compareAndSet(eo, eo.tail)) { - promise completeWithResult eo.head + promise success eo.head } else dequeue(promise) } } diff --git a/akka-actor/src/main/scala/akka/util/cps/package.scala b/akka-actor/src/main/scala/akka/util/cps/package.scala index 6e88ff9cfe..7cbf60aaf2 100644 --- a/akka-actor/src/main/scala/akka/util/cps/package.scala +++ b/akka-actor/src/main/scala/akka/util/cps/package.scala @@ -42,7 +42,7 @@ package cps { if (test) Future(reify(block) flatMap (_ ⇒ reify(whileC(test)(block))) foreach c) else - Promise() completeWithResult (shiftUnitR[Unit, Future[Any]](()) foreach c) + Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c) } def repeatC[U](times: Int)(block: ⇒ U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] = @@ -50,7 +50,7 @@ package cps { if (times > 0) Future(reify(block) flatMap (_ ⇒ reify(repeatC(times - 1)(block))) foreach c) else - Promise() completeWithResult (shiftUnitR[Unit, Future[Any]](()) foreach c) + Promise() success (shiftUnitR[Unit, Future[Any]](()) foreach c) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index 3eb62217d5..639c72d1df 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -210,8 +210,8 @@ class TransactionLog private ( val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] val entries = toByteArrays(enumeration) - if (returnCode == BKException.Code.OK) future.completeWithResult(entries) - else future.completeWithException(BKException.create(returnCode)) + if (returnCode == BKException.Code.OK) future.success(entries) + else future.failure(BKException.create(returnCode)) } }, future) @@ -474,8 +474,8 @@ object TransactionLog { ledgerHandle: LedgerHandle, ctx: AnyRef) { val future = ctx.asInstanceOf[Promise[LedgerHandle]] - if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) - else future.completeWithException(BKException.create(returnCode)) + if (returnCode == BKException.Code.OK) future.success(ledgerHandle) + else future.failure(BKException.create(returnCode)) } }, future) @@ -532,8 +532,8 @@ object TransactionLog { new AsyncCallback.OpenCallback { def openComplete(returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { val future = ctx.asInstanceOf[Promise[LedgerHandle]] - if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) - else future.completeWithException(BKException.create(returnCode)) + if (returnCode == BKException.Code.OK) future.success(ledgerHandle) + else future.failure(BKException.create(returnCode)) } }, future) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala index 548fb149b5..fe7a8f1908 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/metrics/local/LocalMetricsMultiJvmSpec.scala @@ -81,7 +81,7 @@ class LocalMetricsMultiJvmNode1 extends MasterClusterTestNode { def reactsOn(metrics: NodeMetrics) = metrics.usedHeapMemory > 1 - def react(metrics: NodeMetrics) = monitorReponse.completeWithResult("Too much memory is used!") + def react(metrics: NodeMetrics) = monitorReponse.success("Too much memory is used!") }) diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 9defdf4607..7166ea1c49 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -284,8 +284,8 @@ Summary of reply semantics and options Promise represents the write-side of a Future, enabled by the methods -* completeWithResult(..) -* completeWithException(..) +* success(..) +* break(..) Starting actors --------------- diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index e3be9acd79..c54c91ab33 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -46,8 +46,8 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { val result = Promise[Boolean]()(dispatcher) mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)] ⇒ wr match { - case Right((oid, wr)) ⇒ result.completeWithResult(true) - case Left(t) ⇒ result.completeWithException(t) + case Right((oid, wr)) ⇒ result.success(true) + case Left(t) ⇒ result.failure(t) } }) Block.on(result, settings.WriteTimeout) @@ -66,12 +66,12 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { doc match { case Some(msg) ⇒ { log.debug("DEQUEUING message in mongo-based mailbox [{}]", msg) - envelopePromise.completeWithResult(msg.envelope()) + envelopePromise.success(msg.envelope()) log.debug("DEQUEUING messageInvocation in mongo-based mailbox [{}]", envelopePromise) } case None ⇒ log.info("No matching document found. Not an error, just an empty queue.") - envelopePromise.completeWithResult(null) + envelopePromise.success(null) () } } @@ -80,7 +80,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { def numberOfMessages: Int = { val count = Promise[Int]()(dispatcher) - mongo.count()(count.completeWithResult) + mongo.count()(count.success) try { Block.sync(count, settings.ReadTimeout).asInstanceOf[Int] } catch { case _: Exception ⇒ -1 } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 316fb9693c..66dc71f35b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -84,9 +84,9 @@ class RemoteActorRefProvider( if (systemService) local.actorOf(system, props, supervisor, name, systemService) else { val path = supervisor.path / name - val newFuture = Promise[ActorRef]()(dispatcher) + val creationPromise = Promise[ActorRef]()(dispatcher) - actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future + actors.putIfAbsent(path.toString, creationPromise) match { // we won the race -- create the actor and resolve the future case null ⇒ val actor: InternalActorRef = try { deployer.lookupDeploymentFor(path.toString) match { @@ -158,14 +158,14 @@ class RemoteActorRefProvider( } } catch { case e: Exception ⇒ - newFuture completeWithException e // so the other threads gets notified of error + creationPromise failure e // so the other threads gets notified of error throw e } // actor foreach system.registry.register // only for ActorRegistry backward compat, will be removed later - newFuture completeWithResult actor - actors.replace(path.toString, newFuture, actor) + creationPromise success actor + actors.replace(path.toString, creationPromise, actor) actor case actor: InternalActorRef ⇒ actor case future: Future[_] ⇒ Block.sync(future, system.settings.ActorTimeout.duration).asInstanceOf[InternalActorRef] diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 348ac77bd2..8466fa25b3 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -57,7 +57,7 @@ object TestActorRefSpec { class WorkerActor() extends TActor { def receiveT = { case "work" ⇒ sender ! "workDone"; self.stop() - case replyTo: Promise[Any] ⇒ replyTo.completeWithResult("complexReply") + case replyTo: Promise[Any] ⇒ replyTo.success("complexReply") case replyTo: ActorRef ⇒ replyTo ! "complexReply" } }