=rem #3527 Take actor system uid into consideration in remote watch

* When actor system was restarted quickly the new system replied to
  heartbeats and Terminated was never triggered for actors in old
  system.
* Solved by sending an extra Watch system message when first hearbeat
  is received for an address and when a change of system uid is detected.
This commit is contained in:
Patrik Nordwall 2013-08-19 17:50:15 +02:00
parent d85039c1a6
commit 637598a28b
3 changed files with 130 additions and 1 deletions

View file

@ -0,0 +1,112 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.actor.Terminated
import akka.remote.testconductor.RoleName
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import akka.actor.RootActorPath
object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
""")))
testTransport(on = true)
class Subject extends Actor {
def receive = {
case "shutdown"
sender ! "shutdown-ack"
context.system.shutdown()
case msg sender ! msg
}
}
}
// Several different variations of the test
class RemoteNodeRestartDeathWatchMultiJvmNode1 extends RemoteNodeRestartDeathWatchSpec
class RemoteNodeRestartDeathWatchMultiJvmNode2 extends RemoteNodeRestartDeathWatchSpec
abstract class RemoteNodeRestartDeathWatchSpec
extends MultiNodeSpec(RemoteNodeRestartDeathWatchMultiJvmSpec)
with STMultiNodeSpec with ImplicitSender {
import RemoteNodeRestartDeathWatchMultiJvmSpec._
override def initialParticipants = roles.size
def identify(role: RoleName, actorName: String): ActorRef = {
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
expectMsgType[ActorIdentity].ref.get
}
"RemoteNodeRestartDeathWatch" must {
"receive Terminated when remote actor system is restarted" taggedAs LongRunningTest in {
runOn(first) {
val secondAddress = node(second).address
enterBarrier("actors-started")
val subject = identify(second, "subject")
watch(subject)
subject ! "hello"
expectMsg("hello")
enterBarrier("watch-established")
// simulate a hard shutdown, nothing sent from the shutdown node
testConductor.blackhole(second, first, Direction.Send).await
testConductor.shutdown(second).await
expectTerminated(subject, 15.seconds)
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown"
expectMsg("shutdown-ack")
}
runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.actorOf(Props[Subject], "subject")
enterBarrier("actors-started")
enterBarrier("watch-established")
system.awaitTermination(30.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp {
hostname = ${addr.host.get}
port = ${addr.port.get}
}
""").withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject")
freshSystem.awaitTermination(30.seconds)
}
}
}
}

View file

@ -560,7 +560,7 @@ private[remote] class EndpointWriter(
unstashAll() unstashAll()
goto(Writing) goto(Writing)
case _ => case _
stash() stash()
stay() stay()

View file

@ -17,6 +17,7 @@ import akka.ConfigurationException
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.actor.InternalActorRef import akka.actor.InternalActorRef
import akka.dispatch.sysmsg.DeathWatchNotification import akka.dispatch.sysmsg.DeathWatchNotification
import akka.dispatch.sysmsg.Watch
import akka.actor.Deploy import akka.actor.Deploy
/** /**
@ -148,6 +149,8 @@ private[akka] class RemoteWatcher(
log.debug("Received first heartbeat rsp from [{}]", from) log.debug("Received first heartbeat rsp from [{}]", from)
if (watchingNodes(from) && !unreachable(from)) { if (watchingNodes(from) && !unreachable(from)) {
if (!addressUids.contains(from) || addressUids(from) != uid)
reWatch(from)
addressUids += (from -> uid) addressUids += (from -> uid)
failureDetector.heartbeat(from) failureDetector.heartbeat(from)
} }
@ -264,4 +267,18 @@ private[akka] class RemoteWatcher(
failureDetector.heartbeat(address) failureDetector.heartbeat(address)
} }
/**
* To ensure that we receive heartbeat messages from the right actor system
* incarnation we send Watch again for the first HeartbeatRsp (containing
* the system UID) and if HeartbeatRsp contains a new system UID.
* Terminated will be triggered if the watchee (including correct Actor UID)
* does not exist.
*/
def reWatch(address: Address): Unit =
watching.foreach {
case (wee: InternalActorRef, wer: InternalActorRef) if wee.path.address == address
log.debug("Re-watch [{} -> {}]", wer, wee)
wee.sendSystemMessage(Watch(wee, wer)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
}
} }