From b0952e5212cc5ecdd18f5bb78f1e98fc1e79fb68 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 2 Jun 2011 13:33:49 -0700 Subject: [PATCH 1/2] Renaming Future.failure to Future.recover --- .../test/scala/akka/dispatch/FutureSpec.scala | 20 +++---- .../src/main/scala/akka/dispatch/Future.scala | 55 +++++++++---------- akka-docs/java/untyped-actors.rst | 1 - akka-docs/scala/futures.rst | 6 +- 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index ab3f45e388..1d18e2c60f 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -146,30 +146,30 @@ class FutureSpec extends JUnitSuite { val future2 = future1 map (_ / 0) val future3 = future2 map (_.toString) - val future4 = future1 failure { + val future4 = future1 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future5 = future2 failure { + val future5 = future2 recover { case e: ArithmeticException ⇒ 0 } map (_.toString) - val future6 = future2 failure { + val future6 = future2 recover { case e: MatchError ⇒ 0 } map (_.toString) - val future7 = future3 failure { case e: ArithmeticException ⇒ "You got ERROR" } + val future7 = future3 recover { case e: ArithmeticException ⇒ "You got ERROR" } val actor = actorOf[TestActor].start() val future8 = actor !!! "Failure" - val future9 = actor !!! "Failure" failure { + val future9 = actor !!! "Failure" recover { case e: RuntimeException ⇒ "FAIL!" } - val future10 = actor !!! "Hello" failure { + val future10 = actor !!! "Hello" recover { case e: RuntimeException ⇒ "FAIL!" } - val future11 = actor !!! "Failure" failure { case _ ⇒ "Oops!" } + val future11 = actor !!! "Failure" recover { case _ ⇒ "Oops!" } assert(future1.get === 5) intercept[ArithmeticException] { future2.get } @@ -269,7 +269,7 @@ class FutureSpec extends JUnitSuite { def receiveShouldExecuteOnComplete { val latch = new StandardLatch val actor = actorOf[TestActor].start() - actor !!! "Hello" receive { case "World" ⇒ latch.open } + actor !!! "Hello" onResult { case "World" ⇒ latch.open } assert(latch.tryAwait(5, TimeUnit.SECONDS)) actor.stop() } @@ -304,13 +304,13 @@ class FutureSpec extends JUnitSuite { val latch = new StandardLatch val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("dispatcher receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) latch.open f2.await assert(f2.resultOrException === Some("success")) f2 foreach (_ ⇒ throw new ThrowableTest("current thread foreach")) - f2 receive { case _ ⇒ throw new ThrowableTest("current thread receive") } + f2 onResult { case _ ⇒ throw new ThrowableTest("current thread receive") } f3.await assert(f3.resultOrException === Some("SUCCESS")) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 84e4b2f79b..3a25eff65f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -320,14 +320,6 @@ sealed trait Future[+T] { */ def await(atMost: Duration): Future[T] - /** - * Blocks the current thread until the Future has been completed. Use - * caution with this method as it ignores the timeout and will block - * indefinitely if the Future is never completed. - */ - @deprecated("Will be removed after 1.1, it's dangerous and can cause deadlocks, agony and insanity.", "1.1") - def awaitBlocking: Future[T] - /** * Tests whether this Future has been completed. */ @@ -383,17 +375,35 @@ sealed trait Future[+T] { * When the future is completed with a valid result, apply the provided * PartialFunction to the result. *
-   *   val result = future receive {
+   *   val result = future onResult {
    *     case Foo => "foo"
    *     case Bar => "bar"
-   *   }.await.result
+   *   }
    * 
*/ - final def receive(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒ + final def onResult(pf: PartialFunction[Any, Unit]): Future[T] = onComplete { f ⇒ val optr = f.result if (optr.isDefined) { val r = optr.get - if (pf.isDefinedAt(r)) pf(r) + if (pf isDefinedAt r) pf(r) + } + } + + /** + * When the future is completed with an exception, apply the provided + * PartialFunction to the exception. + *
+   *   val result = future onException {
+   *     case Foo => "foo"
+   *     case Bar => "bar"
+   *   }
+   * 
+ */ + final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f ⇒ + val opte = f.exception + if (opte.isDefined) { + val e = opte.get + if (pf isDefinedAt e) pf(e) } } @@ -439,12 +449,12 @@ sealed trait Future[+T] { * a valid result then the new Future will contain the same. * Example: *
-   * Future(6 / 0) failure { case e: ArithmeticException => 0 } // result: 0
-   * Future(6 / 0) failure { case e: NotFoundException   => 0 } // result: exception
-   * Future(6 / 2) failure { case e: ArithmeticException => 0 } // result: 3
+   * Future(6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
+   * Future(6 / 0) recover { case e: NotFoundException   => 0 } // result: exception
+   * Future(6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
    * 
*/ - final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception @@ -708,18 +718,6 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } - def awaitBlocking = { - _lock.lock - try { - while (_value.isEmpty) { - _signal.await - } - this - } finally { - _lock.unlock - } - } - def isExpired: Boolean = timeLeft() <= 0 def value: Option[Either[Throwable, T]] = { @@ -816,7 +814,6 @@ sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this - def awaitBlocking: Future[T] = this def isExpired: Boolean = true def timeoutInNanos: Long = 0 } diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index ddeedbe829..7d6cec6013 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -170,7 +170,6 @@ The 'Future' interface looks like this: interface Future { void await(); - void awaitBlocking(); boolean isCompleted(); boolean isExpired(); long timeoutInNanos(); diff --git a/akka-docs/scala/futures.rst b/akka-docs/scala/futures.rst index 0dc492a02a..de58118a4f 100644 --- a/akka-docs/scala/futures.rst +++ b/akka-docs/scala/futures.rst @@ -238,12 +238,12 @@ Exceptions Since the result of a ``Future`` is created concurrently to the rest of the program, exceptions must be handled differently. It doesn't matter if an ``Actor`` or the dispatcher is completing the ``Future``, if an ``Exception`` is caught the ``Future`` will contain it instead of a valid result. If a ``Future`` does contain an ``Exception``, calling ``get`` will cause it to be thrown again so it can be handled properly. -It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``failure`` method. For example: +It is also possible to handle an ``Exception`` by returning a different result. This is done with the ``recover`` method. For example: .. code-block:: scala - val future = actor !!! msg1 failure { + val future = actor !!! msg1 recover { case e: ArithmeticException => 0 } -In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``failure`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``failure`` method. +In this example, if an ``ArithmeticException`` was thrown while the ``Actor`` processed the message, our ``Future`` would have a result of 0. The ``recover`` method works very similarly to the standard try/catch blocks, so multiple ``Exception``\s can be handled in this manner, and if an ``Exception`` is not handled this way it will be behave as if we hadn't used the ``recover`` method. From 07eaf0ba4879774aad1ac6bb7ee76e373b2f674b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 2 Jun 2011 14:29:39 -0700 Subject: [PATCH 2/2] Attempt to solve ticket #902 --- .../src/main/scala/akka/actor/Actor.scala | 15 +++++++------ .../src/main/scala/akka/actor/ActorRef.scala | 21 ++++++++++++------- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2ad30bcd55..8ed782cd61 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -17,6 +17,7 @@ import akka.AkkaException import akka.serialization.{ Format, Serializer } import akka.cluster.ClusterNode import akka.event.EventHandler +import scala.collection.immutable.Stack import scala.reflect.BeanProperty @@ -135,8 +136,8 @@ object Actor extends ListenerManagement { hook } - private[actor] val actorRefInCreation = new ThreadLocal[Option[ActorRef]] { - override def initialValue = None + private[actor] val actorRefInCreation = new ThreadLocal[Stack[ActorRef]] { + override def initialValue = Stack[ActorRef]() } /** @@ -505,16 +506,18 @@ trait Actor { */ @transient implicit val someSelf: Some[ActorRef] = { - val optRef = Actor.actorRefInCreation.get - if (optRef.isEmpty) throw new ActorInitializationException( + val refStack = Actor.actorRefInCreation.get + if (refStack.isEmpty) throw new ActorInitializationException( "ActorRef for instance of actor [" + getClass.getName + "] is not in scope." + "\n\tYou can not create an instance of an actor explicitly using 'new MyActor'." + "\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." + "\n\tEither use:" + "\n\t\t'val actor = Actor.actorOf[MyActor]', or" + "\n\t\t'val actor = Actor.actorOf(new MyActor(..))'") - Actor.actorRefInCreation.set(None) - optRef.asInstanceOf[Some[ActorRef]] + + val ref = refStack.head + Actor.actorRefInCreation.set(refStack.pop) + Some(ref) } /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bc5efa02b0..e3b866ec97 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -869,13 +869,20 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, // ========= PRIVATE FUNCTIONS ========= private[this] def newActor: Actor = { - try { - Actor.actorRefInCreation.set(Some(this)) - val a = actorFactory() - if (a eq null) throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") - a - } finally { - Actor.actorRefInCreation.set(None) + import Actor.{ actorRefInCreation ⇒ refStack } + (try { + refStack.set(refStack.get.push(this)) + actorFactory() + } catch { + case e ⇒ + val stack = refStack.get + //Clean up if failed + if ((stack.nonEmpty) && (stack.head eq this)) refStack.set(stack.pop) + //Then rethrow + throw e + }) match { + case null ⇒ throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'") + case valid ⇒ valid } }