From 637598a28bc5aafaca27b801f442cf9b20009807 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 19 Aug 2013 17:50:15 +0200 Subject: [PATCH] =rem #3527 Take actor system uid into consideration in remote watch * When actor system was restarted quickly the new system replied to heartbeats and Terminated was never triggered for actors in old system. * Solved by sending an extra Watch system message when first hearbeat is received for an address and when a change of system uid is detected. --- .../RemoteNodeRestartDeathWatchSpec.scala | 112 ++++++++++++++++++ .../src/main/scala/akka/remote/Endpoint.scala | 2 +- .../scala/akka/remote/RemoteWatcher.scala | 17 +++ 3 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala new file mode 100644 index 0000000000..e7519b8ed4 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.actor.Terminated +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ExtendedActorSystem +import akka.actor.ActorSystem +import akka.actor.RootActorPath + +object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + """))) + + testTransport(on = true) + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ + sender ! "shutdown-ack" + context.system.shutdown() + case msg ⇒ sender ! msg + } + } + +} + +// Several different variations of the test + +class RemoteNodeRestartDeathWatchMultiJvmNode1 extends RemoteNodeRestartDeathWatchSpec +class RemoteNodeRestartDeathWatchMultiJvmNode2 extends RemoteNodeRestartDeathWatchSpec + +abstract class RemoteNodeRestartDeathWatchSpec + extends MultiNodeSpec(RemoteNodeRestartDeathWatchMultiJvmSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteNodeRestartDeathWatchMultiJvmSpec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "RemoteNodeRestartDeathWatch" must { + + "receive Terminated when remote actor system is restarted" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + enterBarrier("actors-started") + + val subject = identify(second, "subject") + watch(subject) + subject ! "hello" + expectMsg("hello") + enterBarrier("watch-established") + + // simulate a hard shutdown, nothing sent from the shutdown node + testConductor.blackhole(second, first, Direction.Send).await + testConductor.shutdown(second).await + + expectTerminated(subject, 15.seconds) + + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown" + expectMsg("shutdown-ack") + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + + enterBarrier("watch-established") + + system.awaitTermination(30.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.netty.tcp { + hostname = ${addr.host.get} + port = ${addr.port.get} + } + """).withFallback(system.settings.config)) + freshSystem.actorOf(Props[Subject], "subject") + + freshSystem.awaitTermination(30.seconds) + } + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 17d58f5927..bee68f4cd1 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -560,7 +560,7 @@ private[remote] class EndpointWriter( unstashAll() goto(Writing) - case _ => + case _ ⇒ stash() stay() diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index f8904a2256..3fd334ebe2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -17,6 +17,7 @@ import akka.ConfigurationException import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.actor.InternalActorRef import akka.dispatch.sysmsg.DeathWatchNotification +import akka.dispatch.sysmsg.Watch import akka.actor.Deploy /** @@ -148,6 +149,8 @@ private[akka] class RemoteWatcher( log.debug("Received first heartbeat rsp from [{}]", from) if (watchingNodes(from) && !unreachable(from)) { + if (!addressUids.contains(from) || addressUids(from) != uid) + reWatch(from) addressUids += (from -> uid) failureDetector.heartbeat(from) } @@ -264,4 +267,18 @@ private[akka] class RemoteWatcher( failureDetector.heartbeat(address) } + /** + * To ensure that we receive heartbeat messages from the right actor system + * incarnation we send Watch again for the first HeartbeatRsp (containing + * the system UID) and if HeartbeatRsp contains a new system UID. + * Terminated will be triggered if the watchee (including correct Actor UID) + * does not exist. + */ + def reWatch(address: Address): Unit = + watching.foreach { + case (wee: InternalActorRef, wer: InternalActorRef) if wee.path.address == address ⇒ + log.debug("Re-watch [{} -> {}]", wer, wee) + wee.sendSystemMessage(Watch(wee, wer)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + } + } \ No newline at end of file