From fcc6169edea2a0e89b47e0590404d56095b7eec6 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 2 Dec 2011 01:00:55 +0100 Subject: [PATCH] Removing the final usages of startsWatching/stopsWatching --- .../scala/akka/actor/DeathWatchSpec.scala | 34 +++++++--------- .../src/main/scala/akka/actor/Actor.scala | 10 ++--- .../src/main/scala/akka/actor/ActorCell.scala | 8 +++- .../src/main/scala/akka/actor/ActorRef.scala | 39 ------------------- .../akka/camel/component/ActorComponent.scala | 3 -- .../scala/akka/testkit/TestActorRef.scala | 18 +++++++++ .../scala/akka/testkit/TestActorRefSpec.scala | 5 ++- 7 files changed, 48 insertions(+), 69 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 90e398e4cb..88b31f25d9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -11,6 +11,10 @@ import java.util.concurrent.atomic._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { + def startWatching(target: ActorRef) = actorOf(Props(new Actor { + watch(target) + def receive = { case x ⇒ testActor forward x } + })) "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { @@ -19,8 +23,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - - testActor startsWatching terminal + startWatching(terminal) testActor ! "ping" expectMsg("ping") @@ -32,11 +35,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with all monitors with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - val monitor1, monitor2, monitor3 = - actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor1, monitor2, monitor3 = startWatching(terminal) terminal ! PoisonPill @@ -51,11 +50,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende "notify with _current_ monitors with one Terminated message when an Actor is stopped" in { val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) - val monitor1, monitor3 = - actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor1, monitor3 = startWatching(terminal) val monitor2 = actorOf(Props(new Actor { watch(terminal) unwatch(terminal) @@ -85,10 +80,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x }) val terminal = (supervisor ? terminalProps).as[ActorRef].get - val monitor = actorOf(Props(new Actor { - watch(terminal) - def receive = { case t: Terminated ⇒ testActor ! t } - })) + val monitor = startWatching(terminal) terminal ! Kill terminal ! Kill @@ -113,9 +105,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende } })) - val failed, brother = (supervisor ? Props.empty).as[ActorRef].get - brother startsWatching failed - testActor startsWatching brother + val failed = (supervisor ? Props.empty).as[ActorRef].get + val brother = (supervisor ? Props(new Actor { + watch(failed) + def receive = Actor.emptyBehavior + })).as[ActorRef].get + + startWatching(brother) failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index b8c0bbb327..8f613e02f5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -363,24 +363,24 @@ trait Actor { * Puts the behavior on top of the hotswap stack. * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ - def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) } + final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) } /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome() { context.unbecome() } + final def unbecome() { context.unbecome() } /** * Registers this actor as a Monitor for the provided ActorRef * @return the provided ActorRef */ - def watch(subject: ActorRef): ActorRef = self startsWatching subject + final def watch(subject: ActorRef): ActorRef = context startsWatching subject /** * Unregisters this actor as Monitor for the provided ActorRef * @return the provided ActorRef */ - def unwatch(subject: ActorRef): ActorRef = self stopsWatching subject + final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== @@ -395,6 +395,6 @@ trait Actor { } } - private val processingBehavior = receive //ProcessingBehavior is the original behavior + private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index afd462ff1e..704d78d38b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -47,6 +47,10 @@ trait ActorContext extends ActorRefFactory { def system: ActorSystem def parent: ActorRef + + def startsWatching(subject: ActorRef): ActorRef + + def stopsWatching(subject: ActorRef): ActorRef } private[akka] object ActorCell { @@ -136,13 +140,13 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) - final def startsWatching(subject: ActorRef): ActorRef = { + override final def startsWatching(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Link(subject)) subject } - final def stopsWatching(subject: ActorRef): ActorRef = { + override final def stopsWatching(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Unlink(subject)) subject diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e62a04938a..c86a2dc66e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -128,24 +128,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def isTerminated: Boolean - /** - * Registers this actor to be a death monitor of the provided ActorRef - * This means that this actor will get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS - - /** - * Deregisters this actor from being a death monitor of the provided ActorRef - * This means that this actor will not get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS - override def hashCode: Int = HashCode.hash(HashCode.SEED, address) override def equals(that: Any): Boolean = { @@ -215,24 +197,6 @@ class LocalActorRef private[akka] ( */ def stop(): Unit = actorCell.stop() - /** - * Registers this actor to be a death monitor of the provided ActorRef - * This means that this actor will get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def startsWatching(subject: ActorRef): ActorRef = actorCell.startsWatching(subject) - - /** - * Deregisters this actor from being a death monitor of the provided ActorRef - * This means that this actor will not get a Terminated()-message when the provided actor - * is permanently terminated. - * - * @return the same ActorRef that is provided to it, to allow for cleaner invocations - */ - def stopsWatching(subject: ActorRef): ActorRef = actorCell.stopsWatching(subject) - // ========= AKKA PROTECTED FUNCTIONS ========= protected[akka] def underlying: ActorCell = actorCell @@ -330,9 +294,6 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { private[akka] val uuid: Uuid = newUuid() def name: String = uuid.toString - def startsWatching(actorRef: ActorRef): ActorRef = actorRef - def stopsWatching(actorRef: ActorRef): ActorRef = actorRef - def suspend(): Unit = () def resume(): Unit = () diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index 795fbf5a54..c4ec7dcf31 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -293,9 +293,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall callback.done(false) } - def startsWatching(actorRef: ActorRef): ActorRef = unsupported - def stopsWatching(actorRef: ActorRef): ActorRef = unsupported - def ?(message: Any)(implicit timeout: Timeout): Future[Any] = new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName)))) def restart(reason: Throwable): Unit = unsupported diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 6f03df59b2..18a181eb3f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -42,6 +42,24 @@ class TestActorRef[T <: Actor]( */ def underlyingActor: T = underlyingActorInstance.asInstanceOf[T] + /** + * Registers this actor to be a death monitor of the provided ActorRef + * This means that this actor will get a Terminated()-message when the provided actor + * is permanently terminated. + * + * @return the same ActorRef that is provided to it, to allow for cleaner invocations + */ + def startsWatching(subject: ActorRef): ActorRef = underlying.startsWatching(subject) + + /** + * Deregisters this actor from being a death monitor of the provided ActorRef + * This means that this actor will not get a Terminated()-message when the provided actor + * is permanently terminated. + * + * @return the same ActorRef that is provided to it, to allow for cleaner invocations + */ + def stopsWatching(subject: ActorRef): ActorRef = underlying.stopsWatching(subject) + override def toString = "TestActor[" + address + "]" override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index a6248ff63c..12096a61b1 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -155,7 +155,10 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "stop when sent a poison pill" in { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) - testActor startsWatching a + val forwarder = actorOf(Props(new Actor { + watch(a) + def receive = { case x ⇒ testActor forward x } + })) a.!(PoisonPill)(testActor) expectMsgPF(5 seconds) { case Terminated(`a`) ⇒ true