=rem #3870 Stop re-delivery of system messages when watching non-existing sys

* We don't know the UID so we can't quarantine, but we can stop the endpoint
  writer and drop outstanding system messages
This commit is contained in:
Patrik Nordwall 2014-02-13 11:27:40 +01:00
parent f1edf78979
commit aa6bdd197e
9 changed files with 62 additions and 25 deletions

View file

@ -89,7 +89,7 @@ private[cluster] class ClusterRemoteWatcher(
if (m.address != selfAddress) { if (m.address != selfAddress) {
clusterNodes -= m.address clusterNodes -= m.address
if (previousStatus == MemberStatus.Down) { if (previousStatus == MemberStatus.Down) {
quarantine(m.address, m.uniqueAddress.uid) quarantine(m.address, Some(m.uniqueAddress.uid))
} }
publishAddressTerminated(m.address) publishAddressTerminated(m.address)
} }

View file

@ -67,7 +67,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti
enterBarrier("actor-identified") enterBarrier("actor-identified")
// Manually Quarantine the other system // Manually Quarantine the other system
RARP(system).provider.transport.quarantine(node(second).address, uidFirst) RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst))
// Quarantine is up -- Cannot communicate with remote system any more // Quarantine is up -- Cannot communicate with remote system any more
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify"

View file

