Removing the final usages of startsWatching/stopsWatching
This commit is contained in:
parent
54e2e9a357
commit
fcc6169ede
7 changed files with 48 additions and 69 deletions
|
|
@ -11,6 +11,10 @@ import java.util.concurrent.atomic._
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
|
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
|
||||||
|
def startWatching(target: ActorRef) = actorOf(Props(new Actor {
|
||||||
|
watch(target)
|
||||||
|
def receive = { case x ⇒ testActor forward x }
|
||||||
|
}))
|
||||||
|
|
||||||
"The Death Watch" must {
|
"The Death Watch" must {
|
||||||
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
|
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
|
||||||
|
|
@ -19,8 +23,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
|
|
||||||
"notify with one Terminated message when an Actor is stopped" in {
|
"notify with one Terminated message when an Actor is stopped" in {
|
||||||
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
||||||
|
startWatching(terminal)
|
||||||
testActor startsWatching terminal
|
|
||||||
|
|
||||||
testActor ! "ping"
|
testActor ! "ping"
|
||||||
expectMsg("ping")
|
expectMsg("ping")
|
||||||
|
|
@ -32,11 +35,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
|
|
||||||
"notify with all monitors with one Terminated message when an Actor is stopped" in {
|
"notify with all monitors with one Terminated message when an Actor is stopped" in {
|
||||||
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
||||||
val monitor1, monitor2, monitor3 =
|
val monitor1, monitor2, monitor3 = startWatching(terminal)
|
||||||
actorOf(Props(new Actor {
|
|
||||||
watch(terminal)
|
|
||||||
def receive = { case t: Terminated ⇒ testActor ! t }
|
|
||||||
}))
|
|
||||||
|
|
||||||
terminal ! PoisonPill
|
terminal ! PoisonPill
|
||||||
|
|
||||||
|
|
@ -51,11 +50,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
|
|
||||||
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
|
"notify with _current_ monitors with one Terminated message when an Actor is stopped" in {
|
||||||
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
val terminal = actorOf(Props(context ⇒ { case _ ⇒ }))
|
||||||
val monitor1, monitor3 =
|
val monitor1, monitor3 = startWatching(terminal)
|
||||||
actorOf(Props(new Actor {
|
|
||||||
watch(terminal)
|
|
||||||
def receive = { case t: Terminated ⇒ testActor ! t }
|
|
||||||
}))
|
|
||||||
val monitor2 = actorOf(Props(new Actor {
|
val monitor2 = actorOf(Props(new Actor {
|
||||||
watch(terminal)
|
watch(terminal)
|
||||||
unwatch(terminal)
|
unwatch(terminal)
|
||||||
|
|
@ -85,10 +80,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
val terminalProps = Props(context ⇒ { case x ⇒ context.sender ! x })
|
||||||
val terminal = (supervisor ? terminalProps).as[ActorRef].get
|
val terminal = (supervisor ? terminalProps).as[ActorRef].get
|
||||||
|
|
||||||
val monitor = actorOf(Props(new Actor {
|
val monitor = startWatching(terminal)
|
||||||
watch(terminal)
|
|
||||||
def receive = { case t: Terminated ⇒ testActor ! t }
|
|
||||||
}))
|
|
||||||
|
|
||||||
terminal ! Kill
|
terminal ! Kill
|
||||||
terminal ! Kill
|
terminal ! Kill
|
||||||
|
|
@ -113,9 +105,13 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
||||||
val failed, brother = (supervisor ? Props.empty).as[ActorRef].get
|
val failed = (supervisor ? Props.empty).as[ActorRef].get
|
||||||
brother startsWatching failed
|
val brother = (supervisor ? Props(new Actor {
|
||||||
testActor startsWatching brother
|
watch(failed)
|
||||||
|
def receive = Actor.emptyBehavior
|
||||||
|
})).as[ActorRef].get
|
||||||
|
|
||||||
|
startWatching(brother)
|
||||||
|
|
||||||
failed ! Kill
|
failed ! Kill
|
||||||
val result = receiveWhile(3 seconds, messages = 3) {
|
val result = receiveWhile(3 seconds, messages = 3) {
|
||||||
|
|
|
||||||
|
|
@ -363,24 +363,24 @@ trait Actor {
|
||||||
* Puts the behavior on top of the hotswap stack.
|
* Puts the behavior on top of the hotswap stack.
|
||||||
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
|
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
|
||||||
*/
|
*/
|
||||||
def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) }
|
final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reverts the Actor behavior to the previous one in the hotswap stack.
|
* Reverts the Actor behavior to the previous one in the hotswap stack.
|
||||||
*/
|
*/
|
||||||
def unbecome() { context.unbecome() }
|
final def unbecome() { context.unbecome() }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers this actor as a Monitor for the provided ActorRef
|
* Registers this actor as a Monitor for the provided ActorRef
|
||||||
* @return the provided ActorRef
|
* @return the provided ActorRef
|
||||||
*/
|
*/
|
||||||
def watch(subject: ActorRef): ActorRef = self startsWatching subject
|
final def watch(subject: ActorRef): ActorRef = context startsWatching subject
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregisters this actor as Monitor for the provided ActorRef
|
* Unregisters this actor as Monitor for the provided ActorRef
|
||||||
* @return the provided ActorRef
|
* @return the provided ActorRef
|
||||||
*/
|
*/
|
||||||
def unwatch(subject: ActorRef): ActorRef = self stopsWatching subject
|
final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject
|
||||||
|
|
||||||
// =========================================
|
// =========================================
|
||||||
// ==== INTERNAL IMPLEMENTATION DETAILS ====
|
// ==== INTERNAL IMPLEMENTATION DETAILS ====
|
||||||
|
|
@ -395,6 +395,6 @@ trait Actor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val processingBehavior = receive //ProcessingBehavior is the original behavior
|
private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -47,6 +47,10 @@ trait ActorContext extends ActorRefFactory {
|
||||||
def system: ActorSystem
|
def system: ActorSystem
|
||||||
|
|
||||||
def parent: ActorRef
|
def parent: ActorRef
|
||||||
|
|
||||||
|
def startsWatching(subject: ActorRef): ActorRef
|
||||||
|
|
||||||
|
def stopsWatching(subject: ActorRef): ActorRef
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] object ActorCell {
|
private[akka] object ActorCell {
|
||||||
|
|
@ -136,13 +140,13 @@ private[akka] class ActorCell(
|
||||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||||
private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
|
private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
|
||||||
|
|
||||||
final def startsWatching(subject: ActorRef): ActorRef = {
|
override final def startsWatching(subject: ActorRef): ActorRef = {
|
||||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||||
dispatcher.systemDispatch(this, Link(subject))
|
dispatcher.systemDispatch(this, Link(subject))
|
||||||
subject
|
subject
|
||||||
}
|
}
|
||||||
|
|
||||||
final def stopsWatching(subject: ActorRef): ActorRef = {
|
override final def stopsWatching(subject: ActorRef): ActorRef = {
|
||||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||||
dispatcher.systemDispatch(this, Unlink(subject))
|
dispatcher.systemDispatch(this, Unlink(subject))
|
||||||
subject
|
subject
|
||||||
|
|
|
||||||
|
|
@ -128,24 +128,6 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
|
||||||
*/
|
*/
|
||||||
def isTerminated: Boolean
|
def isTerminated: Boolean
|
||||||
|
|
||||||
/**
|
|
||||||
* Registers this actor to be a death monitor of the provided ActorRef
|
|
||||||
* This means that this actor will get a Terminated()-message when the provided actor
|
|
||||||
* is permanently terminated.
|
|
||||||
*
|
|
||||||
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
|
||||||
*/
|
|
||||||
def startsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deregisters this actor from being a death monitor of the provided ActorRef
|
|
||||||
* This means that this actor will not get a Terminated()-message when the provided actor
|
|
||||||
* is permanently terminated.
|
|
||||||
*
|
|
||||||
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
|
||||||
*/
|
|
||||||
def stopsWatching(subject: ActorRef): ActorRef //TODO FIXME REMOVE THIS
|
|
||||||
|
|
||||||
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
|
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
|
||||||
|
|
||||||
override def equals(that: Any): Boolean = {
|
override def equals(that: Any): Boolean = {
|
||||||
|
|
@ -215,24 +197,6 @@ class LocalActorRef private[akka] (
|
||||||
*/
|
*/
|
||||||
def stop(): Unit = actorCell.stop()
|
def stop(): Unit = actorCell.stop()
|
||||||
|
|
||||||
/**
|
|
||||||
* Registers this actor to be a death monitor of the provided ActorRef
|
|
||||||
* This means that this actor will get a Terminated()-message when the provided actor
|
|
||||||
* is permanently terminated.
|
|
||||||
*
|
|
||||||
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
|
||||||
*/
|
|
||||||
def startsWatching(subject: ActorRef): ActorRef = actorCell.startsWatching(subject)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Deregisters this actor from being a death monitor of the provided ActorRef
|
|
||||||
* This means that this actor will not get a Terminated()-message when the provided actor
|
|
||||||
* is permanently terminated.
|
|
||||||
*
|
|
||||||
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
|
||||||
*/
|
|
||||||
def stopsWatching(subject: ActorRef): ActorRef = actorCell.stopsWatching(subject)
|
|
||||||
|
|
||||||
// ========= AKKA PROTECTED FUNCTIONS =========
|
// ========= AKKA PROTECTED FUNCTIONS =========
|
||||||
|
|
||||||
protected[akka] def underlying: ActorCell = actorCell
|
protected[akka] def underlying: ActorCell = actorCell
|
||||||
|
|
@ -330,9 +294,6 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef {
|
||||||
private[akka] val uuid: Uuid = newUuid()
|
private[akka] val uuid: Uuid = newUuid()
|
||||||
def name: String = uuid.toString
|
def name: String = uuid.toString
|
||||||
|
|
||||||
def startsWatching(actorRef: ActorRef): ActorRef = actorRef
|
|
||||||
def stopsWatching(actorRef: ActorRef): ActorRef = actorRef
|
|
||||||
|
|
||||||
def suspend(): Unit = ()
|
def suspend(): Unit = ()
|
||||||
def resume(): Unit = ()
|
def resume(): Unit = ()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -293,9 +293,6 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
|
||||||
callback.done(false)
|
callback.done(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
def startsWatching(actorRef: ActorRef): ActorRef = unsupported
|
|
||||||
def stopsWatching(actorRef: ActorRef): ActorRef = unsupported
|
|
||||||
|
|
||||||
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
|
def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
|
||||||
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))
|
new KeptPromise[Any](Left(new UnsupportedOperationException("Ask/? is not supported for %s".format(getClass.getName))))
|
||||||
def restart(reason: Throwable): Unit = unsupported
|
def restart(reason: Throwable): Unit = unsupported
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,24 @@ class TestActorRef[T <: Actor](
|
||||||
*/
|
*/
|
||||||
def underlyingActor: T = underlyingActorInstance.asInstanceOf[T]
|
def underlyingActor: T = underlyingActorInstance.asInstanceOf[T]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers this actor to be a death monitor of the provided ActorRef
|
||||||
|
* This means that this actor will get a Terminated()-message when the provided actor
|
||||||
|
* is permanently terminated.
|
||||||
|
*
|
||||||
|
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
||||||
|
*/
|
||||||
|
def startsWatching(subject: ActorRef): ActorRef = underlying.startsWatching(subject)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deregisters this actor from being a death monitor of the provided ActorRef
|
||||||
|
* This means that this actor will not get a Terminated()-message when the provided actor
|
||||||
|
* is permanently terminated.
|
||||||
|
*
|
||||||
|
* @return the same ActorRef that is provided to it, to allow for cleaner invocations
|
||||||
|
*/
|
||||||
|
def stopsWatching(subject: ActorRef): ActorRef = underlying.stopsWatching(subject)
|
||||||
|
|
||||||
override def toString = "TestActor[" + address + "]"
|
override def toString = "TestActor[" + address + "]"
|
||||||
|
|
||||||
override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address
|
override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].address == address
|
||||||
|
|
|
||||||
|
|
@ -155,7 +155,10 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
|
||||||
"stop when sent a poison pill" in {
|
"stop when sent a poison pill" in {
|
||||||
EventFilter[ActorKilledException]() intercept {
|
EventFilter[ActorKilledException]() intercept {
|
||||||
val a = TestActorRef(Props[WorkerActor])
|
val a = TestActorRef(Props[WorkerActor])
|
||||||
testActor startsWatching a
|
val forwarder = actorOf(Props(new Actor {
|
||||||
|
watch(a)
|
||||||
|
def receive = { case x ⇒ testActor forward x }
|
||||||
|
}))
|
||||||
a.!(PoisonPill)(testActor)
|
a.!(PoisonPill)(testActor)
|
||||||
expectMsgPF(5 seconds) {
|
expectMsgPF(5 seconds) {
|
||||||
case Terminated(`a`) ⇒ true
|
case Terminated(`a`) ⇒ true
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue