Merge pull request #668 from akka/wip-1588-cluster-death-watch-patriknw

Death watch hooked up with cluster failure detector, see #1588
This commit is contained in:
Patrik Nordwall 2012-09-11 06:13:44 -07:00
commit 911ef6b97e
9 changed files with 286 additions and 29 deletions

View file

@ -5,7 +5,6 @@
package akka.actor package akka.actor
import language.postfixOps import language.postfixOps
import akka.testkit._ import akka.testkit._
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
@ -18,8 +17,17 @@ class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeo
object DeathWatchSpec { object DeathWatchSpec {
def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { def props(target: ActorRef, testActor: ActorRef) = Props(new Actor {
context.watch(target) context.watch(target)
def receive = { case x testActor forward x } def receive = {
case t: Terminated testActor forward WrappedTerminated(t)
case x testActor forward x
}
}) })
/**
* Forwarding `Terminated` to non-watching testActor is not possible,
* and therefore the `Terminated` message is wrapped.
*/
case class WrappedTerminated(t: Terminated)
} }
trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
@ -32,7 +40,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"The Death Watch" must { "The Death Watch" must {
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {
case Terminated(`actorRef`) true case WrappedTerminated(Terminated(`actorRef`)) true
} }
"notify with one Terminated message when an Actor is stopped" in { "notify with one Terminated message when an Actor is stopped" in {
@ -77,7 +85,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
context.unwatch(terminal) context.unwatch(terminal)
def receive = { def receive = {
case "ping" sender ! "pong" case "ping" sender ! "pong"
case t: Terminated testActor ! t case t: Terminated testActor ! WrappedTerminated(t)
} }
})) }))
@ -137,9 +145,9 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
failed ! Kill failed ! Kill
val result = receiveWhile(3 seconds, messages = 3) { val result = receiveWhile(3 seconds, messages = 3) {
case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed 1 case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed 1
case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother 2 case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother 2
case Terminated(`brother`) 3 case WrappedTerminated(Terminated(`brother`)) 3
} }
testActor.isTerminated must not be true testActor.isTerminated must not be true
result must be(Seq(1, 2, 3)) result must be(Seq(1, 2, 3))
@ -165,6 +173,18 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
parent ! "NKOTB" parent ! "NKOTB"
expectMsg("GREEN") expectMsg("GREEN")
} }
"only notify when watching" in {
val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }))
val observer = system.actorOf(Props(new Actor {
context.watch(subject)
def receive = { case x testActor forward x }
}))
subject ! PoisonPill
// the testActor is not watching subject and will discard Terminated msg
expectNoMsg
}
} }
} }

View file

