diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 77e1add959..bb72d9cdff 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -65,6 +65,17 @@ case object Kill extends Kill { @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +/** + * INTERNAL API + * + * Used for remote death watch. Failure detector publish this to the + * `eventStream` when a remote node is detected to be unreachable. + * [[akka.actor.DeathWatch]] subscribes to the `eventStream` and translates this + * event to [[akka.actor.Terminated]], which is received by the watcher. + */ +@SerialVersionUID(1L) +private[akka] case class NodeUnreachable(address: Address) extends AutoReceivedMessage + abstract class ReceiveTimeout extends PossiblyHarmful /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1699a27505..69deafa306 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -316,9 +316,9 @@ private[akka] class ActorCell( @tailrec final def systemInvoke(message: SystemMessage): Unit = { /* * When recreate/suspend/resume are received while restarting (i.e. between - * preRestart and postRestart, waiting for children to terminate), these + * preRestart and postRestart, waiting for children to terminate), these * must not be executed immediately, but instead queued and released after - * finishRecreate returns. This can only ever be triggered by + * finishRecreate returns. This can only ever be triggered by * ChildTerminated, and ChildTerminated is not one of the queued message * types (hence the overwrite further down). Mailbox sets message.next=null * before systemInvoke, so this will only be non-null during such a replay. @@ -377,6 +377,7 @@ private[akka] class ActorCell( msg.message match { case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) + case NodeUnreachable(address) ⇒ watchedNodeUnreachable(address) foreach receiveMessage case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index 031019f3f6..908539ef69 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -4,7 +4,7 @@ package akka.actor.cell -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, NodeUnreachable } import akka.dispatch.{ Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal @@ -17,6 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && !watching.contains(a)) { + // start subscription to NodeUnreachable if non-local subject and not already subscribing + if (!a.isLocal && !isSubscribingToNodeUnreachable) + system.eventStream.subscribe(self, classOf[NodeUnreachable]) + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ watching += a } @@ -92,4 +96,19 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } + protected def watchedNodeUnreachable(address: Address): Iterable[Terminated] = { + val subjects = watching filter { _.path.address == address } + subjects foreach watchedActorTerminated + + // FIXME should we cleanup (remove watchedBy) since we know they are dead? + + // FIXME existenceConfirmed? + subjects map { Terminated(_)(existenceConfirmed = false) } + } + + private def isSubscribingToNodeUnreachable: Boolean = watching.exists { + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index cbcee78cc9..bcbeebb147 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,11 +5,11 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet - import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream +import akka.actor.NodeUnreachable /** * Domain events published to the event bus. @@ -200,7 +200,10 @@ private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnv def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - diff(oldGossip, newGossip) foreach { eventStream publish } + val events = diff(oldGossip, newGossip) + events foreach { eventStream publish } + // notify DeathWatch about the unreachable node + events collect { case MemberUnreachable(m) ⇒ NodeUnreachable(m.address) } foreach { eventStream publish } } def publishInternalStats(currentStats: CurrentInternalStats): Unit = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala new file mode 100644 index 0000000000..467b2e49c0 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Props +import akka.actor.Actor +import akka.actor.RootActorPath +import akka.actor.Terminated + +object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy + +abstract class ClusterDeathWatchSpec + extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) + with MultiNodeClusterSpec { + + import ClusterDeathWatchMultiJvmSpec._ + + "An actor watching a remote actor in the cluster" must { + "receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in { + awaitClusterUp(roles: _*) + enterBarrier("cluster-up") + + runOn(first) { + enterBarrier("subjected-started") + + val path2 = RootActorPath(second) / "user" / "subject" + val path3 = RootActorPath(third) / "user" / "subject" + val watchEstablished = TestLatch(1) + system.actorOf(Props(new Actor { + + context.watch(context.actorFor(path2)) + context.watch(context.actorFor(path3)) + watchEstablished.countDown + + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer") + + watchEstablished.await + enterBarrier("watch-established") + expectMsg(path2) + enterBarrier("second-terminated") + + markNodeAsUnavailable(third) + expectMsg(path3) + enterBarrier("third-terminated") + + } + + runOn(second, third) { + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") + enterBarrier("subjected-started") + enterBarrier("watch-established") + runOn(third) { + markNodeAsUnavailable(second) + } + enterBarrier("second-terminated") + enterBarrier("third-terminated") + } + + enterBarrier("after") + + } + + } +}