diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 7dc121604f..ea491dcbd1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -5,7 +5,6 @@ package akka.actor import language.postfixOps - import akka.testkit._ import scala.concurrent.util.duration._ import java.util.concurrent.atomic._ @@ -18,8 +17,17 @@ class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeo object DeathWatchSpec { def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { context.watch(target) - def receive = { case x ⇒ testActor forward x } + def receive = { + case t: Terminated ⇒ testActor forward WrappedTerminated(t) + case x ⇒ testActor forward x + } }) + + /** + * Forwarding `Terminated` to non-watching testActor is not possible, + * and therefore the `Terminated` message is wrapped. + */ + case class WrappedTerminated(t: Terminated) } trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ @@ -32,7 +40,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { - case Terminated(`actorRef`) ⇒ true + case WrappedTerminated(Terminated(`actorRef`)) ⇒ true } "notify with one Terminated message when an Actor is stopped" in { @@ -77,7 +85,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout context.unwatch(terminal) def receive = { case "ping" ⇒ sender ! "pong" - case t: Terminated ⇒ testActor ! t + case t: Terminated ⇒ testActor ! WrappedTerminated(t) } })) @@ -137,9 +145,9 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { - case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed ⇒ 1 + case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed ⇒ 1 case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2 - case Terminated(`brother`) ⇒ 3 + case WrappedTerminated(Terminated(`brother`)) ⇒ 3 } testActor.isTerminated must not be true result must be(Seq(1, 2, 3)) @@ -165,6 +173,18 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout parent ! "NKOTB" expectMsg("GREEN") } + + "only notify when watching" in { + val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) + val observer = system.actorOf(Props(new Actor { + context.watch(subject) + def receive = { case x ⇒ testActor forward x } + })) + + subject ! PoisonPill + // the testActor is not watching subject and will discard Terminated msg + expectNoMsg + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 77e1add959..b5f2093f90 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -60,11 +60,26 @@ case object Kill extends Kill { } /** - * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. + * When Death Watch is used, the watcher will receive a Terminated(watched) + * message when watched is terminated. + * Terminated message can't be forwarded to another actor, since that actor + * might not be watching the subject. Instead, if you need to forward Terminated + * to another actor you should send the information in your own message. */ @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)(@BeanProperty val existenceConfirmed: Boolean) extends AutoReceivedMessage +/** + * INTERNAL API + * + * Used for remote death watch. Failure detector publish this to the + * `eventStream` when a remote node is detected to be unreachable. + * The watcher ([[akka.actor.DeathWatch]]) subscribes to the `eventStream` + * and translates this event to [[akka.actor.Terminated]], which is sent itself. + */ +@SerialVersionUID(1L) +private[akka] case class AddressTerminated(address: Address) extends AutoReceivedMessage + abstract class ReceiveTimeout extends PossiblyHarmful /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 1699a27505..19c6d9785c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -125,6 +125,8 @@ trait ActorContext extends ActorRefFactory { /** * Registers this actor as a Monitor for the provided ActorRef. + * This actor will receive a Terminated(watched) message when watched + * is terminated. * @return the provided ActorRef */ def watch(subject: ActorRef): ActorRef @@ -316,9 +318,9 @@ private[akka] class ActorCell( @tailrec final def systemInvoke(message: SystemMessage): Unit = { /* * When recreate/suspend/resume are received while restarting (i.e. between - * preRestart and postRestart, waiting for children to terminate), these + * preRestart and postRestart, waiting for children to terminate), these * must not be executed immediately, but instead queued and released after - * finishRecreate returns. This can only ever be triggered by + * finishRecreate returns. This can only ever be triggered by * ChildTerminated, and ChildTerminated is not one of the queued message * types (hence the overwrite further down). Mailbox sets message.next=null * before systemInvoke, so this will only be non-null during such a replay. @@ -375,13 +377,14 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ watchedActorTerminated(t) + case AddressTerminated(address) ⇒ addressTerminated(address) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala index 031019f3f6..f994e956c6 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -4,7 +4,7 @@ package akka.actor.cell -import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor } +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor, Address, AddressTerminated } import akka.dispatch.{ Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal @@ -17,8 +17,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && !watching.contains(a)) { - a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching += a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } } a } @@ -26,13 +28,24 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def unwatch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && watching.contains(a)) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a + maintainAddressTerminatedSubscription(a) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } } a } - protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref + /** + * When this actor is watching the subject of [[akka.actor.Terminated]] message + * it will be propagated to user's receive. + */ + protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) { + maintainAddressTerminatedSubscription(t.actor) { + watching -= t.actor + } + receiveMessage(t) + } protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { @@ -56,7 +69,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) } } - } finally watching = ActorCell.emptyActorRefSet + } finally { + watching = ActorCell.emptyActorRefSet + unsubscribeAddressTerminated() + } } } @@ -65,7 +81,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { + if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy += watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) } @@ -81,7 +97,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ val watcherSelf = watcher == self if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) { + if (watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) { watchedBy -= watcher if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) } @@ -92,4 +108,48 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } + protected def addressTerminated(address: Address): Unit = { + // cleanup watchedBy since we know they are dead + maintainAddressTerminatedSubscription() { + for (a ← watchedBy; if a.path.address == address) watchedBy -= a + } + + // send Terminated to self for all matching subjects + // existenceConfirmed = false because we could have been watching a + // non-local ActorRef that had never resolved before the other node went down + for (a ← watching; if a.path.address == address) { + self ! Terminated(a)(existenceConfirmed = false) + } + } + + /** + * Starts subscription to AddressTerminated if not already subscribing and the + * block adds a non-local ref to watching or watchedBy. + * Ends subscription to AddressTerminated if subscribing and the + * block removes the last non-local ref from watching and watchedBy. + */ + private def maintainAddressTerminatedSubscription[T](change: ActorRef = null)(block: ⇒ T): T = { + def isNonLocal(ref: ActorRef) = ref match { + case null ⇒ true + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + + if (isNonLocal(change)) { + def hasNonLocalAddress: Boolean = ((watching exists isNonLocal) || (watchedBy exists isNonLocal)) + val had = hasNonLocalAddress + val result = block + val has = hasNonLocalAddress + if (had && !has) unsubscribeAddressTerminated() + else if (!had && has) subscribeAddressTerminated() + result + } else { + block + } + } + + private def unsubscribeAddressTerminated(): Unit = system.eventStream.unsubscribe(self, classOf[AddressTerminated]) + + private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated]) + } \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index ae1bd6b6ac..4bb0105413 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -5,11 +5,11 @@ package akka.cluster import language.postfixOps import scala.collection.immutable.SortedSet - import akka.actor.{ Actor, ActorLogging, ActorRef, Address } import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream +import akka.actor.AddressTerminated /** * Domain events published to the event bus. @@ -200,7 +200,14 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto def publishChanges(oldGossip: Gossip, newGossip: Gossip): Unit = { // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - diff(oldGossip, newGossip) foreach { eventStream publish } + diff(oldGossip, newGossip) foreach { event ⇒ + eventStream publish event + // notify DeathWatch about unreachable node + event match { + case MemberUnreachable(m) ⇒ eventStream publish AddressTerminated(m.address) + case _ ⇒ + } + } } def publishInternalStats(currentStats: CurrentInternalStats): Unit = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala new file mode 100644 index 0000000000..8fb7357289 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterDeathWatchSpec.scala @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Props +import akka.actor.Actor +import akka.actor.Address +import akka.actor.RootActorPath +import akka.actor.Terminated +import akka.actor.Address + +object ClusterDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class ClusterDeathWatchMultiJvmNode1 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode2 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode3 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy +class ClusterDeathWatchMultiJvmNode4 extends ClusterDeathWatchSpec with FailureDetectorPuppetStrategy + +abstract class ClusterDeathWatchSpec + extends MultiNodeSpec(ClusterDeathWatchMultiJvmSpec) + with MultiNodeClusterSpec { + + import ClusterDeathWatchMultiJvmSpec._ + + "An actor watching a remote actor in the cluster" must { + "receive Terminated when watched node becomes unreachable" taggedAs LongRunningTest in { + awaitClusterUp(roles: _*) + enterBarrier("cluster-up") + + runOn(first) { + enterBarrier("subjected-started") + + val path2 = RootActorPath(second) / "user" / "subject" + val path3 = RootActorPath(third) / "user" / "subject" + val watchEstablished = TestLatch(1) + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path2)) + context.watch(context.actorFor(path3)) + watchEstablished.countDown + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer1") + + watchEstablished.await + enterBarrier("watch-established") + expectMsg(path2) + expectNoMsg + enterBarrier("second-terminated") + + markNodeAsUnavailable(third) + expectMsg(path3) + enterBarrier("third-terminated") + + } + + runOn(second, third, fourth) { + system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }), name = "subject") + enterBarrier("subjected-started") + enterBarrier("watch-established") + runOn(third) { + markNodeAsUnavailable(second) + } + enterBarrier("second-terminated") + enterBarrier("third-terminated") + } + + enterBarrier("after-1") + + } + + "receive Terminated when watched node is unknown host" taggedAs LongRunningTest in { + runOn(first) { + val path = RootActorPath(Address("akka", system.name, "unknownhost", 2552)) / "user" / "subject" + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path)) + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer2") + + expectMsg(path) + } + + enterBarrier("after-2") + } + + "receive Terminated when watched path doesn't exist" taggedAs LongRunningTest in { + runOn(first) { + val path = RootActorPath(second) / "user" / "non-existing" + system.actorOf(Props(new Actor { + context.watch(context.actorFor(path)) + def receive = { + case t: Terminated ⇒ testActor ! t.actor.path + } + }), name = "observer3") + + expectMsg(path) + } + + enterBarrier("after-3") + } + + } +} diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 22f2c12fdd..2f0ac2d15f 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -287,3 +287,26 @@ stashed messages are put into the dead letters when the actor stops, make sure y super.postStop if you override it. +Forward of Terminated message +============================= + +Forward of ``Terminated`` message is no longer supported. Instead, if you forward +``Terminated`` you should send the information in you own message. + +v2.0:: + + context.watch(subject) + + def receive = { + case t @ Terminated => someone forward t + } + +v2.1:: + + case class MyTerminated(subject: ActorRef) + + context.watch(subject) + + def receive = { + case Terminated(s) => someone forward MyTerminated(s) + } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index b118279ae1..c517fa8a14 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -235,6 +235,7 @@ private[akka] class RemoteActorRef private[akka] ( catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + provider.deadLetters ! message } override def !(message: Any)(implicit sender: ActorRef = null): Unit = @@ -242,6 +243,7 @@ private[akka] class RemoteActorRef private[akka] ( catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send")) + provider.deadLetters ! message } def suspend(): Unit = sendSystemMessage(Suspend()) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 9d88c7b111..f033df302f 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -96,6 +96,12 @@ object TestActorRefSpec { } } + /** + * Forwarding `Terminated` to non-watching testActor is not possible, + * and therefore the `Terminated` message is wrapped. + */ + case class WrappedTerminated(t: Terminated) + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -169,11 +175,14 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA val a = TestActorRef(Props[WorkerActor]) val forwarder = system.actorOf(Props(new Actor { context.watch(a) - def receive = { case x ⇒ testActor forward x } + def receive = { + case t: Terminated ⇒ testActor forward WrappedTerminated(t) + case x ⇒ testActor forward x + } })) a.!(PoisonPill)(testActor) expectMsgPF(5 seconds) { - case Terminated(`a`) ⇒ true + case WrappedTerminated(Terminated(`a`)) ⇒ true } a.isTerminated must be(true) assertThread