@ -60,11 +60,26 @@ case object Kill extends Kill {
} }
/** /**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. * When Death Watch is used, the watcher will receive a Terminated(watched)
* message when watched is terminated.
* Terminated message can't be forwarded to another actor, since that actor
* might not be watching the subject. Instead, if you need to forward Terminated
* to another actor you should send the information in your own message.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage
/**
* INTERNAL API
*
* Used for remote death watch. Failure detector publish this to the
* `eventStream` when a remote node is detected to be unreachable.
* The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream`
* and translates this event to [[akka.actor.Terminated]], which is sent itself.
*/
@SerialVersionUID(1L)
private[akka] case class AddressTerminated(address: Address) extends AutoReceivedMessage
abstract class ReceiveTimeout extends PossiblyHarmful abstract class ReceiveTimeout extends PossiblyHarmful
/** /**

View file

@ -125,6 +125,8 @@ trait ActorContext extends ActorRefFactory {
/** /**
* Registers this actor as a Monitor for the provided ActorRef. * Registers this actor as a Monitor for the provided ActorRef.
* This actor will receive a Terminated(watched) message when watched
* is terminated.
* @return the provided ActorRef * @return the provided ActorRef
*/ */
def watch(subject: ActorRef): ActorRef def watch(subject: ActorRef): ActorRef
@ -316,9 +318,9 @@ private[akka] class ActorCell(
@tailrec final def systemInvoke(message: SystemMessage): Unit = { @tailrec final def systemInvoke(message: SystemMessage): Unit = {
/* /*
* When recreate/suspend/resume are received while restarting (i.e. between * When recreate/suspend/resume are received while restarting (i.e. between
* preRestart and postRestart, waiting for children to terminate), these * preRestart and postRestart, waiting for children to terminate), these
* must not be executed immediately, but instead queued and released after * must not be executed immediately, but instead queued and released after
* finishRecreate returns. This can only ever be triggered by * finishRecreate returns. This can only ever be triggered by
* ChildTerminated, and ChildTerminated is not one of the queued message * ChildTerminated, and ChildTerminated is not one of the queued message
* types (hence the overwrite further down). Mailbox sets message.next=null * types (hence the overwrite further down). Mailbox sets message.next=null
* before systemInvoke, so this will only be non-null during such a replay. * before systemInvoke, so this will only be non-null during such a replay.
@ -375,13 +377,14 @@ private[akka] class ActorCell(
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match { msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid) case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated watchedActorTerminated(t.actor); receiveMessage(t) case t: Terminated watchedActorTerminated(t)
case Kill throw new ActorKilledException("Kill") case AddressTerminated(address) addressTerminated(address)
case PoisonPill self.stop() case Kill throw new ActorKilledException("Kill")
case SelectParent(m) parent.tell(m, msg.sender) case PoisonPill self.stop()
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ } case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
} }
} }

View file

@ -4,7 +4,7 @@
package akka.actor.cell package akka.actor.cell
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor } import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ Watch, Unwatch } import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug } import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal import scala.util.control.NonFatal
@ -17,8 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def watch(subject: ActorRef): ActorRef = subject match { override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef case a: InternalActorRef
if (a != self && !watching.contains(a)) { if (a != self && !watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ maintainAddressTerminatedSubscription(a) {
watching += a a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
} }
a a
} }
@ -26,13 +28,24 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def unwatch(subject: ActorRef): ActorRef = subject match { override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef case a: InternalActorRef
if (a != self && watching.contains(a)) { if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ maintainAddressTerminatedSubscription(a) {
watching -= a a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
} }
a a
} }
protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref /**
* When this actor is watching the subject of [[akka.actor.Terminated]] message
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) {
maintainAddressTerminatedSubscription(t.actor) {
watching -= t.actor
}
receiveMessage(t)
}
protected def tellWatchersWeDied(actor: Actor): Unit = { protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) { if (!watchedBy.isEmpty) {
@ -56,7 +69,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
} }
} }
} finally watching = ActorCell.emptyActorRefSet } finally {
watching = ActorCell.emptyActorRefSet
unsubscribeAddressTerminated()
}
} }
} }
@ -65,7 +81,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
val watcherSelf = watcher == self val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) { if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) { if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
watchedBy += watcher watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
} }
@ -81,7 +97,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
val watcherSelf = watcher == self val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) { if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) { if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
watchedBy -= watcher watchedBy -= watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
} }
@ -92,4 +108,48 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
} }
} }
protected def addressTerminated(address: Address): Unit = {
// cleanup watchedBy since we know they are dead
maintainAddressTerminatedSubscription() {
for (a watchedBy; if a.path.address == address) watchedBy -= a
}
// send Terminated to self for all matching subjects
// existenceConfirmed = false because we could have been watching a
// non-local ActorRef that had never resolved before the other node went down
for (a watching; if a.path.address == address) {
self ! Terminated(a)(existenceConfirmed = false)
}
}
/**
* Starts subscription to AddressTerminated if not already subscribing and the
* block adds a non-local ref to watching or watchedBy.
* Ends subscription to AddressTerminated if subscribing and the
* block removes the last non-local ref from watching and watchedBy.
*/
private def maintainAddressTerminatedSubscription[T](change: ActorRef = null)(block: T): T = {
def isNonLocal(ref: ActorRef) = ref match {
case null true
case a: InternalActorRef if !a.isLocal true
case _ false
}
if (isNonLocal(change)) {
def hasNonLocalAddress: Boolean = ((watching exists isNonLocal) || (watchedBy exists isNonLocal))
val had = hasNonLocalAddress
val result = block
val has = hasNonLocalAddress
if (had && !has) unsubscribeAddressTerminated()
else if (!had && has) subscribeAddressTerminated()
result
} else {
block
}
}
private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated])
private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated])
} }

View file

