From 8a790b1ddfd6789f937590eae25f991dab10ad21 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 11:31:01 +0200 Subject: [PATCH] Renaming CompletableFuture to Promise, Renaming AlreadyCompletedFuture to KeptPromise, closing ticket #854 --- .../akka/actor/actor/TypedActorSpec.scala | 6 +-- .../akka/dispatch/MailboxConfigSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorRef.scala | 14 +++--- .../src/main/scala/akka/dispatch/Future.scala | 50 +++++++++---------- .../scala/akka/dispatch/MessageHandling.scala | 6 +-- .../remoteinterface/RemoteInterface.scala | 6 +-- .../scala/akka/util/ReflectiveAccess.scala | 2 +- .../scala/akka/cluster/ClusterActorRef.scala | 6 +-- .../akka/cluster/ReplicatedClusterRef.scala | 2 +- .../scala/akka/cluster/TransactionLog.scala | 16 +++--- akka-docs/java/dataflow.rst | 2 +- akka-docs/java/untyped-actors.rst | 4 +- akka-docs/scala/actors.rst | 2 +- .../remote/netty/NettyRemoteSupport.scala | 24 ++++----- .../src/main/scala/akka/agent/Agent.scala | 6 +-- 15 files changed, 74 insertions(+), 74 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 5b9f155d75..f21f49b023 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach } import akka.actor.TypedActor._ import akka.japi.{ Option ⇒ JOption } import akka.util.Duration -import akka.dispatch.{ Dispatchers, Future, AlreadyCompletedFuture } +import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.routing.CyclicIterator object TypedActorSpec { @@ -43,7 +43,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = new AlreadyCompletedFuture(Right(pigdog)) + def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) futurePigdog @@ -51,7 +51,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - new AlreadyCompletedFuture(Right(pigdog + numbered)) + new KeptPromise(Right(pigdog + numbered)) } def futureComposePigdogFrom(foo: Foo): Future[String] = diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 773a68ef89..e71ca14721 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -68,7 +68,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte //CANDIDATE FOR TESTKIT def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = { - val result = new DefaultCompletableFuture[T](within.length, within.unit) + val result = new DefaultPromise[T](within.length, within.unit) val t = new Thread(new Runnable { def run = try { result.completeWithResult(fun) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 61fbf56be9..f41f2b46ae 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -208,7 +208,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture + def getSenderFuture: Option[Promise[Any]] = senderFuture /** * Is the actor being restarted? @@ -482,7 +482,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + senderFuture: Option[Promise[T]]): Promise[T] protected[akka] def actorInstance: AtomicReference[Actor] @@ -698,10 +698,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + senderFuture: Option[Promise[T]]): Promise[T] = { + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout)) dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]]) future.get } @@ -1020,7 +1020,7 @@ private[akka] case class RemoteActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + senderFuture: Option[Promise[T]]): Promise[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, remoteAddress, timeout, false, this, loader) @@ -1155,7 +1155,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture(): Option[CompletableFuture[Any]] = { + def senderFuture(): Option[Promise[Any]] = { val msg = currentMessage if (msg eq null) None else msg.senderFuture diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3ffdc0e1a6..4ebe2b7f05 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -56,7 +56,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { - val futureResult = new DefaultCompletableFuture[T](timeout) + val futureResult = new DefaultPromise[T](timeout) val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) for (f ← futures) f onComplete completeFirst @@ -83,9 +83,9 @@ object Futures { */ def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { if (futures.isEmpty) { - new AlreadyCompletedFuture[R](Right(zero)) + new KeptPromise[R](Right(zero)) } else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val results = new ConcurrentLinkedQueue[T]() val allDone = futures.size @@ -135,9 +135,9 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = { if (futures.isEmpty) - new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold @@ -202,7 +202,7 @@ object Futures { * in parallel. * * def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - * in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + * in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => * val fb = fn(a.asInstanceOf[A]) * for (r <- fr; b <-fb) yield (r += b) * }.map(_.result) @@ -230,7 +230,7 @@ object Future { /** * Create an empty Future with default timeout */ - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) + def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -240,7 +240,7 @@ object Future { * Useful for reducing many Futures into a single Future. */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. @@ -251,7 +251,7 @@ object Future { * */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) @@ -267,14 +267,14 @@ object Future { * * This allows working with Futures in an imperative style without blocking for each result. * - * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the + * Completing a Future using 'Promise << Future' will also suspend execution until the * value of the other Future is available. * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ val opte = f.exception if (opte.isDefined) future completeWithException (opte.get) } @@ -417,7 +417,7 @@ sealed trait Future[+T] { * */ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val v = ft.value.get fa complete { @@ -450,7 +450,7 @@ sealed trait Future[+T] { * */ final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception fa complete { @@ -482,7 +482,7 @@ sealed trait Future[+T] { * */ final def map[A](f: T ⇒ A): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -518,7 +518,7 @@ sealed trait Future[+T] { * */ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -546,7 +546,7 @@ sealed trait Future[+T] { } final def filter(p: Any ⇒ Boolean): Future[Any] = { - val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) + val f = new DefaultPromise[T](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -596,16 +596,16 @@ sealed trait Future[+T] { object Promise { - def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) + def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout) - def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) + def apply[A](): Promise[A] = apply(Actor.TIMEOUT) } /** * Essentially this is the Promise (or write-side) of a Future (read-side). */ -trait CompletableFuture[T] extends Future[T] { +trait Promise[T] extends Future[T] { /** * Completes this Future with the specified result, if not already completed. * @return this @@ -637,7 +637,7 @@ trait CompletableFuture[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) + val fr = new DefaultPromise[Any](Actor.TIMEOUT) this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -655,7 +655,7 @@ trait CompletableFuture[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { +class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def this() = this(0, MILLIS) @@ -722,7 +722,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired @@ -764,7 +764,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com this } - def onComplete(func: Future[T] ⇒ Unit): CompletableFuture[T] = { + def onComplete(func: Future[T] ⇒ Unit): Promise[T] = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -800,10 +800,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { +sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def complete(value: Either[Throwable, T]): Promise[T] = this def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f64c8109cb..f5cda388c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -19,7 +19,7 @@ import akka.actor._ final case class MessageInvocation(receiver: ActorRef, message: Any, sender: Option[ActorRef], - senderFuture: Option[CompletableFuture[Any]]) { + senderFuture: Option[Promise[Any]]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") def invoke() { @@ -32,7 +32,7 @@ final case class MessageInvocation(receiver: ActorRef, } } -final case class FutureInvocation[T](future: CompletableFuture[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { +final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { def run() { future complete (try { Right(function()) @@ -99,7 +99,7 @@ trait MessageDispatcher { private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = { futures.getAndIncrement() try { - val future = new DefaultCompletableFuture[T](timeout) + val future = new DefaultPromise[T](timeout) if (active.isOff) guard withGuard { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 13311138a9..91478c8eb2 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -7,7 +7,7 @@ package akka.remoteinterface import akka.japi.Creator import akka.actor._ import akka.util._ -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import akka.serialization._ import akka.AkkaException @@ -300,10 +300,10 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒ protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] + loader: Option[ClassLoader]): Option[Promise[T]] } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3efdb67a63..e47973984c 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,7 +4,7 @@ package akka.util -import akka.dispatch.{ Future, CompletableFuture, MessageInvocation } +import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index c8ac1012f4..7776e8f1d5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -8,7 +8,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.event.EventHandler -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -42,8 +42,8 @@ class ClusterActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = - route[T](message, timeout)(senderOption).asInstanceOf[CompletableFuture[T]] + senderFuture: Option[Promise[T]]): Promise[T] = + route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { addresses set (addresses.get map { diff --git a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala index cf4bff9859..4b075c7f91 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala @@ -86,7 +86,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String) message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) + senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance protected[akka] def supervisor_=(sup: Option[ActorRef]) { actorRef.supervisor_=(sup) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index b6e30fca1c..f5c96250b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -13,7 +13,7 @@ import akka.config._ import Config._ import akka.util._ import akka.event.EventHandler -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture } +import akka.dispatch.{ DefaultPromise, Promise } import akka.AkkaException import akka.cluster.zookeeper._ @@ -140,7 +140,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { - val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout) + val future = new DefaultPromise[Vector[Array[Byte]]](timeout) ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { @@ -149,7 +149,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]] + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] var entries = Vector[Array[Byte]]() while (enumeration.hasMoreElements) { entries = entries :+ enumeration.nextElement.getEntry @@ -362,7 +362,7 @@ object TransactionLog { if (zkClient.exists(txLogPath)) throw new ReplicationException( "Transaction log for UUID [" + id + "] already exists") - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) if (isAsync) { bookieClient.asyncCreateLedger( ensembleSize, quorumSize, digestType, password, @@ -371,7 +371,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -422,7 +422,7 @@ object TransactionLog { val ledger = try { if (isAsync) { - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { @@ -430,7 +430,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -447,7 +447,7 @@ object TransactionLog { TransactionLog(ledger, id, isAsync) } - private[akka] def await[T](future: CompletableFuture[T]): T = { + private[akka] def await[T](future: Promise[T]): T = { future.await if (future.result.isDefined) future.result.get else if (future.exception.isDefined) handleError(future.exception.get) diff --git a/akka-docs/java/dataflow.rst b/akka-docs/java/dataflow.rst index 52437647a5..901fdb2e38 100644 --- a/akka-docs/java/dataflow.rst +++ b/akka-docs/java/dataflow.rst @@ -8,7 +8,7 @@ Dataflow Concurrency (Java) Introduction ------------ -**IMPORTANT: As of Akka 1.1, Akka Future, CompletableFuture and DefaultCompletableFuture have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** +**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** Akka implements `Oz-style dataflow concurrency `_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index c03dd8e50c..ddeedbe829 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -308,9 +308,9 @@ Reply using the sender future If a message was sent with the 'sendRequestReply' or 'sendRequestReplyFuture' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. +The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. -CompletableFuture is a future with methods for 'completing the future: +Promise is a future with methods for 'completing the future: * completeWithResult(..) * completeWithException(..) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index bd550b807b..38076d7b58 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -331,7 +331,7 @@ Reply using the sender future If a message was sent with the ``!!`` or ``!!!`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the ``senderFuture: Option[CompletableFuture[_]]`` member field in the ``ActorRef`` class. +The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class. Here is an example of how it can be used: diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index de04a44a0f..6411104295 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future } +import akka.dispatch.{ DefaultPromise, Promise, Future } import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization @@ -73,12 +73,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] = + loader: Option[ClassLoader]): Option[Promise[T]] = withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)) private[akka] def withClientFor[T]( @@ -154,7 +154,7 @@ abstract class RemoteClient private[akka] ( remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -191,11 +191,11 @@ abstract class RemoteClient private[akka] ( def send[T]( message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, - actorRef: ActorRef): Option[CompletableFuture[T]] = + actorRef: ActorRef): Option[Promise[T]] = send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build, senderFuture) @@ -205,7 +205,7 @@ abstract class RemoteClient private[akka] ( */ def send[T]( request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { if (request.getOneWay) { try { @@ -227,7 +227,7 @@ abstract class RemoteClient private[akka] ( None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + else new DefaultPromise[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails @@ -410,7 +410,7 @@ class ActiveRemoteClient private[akka] ( */ class ActiveRemoteClientPipelineFactory( name: String, - futures: ConcurrentMap[Uuid, CompletableFuture[_]], + futures: ConcurrentMap[Uuid, Promise[_]], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -439,7 +439,7 @@ class ActiveRemoteClientPipelineFactory( @ChannelHandler.Sharable class ActiveRemoteClientHandler( val name: String, - val futures: ConcurrentMap[Uuid, CompletableFuture[_]], + val futures: ConcurrentMap[Uuid, Promise[_]], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -457,7 +457,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]] if (reply.hasMessage) { if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") @@ -891,7 +891,7 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout). + Some(new DefaultPromise[Any](request.getActorInfo.getTimeout). onComplete(_.value.get match { case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) case r: Right[Throwable, Any] ⇒ diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index b556b90e50..dfa3cfd6b8 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -7,7 +7,7 @@ package akka.agent import akka.stm._ import akka.actor.Actor import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.dispatch.{ DefaultCompletableFuture, Dispatchers, Future } +import akka.dispatch.{ DefaultPromise, Dispatchers, Future } /** * Used internally to send functions. @@ -122,7 +122,7 @@ class Agent[T](initialValue: T) { def alter(f: T ⇒ T)(timeout: Long): Future[T] = { def dispatch = updater.!!!(Update(f), timeout) if (Stm.activeTransaction) { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) get //Join xa deferred { result completeWith dispatch @@ -164,7 +164,7 @@ class Agent[T](initialValue: T) { * still be executed in order. */ def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) send((value: T) ⇒ { suspend val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start()