From aa6bdd197eca5a5adbffd77d64c6fd08dbb03648 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 13 Feb 2014 11:27:40 +0100 Subject: [PATCH] =rem #3870 Stop re-delivery of system messages when watching non-existing sys * We don't know the UID so we can't quarantine, but we can stop the endpoint writer and drop outstanding system messages --- .../akka/cluster/ClusterRemoteWatcher.scala | 2 +- .../remote/RemoteQuarantinePiercingSpec.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 5 ++-- .../scala/akka/remote/RemoteTransport.scala | 5 ++-- .../scala/akka/remote/RemoteWatcher.scala | 4 +-- .../src/main/scala/akka/remote/Remoting.scala | 23 ++++++++++----- .../RemoteConsistentHashingRouterSpec.scala | 4 +-- .../akka/remote/RemoteDeathWatchSpec.scala | 28 ++++++++++++++++++- .../scala/akka/remote/RemoteWatcherSpec.scala | 14 +++++----- 9 files changed, 62 insertions(+), 25 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 2fa54fe728..aeb8ab4d88 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -89,7 +89,7 @@ private[cluster] class ClusterRemoteWatcher( if (m.address != selfAddress) { clusterNodes -= m.address if (previousStatus == MemberStatus.Down) { - quarantine(m.address, m.uniqueAddress.uid) + quarantine(m.address, Some(m.uniqueAddress.uid)) } publishAddressTerminated(m.address) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index 295023bd69..f73e3980ce 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -67,7 +67,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti enterBarrier("actor-identified") // Manually Quarantine the other system - RARP(system).provider.transport.quarantine(node(second).address, uidFirst) + RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst)) // Quarantine is up -- Cannot communicate with remote system any more system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2491bff977..f305448cb5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -417,9 +417,10 @@ private[akka] class RemoteActorRefProvider( /** * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. * @param address Address of the remote system to be quarantined - * @param uid UID of the remote system + * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but + * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Int): Unit = transport.quarantine(address: Address, uid: Int) + def quarantine(address: Address, uid: Option[Int]): Unit = transport.quarantine(address, uid) /** * INTERNAL API diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 37a14d7d3b..245519dae6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -85,9 +85,10 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va /** * Marks a remote system as out of sync and prevents reconnects until the quarantine timeout elapses. * @param address Address of the remote system to be quarantined - * @param uid UID of the remote system + * @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but + * the current endpoint writer will be stopped (dropping system messages) and the address will be gated */ - def quarantine(address: Address, uid: Int): Unit + def quarantine(address: Address, uid: Option[Int]): Unit /** * When this method returns true, some functionality will be turned off for security purposes. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index ee08458d90..69d84d7241 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -164,7 +164,7 @@ private[akka] class RemoteWatcher( watchingNodes foreach { a ⇒ if (!unreachable(a) && !failureDetector.isAvailable(a)) { log.warning("Detected unreachable: [{}]", a) - addressUids.get(a) foreach { uid ⇒ quarantine(a, uid) } + quarantine(a, addressUids.get(a)) publishAddressTerminated(a) unreachable += a } @@ -173,7 +173,7 @@ private[akka] class RemoteWatcher( def publishAddressTerminated(address: Address): Unit = context.system.eventStream.publish(AddressTerminated(address)) - def quarantine(address: Address, uid: Int): Unit = + def quarantine(address: Address, uid: Option[Int]): Unit = remoteProvider.quarantine(address, uid) def rewatchRemote(watchee: ActorRef, watcher: ActorRef): Unit = diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 699114c970..c4df325c04 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -211,7 +211,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null) } - override def quarantine(remoteAddress: Address, uid: Int): Unit = endpointManager match { + override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = endpointManager match { case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid) case _ ⇒ throw new RemoteTransportExceptionNoStackTrace( s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null) @@ -244,7 +244,7 @@ private[remote] object EndpointManager { // acknowledged delivery buffers def seq = seqOpt.get } - case class Quarantine(remoteAddress: Address, uid: Int) extends RemotingCommand + case class Quarantine(remoteAddress: Address, uid: Option[Int]) extends RemotingCommand case class ManagementCommand(cmd: Any) extends RemotingCommand case class ManagementCommandAck(status: Boolean) @@ -479,19 +479,28 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender() - case Quarantine(address, uid) ⇒ + case Quarantine(address, uidOption) ⇒ // Stop writers endpoints.writableEndpointWithPolicyFor(address) match { - case Some(Pass(endpoint)) ⇒ context.stop(endpoint) - case _ ⇒ // nothing to stop + case Some(Pass(endpoint)) ⇒ + context.stop(endpoint) + if (uidOption.isEmpty) { + log.warning("Association to [{}] with unknown UID is reported as quarantined, but " + + "address cannot be quarantined without knowing the UID, gating instead for {} ms.", + address, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor) + } + case _ ⇒ // nothing to stop } // Stop inbound read-only associations endpoints.readOnlyEndpointFor(address) match { case Some(endpoint) ⇒ context.stop(endpoint) case _ ⇒ // nothing to stop } - endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) + uidOption foreach { uid ⇒ + endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) + eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) + } case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala index f844403951..e1ec1450ac 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConsistentHashingRouterSpec.scala @@ -27,8 +27,8 @@ class RemoteConsistentHashingRouterSpec extends AkkaSpec(""" val consistentHash1 = ConsistentHash(nodes1, 10) val consistentHash2 = ConsistentHash(nodes2, 10) val keys = List("A", "B", "C", "D", "E", "F", "G") - val result1 = keys collect { case k => consistentHash1.nodeFor(k).routee } - val result2 = keys collect { case k => consistentHash2.nodeFor(k).routee } + val result1 = keys collect { case k ⇒ consistentHash1.nodeFor(k).routee } + val result2 = keys collect { case k ⇒ consistentHash2.nodeFor(k).routee } result1 should be(result2) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 6c345ac7cd..bfa1072f72 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -8,6 +8,8 @@ import akka.actor._ import com.typesafe.config.ConfigFactory import akka.actor.RootActorPath import scala.concurrent.duration._ +import akka.TestUtils +import akka.event.Logging.Warning @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" @@ -37,7 +39,31 @@ akka { shutdown(other) } - override def expectedTestDuration: FiniteDuration = 90.seconds + override def expectedTestDuration: FiniteDuration = 120.seconds + + "receive Terminated when system of de-serialized ActorRef is not running" in { + val probe = TestProbe() + system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent]) + val rarp = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] + // pick an unused port + val port = TestUtils.temporaryServerAddress().getPort + // simulate de-serialized ActorRef + val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294") + system.actorOf(Props(new Actor { + context.watch(ref) + def receive = { + case Terminated(r) ⇒ testActor ! r + } + }).withDeploy(Deploy.local)) + + expectMsg(20.seconds, ref) + // we don't expect real quarantine when the UID is unknown, i.e. QuarantinedEvent is not published + probe.expectNoMsg(3.seconds) + // The following verifies ticket #3870, i.e. make sure that re-delivery of Watch message is stopped. + // It was observed as periodic logging of "address is now gated" when the gate was lifted. + system.eventStream.subscribe(probe.ref, classOf[Warning]) + probe.expectNoMsg(rarp.remoteSettings.RetryGateClosedFor * 2) + } "receive Terminated when watched node is unknown host" in { val path = RootActorPath(Address("akka.tcp", system.name, "unknownhost", 2552)) / "user" / "subject" diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index cb03d9c1e4..312c66cf2d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -46,7 +46,7 @@ object RemoteWatcherSpec { object TestRemoteWatcher { case class AddressTerm(address: Address) - case class Quarantined(address: Address, uid: Int) + case class Quarantined(address: Address, uid: Option[Int]) } class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector, @@ -61,7 +61,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Int): Unit = { + override def quarantine(address: Address, uid: Option[Int]): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } @@ -200,7 +200,7 @@ class RemoteWatcherSpec extends AkkaSpec( // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) - q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid))) } } @@ -235,8 +235,8 @@ class RemoteWatcherSpec extends AkkaSpec( // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) - // no quarantine when missing first heartbeat, uid unknown - q.expectNoMsg(1 second) + // no real quarantine when missing first heartbeat, uid unknown + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, None)) } } @@ -273,7 +273,7 @@ class RemoteWatcherSpec extends AkkaSpec( // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) - q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, Some(remoteAddressUid))) } } @@ -318,7 +318,7 @@ class RemoteWatcherSpec extends AkkaSpec( // but no HeartbeatRsp monitorA ! ReapUnreachableTick p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address)) - q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid)) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, Some(remoteAddressUid))) } }