From f9d0b188af61109bfdfc331ec6006d57d11591c0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 4 Jun 2011 12:42:06 -0700 Subject: [PATCH] Removing ActorRef.isDefinedAt and Future.empty and moving Future.channel to Promise, renaming future to promise for the channel --- .../test/scala/akka/routing/RoutingSpec.scala | 32 ------------- .../src/main/scala/akka/actor/Actor.scala | 15 ------- .../src/main/scala/akka/actor/ActorRef.scala | 12 +---- .../src/main/scala/akka/dispatch/Future.scala | 45 ++++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 4 -- .../scala/akka/kernel/EmbeddedAppServer.scala | 2 +- .../scala/akka/testkit/TestActorRef.scala | 5 --- .../scala/akka/testkit/TestActorRefSpec.scala | 10 ++--- 8 files changed, 29 insertions(+), 96 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 0fa5b8e017..ddd37f760c 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -166,38 +166,6 @@ class RoutingSpec extends WordSpec with MustMatchers { for (a ← List(broadcast, a1, a2, a3)) a.stop() } - - "be defined at" in { - import akka.actor.ActorRef - - val Yes = "yes" - val No = "no" - - def testActor() = actorOf(new Actor() { - def receive = { - case Yes ⇒ "yes" - } - }).start() - - val t1 = testActor() - val t2 = testActor() - val t3 = testActor() - val t4 = testActor() - - val d1 = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) - val d2 = loadBalancerActor(new CyclicIterator[ActorRef](t3 :: t4 :: Nil)) - - t1.isDefinedAt(Yes) must be(true) - t1.isDefinedAt(No) must be(false) - t2.isDefinedAt(Yes) must be(true) - t2.isDefinedAt(No) must be(false) - d1.isDefinedAt(Yes) must be(true) - d1.isDefinedAt(No) must be(false) - d2.isDefinedAt(Yes) must be(true) - d2.isDefinedAt(No) must be(false) - - for (a ← List(t1, t2, d1, d2)) a.stop() - } } "Actor Pool" must { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8ed782cd61..1dd597bf89 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -619,21 +619,6 @@ trait Actor { throw new UnhandledMessageException(msg, self) } - /** - * Is the actor able to handle the message passed in as arguments? - */ - def isDefinedAt(message: Any): Boolean = { - val behaviorStack = self.hotswap - message match { //Same logic as apply(msg) but without the unhandled catch-all - case l: AutoReceivedMessage ⇒ true - case msg if behaviorStack.nonEmpty && - behaviorStack.head.isDefinedAt(msg) ⇒ true - case msg if behaviorStack.isEmpty && - processingBehavior.isDefinedAt(msg) ⇒ true - case _ ⇒ false - } - } - /** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e3b866ec97..7affe6c104 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -233,12 +233,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def isUnstarted: Boolean = _status == ActorRefInternals.UNSTARTED - /** - * Is the actor able to handle the message passed in as arguments? - */ - @deprecated("Will be removed without replacement, it's just not reliable in the face of `become` and `unbecome`", "1.1") - def isDefinedAt(message: Any): Boolean = actor.isDefinedAt(message) - /** * Only for internal use. UUID is effectively final. */ @@ -801,11 +795,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, } def tooManyRestarts() { - _supervisor.foreach { sup ⇒ - // can supervisor handle the notification? - val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) - if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) - } + notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)) stop() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3a25eff65f..e760f97bb7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -217,20 +217,6 @@ object Future { def apply[T](body: ⇒ T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = dispatcher.dispatchFuture(() ⇒ body, timeout) - /** - * Construct a completable channel - */ - def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { - val future = empty[Any](timeout) - def !(msg: Any) = future completeWithResult msg - } - - /** - * Create an empty Future with default timeout - */ - @deprecated("Superceded by Promise.apply", "1.2") - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) - import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -273,9 +259,11 @@ object Future { */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ - val opte = f.exception - if (opte.isDefined) future completeWithException (opte.get) + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { + _.exception match { + case Some(e) ⇒ future completeWithException e + case None ⇒ + } } future } @@ -616,6 +604,14 @@ object Promise { def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + /** + * Construct a completable channel + */ + def channel(timeout: Long = Actor.TIMEOUT) = new Channel[Any] { + val promise = Promise[Any](timeout) + def !(msg: Any) = promise completeWithResult msg + } + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { override def initialValue = None } @@ -732,11 +728,16 @@ class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { - if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired - _value = Some(value) - val existingListeners = _listeners - _listeners = Nil - existingListeners + if (_value.isEmpty) { //Only complete if we aren't expired + if (!isExpired) { + _value = Some(value) + val existingListeners = _listeners + _listeners = Nil + existingListeners + } else { + _listeners = Nil + Nil + } } else Nil } finally { _signal.signalAll diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 83fd45a5cb..454760594a 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -90,8 +90,6 @@ trait LoadBalancer extends Router { self: Actor ⇒ } override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) } /** @@ -106,8 +104,6 @@ abstract class UntypedLoadBalancer extends UntypedRouter { else null override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) } object Routing { diff --git a/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala b/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala index 286d2b3b04..471f45213e 100644 --- a/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala +++ b/akka-kernel/src/main/scala/akka/kernel/EmbeddedAppServer.scala @@ -40,7 +40,7 @@ trait EmbeddedAppServer extends Bootable { super.onLoad if (isRestEnabled) { - val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!"))) + val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(sys.error("microkernel-server.xml not found!"))) System.setProperty("jetty.port", REST_PORT.toString) System.setProperty("jetty.host", REST_HOSTNAME) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 7b656278b8..4e9d7b527d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -24,11 +24,6 @@ class TestActorRef[T <: Actor](factory: () ⇒ T, address: String) extends Local dispatcher = CallingThreadDispatcher.global receiveTimeout = None - /** - * Query actor's current receive behavior. - */ - override def isDefinedAt(o: Any) = actor.isDefinedAt(o) - /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index ffe39dee57..5d74db9b34 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -8,7 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec } import akka.actor._ import akka.config.Supervision.OneForOneStrategy import akka.event.EventHandler -import akka.dispatch.Future +import akka.dispatch.{ Future, Promise } /** * Test whether TestActorRef behaves as an ActorRef should, besides its own spec. @@ -234,14 +234,12 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac EventHandler.removeListener(log) } - "proxy isDefinedAt/apply for the underlying actor" in { + "proxy apply for the underlying actor" in { val ref = TestActorRef[WorkerActor].start() - ref.isDefinedAt("work") must be(true) - ref.isDefinedAt("sleep") must be(false) intercept[IllegalActorStateException] { ref("work") } - val ch = Future.channel() + val ch = Promise.channel() ref ! ch - val f = ch.future + val f = ch.promise f must be('completed) f.get must be("complexReply") }