diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 5b9f155d75..311523ce96 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach } import akka.actor.TypedActor._ import akka.japi.{ Option ⇒ JOption } import akka.util.Duration -import akka.dispatch.{ Dispatchers, Future, AlreadyCompletedFuture } +import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.routing.CyclicIterator object TypedActorSpec { @@ -43,7 +43,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = new AlreadyCompletedFuture(Right(pigdog)) + def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) futurePigdog @@ -51,7 +51,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - new AlreadyCompletedFuture(Right(pigdog + numbered)) + new KeptPromise(Right(pigdog + numbered)) } def futureComposePigdogFrom(foo: Foo): Future[String] = @@ -132,6 +132,12 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach stop(null) must be(false) } + "throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in { + (intercept[IllegalStateException] { + TypedActor.self[Foo] + }).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!") + } + "have access to itself when executing a method call" in { val t = newFooBar t.self must be(t) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 773a68ef89..e71ca14721 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -68,7 +68,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte //CANDIDATE FOR TESTKIT def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = { - val result = new DefaultCompletableFuture[T](within.length, within.unit) + val result = new DefaultPromise[T](within.length, within.unit) val t = new Thread(new Runnable { def run = try { result.completeWithResult(fun) diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 6214c84da3..df6184c292 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -3,15 +3,14 @@ package akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import java.util.concurrent.{ CyclicBarrier, TimeUnit, CountDownLatch } import org.scalatest.Assertions._ +import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch } +import akka.dispatch.Future object ActorRegistrySpec { - var record = "" class TestActor extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -19,10 +18,8 @@ object ActorRegistrySpec { class TestActor2 extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") case "ping2" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -41,6 +38,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actor2.get.address === actor1.address) assert(actor2.get.address === "test-actor-1") actor2.get.stop + assert(Actor.registry.actorFor(actor1.address).isEmpty) } @Test @@ -54,6 +52,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actorOrNone.get.uuid === uuid) assert(actorOrNone.get.address === "test-actor-1") actor.stop + assert(Actor.registry.local.actorFor(uuid).isEmpty) } @Test @@ -71,10 +70,8 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetAllActorsFromLocalActorRegistry { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) @@ -88,13 +85,15 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start - record = "" - Actor.registry.local.foreach(actor ⇒ actor !! "ping") - assert(record === "pongpong") + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start + val results = new ConcurrentLinkedQueue[Future[String]] + + Actor.registry.local.foreach(actor ⇒ results.add(actor.!!![String]("ping"))) + + assert(results.size === 2) + val i = results.iterator + while (i.hasNext) assert(i.next.get === "got ping") actor1.stop() actor2.stop() } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fe490fdd0a..0fa5b8e017 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -7,11 +7,25 @@ import akka.testing._ import akka.testing.Testing.{ sleepFor, testMillis } import akka.util.duration._ -import akka.actor.Actor import akka.actor.Actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.{ KeptPromise, Future } +import akka.actor.{ TypedActor, Actor } + +object RoutingSpec { + trait Foo { + def sq(x: Int, sleep: Long): Future[Int] + } + + class FooImpl extends Foo { + def sq(x: Int, sleep: Long): Future[Int] = { + if (sleep > 0) Thread.sleep(sleep) + new KeptPromise(Right(x * x)) + } + } +} class RoutingSpec extends WordSpec with MustMatchers { import Routing._ @@ -491,6 +505,29 @@ class RoutingSpec extends WordSpec with MustMatchers { pool.stop() } + + "support typed actors" in { + import RoutingSpec._ + import TypedActor._ + def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) + def receive = _route + } + + val pool = createProxy[Foo](createPool) + + val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) + + for ((i, r) ← results) r.get must equal(i * i) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5094ebed01..8b3e25fc2e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -208,7 +208,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture + def getSenderFuture: Option[Promise[Any]] = senderFuture /** * Is the actor being restarted? @@ -482,7 +482,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + senderFuture: Option[Promise[T]]): Promise[T] protected[akka] def actorInstance: AtomicReference[Actor] @@ -698,10 +698,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + senderFuture: Option[Promise[T]]): Promise[T] = { + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout)) dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]]) future.get } @@ -1022,7 +1022,7 @@ private[akka] case class RemoteActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + senderFuture: Option[Promise[T]]): Promise[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, remoteAddress, timeout, false, this, loader) @@ -1157,7 +1157,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture(): Option[CompletableFuture[Any]] = { + def senderFuture(): Option[Promise[Any]] = { val msg = currentMessage if (msg eq null) None else msg.senderFuture diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index c6710d60ff..f3c38887fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -34,7 +34,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] - private val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] private val guard = new ReadWriteGuard @@ -66,7 +66,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag // throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]") actorsByAddress.put(address, actor) - actorsByUuid.put(actor.uuid.toString, actor) + actorsByUuid.put(actor.uuid, actor) notifyListeners(ActorRegistered(address, actor)) } @@ -121,7 +121,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag */ class LocalActorRegistry( private val actorsByAddress: ConcurrentHashMap[String, ActorRef], - private val actorsByUuid: ConcurrentHashMap[String, ActorRef], + private val actorsByUuid: ConcurrentHashMap[Uuid, ActorRef], private val typedActorsByUuid: ConcurrentHashMap[Uuid, AnyRef]) { /** @@ -153,11 +153,8 @@ class LocalActorRegistry( /** * Finds the actor that have a specific uuid. */ - private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = { - val uuidAsString = uuid.toString - if (actorsByUuid.containsKey(uuidAsString)) Some(actorsByUuid.get(uuidAsString)) - else None - } + private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = + Option(actorsByUuid.get(uuid)) /** * Finds the typed actor that have a specific address. diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 839474a3ba..b7c6b45422 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -13,22 +13,23 @@ import java.util.concurrent.atomic.AtomicReference object TypedActor { private val selfReference = new ThreadLocal[AnyRef] - def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] - class TypedActor[TI <: AnyRef](proxyRef: AtomicReference[AnyRef], createInstance: ⇒ TI) extends Actor { + def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match { + case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!") + case some ⇒ some + } + + class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor { val me = createInstance + def callMethod(methodCall: MethodCall): Unit = methodCall match { + case m if m.isOneWay ⇒ m(me) + case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] + case m ⇒ self reply m(me) + } def receive = { case m: MethodCall ⇒ selfReference set proxyRef.get - try { - m match { - case m if m.isOneWay ⇒ m(me) - case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - case m ⇒ self reply m(me) - } - } finally { - selfReference set null - } + try { callMethod(m) } finally { selfReference set null } } } @@ -42,25 +43,25 @@ object TypedActor { case m if m.isOneWay ⇒ actor ! m null - case m if m.returnsJOption_? ⇒ - (actor !!! m).as[JOption[Any]] match { - case Some(null) | None ⇒ JOption.none[Any] - case Some(joption) ⇒ joption - } - case m if m.returnsOption_? ⇒ - (actor !!! m).as[AnyRef] match { - case Some(null) | None ⇒ None - case Some(option) ⇒ option - } case m if m.returnsFuture_? ⇒ actor !!! m + case m if m.returnsJOption_? || m.returnsOption_? ⇒ + (actor !!! m).as[AnyRef] match { + case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None + case Some(joption) ⇒ joption + } case m ⇒ (actor !!! m).get } } } - case class Configuration(timeout: Duration = Duration(Actor.TIMEOUT, "millis"), dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) + object Configuration { + val defaultTimeout = Duration(Actor.TIMEOUT, "millis") + val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher) + def apply(): Configuration = defaultConfiguration + } + case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) case class MethodCall(method: Method, parameters: Array[AnyRef]) { def isOneWay = method.getReturnType == java.lang.Void.TYPE @@ -83,39 +84,24 @@ object TypedActor { private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues) } - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.newInstance, config, loader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.create, config, loader) def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R = - newTypedActor(impl.getInterfaces, impl.newInstance, config, loader) + createProxyAndTypedActor(impl, impl.newInstance, config, loader) def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) - } - - protected def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[AnyRef](null) - configureAndProxyLocalActorRef[T](interfaces, proxyRef, actorOf(new TypedActor[T](proxyRef, constructor)), config, loader) - } - - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[AnyRef], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { - actor.timeout = config.timeout.toMillis - actor.dispatcher = config.dispatcher - - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] - proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop - proxy + createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) } def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match { @@ -133,4 +119,32 @@ object TypedActor { } def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null + + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + + def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = + createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = + createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomicReference[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader) + } + + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { + actor.timeout = config.timeout.toMillis + actor.dispatcher = config.dispatcher + + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] + proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + proxy + } + + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = + if (clazz.isInterface) Array[Class[_]](clazz) + else clazz.getInterfaces } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3ffdc0e1a6..d235d7f46e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -20,8 +20,6 @@ import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec -import scala.collection.generic.CanBuildFrom -import scala.collection.mutable.Builder import scala.collection.mutable.Stack class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -56,7 +54,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { - val futureResult = new DefaultCompletableFuture[T](timeout) + val futureResult = new DefaultPromise[T](timeout) val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) for (f ← futures) f onComplete completeFirst @@ -83,9 +81,9 @@ object Futures { */ def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { if (futures.isEmpty) { - new AlreadyCompletedFuture[R](Right(zero)) + new KeptPromise[R](Right(zero)) } else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val results = new ConcurrentLinkedQueue[T]() val allDone = futures.size @@ -135,9 +133,9 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = { if (futures.isEmpty) - new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold @@ -202,7 +200,7 @@ object Futures { * in parallel. * * def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - * in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + * in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => * val fb = fn(a.asInstanceOf[A]) * for (r <- fr; b <-fb) yield (r += b) * }.map(_.result) @@ -230,7 +228,7 @@ object Future { /** * Create an empty Future with default timeout */ - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) + def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -240,7 +238,7 @@ object Future { * Useful for reducing many Futures into a single Future. */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. @@ -251,7 +249,7 @@ object Future { * */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) @@ -267,23 +265,19 @@ object Future { * * This allows working with Futures in an imperative style without blocking for each result. * - * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the + * Completing a Future using 'Promise << Future' will also suspend execution until the * value of the other Future is available. * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ val opte = f.exception if (opte.isDefined) future completeWithException (opte.get) } future } - - private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { - override def initialValue = None - } } sealed trait Future[+T] { @@ -417,7 +411,7 @@ sealed trait Future[+T] { * */ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val v = ft.value.get fa complete { @@ -450,7 +444,7 @@ sealed trait Future[+T] { * */ final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception fa complete { @@ -482,7 +476,7 @@ sealed trait Future[+T] { * */ final def map[A](f: T ⇒ A): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -518,7 +512,7 @@ sealed trait Future[+T] { * */ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -546,7 +540,7 @@ sealed trait Future[+T] { } final def filter(p: Any ⇒ Boolean): Future[Any] = { - val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) + val f = new DefaultPromise[T](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -596,16 +590,19 @@ sealed trait Future[+T] { object Promise { - def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) + def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout) - def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) + def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { + override def initialValue = None + } } /** * Essentially this is the Promise (or write-side) of a Future (read-side). */ -trait CompletableFuture[T] extends Future[T] { +trait Promise[T] extends Future[T] { /** * Completes this Future with the specified result, if not already completed. * @return this @@ -637,7 +634,7 @@ trait CompletableFuture[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) + val fr = new DefaultPromise[Any](Actor.TIMEOUT) this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -655,7 +652,7 @@ trait CompletableFuture[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { +class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def this() = this(0, MILLIS) @@ -722,7 +719,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired @@ -746,7 +743,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - val pending = Future.callbacksPendingExecution.get + val pending = Promise.callbacksPendingExecution.get if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) pending.get.push(() ⇒ { // Linearize/aggregate callbacks at top level and then execute val doNotify = notifyCompleted _ //Hoist closure to avoid garbage @@ -755,16 +752,16 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } else { try { val callbacks = Stack[() ⇒ Unit]() // Allocate new aggregator for pending callbacks - Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator + Promise.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated - } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup + } finally { Promise.callbacksPendingExecution.set(None) } // Ensure cleanup } } this } - def onComplete(func: Future[T] ⇒ Unit): CompletableFuture[T] = { + def onComplete(func: Future[T] ⇒ Unit): Promise[T] = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -800,10 +797,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { +sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def complete(value: Either[Throwable, T]): Promise[T] = this def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f64c8109cb..f5cda388c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -19,7 +19,7 @@ import akka.actor._ final case class MessageInvocation(receiver: ActorRef, message: Any, sender: Option[ActorRef], - senderFuture: Option[CompletableFuture[Any]]) { + senderFuture: Option[Promise[Any]]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") def invoke() { @@ -32,7 +32,7 @@ final case class MessageInvocation(receiver: ActorRef, } } -final case class FutureInvocation[T](future: CompletableFuture[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { +final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { def run() { future complete (try { Right(function()) @@ -99,7 +99,7 @@ trait MessageDispatcher { private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = { futures.getAndIncrement() try { - val future = new DefaultCompletableFuture[T](timeout) + val future = new DefaultPromise[T](timeout) if (active.isOff) guard withGuard { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 13311138a9..91478c8eb2 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -7,7 +7,7 @@ package akka.remoteinterface import akka.japi.Creator import akka.actor._ import akka.util._ -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import akka.serialization._ import akka.AkkaException @@ -300,10 +300,10 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒ protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] + loader: Option[ClassLoader]): Option[Promise[T]] } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 35196813ae..a75d65ddd9 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,7 +4,7 @@ package akka.util -import akka.dispatch.{ Future, CompletableFuture, MessageInvocation } +import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index abcbbb58ee..5fc58ca42a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -8,7 +8,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.event.EventHandler -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -42,8 +42,8 @@ class ClusterActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = - route[T](message, timeout)(senderOption).asInstanceOf[CompletableFuture[T]] + senderFuture: Option[Promise[T]]): Promise[T] = + route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { addresses set (addresses.get map { diff --git a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala new file mode 100644 index 0000000000..da7a6a07b4 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala @@ -0,0 +1,111 @@ +package akka.cluster + +import zookeeper.AkkaZkClient +import java.lang.UnsupportedOperationException +import akka.AkkaException +import org.apache.zookeeper.{ KeeperException, CreateMode } +import org.apache.zookeeper.data.Stat +import scala.Some + +/** + * Simple abstraction to store an Array of bytes based on some String key. + * + * Nothing is being said about ACID, transactions etc. It depends on the implementation + * of this Storage interface of what is and isn't done on the lowest level. + * + * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. + * (This is supported by ZooKeeper). + * TODO: Class is up for better names. + * TODO: Instead of a String as key, perhaps also a byte-array. + */ +trait RawStorage { + + /** + * Inserts a byte-array based on some key. + * + * TODO: What happens when given key already exists + * @throws NodeExistsException + */ + def insert(key: String, bytes: Array[Byte]): Unit + + /** + * Stores a array of bytes based on some key. + * + * TODO: What happens when the given key doesn't exist yet + */ + def update(key: String, bytes: Array[Byte]): Unit + + /** + * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. + */ + def load(key: String): Option[Array[Byte]] +} + +/** + * An AkkaException thrown by the RawStorage module. + */ +class RawStorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause) + +/** + * * + * A RawStorageException thrown when an operation is done on a non existing node. + */ +class MissingNodeException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause) + +/** + * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. + */ +class NodeExistsException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause) + +/** + * A RawStorage implementation based on ZooKeeper. + * + * The store method is atomic: + * - so everything is written or nothing is written + * - is isolated, so threadsafe, + * but it will not participate in any transactions. + * //todo: unclear, is only a single connection used in the JVM?? + * + */ +class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { + + override def insert(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); + } catch { + case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException("failed to insert key" + key, e) + case e: KeeperException ⇒ throw new RawStorageException("failed to insert key" + key, e) + } + } + + override def load(key: String) = try { + Some(zkClient.connection.readData(key, new Stat, false)) + } catch { + case e: KeeperException.NoNodeException ⇒ None + case e: KeeperException ⇒ throw new RawStorageException("failed to load key" + key, e) + } + + override def update(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.writeData(key, bytes) + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException("failed to update key", e) + case e: KeeperException ⇒ throw new RawStorageException("failed to update key", e) + } + } +} + +class VoldemortRawStorage extends RawStorage { + + def load(Key: String) = { + throw new UnsupportedOperationException() + } + + override def insert(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } + + def update(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala index cf4bff9859..4b075c7f91 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala @@ -86,7 +86,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String) message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) + senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance protected[akka] def supervisor_=(sup: Option[ActorRef]) { actorRef.supervisor_=(sup) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index b6e30fca1c..f5c96250b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -13,7 +13,7 @@ import akka.config._ import Config._ import akka.util._ import akka.event.EventHandler -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture } +import akka.dispatch.{ DefaultPromise, Promise } import akka.AkkaException import akka.cluster.zookeeper._ @@ -140,7 +140,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { - val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout) + val future = new DefaultPromise[Vector[Array[Byte]]](timeout) ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { @@ -149,7 +149,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]] + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] var entries = Vector[Array[Byte]]() while (enumeration.hasMoreElements) { entries = entries :+ enumeration.nextElement.getEntry @@ -362,7 +362,7 @@ object TransactionLog { if (zkClient.exists(txLogPath)) throw new ReplicationException( "Transaction log for UUID [" + id + "] already exists") - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) if (isAsync) { bookieClient.asyncCreateLedger( ensembleSize, quorumSize, digestType, password, @@ -371,7 +371,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -422,7 +422,7 @@ object TransactionLog { val ledger = try { if (isAsync) { - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { @@ -430,7 +430,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -447,7 +447,7 @@ object TransactionLog { TransactionLog(ledger, id, isAsync) } - private[akka] def await[T](future: CompletableFuture[T]): T = { + private[akka] def await[T](future: Promise[T]): T = { future.await if (future.result.isDefined) future.result.get else if (future.exception.isDefined) handleError(future.exception.get) diff --git a/akka-docs/java/dataflow.rst b/akka-docs/java/dataflow.rst index 52437647a5..901fdb2e38 100644 --- a/akka-docs/java/dataflow.rst +++ b/akka-docs/java/dataflow.rst @@ -8,7 +8,7 @@ Dataflow Concurrency (Java) Introduction ------------ -**IMPORTANT: As of Akka 1.1, Akka Future, CompletableFuture and DefaultCompletableFuture have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** +**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** Akka implements `Oz-style dataflow concurrency `_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index c03dd8e50c..ddeedbe829 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -308,9 +308,9 @@ Reply using the sender future If a message was sent with the 'sendRequestReply' or 'sendRequestReplyFuture' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. +The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. -CompletableFuture is a future with methods for 'completing the future: +Promise is a future with methods for 'completing the future: * completeWithResult(..) * completeWithException(..) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index bd550b807b..38076d7b58 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -331,7 +331,7 @@ Reply using the sender future If a message was sent with the ``!!`` or ``!!!`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the ``senderFuture: Option[CompletableFuture[_]]`` member field in the ``ActorRef`` class. +The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class. Here is an example of how it can be used: diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index de04a44a0f..6411104295 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future } +import akka.dispatch.{ DefaultPromise, Promise, Future } import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization @@ -73,12 +73,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] = + loader: Option[ClassLoader]): Option[Promise[T]] = withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)) private[akka] def withClientFor[T]( @@ -154,7 +154,7 @@ abstract class RemoteClient private[akka] ( remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -191,11 +191,11 @@ abstract class RemoteClient private[akka] ( def send[T]( message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, - actorRef: ActorRef): Option[CompletableFuture[T]] = + actorRef: ActorRef): Option[Promise[T]] = send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build, senderFuture) @@ -205,7 +205,7 @@ abstract class RemoteClient private[akka] ( */ def send[T]( request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { if (request.getOneWay) { try { @@ -227,7 +227,7 @@ abstract class RemoteClient private[akka] ( None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + else new DefaultPromise[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails @@ -410,7 +410,7 @@ class ActiveRemoteClient private[akka] ( */ class ActiveRemoteClientPipelineFactory( name: String, - futures: ConcurrentMap[Uuid, CompletableFuture[_]], + futures: ConcurrentMap[Uuid, Promise[_]], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -439,7 +439,7 @@ class ActiveRemoteClientPipelineFactory( @ChannelHandler.Sharable class ActiveRemoteClientHandler( val name: String, - val futures: ConcurrentMap[Uuid, CompletableFuture[_]], + val futures: ConcurrentMap[Uuid, Promise[_]], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -457,7 +457,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]] if (reply.hasMessage) { if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") @@ -891,7 +891,7 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout). + Some(new DefaultPromise[Any](request.getActorInfo.getTimeout). onComplete(_.value.get match { case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) case r: Right[Throwable, Any] ⇒ diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index b556b90e50..dfa3cfd6b8 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -7,7 +7,7 @@ package akka.agent import akka.stm._ import akka.actor.Actor import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.dispatch.{ DefaultCompletableFuture, Dispatchers, Future } +import akka.dispatch.{ DefaultPromise, Dispatchers, Future } /** * Used internally to send functions. @@ -122,7 +122,7 @@ class Agent[T](initialValue: T) { def alter(f: T ⇒ T)(timeout: Long): Future[T] = { def dispatch = updater.!!!(Update(f), timeout) if (Stm.activeTransaction) { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) get //Join xa deferred { result completeWith dispatch @@ -164,7 +164,7 @@ class Agent[T](initialValue: T) { * still be executed in order. */ def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) send((value: T) ⇒ { suspend val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start()