From b0952e5212cc5ecdd18f5bb78f1e98fc1e79fb68 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 2 Jun 2011 13:33:49 -0700 Subject: [PATCH 1/9] Renaming Future.failure to Future.recover --- .../test/scala/akka/dispatch/FutureSpec.scala | 20 +++---- .../src/main/scala/akka/dispatch/Future.scala | 55 +++++++++---------- akka-docs/java/untyped-actors.rst | 1 - akka-docs/scala/futures.rst | 6 +- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index ab3f45e388..1d18e2c60f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -146,30 +146,30 @@ class FutureSpec extends JUnitSuite { val future2 = future1 map (_ / 0) val future3 = future2 map (_.toString) - val future4 = future1 failure { + val future4 = future1 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future5 = future2 failure { + val future5 = future2 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future6 = future2 failure { + val future6 = future2 recover { case e: MatchError ⇒ 0 } map (_.toString) - val future7 = future3 failure { case e: ArithmeticException ⇒ "You got ERROR" } + val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } val actor = actorOf[TestActor].start() val future8 = actor !!! "Failure" - val future9 = actor !!! "Failure" failure { + val future9 = actor !!! "Failure" recover { case e: RuntimeException ⇒ "FAIL!" } - val future10 = actor !!! "Hello" failure { + val future10 = actor !!! "Hello" recover { case e: RuntimeException ⇒ "FAIL!" } - val future11 = actor !!! "Failure" failure { case _ ⇒ "Oops!" } + val future11 = actor !!! "Failure" recover { case _ ⇒ "Oops!" } assert(future1.get === 5) intercept[ArithmeticException] { future2.get } @@ -269,7 +269,7 @@ class FutureSpec extends JUnitSuite { def receiveShouldExecuteOnComplete { val latch = new StandardLatch val actor = actorOf[TestActor].start() - actor !!! "Hello" receive { case "World" ⇒ latch.open } + actor !!! "Hello" onResult { case "World" ⇒ latch.open } assert(latch.tryAwait(5, TimeUnit.SECONDS)) actor.stop() } @@ -304,13 +304,13 @@ class FutureSpec extends JUnitSuite { val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("dispatcher receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open f2.await assert(f2.resultOrException === Some("success")) f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("current thread receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } f3.await assert(f3.resultOrException === Some("SUCCESS")) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 84e4b2f79b..3a25eff65f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -320,14 +320,6 @@ sealed trait Future[+T] { */ def await(atMost: Duration): Future[T] - /** - * Blocks the current thread until the Future has been completed. Use - * caution with this method as it ignores the timeout and will block - * indefinitely if the Future is never completed. - */ - @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1") - def awaitBlocking: Future[T] - /** * Tests whether this Future has been completed. */ @@ -383,17 +375,35 @@ sealed trait Future[+T] { * When the future is completed with a valid result, apply the provided * PartialFunction to the result. *
-   *   val result = future receive {
+   *   val result = future onResult {
    *     case Foo => "foo"
    *     case Bar => "bar"
-   *   }.await.result
+   *   }
    * 
*/ - final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒ + final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒ val optr = f.result if (optr.isDefined) { val r = optr.get - if (pf.isDefinedAt(r)) pf(r) + if (pf isDefinedAt r) pf(r) + } + } + + /** + * When the future is completed with an exception, apply the provided + * PartialFunction to the exception. + *
+   *   val result = future onException {
+   *     case Foo => "foo"
+   *     case Bar => "bar"
+   *   }
+   * 
+ */ + final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒ + val opte = f.exception + if (opte.isDefined) { + val e = opte.get + if (pf isDefinedAt e) pf(e) } } @@ -439,12 +449,12 @@ sealed trait Future[+T] { * a valid result then the new Future will contain the same. * Example: *
-   * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
-   * Future(6 / 0) failure { case e: NotFoundException   => 0 } // result: exception
-   * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
+   * Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
+   * Future(6 / 0) recover { case e: NotFoundException   => 0 } // result: exception
+   * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
    * 
*/ - final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception @@ -708,18 +718,6 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } - def awaitBlocking = { - _lock.lock - try { - while (_value.isEmpty) { - _signal.await - } - this - } finally { - _lock.unlock - } - } - def isExpired: Boolean = timeLeft() <= 0 def value: Option[Either[Throwable, T]] = { @@ -816,7 +814,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this - def awaitBlocking: Future[T] = this def isExpired: Boolean = true def timeoutInNanos: Long = 0 } diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index ddeedbe829..7d6cec6013 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -170,7 +170,6 @@ The 'Future' interface looks like this: interface Future { void await(); - void awaitBlocking(); boolean isCompleted(); boolean isExpired(); long timeoutInNanos(); diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 0dc492a02a..de58118a4f 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -238,12 +238,12 @@ Exceptions Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``Actor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly. -It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``failure`` method. For example: +It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``recover`` method. For example: .. code-block:: scala - val future = actor !!! msg1 failure { + val future = actor !!! msg1 recover { case e: ArithmeticException => 0 } -In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``failure`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``failure`` method. +In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``recover`` method. From 07eaf0ba4879774aad1ac6bb7ee76e373b2f674b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 2 Jun 2011 14:29:39 -0700 Subject: [PATCH 2/9] Attempt to solve ticket #902 --- .../src/main/scala/akka/actor/Actor.scala | 15 +++++++------ .../src/main/scala/akka/actor/ActorRef.scala | 21 ++++++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2ad30bcd55..8ed782cd61 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -17,6 +17,7 @@ import akka.AkkaException import akka.serialization.{ Format, Serializer } import akka.cluster.ClusterNode import akka.event.EventHandler +import scala.collection.immutable.Stack import scala.reflect.BeanProperty @@ -135,8 +136,8 @@ object Actor extends ListenerManagement { hook } - private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] { - override def initialValue = None + private[actor] val actorRefInCreation = new ThreadLocal[Stack[ActorRef]] { + override def initialValue = Stack[ActorRef]() } /** @@ -505,16 +506,18 @@ trait Actor { */ @transient implicit val someSelf: Some[ActorRef] = { - val optRef = Actor.actorRefInCreation.get - if (optRef.isEmpty) throw new ActorInitializationException( + val refStack = Actor.actorRefInCreation.get + if (refStack.isEmpty) throw new ActorInitializationException( "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + "\n\tEither use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") - Actor.actorRefInCreation.set(None) - optRef.asInstanceOf[Some[ActorRef]] + + val ref = refStack.head + Actor.actorRefInCreation.set(refStack.pop) + Some(ref) } /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bc5efa02b0..e3b866ec97 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -869,13 +869,20 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, // ========= PRIVATE FUNCTIONS ========= private[this] def newActor: Actor = { - try { - Actor.actorRefInCreation.set(Some(this)) - val a = actorFactory() - if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - a - } finally { - Actor.actorRefInCreation.set(None) + import Actor.{ actorRefInCreation ⇒ refStack } + (try { + refStack.set(refStack.get.push(this)) + actorFactory() + } catch { + case e ⇒ + val stack = refStack.get + //Clean up if failed + if ((stack.nonEmpty) && (stack.head eq this)) refStack.set(stack.pop) + //Then rethrow + throw e + }) match { + case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + case valid ⇒ valid } } From f9d0b188af61109bfdfc331ec6006d57d11591c0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 4 Jun 2011 12:42:06 -0700 Subject: [PATCH 3/9] Removing ActorRef.isDefinedAt and Future.empty and moving Future.channel to Promise, renaming future to promise for the channel --- .../test/scala/akka/routing/RoutingSpec.scala | 32 ------------- .../src/main/scala/akka/actor/Actor.scala | 15 ------- .../src/main/scala/akka/actor/ActorRef.scala | 12 +---- .../src/main/scala/akka/dispatch/Future.scala | 45 ++++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 4 -- .../scala/akka/kernel/EmbeddedAppServer.scala | 2 +- .../scala/akka/testkit/TestActorRef.scala | 5 --- .../scala/akka/testkit/TestActorRefSpec.scala | 10 ++--- 8 files changed, 29 insertions(+), 96 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 0fa5b8e017..ddd37f760c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -166,38 +166,6 @@ class RoutingSpec extends WordSpec with MustMatchers { for (a ← List(broadcast, a1, a2, a3)) a.stop() } - - "be defined at" in { - import akka.actor.ActorRef - - val Yes = "yes" - val No = "no" - - def testActor() = actorOf(new Actor() { - def receive = { - case Yes ⇒ "yes" - } - }).start() - - val t1 = testActor() - val t2 = testActor() - val t3 = testActor() - val t4 = testActor() - - val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) - - t1.isDefinedAt(Yes) must be(true) - t1.isDefinedAt(No) must be(false) - t2.isDefinedAt(Yes) must be(true) - t2.isDefinedAt(No) must be(false) - d1.isDefinedAt(Yes) must be(true) - d1.isDefinedAt(No) must be(false) - d2.isDefinedAt(Yes) must be(true) - d2.isDefinedAt(No) must be(false) - - for (a ← List(t1, t2, d1, d2)) a.stop() - } } "Actor Pool" must { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8ed782cd61..1dd597bf89 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -619,21 +619,6 @@ trait Actor { throw new UnhandledMessageException(msg, self) } - /** - * Is the actor able to handle the message passed in as arguments? - */ - def isDefinedAt(message: Any): Boolean = { - val behaviorStack = self.hotswap - message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage ⇒ true - case msg if behaviorStack.nonEmpty && - behaviorStack.head.isDefinedAt(msg) ⇒ true - case msg if behaviorStack.isEmpty && - processingBehavior.isDefinedAt(msg) ⇒ true - case _ ⇒ false - } - } - /** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e3b866ec97..7affe6c104 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -233,12 +233,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED - /** - * Is the actor able to handle the message passed in as arguments? - */ - @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") - def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - /** * Only for internal use. UUID is effectively final. */ @@ -801,11 +795,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, } def tooManyRestarts() { - _supervisor.foreach { sup ⇒ - // can supervisor handle the notification? - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - } + notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) stop() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3a25eff65f..e760f97bb7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -217,20 +217,6 @@ object Future { def apply[T](body: ⇒ T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = dispatcher.dispatchFuture(() ⇒ body, timeout) - /** - * Construct a completable channel - */ - def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { - val future = empty[Any](timeout) - def !(msg: Any) = future completeWithResult msg - } - - /** - * Create an empty Future with default timeout - */ - @deprecated("Superceded by Promise.apply", "1.2") - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) - import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -273,9 +259,11 @@ object Future { */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ - val opte = f.exception - if (opte.isDefined) future completeWithException (opte.get) + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { + _.exception match { + case Some(e) ⇒ future completeWithException e + case None ⇒ + } } future } @@ -616,6 +604,14 @@ object Promise { def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + /** + * Construct a completable channel + */ + def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { + val promise = Promise[Any](timeout) + def !(msg: Any) = promise completeWithResult msg + } + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None } @@ -732,11 +728,16 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { - if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired - _value = Some(value) - val existingListeners = _listeners - _listeners = Nil - existingListeners + if (_value.isEmpty) { //Only complete if we aren't expired + if (!isExpired) { + _value = Some(value) + val existingListeners = _listeners + _listeners = Nil + existingListeners + } else { + _listeners = Nil + Nil + } } else Nil } finally { _signal.signalAll diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 83fd45a5cb..454760594a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -90,8 +90,6 @@ trait LoadBalancer extends Router { self: Actor ⇒ } override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) } /** @@ -106,8 +104,6 @@ abstract class UntypedLoadBalancer extends UntypedRouter { else null override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) } object Routing { diff --git a/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala index 286d2b3b04..471f45213e 100644 --- a/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala @@ -40,7 +40,7 @@ trait EmbeddedAppServer extends Bootable { super.onLoad if (isRestEnabled) { - val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!"))) + val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(sys.error("microkernel-server.xml not found!"))) System.setProperty("jetty.port", REST_PORT.toString) System.setProperty("jetty.host", REST_HOSTNAME) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 7b656278b8..4e9d7b527d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -24,11 +24,6 @@ class TestActorRef[T <: Actor](factory: () ⇒ T, address: String) extends Local dispatcher = CallingThreadDispatcher.global receiveTimeout = None - /** - * Query actor's current receive behavior. - */ - override def isDefinedAt(o: Any) = actor.isDefinedAt(o) - /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index ffe39dee57..5d74db9b34 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.config.Supervision.OneForOneStrategy import akka.event.EventHandler -import akka.dispatch.Future +import akka.dispatch.{ Future, Promise } /** * Test whether TestActorRef behaves as an ActorRef should, besides its own spec. @@ -234,14 +234,12 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac EventHandler.removeListener(log) } - "proxy isDefinedAt/apply for the underlying actor" in { + "proxy apply for the underlying actor" in { val ref = TestActorRef[WorkerActor].start() - ref.isDefinedAt("work") must be(true) - ref.isDefinedAt("sleep") must be(false) intercept[IllegalActorStateException] { ref("work") } - val ch = Future.channel() + val ch = Promise.channel() ref ! ch - val f = ch.future + val f = ch.promise f must be('completed) f.get must be("complexReply") } From 21fe2055f66ff332dcf8bee8423286d5b41ffdad Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 4 Jun 2011 16:00:14 -0700 Subject: [PATCH 4/9] Removing deprecation warnings --- akka-actor/src/main/scala/akka/actor/Deployer.scala | 1 + .../src/test/scala/akka/cluster/ClusterDeployerSpec.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 5943e28117..9da6df072f 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -139,6 +139,7 @@ object DeploymentConfig { case LeastRAM() ⇒ RouterType.LeastRAM case LeastMessages ⇒ RouterType.LeastMessages case LeastMessages() ⇒ RouterType.LeastMessages + case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 3ab9acdb8a..435f20d094 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -47,7 +47,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter val deployments2 = ClusterDeployer.fetchDeploymentsFromCluster deployments2.size must equal(1) - deployments2.first must equal(deployments1.first) + deployments2.head must equal(deployments1.head) } } From 66ad188222a982d88b6cc8da37f36c862519a1de Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 4 Jun 2011 16:01:07 -0700 Subject: [PATCH 5/9] Fixing #648, adding transparent support for Java Serialization of ActorRef (local + remote) --- .../src/main/scala/akka/actor/Actor.scala | 4 ++-- .../src/main/scala/akka/actor/ActorRef.scala | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1dd597bf89..1f14cefbb4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -419,8 +419,8 @@ object Actor extends ListenerManagement { } val isStateful = state match { - case Stateless ⇒ false - case Stateful ⇒ true + case _: Stateless | Stateless ⇒ false + case _: Stateful | Stateful ⇒ true } if (isStateful && isHomeNode) { // stateful actor's home node diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 7affe6c104..54336dd2d2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -679,6 +679,11 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, def supervisor: Option[ActorRef] = _supervisor // ========= AKKA PROTECTED FUNCTIONS ========= + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + val inetaddr = Actor.remote.address + SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout) + } protected[akka] def supervisor_=(sup: Option[ActorRef]) { _supervisor = sup @@ -1028,6 +1033,12 @@ private[akka] case class RemoteActorRef private[akka] ( } // ==== NOT SUPPORTED ==== + + @throws(classOf[java.io.ObjectStreamException]) + private def writeReplace(): AnyRef = { + SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, timeout) + } + @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") def actorClass: Class[_ <: Actor] = unsupported def dispatcher_=(md: MessageDispatcher) { @@ -1248,3 +1259,15 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ } else false } } + +case class SerializedActorRef(val uuid: Uuid, + val address: String, + val hostname: String, + val port: Int, + val timeout: Long) { + @throws(classOf[java.io.ObjectStreamException]) + def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match { + case Some(actor) ⇒ actor + case None ⇒ RemoteActorRef(new InetSocketAddress(hostname, port), address, timeout, None) + } +} From 56a172bbdd2bf32e127e74929050e37fdba49cab Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 4 Jun 2011 16:29:23 -0700 Subject: [PATCH 6/9] Removing residue of old ActorRef interface --- .../src/main/scala/akka/camel/component/ActorComponent.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index f1a643fc1a..d06bdcb70e 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -295,7 +295,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall } def actorClass: Class[_ <: Actor] = unsupported - def actorClassName = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher: MessageDispatcher = unsupported def makeRemote(hostname: String, port: Int): Unit = unsupported From 3e989b5324c3eec81c40ec9a0c3b3f95c342e148 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 5 Jun 2011 16:02:50 -0700 Subject: [PATCH 7/9] Removing AOP stuff from the project --- project/build/AkkaProject.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 7f3b8de710..06a24035eb 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -67,7 +67,6 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec lazy val beanstalkModuleConfig = ModuleConfiguration("beanstalk", AkkaRepo) lazy val lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", "h2-lzf", AkkaRepo) lazy val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo) - lazy val aspectWerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", "aspectwerkz", "2.2.3", AkkaRepo) lazy val objenesisModuleConfig = ModuleConfiguration("org.objenesis", sbt.DefaultMavenRepository) lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) @@ -99,8 +98,6 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec // Compile - lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain - lazy val aspectwerkz = "org.codehaus.aspectwerkz" % "aspectwerkz" % "2.2.3" % "compile" //ApacheV2 lazy val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" //New BSD lazy val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % ZOOKEEPER_VERSION //ApacheV2 lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2 From c6019ce447628258701d1530390dfb188ba4a14d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 5 Jun 2011 16:03:15 -0700 Subject: [PATCH 8/9] Removing pointless guard in ActorRegistry --- akka-actor/src/main/scala/akka/actor/ActorRegistry.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index 383c6d9545..6c9c7a50cf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -12,7 +12,7 @@ import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } import java.util.{ Set ⇒ JSet } import akka.util.ReflectiveAccess._ -import akka.util.{ ReflectiveAccess, ReadWriteGuard, ListenerManagement } +import akka.util.ListenerManagement import akka.serialization._ /** @@ -36,7 +36,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] - private val guard = new ReadWriteGuard val local = new LocalActorRegistry(actorsByAddress, actorsByUuid, typedActorsByUuid) From b600d0cf528b422396d703818d6d291d3004f896 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 5 Jun 2011 16:03:35 -0700 Subject: [PATCH 9/9] Rewriting some serialization hook in Actor --- .../src/main/scala/akka/actor/Actor.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1f14cefbb4..a8aab54a90 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -397,14 +397,15 @@ object Actor extends ListenerManagement { "] for serialization of actor [" + address + "] since " + reason) - val serializer: Serializer = { - if ((serializerClassName eq null) || - (serializerClassName == "") || - (serializerClassName == Format.defaultSerializerName)) { - Format.Default - } else { - val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { - case Right(clazz) ⇒ clazz + val serializer: Serializer = serializerClassName match { + case null | "" | Format.`defaultSerializerName` ⇒ Format.Default + case specialSerializer ⇒ + ReflectiveAccess.getClassFor(specialSerializer) match { + case Right(clazz) ⇒ + clazz.newInstance match { + case s: Serializer ⇒ s + case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") + } case Left(exception) ⇒ val cause = exception match { case i: InvocationTargetException ⇒ i.getTargetException @@ -412,10 +413,6 @@ object Actor extends ListenerManagement { } serializerErrorDueTo(cause.toString) } - val f = clazz.newInstance.asInstanceOf[AnyRef] - if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] - else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") - } } val isStateful = state match {