@ -417,9 +417,10 @@ private[akka] class RemoteActorRefProvider(
/** /**
* Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses.
* @param address Address of the remote system to be quarantined * @param address Address of the remote system to be quarantined
* @param uid UID of the remote system * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/ */
def quarantine(address: Address, uid: Int): Unit = transport.quarantine(address: Address, uid: Int) def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid)
/** /**
* INTERNAL API * INTERNAL API

View file

@ -85,9 +85,10 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
/** /**
* Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses.
* @param address Address of the remote system to be quarantined * @param address Address of the remote system to be quarantined
* @param uid UID of the remote system * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/ */
def quarantine(address: Address, uid: Int): Unit def quarantine(address: Address, uid: Option[Int]): Unit
/** /**
* When this method returns true, some functionality will be turned off for security purposes. * When this method returns true, some functionality will be turned off for security purposes.

View file

@ -164,7 +164,7 @@ private[akka] class RemoteWatcher(
watchingNodes foreach { a watchingNodes foreach { a
if (!unreachable(a) && !failureDetector.isAvailable(a)) { if (!unreachable(a) && !failureDetector.isAvailable(a)) {
log.warning("Detected unreachable: [{}]", a) log.warning("Detected unreachable: [{}]", a)
addressUids.get(a) foreach { uid quarantine(a, uid) } quarantine(a, addressUids.get(a))
publishAddressTerminated(a) publishAddressTerminated(a)
unreachable += a unreachable += a
} }
@ -173,7 +173,7 @@ private[akka] class RemoteWatcher(
def publishAddressTerminated(address: Address): Unit = def publishAddressTerminated(address: Address): Unit =
context.system.eventStream.publish(AddressTerminated(address)) context.system.eventStream.publish(AddressTerminated(address))
def quarantine(address: Address, uid: Int): Unit = def quarantine(address: Address, uid: Option[Int]): Unit =
remoteProvider.quarantine(address, uid) remoteProvider.quarantine(address, uid)
def rewatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = def rewatchRemote(watchee: ActorRef, watcher: ActorRef): Unit =

View file

@ -211,7 +211,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
case None throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null) case None throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
} }
override def quarantine(remoteAddress: Address, uid: Int): Unit = endpointManager match { override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match {
case Some(manager) manager ! Quarantine(remoteAddress, uid) case Some(manager) manager ! Quarantine(remoteAddress, uid)
case _ throw new RemoteTransportExceptionNoStackTrace( case _ throw new RemoteTransportExceptionNoStackTrace(
s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null) s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
@ -244,7 +244,7 @@ private[remote] object EndpointManager {
// acknowledged delivery buffers // acknowledged delivery buffers
def seq = seqOpt.get def seq = seqOpt.get
} }
case class Quarantine(remoteAddress: Address, uid: Int) extends RemotingCommand case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand
case class ManagementCommand(cmd: Any) extends RemotingCommand case class ManagementCommand(cmd: Any) extends RemotingCommand
case class ManagementCommandAck(status: Boolean) case class ManagementCommandAck(status: Boolean)
@ -479,19 +479,28 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender() Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()
case Quarantine(address, uid) case Quarantine(address, uidOption)
// Stop writers // Stop writers
endpoints.writableEndpointWithPolicyFor(address) match { endpoints.writableEndpointWithPolicyFor(address) match {
case Some(Pass(endpoint)) context.stop(endpoint) case Some(Pass(endpoint))
case _ // nothing to stop context.stop(endpoint)
if (uidOption.isEmpty) {
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
address, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
}
case _ // nothing to stop
} }
// Stop inbound read-only associations // Stop inbound read-only associations
endpoints.readOnlyEndpointFor(address) match { endpoints.readOnlyEndpointFor(address) match {
case Some(endpoint) context.stop(endpoint) case Some(endpoint) context.stop(endpoint)
case _ // nothing to stop case _ // nothing to stop
} }
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) uidOption foreach { uid
eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
}
case s @ Send(message, senderOption, recipientRef, _) case s @ Send(message, senderOption, recipientRef, _)
val recipientAddress = recipientRef.path.address val recipientAddress = recipientRef.path.address

View file

@ -27,8 +27,8 @@ class RemoteConsistentHashingRouterSpec extends AkkaSpec("""
val consistentHash1 = ConsistentHash(nodes1, 10) val consistentHash1 = ConsistentHash(nodes1, 10)
val consistentHash2 = ConsistentHash(nodes2, 10) val consistentHash2 = ConsistentHash(nodes2, 10)
val keys = List("A", "B", "C", "D", "E", "F", "G") val keys = List("A", "B", "C", "D", "E", "F", "G")
val result1 = keys collect { case k => consistentHash1.nodeFor(k).routee } val result1 = keys collect { case k consistentHash1.nodeFor(k).routee }
val result2 = keys collect { case k => consistentHash2.nodeFor(k).routee } val result2 = keys collect { case k consistentHash2.nodeFor(k).routee }
result1 should be(result2) result1 should be(result2)
} }

View file

@ -8,6 +8,8 @@ import akka.actor._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.RootActorPath import akka.actor.RootActorPath
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.TestUtils
import akka.event.Logging.Warning
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString("""
@ -37,7 +39,31 @@ akka {
shutdown(other) shutdown(other)
} }
override def expectedTestDuration: FiniteDuration = 90.seconds override def expectedTestDuration: FiniteDuration = 120.seconds
"receive Terminated when system of de-serialized ActorRef is not running" in {
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent])
val rarp = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]
// pick an unused port
val port = TestUtils.temporaryServerAddress().getPort
// simulate de-serialized ActorRef
val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294")
system.actorOf(Props(new Actor {
context.watch(ref)
def receive = {
case Terminated(r) testActor ! r
}
}).withDeploy(Deploy.local))
expectMsg(20.seconds, ref)
// we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published
probe.expectNoMsg(3.seconds)
// The following verifies ticket #3870, i.e. make sure that re-delivery of Watch message is stopped.
// It was observed as periodic logging of "address is now gated" when the gate was lifted.
system.eventStream.subscribe(probe.ref, classOf[Warning])
probe.expectNoMsg(rarp.remoteSettings.RetryGateClosedFor * 2)
}
"receive Terminated when watched node is unknown host" in { "receive Terminated when watched node is unknown host" in {
val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject"

View file

@ -46,7 +46,7 @@ object RemoteWatcherSpec {
object TestRemoteWatcher { object TestRemoteWatcher {
case class AddressTerm(address: Address) case class AddressTerm(address: Address)
case class Quarantined(address: Address, uid: Int) case class Quarantined(address: Address, uid: Option[Int])
} }
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector, class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
@ -61,7 +61,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background // that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Int): Unit = { override def quarantine(address: Address, uid: Option[Int]): Unit = {
// don't quarantine in remoting, but publish a testable message // don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
} }
@ -200,7 +200,7 @@ class RemoteWatcherSpec extends AkkaSpec(
// but no HeartbeatRsp // but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
} }
} }
@ -235,8 +235,8 @@ class RemoteWatcherSpec extends AkkaSpec(
// but no HeartbeatRsp // but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
// no quarantine when missing first heartbeat, uid unknown // no real quarantine when missing first heartbeat, uid unknown
q.expectNoMsg(1 second) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, None))
} }
} }
@ -273,7 +273,7 @@ class RemoteWatcherSpec extends AkkaSpec(
// but no HeartbeatRsp // but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid)))
} }
} }
@ -318,7 +318,7 @@ class RemoteWatcherSpec extends AkkaSpec(
// but no HeartbeatRsp // but no HeartbeatRsp
monitorA ! ReapUnreachableTick monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address)) p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, Some(remoteAddressUid)))
} }
} }