@ -5,11 +5,11 @@ package akka.cluster
import language.postfixOps import language.postfixOps
import scala.collection.immutable.SortedSet import scala.collection.immutable.SortedSet
import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.actor.{ Actor, ActorLogging, ActorRef, Address }
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._ import akka.cluster.MemberStatus._
import akka.event.EventStream import akka.event.EventStream
import akka.actor.AddressTerminated
/** /**
* Domain events published to the event bus. * Domain events published to the event bus.
@ -200,7 +200,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = {
// keep the latestGossip to be sent to new subscribers // keep the latestGossip to be sent to new subscribers
latestGossip = newGossip latestGossip = newGossip
diff(oldGossip, newGossip) foreach { eventStream publish } diff(oldGossip, newGossip) foreach { event
eventStream publish event
// notify DeathWatch about unreachable node
event match {
case MemberUnreachable(m) eventStream publish AddressTerminated(m.address)
case _
}
}
} }
def publishInternalStats(currentStats: CurrentInternalStats): Unit = { def publishInternalStats(currentStats: CurrentInternalStats): Unit = {

View file

@ -0,0 +1,118 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.actor.Props
import akka.actor.Actor
import akka.actor.Address
import akka.actor.RootActorPath
import akka.actor.Terminated
import akka.actor.Address
object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy
class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy
class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy
class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy
abstract class ClusterDeathWatchSpec
extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec)
with MultiNodeClusterSpec {
import ClusterDeathWatchMultiJvmSpec._
"An actor watching a remote actor in the cluster" must {
"receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
enterBarrier("cluster-up")
runOn(first) {
enterBarrier("subjected-started")
val path2 = RootActorPath(second) / "user" / "subject"
val path3 = RootActorPath(third) / "user" / "subject"
val watchEstablished = TestLatch(1)
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path2))
context.watch(context.actorFor(path3))
watchEstablished.countDown
def receive = {
case t: Terminated testActor ! t.actor.path
}
}), name = "observer1")
watchEstablished.await
enterBarrier("watch-established")
expectMsg(path2)
expectNoMsg
enterBarrier("second-terminated")
markNodeAsUnavailable(third)
expectMsg(path3)
enterBarrier("third-terminated")
}
runOn(second, third, fourth) {
system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject")
enterBarrier("subjected-started")
enterBarrier("watch-established")
runOn(third) {
markNodeAsUnavailable(second)
}
enterBarrier("second-terminated")
enterBarrier("third-terminated")
}
enterBarrier("after-1")
}
"receive Terminated when watched node is unknown host" taggedAs LongRunningTest in {
runOn(first) {
val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject"
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path))
def receive = {
case t: Terminated testActor ! t.actor.path
}
}), name = "observer2")
expectMsg(path)
}
enterBarrier("after-2")
}
"receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in {
runOn(first) {
val path = RootActorPath(second) / "user" / "non-existing"
system.actorOf(Props(new Actor {
context.watch(context.actorFor(path))
def receive = {
case t: Terminated testActor ! t.actor.path
}
}), name = "observer3")
expectMsg(path)
}
enterBarrier("after-3")
}
}
}

View file

@ -287,3 +287,26 @@ stashed messages are put into the dead letters when the actor stops, make sure y
super.postStop if you override it. super.postStop if you override it.
Forward of Terminated message
=============================
Forward of ``Terminated`` message is no longer supported. Instead, if you forward
``Terminated`` you should send the information in you own message.
v2.0::
context.watch(subject)
def receive = {
case t @ Terminated => someone forward t
}
v2.1::
case class MyTerminated(subject: ActorRef)
context.watch(subject)
def receive = {
case Terminated(s) => someone forward MyTerminated(s)
}

View file

@ -235,6 +235,7 @@ private[akka] class RemoteActorRef private[akka] (
catch { catch {
case e @ (_: InterruptedException | NonFatal(_)) case e @ (_: InterruptedException | NonFatal(_))
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send"))
provider.deadLetters ! message
} }
override def !(message: Any)(implicit sender: ActorRef = null): Unit = override def !(message: Any)(implicit sender: ActorRef = null): Unit =
@ -242,6 +243,7 @@ private[akka] class RemoteActorRef private[akka] (
catch { catch {
case e @ (_: InterruptedException | NonFatal(_)) case e @ (_: InterruptedException | NonFatal(_))
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send"))
provider.deadLetters ! message
} }
def suspend(): Unit = sendSystemMessage(Suspend()) def suspend(): Unit = sendSystemMessage(Suspend())

View file

@ -96,6 +96,12 @@ object TestActorRefSpec {
} }
} }
/**
* Forwarding `Terminated` to non-watching testActor is not possible,
* and therefore the `Terminated` message is wrapped.
*/
case class WrappedTerminated(t: Terminated)
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -169,11 +175,14 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
val a = TestActorRef(Props[WorkerActor]) val a = TestActorRef(Props[WorkerActor])
val forwarder = system.actorOf(Props(new Actor { val forwarder = system.actorOf(Props(new Actor {
context.watch(a) context.watch(a)
def receive = { case x testActor forward x } def receive = {
case t: Terminated testActor forward WrappedTerminated(t)
case x testActor forward x
}
})) }))
a.!(PoisonPill)(testActor) a.!(PoisonPill)(testActor)
expectMsgPF(5 seconds) { expectMsgPF(5 seconds) {
case Terminated(`a`) true case WrappedTerminated(Terminated(`a`)) true
} }
a.isTerminated must be(true) a.isTerminated must be(true)
assertThread assertThread