=rem #3879: Stop sysmsg redelivery for unknown systems after a timeout

This commit is contained in:
Endre Sándor Varga 2014-02-19 11:42:41 +01:00
parent a3b8f51079
commit e69d068cc0
5 changed files with 42 additions and 1 deletions

View file

@ -272,6 +272,14 @@ akka {
# resent. # resent.
resend-interval = 2 s resend-interval = 2 s
# WARNING: this setting should not be not changed unless all of its consequences
# are properly understood which assumes experience with remoting internals
# or expert advice.
# This setting defines the time after redelivery attempts of internal management
# signals are stopped to a remote system that has been not confirmed to be alive by
# this system before.
initial-system-message-delivery-timeout = 3 m
### Transports and adapters ### Transports and adapters
# List of the transport drivers that will be loaded by the remoting. # List of the transport drivers that will be loaded by the remoting.

View file

@ -241,6 +241,7 @@ private[remote] class ReliableDeliverySupervisor(
var writer: ActorRef = createWriter() var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid } var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
val bailoutAt: Deadline = Deadline.now + settings.InitialSysMsgDeliveryTimeout
// Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
// UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
// any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
@ -321,6 +322,14 @@ private[remote] class ReliableDeliverySupervisor(
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
case Ungate case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
// If we talk to a system we have not talked to before (or has given up talking to in the past) stop
// system delivery attempts after the specified time. This act will drop the pending system messages and gate the
// remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable
// again it will be immediately quarantined due to out-of-sync system message buffer and becomes quarantined.
// In other words, this action is safe.
if (!uidConfirmed && bailoutAt.isOverdue())
throw new InvalidAssociation(localAddress, remoteAddress,
new java.util.concurrent.TimeoutException("Delivery of system messages timed out and they were dropped."))
writer = createWriter() writer = createWriter()
// Resending will be triggered by the incoming GotUid message after the connection finished // Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive) context.become(receive)

View file

@ -78,6 +78,10 @@ final class RemoteSettings(val config: Config) {
getInt("akka.remote.system-message-buffer-size") getInt("akka.remote.system-message-buffer-size")
} requiring (_ > 0, "system-message-buffer-size must be > 0") } requiring (_ > 0, "system-message-buffer-size must be > 0")
val InitialSysMsgDeliveryTimeout: FiniteDuration = {
config.getMillisDuration("akka.remote.initial-system-message-delivery-timeout")
} requiring (_ > Duration.Zero, "initial-system-message-delivery-timeout must be > 0")
val QuarantineDuration: FiniteDuration = { val QuarantineDuration: FiniteDuration = {
config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero, config.getMillisDuration("akka.remote.prune-quarantine-marker-after").requiring(_ > Duration.Zero,
"prune-quarantine-marker-after must be > 0 ms") "prune-quarantine-marker-after must be > 0 ms")

View file

@ -40,6 +40,7 @@ class RemoteConfigSpec extends AkkaSpec(
SysMsgAckTimeout should be(0.3 seconds) SysMsgAckTimeout should be(0.3 seconds)
SysResendTimeout should be(2 seconds) SysResendTimeout should be(2 seconds)
SysMsgBufferSize should be(1000) SysMsgBufferSize should be(1000)
InitialSysMsgDeliveryTimeout should be(3 minutes)
QuarantineDuration should be(5 days) QuarantineDuration should be(5 days)
CommandAckTimeout.duration should be(30 seconds) CommandAckTimeout.duration should be(30 seconds)
Transports.size should be(1) Transports.size should be(1)

View file

@ -20,6 +20,8 @@ akka {
/watchers.remote = "akka.tcp://other@localhost:2666" /watchers.remote = "akka.tcp://other@localhost:2666"
} }
} }
remote.retry-gate-closed-for = 1 s
remote.initial-system-message-delivery-timeout = 3 s
remote.netty.tcp { remote.netty.tcp {
hostname = "localhost" hostname = "localhost"
port = 0 port = 0
@ -44,7 +46,7 @@ akka {
"receive Terminated when system of de-serialized ActorRef is not running" in { "receive Terminated when system of de-serialized ActorRef is not running" in {
val probe = TestProbe() val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent]) system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent])
val rarp = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] val rarp = RARP(system).provider
// pick an unused port // pick an unused port
val port = TestUtils.temporaryServerAddress().getPort val port = TestUtils.temporaryServerAddress().getPort
// simulate de-serialized ActorRef // simulate de-serialized ActorRef
@ -83,4 +85,21 @@ akka {
expectMsg(60.seconds, ActorIdentity(path, None)) expectMsg(60.seconds, ActorIdentity(path, None))
} }
"quarantine systems after unsuccessful system message delivery if have not communicated before" in {
// Synthesize an ActorRef to a remote system this one has never talked to before.
// This forces ReliableDeliverySupervisor to start with unknown remote system UID.
val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone"
val transport = RARP(system).provider.transport
val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address),
extinctPath, Nobody, props = None, deploy = None)
val probe = TestProbe()
probe.watch(extinctRef)
probe.unwatch(extinctRef)
probe.expectNoMsg(5.seconds)
system.eventStream.subscribe(probe.ref, classOf[Warning])
probe.expectNoMsg(RARP(system).provider.remoteSettings.RetryGateClosedFor * 2)
}
} }