diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ac9aae541a..51c6b4214d 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -272,6 +272,14 @@ akka { # resent. resend-interval = 2 s + # WARNING: this setting should not be not changed unless all of its consequences + # are properly understood which assumes experience with remoting internals + # or expert advice. + # This setting defines the time after redelivery attempts of internal management + # signals are stopped to a remote system that has been not confirmed to be alive by + # this system before. + initial-system-message-delivery-timeout = 3 m + ### Transports and adapters # List of the transport drivers that will be loaded by the remoting. diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 90a5c240ed..8a43138a74 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -241,6 +241,7 @@ private[remote] class ReliableDeliverySupervisor( var writer: ActorRef = createWriter() var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid } + val bailoutAt: Deadline = Deadline.now + settings.InitialSysMsgDeliveryTimeout // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore @@ -321,6 +322,14 @@ private[remote] class ReliableDeliverySupervisor( context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) case Ungate ⇒ if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { + // If we talk to a system we have not talked to before (or has given up talking to in the past) stop + // system delivery attempts after the specified time. This act will drop the pending system messages and gate the + // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable + // again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined. + // In other words, this action is safe. + if (!uidConfirmed && bailoutAt.isOverdue()) + throw new InvalidAssociation(localAddress, remoteAddress, + new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped.")) writer = createWriter() // Resending will be triggered by the incoming GotUid message after the connection finished context.become(receive) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index bdf2e6c836..930a8ebb2d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -78,6 +78,10 @@ final class RemoteSettings(val config: Config) { getInt("akka.remote.system-message-buffer-size") } requiring (_ > 0, "system-message-buffer-size must be > 0") + val InitialSysMsgDeliveryTimeout: FiniteDuration = { + config.getMillisDuration("akka.remote.initial-system-message-delivery-timeout") + } requiring (_ > Duration.Zero, "initial-system-message-delivery-timeout must be > 0") + val QuarantineDuration: FiniteDuration = { config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero, "prune-quarantine-marker-after must be > 0 ms") diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 27ec6a2001..f1b458c6b8 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -40,6 +40,7 @@ class RemoteConfigSpec extends AkkaSpec( SysMsgAckTimeout should be(0.3 seconds) SysResendTimeout should be(2 seconds) SysMsgBufferSize should be(1000) + InitialSysMsgDeliveryTimeout should be(3 minutes) QuarantineDuration should be(5 days) CommandAckTimeout.duration should be(30 seconds) Transports.size should be(1) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index bfa1072f72..ed4beec3ac 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -20,6 +20,8 @@ akka { /watchers.remote = "akka.tcp://other@localhost:2666" } } + remote.retry-gate-closed-for = 1 s + remote.initial-system-message-delivery-timeout = 3 s remote.netty.tcp { hostname = "localhost" port = 0 @@ -44,7 +46,7 @@ akka { "receive Terminated when system of de-serialized ActorRef is not running" in { val probe = TestProbe() system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent]) - val rarp = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] + val rarp = RARP(system).provider // pick an unused port val port = TestUtils.temporaryServerAddress().getPort // simulate de-serialized ActorRef @@ -83,4 +85,21 @@ akka { expectMsg(60.seconds, ActorIdentity(path, None)) } + "quarantine systems after unsuccessful system message delivery if have not communicated before" in { + // Synthesize an ActorRef to a remote system this one has never talked to before. + // This forces ReliableDeliverySupervisor to start with unknown remote system UID. + val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone" + val transport = RARP(system).provider.transport + val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address), + extinctPath, Nobody, props = None, deploy = None) + + val probe = TestProbe() + probe.watch(extinctRef) + probe.unwatch(extinctRef) + + probe.expectNoMsg(5.seconds) + system.eventStream.subscribe(probe.ref, classOf[Warning]) + probe.expectNoMsg(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2) + } + }