From aa52486fdc8679f6f90aa2b05d590cd7afb59b70 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 11:27:02 +0200 Subject: [PATCH 01/12] Fixing erronous use of actor uuid as string in ActorRegistry, closing ticket #886 --- .../src/main/scala/akka/actor/ActorRegistry.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index c6710d60ff..f3c38887fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -34,7 +34,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] - private val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] private val guard = new ReadWriteGuard @@ -66,7 +66,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag // throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]") actorsByAddress.put(address, actor) - actorsByUuid.put(actor.uuid.toString, actor) + actorsByUuid.put(actor.uuid, actor) notifyListeners(ActorRegistered(address, actor)) } @@ -121,7 +121,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag */ class LocalActorRegistry( private val actorsByAddress: ConcurrentHashMap[String, ActorRef], - private val actorsByUuid: ConcurrentHashMap[String, ActorRef], + private val actorsByUuid: ConcurrentHashMap[Uuid, ActorRef], private val typedActorsByUuid: ConcurrentHashMap[Uuid, AnyRef]) { /** @@ -153,11 +153,8 @@ class LocalActorRegistry( /** * Finds the actor that have a specific uuid. */ - private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = { - val uuidAsString = uuid.toString - if (actorsByUuid.containsKey(uuidAsString)) Some(actorsByUuid.get(uuidAsString)) - else None - } + private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = + Option(actorsByUuid.get(uuid)) /** * Finds the typed actor that have a specific address. From 8a790b1ddfd6789f937590eae25f991dab10ad21 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 11:31:01 +0200 Subject: [PATCH 02/12] Renaming CompletableFuture to Promise, Renaming AlreadyCompletedFuture to KeptPromise, closing ticket #854 --- .../akka/actor/actor/TypedActorSpec.scala | 6 +-- .../akka/dispatch/MailboxConfigSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorRef.scala | 14 +++--- .../src/main/scala/akka/dispatch/Future.scala | 50 +++++++++---------- .../scala/akka/dispatch/MessageHandling.scala | 6 +-- .../remoteinterface/RemoteInterface.scala | 6 +-- .../scala/akka/util/ReflectiveAccess.scala | 2 +- .../scala/akka/cluster/ClusterActorRef.scala | 6 +-- .../akka/cluster/ReplicatedClusterRef.scala | 2 +- .../scala/akka/cluster/TransactionLog.scala | 16 +++--- akka-docs/java/dataflow.rst | 2 +- akka-docs/java/untyped-actors.rst | 4 +- akka-docs/scala/actors.rst | 2 +- .../remote/netty/NettyRemoteSupport.scala | 24 ++++----- .../src/main/scala/akka/agent/Agent.scala | 6 +-- 15 files changed, 74 insertions(+), 74 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 5b9f155d75..f21f49b023 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach } import akka.actor.TypedActor._ import akka.japi.{ Option ⇒ JOption } import akka.util.Duration -import akka.dispatch.{ Dispatchers, Future, AlreadyCompletedFuture } +import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.routing.CyclicIterator object TypedActorSpec { @@ -43,7 +43,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = new AlreadyCompletedFuture(Right(pigdog)) + def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) futurePigdog @@ -51,7 +51,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - new AlreadyCompletedFuture(Right(pigdog + numbered)) + new KeptPromise(Right(pigdog + numbered)) } def futureComposePigdogFrom(foo: Foo): Future[String] = diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 773a68ef89..e71ca14721 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -68,7 +68,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte //CANDIDATE FOR TESTKIT def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = { - val result = new DefaultCompletableFuture[T](within.length, within.unit) + val result = new DefaultPromise[T](within.length, within.unit) val t = new Thread(new Runnable { def run = try { result.completeWithResult(fun) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 61fbf56be9..f41f2b46ae 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -208,7 +208,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture + def getSenderFuture: Option[Promise[Any]] = senderFuture /** * Is the actor being restarted? @@ -482,7 +482,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + senderFuture: Option[Promise[T]]): Promise[T] protected[akka] def actorInstance: AtomicReference[Actor] @@ -698,10 +698,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + senderFuture: Option[Promise[T]]): Promise[T] = { + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout)) dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]]) future.get } @@ -1020,7 +1020,7 @@ private[akka] case class RemoteActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + senderFuture: Option[Promise[T]]): Promise[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, remoteAddress, timeout, false, this, loader) @@ -1155,7 +1155,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture(): Option[CompletableFuture[Any]] = { + def senderFuture(): Option[Promise[Any]] = { val msg = currentMessage if (msg eq null) None else msg.senderFuture diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3ffdc0e1a6..4ebe2b7f05 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -56,7 +56,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { - val futureResult = new DefaultCompletableFuture[T](timeout) + val futureResult = new DefaultPromise[T](timeout) val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) for (f ← futures) f onComplete completeFirst @@ -83,9 +83,9 @@ object Futures { */ def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { if (futures.isEmpty) { - new AlreadyCompletedFuture[R](Right(zero)) + new KeptPromise[R](Right(zero)) } else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val results = new ConcurrentLinkedQueue[T]() val allDone = futures.size @@ -135,9 +135,9 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = { if (futures.isEmpty) - new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold @@ -202,7 +202,7 @@ object Futures { * in parallel. * * def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - * in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + * in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => * val fb = fn(a.asInstanceOf[A]) * for (r <- fr; b <-fb) yield (r += b) * }.map(_.result) @@ -230,7 +230,7 @@ object Future { /** * Create an empty Future with default timeout */ - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) + def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -240,7 +240,7 @@ object Future { * Useful for reducing many Futures into a single Future. */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. @@ -251,7 +251,7 @@ object Future { * */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) @@ -267,14 +267,14 @@ object Future { * * This allows working with Futures in an imperative style without blocking for each result. * - * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the + * Completing a Future using 'Promise << Future' will also suspend execution until the * value of the other Future is available. * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ val opte = f.exception if (opte.isDefined) future completeWithException (opte.get) } @@ -417,7 +417,7 @@ sealed trait Future[+T] { * */ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val v = ft.value.get fa complete { @@ -450,7 +450,7 @@ sealed trait Future[+T] { * */ final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception fa complete { @@ -482,7 +482,7 @@ sealed trait Future[+T] { * */ final def map[A](f: T ⇒ A): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -518,7 +518,7 @@ sealed trait Future[+T] { * */ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -546,7 +546,7 @@ sealed trait Future[+T] { } final def filter(p: Any ⇒ Boolean): Future[Any] = { - val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) + val f = new DefaultPromise[T](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -596,16 +596,16 @@ sealed trait Future[+T] { object Promise { - def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) + def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout) - def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) + def apply[A](): Promise[A] = apply(Actor.TIMEOUT) } /** * Essentially this is the Promise (or write-side) of a Future (read-side). */ -trait CompletableFuture[T] extends Future[T] { +trait Promise[T] extends Future[T] { /** * Completes this Future with the specified result, if not already completed. * @return this @@ -637,7 +637,7 @@ trait CompletableFuture[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) + val fr = new DefaultPromise[Any](Actor.TIMEOUT) this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -655,7 +655,7 @@ trait CompletableFuture[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { +class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def this() = this(0, MILLIS) @@ -722,7 +722,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired @@ -764,7 +764,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com this } - def onComplete(func: Future[T] ⇒ Unit): CompletableFuture[T] = { + def onComplete(func: Future[T] ⇒ Unit): Promise[T] = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -800,10 +800,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { +sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def complete(value: Either[Throwable, T]): Promise[T] = this def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f64c8109cb..f5cda388c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -19,7 +19,7 @@ import akka.actor._ final case class MessageInvocation(receiver: ActorRef, message: Any, sender: Option[ActorRef], - senderFuture: Option[CompletableFuture[Any]]) { + senderFuture: Option[Promise[Any]]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") def invoke() { @@ -32,7 +32,7 @@ final case class MessageInvocation(receiver: ActorRef, } } -final case class FutureInvocation[T](future: CompletableFuture[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { +final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { def run() { future complete (try { Right(function()) @@ -99,7 +99,7 @@ trait MessageDispatcher { private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = { futures.getAndIncrement() try { - val future = new DefaultCompletableFuture[T](timeout) + val future = new DefaultPromise[T](timeout) if (active.isOff) guard withGuard { diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 13311138a9..91478c8eb2 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -7,7 +7,7 @@ package akka.remoteinterface import akka.japi.Creator import akka.actor._ import akka.util._ -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import akka.serialization._ import akka.AkkaException @@ -300,10 +300,10 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒ protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] + loader: Option[ClassLoader]): Option[Promise[T]] } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3efdb67a63..e47973984c 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,7 +4,7 @@ package akka.util -import akka.dispatch.{ Future, CompletableFuture, MessageInvocation } +import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index c8ac1012f4..7776e8f1d5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -8,7 +8,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.event.EventHandler -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -42,8 +42,8 @@ class ClusterActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = - route[T](message, timeout)(senderOption).asInstanceOf[CompletableFuture[T]] + senderFuture: Option[Promise[T]]): Promise[T] = + route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { addresses set (addresses.get map { diff --git a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala index cf4bff9859..4b075c7f91 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala @@ -86,7 +86,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String) message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) + senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance protected[akka] def supervisor_=(sup: Option[ActorRef]) { actorRef.supervisor_=(sup) diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index b6e30fca1c..f5c96250b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -13,7 +13,7 @@ import akka.config._ import Config._ import akka.util._ import akka.event.EventHandler -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture } +import akka.dispatch.{ DefaultPromise, Promise } import akka.AkkaException import akka.cluster.zookeeper._ @@ -140,7 +140,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { - val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout) + val future = new DefaultPromise[Vector[Array[Byte]]](timeout) ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { @@ -149,7 +149,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]] + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] var entries = Vector[Array[Byte]]() while (enumeration.hasMoreElements) { entries = entries :+ enumeration.nextElement.getEntry @@ -362,7 +362,7 @@ object TransactionLog { if (zkClient.exists(txLogPath)) throw new ReplicationException( "Transaction log for UUID [" + id + "] already exists") - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) if (isAsync) { bookieClient.asyncCreateLedger( ensembleSize, quorumSize, digestType, password, @@ -371,7 +371,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -422,7 +422,7 @@ object TransactionLog { val ledger = try { if (isAsync) { - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { @@ -430,7 +430,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -447,7 +447,7 @@ object TransactionLog { TransactionLog(ledger, id, isAsync) } - private[akka] def await[T](future: CompletableFuture[T]): T = { + private[akka] def await[T](future: Promise[T]): T = { future.await if (future.result.isDefined) future.result.get else if (future.exception.isDefined) handleError(future.exception.get) diff --git a/akka-docs/java/dataflow.rst b/akka-docs/java/dataflow.rst index 52437647a5..901fdb2e38 100644 --- a/akka-docs/java/dataflow.rst +++ b/akka-docs/java/dataflow.rst @@ -8,7 +8,7 @@ Dataflow Concurrency (Java) Introduction ------------ -**IMPORTANT: As of Akka 1.1, Akka Future, CompletableFuture and DefaultCompletableFuture have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** +**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.** Akka implements `Oz-style dataflow concurrency `_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads. diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index c03dd8e50c..ddeedbe829 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -308,9 +308,9 @@ Reply using the sender future If a message was sent with the 'sendRequestReply' or 'sendRequestReplyFuture' methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the 'reply' method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. +The reference to the Future resides in the 'ActorRef' instance and can be retrieved using 'Option getSenderFuture()'. -CompletableFuture is a future with methods for 'completing the future: +Promise is a future with methods for 'completing the future: * completeWithResult(..) * completeWithException(..) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index bd550b807b..38076d7b58 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -331,7 +331,7 @@ Reply using the sender future If a message was sent with the ``!!`` or ``!!!`` methods, which both implements request-reply semantics using Future's, then you either have the option of replying using the ``reply`` method as above. This method will then resolve the Future. But you can also get a reference to the Future directly and resolve it yourself or if you would like to store it away to resolve it later, or pass it on to some other Actor to resolve it. -The reference to the Future resides in the ``senderFuture: Option[CompletableFuture[_]]`` member field in the ``ActorRef`` class. +The reference to the Future resides in the ``senderFuture: Option[Promise[_]]`` member field in the ``ActorRef`` class. Here is an example of how it can be used: diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index de04a44a0f..6411104295 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future } +import akka.dispatch.{ DefaultPromise, Promise, Future } import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings } import akka.remote.protocol.RemoteProtocol._ import akka.serialization.RemoteActorSerialization @@ -73,12 +73,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] = + loader: Option[ClassLoader]): Option[Promise[T]] = withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef)) private[akka] def withClientFor[T]( @@ -154,7 +154,7 @@ abstract class RemoteClient private[akka] ( remoteAddress.getAddress.getHostAddress + "::" + remoteAddress.getPort - protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]] + protected val futures = new ConcurrentHashMap[Uuid, Promise[_]] protected val pendingRequests = { if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)] else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity) @@ -191,11 +191,11 @@ abstract class RemoteClient private[akka] ( def send[T]( message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, - actorRef: ActorRef): Option[CompletableFuture[T]] = + actorRef: ActorRef): Option[Promise[T]] = send(createRemoteMessageProtocolBuilder( Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build, senderFuture) @@ -205,7 +205,7 @@ abstract class RemoteClient private[akka] ( */ def send[T]( request: RemoteMessageProtocol, - senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { + senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { if (request.getOneWay) { try { @@ -227,7 +227,7 @@ abstract class RemoteClient private[akka] ( None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get - else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) + else new DefaultPromise[T](request.getActorInfo.getTimeout) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails @@ -410,7 +410,7 @@ class ActiveRemoteClient private[akka] ( */ class ActiveRemoteClientPipelineFactory( name: String, - futures: ConcurrentMap[Uuid, CompletableFuture[_]], + futures: ConcurrentMap[Uuid, Promise[_]], bootstrap: ClientBootstrap, remoteAddress: InetSocketAddress, timer: HashedWheelTimer, @@ -439,7 +439,7 @@ class ActiveRemoteClientPipelineFactory( @ChannelHandler.Sharable class ActiveRemoteClientHandler( val name: String, - val futures: ConcurrentMap[Uuid, CompletableFuture[_]], + val futures: ConcurrentMap[Uuid, Promise[_]], val bootstrap: ClientBootstrap, val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, @@ -457,7 +457,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ val reply = arp.getMessage val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) - val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] + val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]] if (reply.hasMessage) { if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist") @@ -891,7 +891,7 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout). + Some(new DefaultPromise[Any](request.getActorInfo.getTimeout). onComplete(_.value.get match { case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) case r: Right[Throwable, Any] ⇒ diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index b556b90e50..dfa3cfd6b8 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -7,7 +7,7 @@ package akka.agent import akka.stm._ import akka.actor.Actor import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } -import akka.dispatch.{ DefaultCompletableFuture, Dispatchers, Future } +import akka.dispatch.{ DefaultPromise, Dispatchers, Future } /** * Used internally to send functions. @@ -122,7 +122,7 @@ class Agent[T](initialValue: T) { def alter(f: T ⇒ T)(timeout: Long): Future[T] = { def dispatch = updater.!!!(Update(f), timeout) if (Stm.activeTransaction) { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) get //Join xa deferred { result completeWith dispatch @@ -164,7 +164,7 @@ class Agent[T](initialValue: T) { * still be executed in order. */ def alterOff(f: T ⇒ T)(timeout: Long): Future[T] = { - val result = new DefaultCompletableFuture[T](timeout) + val result = new DefaultPromise[T](timeout) send((value: T) ⇒ { suspend val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this)).start() From cf0970d27719c018ddfba8856c16c94ebe1b37b8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 11:54:41 +0200 Subject: [PATCH 03/12] Removing duplicate code for TypedActor --- .../src/main/scala/akka/actor/TypedActor.scala | 15 +++++---------- .../src/main/scala/akka/dispatch/Future.scala | 15 ++++++--------- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 839474a3ba..f09e070c9c 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -42,18 +42,13 @@ object TypedActor { case m if m.isOneWay ⇒ actor ! m null - case m if m.returnsJOption_? ⇒ - (actor !!! m).as[JOption[Any]] match { - case Some(null) | None ⇒ JOption.none[Any] - case Some(joption) ⇒ joption - } - case m if m.returnsOption_? ⇒ - (actor !!! m).as[AnyRef] match { - case Some(null) | None ⇒ None - case Some(option) ⇒ option - } case m if m.returnsFuture_? ⇒ actor !!! m + case m if m.returnsJOption_? || m.returnsOption_? ⇒ + (actor !!! m).as[AnyRef] match { + case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None + case Some(joption) ⇒ joption + } case m ⇒ (actor !!! m).get } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 4ebe2b7f05..d235d7f46e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -20,8 +20,6 @@ import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec -import scala.collection.generic.CanBuildFrom -import scala.collection.mutable.Builder import scala.collection.mutable.Stack class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -280,10 +278,6 @@ object Future { } future } - - private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { - override def initialValue = None - } } sealed trait Future[+T] { @@ -600,6 +594,9 @@ object Promise { def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { + override def initialValue = None + } } /** @@ -746,7 +743,7 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { } } - val pending = Future.callbacksPendingExecution.get + val pending = Promise.callbacksPendingExecution.get if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) pending.get.push(() ⇒ { // Linearize/aggregate callbacks at top level and then execute val doNotify = notifyCompleted _ //Hoist closure to avoid garbage @@ -755,9 +752,9 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { } else { try { val callbacks = Stack[() ⇒ Unit]() // Allocate new aggregator for pending callbacks - Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator + Promise.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated - } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup + } finally { Promise.callbacksPendingExecution.set(None) } // Ensure cleanup } } From 3b8c39582a07adf8b74a2ed3513b98bee8c71c50 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 11:59:17 +0200 Subject: [PATCH 04/12] Adding assertions to ensure that the registry doesnt include the actor after stop --- .../src/test/scala/akka/misc/ActorRegistrySpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 6214c84da3..7f264bd3de 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -41,6 +41,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actor2.get.address === actor1.address) assert(actor2.get.address === "test-actor-1") actor2.get.stop + assert(Actor.registry.actorFor(actor1.address).isEmpty) } @Test @@ -54,6 +55,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actorOrNone.get.uuid === uuid) assert(actorOrNone.get.address === "test-actor-1") actor.stop + assert(Actor.registry.local.actorFor(uuid).isEmpty) } @Test From 19cf26b6a922d0dac3ab584cf169496a1b6bba7b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 12:08:38 +0200 Subject: [PATCH 05/12] Rewriting one of the tests in ActorRegistrySpec not to use non-volatile global state for verification --- .../scala/akka/misc/ActorRegistrySpec.scala | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 7f264bd3de..df6184c292 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -3,15 +3,14 @@ package akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import java.util.concurrent.{ CyclicBarrier, TimeUnit, CountDownLatch } import org.scalatest.Assertions._ +import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch } +import akka.dispatch.Future object ActorRegistrySpec { - var record = "" class TestActor extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -19,10 +18,8 @@ object ActorRegistrySpec { class TestActor2 extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") case "ping2" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -73,10 +70,8 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetAllActorsFromLocalActorRegistry { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) @@ -90,13 +85,15 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start - record = "" - Actor.registry.local.foreach(actor ⇒ actor !! "ping") - assert(record === "pongpong") + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start + val results = new ConcurrentLinkedQueue[Future[String]] + + Actor.registry.local.foreach(actor ⇒ results.add(actor.!!![String]("ping"))) + + assert(results.size === 2) + val i = results.iterator + while (i.hasNext) assert(i.next.get === "got ping") actor1.stop() actor2.stop() } From 1f5a04c678e12c09241c3866290d736cc4f44d2e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 13:50:58 +0200 Subject: [PATCH 06/12] Adding support for customization of the TypedActor impl to be used when creating a new TypedActor, internal only, intended for things like ActorPool etc --- .../main/scala/akka/actor/TypedActor.scala | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index f09e070c9c..52a6cf7622 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -15,20 +15,23 @@ object TypedActor { private val selfReference = new ThreadLocal[AnyRef] def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] - class TypedActor[TI <: AnyRef](proxyRef: AtomicReference[AnyRef], createInstance: ⇒ TI) extends Actor { - val me = createInstance - def receive = { + trait TypedActor[Iface <: AnyRef, Impl <: Iface] { self: Actor ⇒ + val proxyRef: AtomicReference[Iface] + def callMethod(methodCall: MethodCall): Unit + def receive: Receive = { case m: MethodCall ⇒ selfReference set proxyRef.get - try { - m match { - case m if m.isOneWay ⇒ m(me) - case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - case m ⇒ self reply m(me) - } - } finally { - selfReference set null - } + try { callMethod(m) } finally { selfReference set null } + } + } + + class DefaultTypedActor[Iface <: AnyRef, Impl <: Iface]( + val proxyRef: AtomicReference[Iface], createInstance: ⇒ Impl) extends TypedActor[Iface, Impl] with Actor { + val me = createInstance + def callMethod(methodCall: MethodCall): Unit = methodCall match { + case m if m.isOneWay ⇒ m(me) + case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] + case m ⇒ self reply m(me) } } @@ -98,12 +101,15 @@ object TypedActor { newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) } - protected def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[AnyRef](null) - configureAndProxyLocalActorRef[T](interfaces, proxyRef, actorOf(new TypedActor[T](proxyRef, constructor)), config, loader) + private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + newTypedActor[R, T](interfaces, (ref: AtomicReference[R]) ⇒ new DefaultTypedActor[R, T](ref, constructor), config, loader) + + private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ TypedActor[R, T], config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomicReference[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef).asInstanceOf[Actor]), config, loader) } - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[AnyRef], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { actor.timeout = config.timeout.toMillis actor.dispatcher = config.dispatcher From e3208251379fc5089adfd21161b351e727aec395 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 17:08:45 +0200 Subject: [PATCH 07/12] Added some API to be able to wrap interfaces on top of Actors, solving the ActorPool for TypedActor dilemma, closing ticket #724 --- .../test/scala/akka/routing/RoutingSpec.scala | 39 ++++++++- .../main/scala/akka/actor/TypedActor.scala | 86 ++++++++++--------- 2 files changed, 85 insertions(+), 40 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fe490fdd0a..0fa5b8e017 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -7,11 +7,25 @@ import akka.testing._ import akka.testing.Testing.{ sleepFor, testMillis } import akka.util.duration._ -import akka.actor.Actor import akka.actor.Actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.{ KeptPromise, Future } +import akka.actor.{ TypedActor, Actor } + +object RoutingSpec { + trait Foo { + def sq(x: Int, sleep: Long): Future[Int] + } + + class FooImpl extends Foo { + def sq(x: Int, sleep: Long): Future[Int] = { + if (sleep > 0) Thread.sleep(sleep) + new KeptPromise(Right(x * x)) + } + } +} class RoutingSpec extends WordSpec with MustMatchers { import Routing._ @@ -491,6 +505,29 @@ class RoutingSpec extends WordSpec with MustMatchers { pool.stop() } + + "support typed actors" in { + import RoutingSpec._ + import TypedActor._ + def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) + def receive = _route + } + + val pool = createProxy[Foo](createPool) + + val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) + + for ((i, r) ← results) r.get must equal(i * i) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 52a6cf7622..e3c6c8e9e1 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -13,26 +13,24 @@ import java.util.concurrent.atomic.AtomicReference object TypedActor { private val selfReference = new ThreadLocal[AnyRef] - def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] - trait TypedActor[Iface <: AnyRef, Impl <: Iface] { self: Actor ⇒ - val proxyRef: AtomicReference[Iface] - def callMethod(methodCall: MethodCall): Unit - def receive: Receive = { - case m: MethodCall ⇒ - selfReference set proxyRef.get - try { callMethod(m) } finally { selfReference set null } - } + def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match { + case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!") + case some ⇒ some } - class DefaultTypedActor[Iface <: AnyRef, Impl <: Iface]( - val proxyRef: AtomicReference[Iface], createInstance: ⇒ Impl) extends TypedActor[Iface, Impl] with Actor { + class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor { val me = createInstance def callMethod(methodCall: MethodCall): Unit = methodCall match { case m if m.isOneWay ⇒ m(me) case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] case m ⇒ self reply m(me) } + def receive = { + case m: MethodCall ⇒ + selfReference set proxyRef.get + try { callMethod(m) } finally { selfReference set null } + } } case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { @@ -81,42 +79,24 @@ object TypedActor { private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues) } - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.newInstance, config, loader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.create, config, loader) def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R = - newTypedActor(impl.getInterfaces, impl.newInstance, config, loader) + createProxyAndTypedActor(impl, impl.newInstance, config, loader) def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) - } - - private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = - newTypedActor[R, T](interfaces, (ref: AtomicReference[R]) ⇒ new DefaultTypedActor[R, T](ref, constructor), config, loader) - - private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ TypedActor[R, T], config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[R] - configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef).asInstanceOf[Actor]), config, loader) - } - - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { - actor.timeout = config.timeout.toMillis - actor.dispatcher = config.dispatcher - - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] - proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop - proxy + createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) } def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match { @@ -134,4 +114,32 @@ object TypedActor { } def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null + + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + + def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = + createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = + createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomicReference[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader) + } + + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { + actor.timeout = config.timeout.toMillis + actor.dispatcher = config.dispatcher + + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] + proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + proxy + } + + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = + if (clazz.isInterface) Array[Class[_]](clazz) + else clazz.getInterfaces } \ No newline at end of file From 39481f0a5fd5b284b907a0320e81e6c7f7d9255d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 18:10:57 +0200 Subject: [PATCH 08/12] Adding a test to verify usage of TypedActor.self outside of a TypedActor --- .../src/test/scala/akka/actor/actor/TypedActorSpec.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index f21f49b023..311523ce96 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -132,6 +132,12 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach stop(null) must be(false) } + "throw an IllegalStateExcpetion when TypedActor.self is called in the wrong scope" in { + (intercept[IllegalStateException] { + TypedActor.self[Foo] + }).getMessage must equal("Calling TypedActor.self outside of a TypedActor implementation method!") + } + "have access to itself when executing a method call" in { val t = newFooBar t.self must be(t) From 556ee4b5e981d136793f96b8541a24049f87024e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 May 2011 18:17:31 +0200 Subject: [PATCH 09/12] Added a default configuration object to avoid object allocation for the trivial case --- akka-actor/src/main/scala/akka/actor/TypedActor.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index e3c6c8e9e1..b7c6b45422 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -56,7 +56,12 @@ object TypedActor { } } - case class Configuration(timeout: Duration = Duration(Actor.TIMEOUT, "millis"), dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) + object Configuration { + val defaultTimeout = Duration(Actor.TIMEOUT, "millis") + val defaultConfiguration = new Configuration(defaultTimeout, Dispatchers.defaultGlobalDispatcher) + def apply(): Configuration = defaultConfiguration + } + case class Configuration(timeout: Duration = Configuration.defaultTimeout, dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) case class MethodCall(method: Method, parameters: Array[AnyRef]) { def isOneWay = method.getReturnType == java.lang.Void.TYPE From 27e9d71f769dc75e3a3bb5b4bde8fc89c240de90 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Mon, 23 May 2011 20:03:40 +0300 Subject: [PATCH 10/12] - initial checkin of the storage functionality --- .../main/scala/akka/cluster/RawStorage.scala | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 akka-cluster/src/main/scala/akka/cluster/RawStorage.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala new file mode 100644 index 0000000000..906547c437 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala @@ -0,0 +1,110 @@ +package akka.cluster + +import zookeeper.AkkaZkClient +import java.lang.UnsupportedOperationException +import akka.AkkaException +import org.apache.zookeeper.{KeeperException, CreateMode} +import org.apache.zookeeper.data.Stat +import scala.Some + +/** + * Simple abstraction to store an Array of bytes based on some String key. + * + * Nothing is being said about ACID, transactions etc. + * + * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. + * (This is supported by ZooKeeper). + * TODO: Class is up for better names. + * TODO: Instead of a String as key, perhaps also a byte-array. + */ +trait RawStorage { + + /** + * Inserts a byte-array based on some key. + * + * TODO: What happens when given key already exists + */ + def insert(key: String, bytes: Array[Byte]): Unit + + /** + * Stores a array of bytes based on some key. + * + * TODO: What happens when the given key doesn't exist yet + */ + def update(key: String, bytes: Array[Byte]): Unit + + /** + * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. + */ + def load(key: String): Option[Array[Byte]] +} + +/** + * An AkkaException thrown by the RawStorage module. + */ +class RawStorageException(msg: String = null, cause: Throwable = null) extends AkkaException(msg, cause) + +/*** + * A RawStorageException thrown when an operation is done on a non existing node. + */ +class MissingNodeException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) + +/** + * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. + */ +class NodeExistsException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) + + +/** + * A RawStorage implementation based on ZooKeeper. + * + * The store method is atomic: + * - so everything is written or nothing is written + * - is isolated, so threadsafe, + * but it will not participate in any transactions. + * //todo: unclear, is only a single connection used in the JVM?? + + */ +class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { + + override def insert(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); + } catch { + case e: KeeperException.NodeExistsException => throw new NodeExistsException(e) + case e: KeeperException => throw new RawStorageException(e) + } + } + + override def load(key: String) = try { + Some(zkClient.connection.readData(key, new Stat, false)) + } catch { + case e: KeeperException.NoNodeException => None + case e: KeeperException => throw new RawStorageException(e) + } + + override def update(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.writeData(key, bytes) + } catch { + case e: KeeperException.NoNodeException => throw new MissingNodeException(e) + case e: KeeperException => throw new RawStorageException(e) + } + } +} + + +class VoldemortRawStorage extends RawStorage { + + def load(Key: String) = { + throw new UnsupportedOperationException() + } + + override def insert(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } + + def update(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } +} \ No newline at end of file From e84a7cb5ead86d15127abb5d6488fba57046114f Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Mon, 23 May 2011 21:08:59 +0300 Subject: [PATCH 11/12] - disabled the functionality for the rawstorage, will inspect it locally. But at least the build will be running again. --- .../main/scala/akka/cluster/RawStorage.scala | 207 +++++++++--------- 1 file changed, 104 insertions(+), 103 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala index 906547c437..ca8f0df938 100644 --- a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala +++ b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala @@ -3,108 +3,109 @@ package akka.cluster import zookeeper.AkkaZkClient import java.lang.UnsupportedOperationException import akka.AkkaException -import org.apache.zookeeper.{KeeperException, CreateMode} +import org.apache.zookeeper.{ KeeperException, CreateMode } import org.apache.zookeeper.data.Stat import scala.Some - -/** - * Simple abstraction to store an Array of bytes based on some String key. - * - * Nothing is being said about ACID, transactions etc. - * - * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. - * (This is supported by ZooKeeper). - * TODO: Class is up for better names. - * TODO: Instead of a String as key, perhaps also a byte-array. - */ -trait RawStorage { - - /** - * Inserts a byte-array based on some key. - * - * TODO: What happens when given key already exists - */ - def insert(key: String, bytes: Array[Byte]): Unit - - /** - * Stores a array of bytes based on some key. - * - * TODO: What happens when the given key doesn't exist yet - */ - def update(key: String, bytes: Array[Byte]): Unit - - /** - * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. - */ - def load(key: String): Option[Array[Byte]] -} - -/** - * An AkkaException thrown by the RawStorage module. - */ -class RawStorageException(msg: String = null, cause: Throwable = null) extends AkkaException(msg, cause) - -/*** - * A RawStorageException thrown when an operation is done on a non existing node. - */ -class MissingNodeException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) - -/** - * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. - */ -class NodeExistsException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) - - -/** - * A RawStorage implementation based on ZooKeeper. - * - * The store method is atomic: - * - so everything is written or nothing is written - * - is isolated, so threadsafe, - * but it will not participate in any transactions. - * //todo: unclear, is only a single connection used in the JVM?? - - */ -class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { - - override def insert(key: String, bytes: Array[Byte]) { - try { - zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); - } catch { - case e: KeeperException.NodeExistsException => throw new NodeExistsException(e) - case e: KeeperException => throw new RawStorageException(e) - } - } - - override def load(key: String) = try { - Some(zkClient.connection.readData(key, new Stat, false)) - } catch { - case e: KeeperException.NoNodeException => None - case e: KeeperException => throw new RawStorageException(e) - } - - override def update(key: String, bytes: Array[Byte]) { - try { - zkClient.connection.writeData(key, bytes) - } catch { - case e: KeeperException.NoNodeException => throw new MissingNodeException(e) - case e: KeeperException => throw new RawStorageException(e) - } - } -} - - -class VoldemortRawStorage extends RawStorage { - - def load(Key: String) = { - throw new UnsupportedOperationException() - } - - override def insert(key: String, bytes: Array[Byte]) { - throw new UnsupportedOperationException() - } - - def update(key: String, bytes: Array[Byte]) { - throw new UnsupportedOperationException() - } -} \ No newline at end of file +// +///** +// * Simple abstraction to store an Array of bytes based on some String key. +// * +// * Nothing is being said about ACID, transactions etc. It depends on the implementation +// * of this Storage interface of what is and isn't done on the lowest level. +// * +// * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. +// * (This is supported by ZooKeeper). +// * TODO: Class is up for better names. +// * TODO: Instead of a String as key, perhaps also a byte-array. +// */ +//trait RawStorage { +// +// /** +// * Inserts a byte-array based on some key. +// * +// * TODO: What happens when given key already exists +// * @throws NodeExistsException +// */ +// def insert(key: String, bytes: Array[Byte]): Unit +// +// /** +// * Stores a array of bytes based on some key. +// * +// * TODO: What happens when the given key doesn't exist yet +// */ +// def update(key: String, bytes: Array[Byte]): Unit +// +// /** +// * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. +// */ +// def load(key: String): Option[Array[Byte]] +//} +// +///** +// * An AkkaException thrown by the RawStorage module. +// */ +//class RawStorageException(msg: String = null, cause: Throwable = null) extends AkkaException(msg, cause) +// +///** +// * * +// * A RawStorageException thrown when an operation is done on a non existing node. +// */ +//class MissingNodeException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) +// +///** +// * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. +// */ +//class NodeExistsException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) +// +///** +// * A RawStorage implementation based on ZooKeeper. +// * +// * The store method is atomic: +// * - so everything is written or nothing is written +// * - is isolated, so threadsafe, +// * but it will not participate in any transactions. +// * //todo: unclear, is only a single connection used in the JVM?? +// * +// */ +//class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { +// +// override def insert(key: String, bytes: Array[Byte]) { +// try { +// zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); +// } catch { +// case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException(e) +// case e: KeeperException ⇒ throw new RawStorageException(e) +// } +// } +// +// override def load(key: String) = try { +// Some(zkClient.connection.readData(key, new Stat, false)) +// } catch { +// case e: KeeperException.NoNodeException ⇒ None +// case e: KeeperException ⇒ throw new RawStorageException(e) +// } +// +// override def update(key: String, bytes: Array[Byte]) { +// try { +// zkClient.connection.writeData(key, bytes) +// } catch { +// case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException(e) +// case e: KeeperException ⇒ throw new RawStorageException(e) +// } +// } +//} +// +//class VoldemortRawStorage extends RawStorage { +// +// def load(Key: String) = { +// throw new UnsupportedOperationException() +// } +// +// override def insert(key: String, bytes: Array[Byte]) { +// throw new UnsupportedOperationException() +// } +// +// def update(key: String, bytes: Array[Byte]) { +// throw new UnsupportedOperationException() +// } +//} \ No newline at end of file From 694134090ddc5f0e87f913440c649bf98b75bb16 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Mon, 23 May 2011 21:17:33 +0300 Subject: [PATCH 12/12] - fixed the type problems. This time with a compile (since no tests currently are available for this code). --- .../main/scala/akka/cluster/RawStorage.scala | 206 +++++++++--------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala index ca8f0df938..da7a6a07b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala +++ b/akka-cluster/src/main/scala/akka/cluster/RawStorage.scala @@ -6,106 +6,106 @@ import akka.AkkaException import org.apache.zookeeper.{ KeeperException, CreateMode } import org.apache.zookeeper.data.Stat import scala.Some -// -///** -// * Simple abstraction to store an Array of bytes based on some String key. -// * -// * Nothing is being said about ACID, transactions etc. It depends on the implementation -// * of this Storage interface of what is and isn't done on the lowest level. -// * -// * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. -// * (This is supported by ZooKeeper). -// * TODO: Class is up for better names. -// * TODO: Instead of a String as key, perhaps also a byte-array. -// */ -//trait RawStorage { -// -// /** -// * Inserts a byte-array based on some key. -// * -// * TODO: What happens when given key already exists -// * @throws NodeExistsException -// */ -// def insert(key: String, bytes: Array[Byte]): Unit -// -// /** -// * Stores a array of bytes based on some key. -// * -// * TODO: What happens when the given key doesn't exist yet -// */ -// def update(key: String, bytes: Array[Byte]): Unit -// -// /** -// * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. -// */ -// def load(key: String): Option[Array[Byte]] -//} -// -///** -// * An AkkaException thrown by the RawStorage module. -// */ -//class RawStorageException(msg: String = null, cause: Throwable = null) extends AkkaException(msg, cause) -// -///** -// * * -// * A RawStorageException thrown when an operation is done on a non existing node. -// */ -//class MissingNodeException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) -// -///** -// * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. -// */ -//class NodeExistsException(msg: String = null, cause: Throwable = null) extends RawStorageException(msg, cause) -// -///** -// * A RawStorage implementation based on ZooKeeper. -// * -// * The store method is atomic: -// * - so everything is written or nothing is written -// * - is isolated, so threadsafe, -// * but it will not participate in any transactions. -// * //todo: unclear, is only a single connection used in the JVM?? -// * -// */ -//class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { -// -// override def insert(key: String, bytes: Array[Byte]) { -// try { -// zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); -// } catch { -// case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException(e) -// case e: KeeperException ⇒ throw new RawStorageException(e) -// } -// } -// -// override def load(key: String) = try { -// Some(zkClient.connection.readData(key, new Stat, false)) -// } catch { -// case e: KeeperException.NoNodeException ⇒ None -// case e: KeeperException ⇒ throw new RawStorageException(e) -// } -// -// override def update(key: String, bytes: Array[Byte]) { -// try { -// zkClient.connection.writeData(key, bytes) -// } catch { -// case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException(e) -// case e: KeeperException ⇒ throw new RawStorageException(e) -// } -// } -//} -// -//class VoldemortRawStorage extends RawStorage { -// -// def load(Key: String) = { -// throw new UnsupportedOperationException() -// } -// -// override def insert(key: String, bytes: Array[Byte]) { -// throw new UnsupportedOperationException() -// } -// -// def update(key: String, bytes: Array[Byte]) { -// throw new UnsupportedOperationException() -// } -//} \ No newline at end of file + +/** + * Simple abstraction to store an Array of bytes based on some String key. + * + * Nothing is being said about ACID, transactions etc. It depends on the implementation + * of this Storage interface of what is and isn't done on the lowest level. + * + * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. + * (This is supported by ZooKeeper). + * TODO: Class is up for better names. + * TODO: Instead of a String as key, perhaps also a byte-array. + */ +trait RawStorage { + + /** + * Inserts a byte-array based on some key. + * + * TODO: What happens when given key already exists + * @throws NodeExistsException + */ + def insert(key: String, bytes: Array[Byte]): Unit + + /** + * Stores a array of bytes based on some key. + * + * TODO: What happens when the given key doesn't exist yet + */ + def update(key: String, bytes: Array[Byte]): Unit + + /** + * Loads the given entry. If it exists, a 'Some[Array[Byte]]' will be returned, else a None. + */ + def load(key: String): Option[Array[Byte]] +} + +/** + * An AkkaException thrown by the RawStorage module. + */ +class RawStorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause) + +/** + * * + * A RawStorageException thrown when an operation is done on a non existing node. + */ +class MissingNodeException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause) + +/** + * A RawStorageException thrown when an operation is done on an existing node, but no node was expected. + */ +class NodeExistsException(msg: String = null, cause: java.lang.Throwable = null) extends RawStorageException(msg, cause) + +/** + * A RawStorage implementation based on ZooKeeper. + * + * The store method is atomic: + * - so everything is written or nothing is written + * - is isolated, so threadsafe, + * but it will not participate in any transactions. + * //todo: unclear, is only a single connection used in the JVM?? + * + */ +class ZooKeeperRawStorage(zkClient: AkkaZkClient) extends RawStorage { + + override def insert(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); + } catch { + case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException("failed to insert key" + key, e) + case e: KeeperException ⇒ throw new RawStorageException("failed to insert key" + key, e) + } + } + + override def load(key: String) = try { + Some(zkClient.connection.readData(key, new Stat, false)) + } catch { + case e: KeeperException.NoNodeException ⇒ None + case e: KeeperException ⇒ throw new RawStorageException("failed to load key" + key, e) + } + + override def update(key: String, bytes: Array[Byte]) { + try { + zkClient.connection.writeData(key, bytes) + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException("failed to update key", e) + case e: KeeperException ⇒ throw new RawStorageException("failed to update key", e) + } + } +} + +class VoldemortRawStorage extends RawStorage { + + def load(Key: String) = { + throw new UnsupportedOperationException() + } + + override def insert(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } + + def update(key: String, bytes: Array[Byte]) { + throw new UnsupportedOperationException() + } +} \ No newline at end of file