diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala index 09f9e1f338..ff7eca00d4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala @@ -49,10 +49,18 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig { class RemoteChild extends Actor { import context.dispatcher - context.system.scheduler.scheduleOnce(500.millis, self, "boom") + def receive = { + case "hello" ⇒ + context.system.scheduler.scheduleOnce(2.seconds, self, "boom") + sender ! "hello" case "boom" ⇒ throw new SimulatedException - case x ⇒ sender ! x + } + } + + class Echo extends Actor { + def receive = { + case m ⇒ sender ! m } } @@ -70,8 +78,8 @@ class SurviveNetworkInstabilityMultiJvmNode8 extends SurviveNetworkInstabilitySp abstract class SurviveNetworkInstabilitySpec extends MultiNodeSpec(SurviveNetworkInstabilityMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender { + with MultiNodeClusterSpec + with ImplicitSender { import SurviveNetworkInstabilityMultiJvmSpec._ @@ -85,15 +93,31 @@ abstract class SurviveNetworkInstabilitySpec awaitAssert(clusterView.unreachableMembers.map(_.address) should be(expected)) } + system.actorOf(Props[Echo], "echo") + + def assertCanTalk(alive: RoleName*): Unit = { + runOn(alive: _*) { + for (to ← alive) { + val sel = system.actorSelection(node(to) / "user" / "echo") + awaitAssert { + sel ! "ping" + expectMsg(1.second, "ping") + } + } + } + enterBarrier("ping-ok") + } + "A network partition tolerant cluster" must { "reach initial convergence" taggedAs LongRunningTest in { awaitClusterUp(first, second, third, fourth, fifth) enterBarrier("after-1") + assertCanTalk(first, second, third, fourth, fifth) } - "heal after a broken pair" taggedAs LongRunningTest in within(30.seconds) { + "heal after a broken pair" taggedAs LongRunningTest in within(45.seconds) { runOn(first) { testConductor.blackhole(first, second, Direction.Both).await } @@ -119,9 +143,10 @@ abstract class SurviveNetworkInstabilitySpec awaitAllReachable() enterBarrier("after-2") + assertCanTalk(first, second, third, fourth, fifth) } - "heal after one isolated node" taggedAs LongRunningTest in within(30.seconds) { + "heal after one isolated node" taggedAs LongRunningTest in within(45.seconds) { val others = Vector(second, third, fourth, fifth) runOn(first) { for (other ← others) { @@ -145,9 +170,10 @@ abstract class SurviveNetworkInstabilitySpec enterBarrier("repair-3") awaitAllReachable() enterBarrier("after-3") + assertCanTalk((others :+ first): _*) } - "heal two isolated islands" taggedAs LongRunningTest in within(30.seconds) { + "heal two isolated islands" taggedAs LongRunningTest in within(45.seconds) { val island1 = Vector(first, second) val island2 = Vector(third, fourth, fifth) runOn(first) { @@ -175,9 +201,10 @@ abstract class SurviveNetworkInstabilitySpec enterBarrier("repair-4") awaitAllReachable() enterBarrier("after-4") + assertCanTalk((island1 ++ island2): _*) } - "heal after unreachable when ring is changed" taggedAs LongRunningTest in within(45.seconds) { + "heal after unreachable when ring is changed" taggedAs LongRunningTest in within(60.seconds) { val joining = Vector(sixth, seventh) val others = Vector(second, third, fourth, fifth) runOn(first) { @@ -220,9 +247,10 @@ abstract class SurviveNetworkInstabilitySpec awaitMembersUp(roles.size - 1) } enterBarrier("after-5") + assertCanTalk((joining ++ others): _*) } - "down and remove quarantined node" taggedAs LongRunningTest in within(45.seconds) { + "down and remove quarantined node" taggedAs LongRunningTest in within(60.seconds) { val others = Vector(first, third, fourth, fifth, sixth, seventh) runOn(second) { @@ -269,9 +297,10 @@ abstract class SurviveNetworkInstabilitySpec } enterBarrier("after-6") + assertCanTalk(others: _*) } - "continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(45.seconds) { + "continue and move Joining to Up after downing of one half" taggedAs LongRunningTest in within(60.seconds) { // note that second is already removed in previous step val side1 = Vector(first, third, fourth) val side1AfterJoin = side1 :+ eighth @@ -331,8 +360,9 @@ abstract class SurviveNetworkInstabilitySpec } enterBarrier("after-7") + assertCanTalk((side1AfterJoin): _*) } } -} +} \ No newline at end of file diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index d9d695c25e..1c377f00fd 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -359,7 +359,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-7") } - "rebalance to nodes with less shards" in within(30 seconds) { + "rebalance to nodes with less shards" in within(60 seconds) { runOn(fourth) { // third, fourth and fifth are still alive diff --git a/akka-docs/rst/images/association_lifecycle.png b/akka-docs/rst/images/association_lifecycle.png new file mode 100644 index 0000000000..22c3a386da Binary files /dev/null and b/akka-docs/rst/images/association_lifecycle.png differ diff --git a/akka-docs/rst/images/association_lifecycle.svg b/akka-docs/rst/images/association_lifecycle.svg new file mode 100644 index 0000000000..31b0cc54c1 --- /dev/null +++ b/akka-docs/rst/images/association_lifecycle.svg @@ -0,0 +1,447 @@ + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + Quarantined + + Idle + + Active + + Gated + + + Gate time elapses + ● Message send toremote system● Successful inboundconnection + + Communicationfailure:● Failed TCP connection● Transport FD trigger● Name lookup failure● Remote system shutdown + (Connecting or Connected)Messages are deliveredor buffered if needed + + Successfulinbound connection + All outbound messages destined to the gated systemare dropped + Catastrophic communicationfailure:● Remote DeathWatch trigger● System message delivery failure● Cluster MemberRemoved event + All outbound and inboundmessages arriving from thequarantined system aredropped. Remote systemmust be restarted to be ableestablish communication again. + + + + + Successfulinbound our outboundconnection from/to restarted system + + diff --git a/akka-docs/rst/java/remoting.rst b/akka-docs/rst/java/remoting.rst index 0aafdadfd2..745abcb4ab 100644 --- a/akka-docs/rst/java/remoting.rst +++ b/akka-docs/rst/java/remoting.rst @@ -149,6 +149,30 @@ you can advise the system to create a child on that remote node like so: .. includecode:: code/docs/remoting/RemoteDeploymentDocTest.java#deploy + +Lifecycle and Failure Recovery Model +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. image:: ../images/association_lifecycle.png + :align: center + :width: 620 + +Each link with a remote system can be in one of the four states as illustrated above. Before any communication +happens with a remote system at a given ``Address`` the state of the association is ``Idle``. The first time a message +is attempted to be sent to the remote system or an inbound connection is accepted the state of the link transitions to +``Active`` denoting that the two systems has messages to send or receive and no failures were encountered so far. +When a communication failure happens and the connection is lost between the two systems the link becomes ``Gated``. + +In this state the system will not attempt to connect to the remote host and all outbound messages will be dropped. The time +while the link is in the ``Gated`` state is controlled by the setting ``akka.remote.retry-gate-closed-for``: +after this time elapses the link state transitions to ``Idle`` again. ``Gate`` is one-sided in the +sense that whenever a successful *inbound* connection is accepted from a remote system during ``Gate`` it automatically +transitions to ``Active`` and communication resumes immediately. + +In the face of communication failures that are unrecoverable because the state of the participating systems are inconsistent, +the remote system becomes ``Quarantined``. Unlike ``Gate``, quarantining is permanent and lasts until one of the systems +is restarted. After a restart communication can be resumed again and the link can become ``Active`` again. + Watching Remote Actors ^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index a371f1c25e..bc0048a9df 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -29,6 +29,8 @@ configured time of unreachability. This feature is disabled by default, as it al During the deprecation phase ``akka.cluster.auto-down=on`` is interpreted at as instant auto-down. + + ======= Routers ======= @@ -99,6 +101,43 @@ Changed cluster expected-response-after configuration Configuration property ``akka.cluster.failure-detector.heartbeat-request.expected-response-after`` has been renamed to ``akka.cluster.failure-detector.expected-response-after``. +Removed automatic retry feature from Remoting in favor of retry-gate +==================================================================== + +The retry-gate feature is now the only failure handling strategy in Remoting. This change means that when remoting detects faulty +connections it goes into a gated state where all buffered and subsequent remote messages are dropped until the configurable +time defined by the configuration key ``akka.remote.retry-gate-closed-for`` elapses after the failure event. This +behavior prevents reconnect storms and unbounded buffer growth during network instabilities. After the configured +time elapses the gate is lifted and a new connection will be attempted when there are new remote messages to be +delivered. + +In concert with this change all settings related to the old reconnect behavior (``akka.remote.retry-window`` and +``akka.remote.maximum-retries-in-window``) were removed. + +The timeout setting ``akka.remote.gate-invalid-addresses-for`` that controlled the gate interval for certain failure +events is also removed and all gating intervals are now controlled by the ``akka.remote.retry-gate-closed-for`` setting +instead. + +Reduced default sensitivity settings for transport failure detector in Remoting +=============================================================================== + +Since the most commonly used transport with Remoting is TCP, which provides proper connection termination events the failure detector sensitivity +setting ``akka.remote.transport-failure-detector.acceptable-heartbeat-pause`` now defaults to 20 seconds to reduce load induced +false-positive failure detection events in remoting. In case a non-connection-oriented protocol is used it is recommended +to change this and the ``akka.remote.transport-failure-detector.heartbeat-interval`` setting to a more sensitive value. + +Quarantine is now permanent +=========================== + +The setting that controlled the length of quarantine ``akka.remote.quarantine-systems-for`` has been removed. The only +setting available now is ``akka.remote.prune-quarantine-marker-after`` which influences how long quarantine tombstones +are kept around to avoid long-term memory leaks. This new setting defaults to 5 days. + +Remoting uses a dedicated dispatcher by default +=============================================== + +The default value of ``akka.remote.use-dispatcher`` has been changed to a dedicated dispatcher. + Dataflow is Deprecated ====================== diff --git a/akka-docs/rst/scala/remoting.rst b/akka-docs/rst/scala/remoting.rst index d215d8586a..d637c40a53 100644 --- a/akka-docs/rst/scala/remoting.rst +++ b/akka-docs/rst/scala/remoting.rst @@ -156,6 +156,29 @@ you can advise the system to create a child on that remote node like so: .. includecode:: code/docs/remoting/RemoteDeploymentDocSpec.scala#deploy +Lifecycle and Failure Recovery Model +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. image:: ../images/association_lifecycle.png +:align: center + :width: 620 + + Each link with a remote system can be in one of the four states as illustrated above. Before any communication + happens with a remote system at a given ``Address`` the state of the association is ``Idle``. The first time a message +is attempted to be sent to the remote system or an inbound connection is accepted the state of the link transitions to +``Active`` denoting that the two systems has messages to send or receive and no failures were encountered so far. +When a communication failure happens and the connection is lost between the two systems the link becomes ``Gated``. + +In this state the system will not attempt to connect to the remote host and all outbound messages will be dropped. The time +while the link is in the ``Gated`` state is controlled by the setting ``akka.remote.retry-gate-closed-for``: +after this time elapses the link state transitions to ``Idle`` again. ``Gate`` is one-sided in the +sense that whenever a successful *inbound* connection is accepted from a remote system during ``Gate`` it automatically +transitions to ``Active`` and communication resumes immediately. + +In the face of communication failures that are unrecoverable because the state of the participating systems are inconsistent, +the remote system becomes ``Quarantined``. Unlike ``Gate``, quarantining is permanent and lasts until one of the systems +is restarted. After a restart communication can be resumed again and the link can become ``Active`` again. + Watching Remote Actors ^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index c4064a3b39..c1814a8298 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -28,7 +28,9 @@ object RemoteNodeDeathWatchMultiJvmSpec extends MultiNodeConfig { ConfigFactory.parseString(""" akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off - """))) + ## Use a tighter setting than the default, otherwise it takes 20s for DeathWatch to trigger + akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3 s + """))) case class WatchIt(watchee: ActorRef) case class UnwatchIt(watchee: ActorRef) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala index d1fddef7b5..8c54a62cfb 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala @@ -30,7 +30,9 @@ object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig { ConfigFactory.parseString(""" akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off - """))) + akka.remote.transport-failure-detector.heartbeat-interval = 1 s + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s + """))) testTransport(on = true) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala index e12782101d..a368dbd5a3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala @@ -26,7 +26,9 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig { ConfigFactory.parseString(""" akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = INFO - #akka.remote.retry-gate-closed-for = 0.5 s + ## Keep it tight, otherwise reestablishing a connection takes too much time + akka.remote.transport-failure-detector.heartbeat-interval = 1 s + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s akka.remote.gate-invalid-addresses-for = 0.5 s """))) @@ -87,13 +89,9 @@ abstract class RemoteNodeShutdownAndComesBackSpec // Trigger reconnect attempt and also queue up a system message to be in limbo state (UID of remote system // is unknown, and system message is pending) system.stop(subject) - subject ! "hello" - subject ! "hello" - subject ! "hello" // Get rid of old system -- now SHUTDOWN is lost testConductor.shutdown(second).await - expectTerminated(subject, 10.seconds) // At this point the second node is restarting, while the first node is trying to reconnect without resetting // the system message send state @@ -102,8 +100,10 @@ abstract class RemoteNodeShutdownAndComesBackSpec within(30.seconds) { // retry because the Subject actor might not be started yet awaitAssert { - system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "echo" - expectMsg(1.second, "echo") + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject") + expectMsgPF(1 second) { + case ActorIdentity("subject", Some(ref)) ⇒ true + } } } @@ -115,7 +115,10 @@ abstract class RemoteNodeShutdownAndComesBackSpec watch(subjectNew) subjectNew ! "shutdown" - expectTerminated(subjectNew) + fishForMessage(5.seconds) { + case _: ActorIdentity ⇒ false + case Terminated(subjectNew) ⇒ true + } } runOn(second) { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index fdea9fb36f..5a078fef86 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -90,7 +90,7 @@ akka { # that since remoting can load arbitrary 3rd party drivers (see # "enabled-transport" and "adapters" entries) it is not guaranteed that # every module will respect this setting. - use-dispatcher = "" + use-dispatcher = "akka.remote.default-remote-dispatcher" ### Security settings @@ -157,7 +157,7 @@ akka { implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. - heartbeat-interval = 1 s + heartbeat-interval = 4 s # Defines the failure detector threshold. # A low threshold is prone to generate many wrong suspicions but ensures @@ -181,7 +181,7 @@ akka { # This margin is important to be able to survive sudden, occasional, # pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 3 s + acceptable-heartbeat-pause = 10 s } # Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf @@ -219,7 +219,7 @@ akka { # This margin is important to be able to survive sudden, occasional, # pauses in heartbeat arrivals, due to for example garbage collect or # network drop. - acceptable-heartbeat-pause = 4 s + acceptable-heartbeat-pause = 10 s # How often to check for nodes marked as unreachable by the failure @@ -237,35 +237,20 @@ akka { # address as failed. This configuration option controls how much time should # be elapsed before reattempting a new connection. While the address is # gated, all messages sent to the address are delivered to dead-letters. - # If this setting is 0, the remoting will always immediately reattempt - # to establish a failed outbound connection and will buffer writes until - # it succeeds. - retry-gate-closed-for = 0 s + # Since this setting limits the rate of reconnects setting it to a + # very short interval (i.e. less than a second) may result in a storm of + # reconnect attempts. + retry-gate-closed-for = 5 s - # If the retry gate function is disabled (see retry-gate-closed-for) the - # remoting subsystem will always attempt to reestablish failed outbound - # connections. The settings below together control the maximum number of - # reattempts in a given time window. The number of reattempts during - # a window of "retry-window" will be maximum "maximum-retries-in-window". - retry-window = 60 s - maximum-retries-in-window = 3 - - # The length of time to gate an address whose name lookup has failed - # or has explicitly signalled that it will not accept connections - # (remote system is shutting down or the requesting system is quarantined). - # No connection attempts will be made to an address while it remains - # gated. Any messages sent to a gated address will be directed to dead - # letters instead. Name lookups are costly, and the time to recovery - # is typically large, therefore this setting should be a value in the - # order of seconds or minutes. - gate-invalid-addresses-for = 60 s - - # This settings controls how long a system will be quarantined after - # catastrophic communication failures that result in the loss of system - # messages. Quarantining prevents communication with the remote system - # of a given UID. This function can be disabled by setting the value - # to "off". - quarantine-systems-for = 60s + # After catastrophic communication failures that result in the loss of system + # messages or after the remote DeathWatch triggers the remote system gets + # quarantined to prevent inconsistent behavior. + # This setting controls how long the Quarantine marker will be kept around + # before being removed to avoid long-term memory leaks. + # WARNING: DO NOT change this to a small value to re-enable communication with + # quarantined nodes. Such feature is not supported and any behavior between + # the affected systems after lifting the quarantine is undefined. + prune-quarantine-marker-after = 5 d # This setting defines the maximum number of unacknowledged system messages # allowed for a remote system. If this limit is reached the remote system is @@ -280,11 +265,12 @@ akka { # an individual ack. system-message-ack-piggyback-timeout = 0.3 s - # This setting defines the time after messages that have not been + # This setting defines the time after internal management signals + # between actors (used for DeathWatch and supervision) that have not been # explicitly acknowledged or negatively acknowledged are resent. # Messages that were negatively acknowledged are always immediately # resent. - resend-interval = 1 s + resend-interval = 2 s ### Transports and adapters @@ -488,6 +474,19 @@ akka { debug = off } + ### Default dispatcher for the remoting subsystem + + default-remote-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + # Min number of threads to cap factor-based parallelism number to + parallelism-min = 2 + parallelism-max = 2 + } + } + + } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 013a9c84b0..2d105154dc 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -187,12 +187,10 @@ private[remote] class ReliableDeliverySupervisor( val transport: AkkaProtocolTransport, val settings: RemoteSettings, val codec: AkkaPduCodec, - val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor { + val receiveBuffers: ConcurrentHashMap[Link, ResendState]) extends Actor with ActorLogging { import ReliableDeliverySupervisor._ import context.dispatcher - def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero - var autoResendTimer: Option[Cancellable] = None def scheduleAutoResend(): Unit = if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) { @@ -206,20 +204,18 @@ private[remote] class ReliableDeliverySupervisor( scheduleAutoResend() } - override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) { + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ + log.warning("Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason is: [{}].", + remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage) uidConfirmed = false // Need confirmation of UID again - if (retryGateEnabled) { - context.become(gated) - context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) - context.unwatch(writer) - currentHandle = None - context.parent ! StoppedReading(self) - Stop - } else { - Restart - } + context.become(gated) + context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + context.unwatch(writer) + currentHandle = None + context.parent ! StoppedReading(self) + Stop } var currentHandle: Option[AkkaProtocolHandle] = handleOrActive diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 4035423980..b447353aa1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -60,20 +60,8 @@ final class RemoteSettings(val config: Config) { config.getMillisDuration("akka.remote.retry-gate-closed-for") } requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0") - val UnknownAddressGateClosedFor: FiniteDuration = { - config.getMillisDuration("akka.remote.gate-invalid-addresses-for") - } requiring (_ > Duration.Zero, "gate-invalid-addresses-for must be > 0") - val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections") - val MaximumRetriesInWindow: Int = { - getInt("akka.remote.maximum-retries-in-window") - } requiring (_ > 0, "maximum-retries-in-window must be > 0") - - val RetryWindow: FiniteDuration = { - config.getMillisDuration("akka.remote.retry-window") - } requiring (_ > Duration.Zero, "retry-window must be > 0") - val BackoffPeriod: FiniteDuration = { config.getMillisDuration("akka.remote.backoff-interval") } requiring (_ > Duration.Zero, "backoff-interval must be > 0") @@ -90,10 +78,9 @@ final class RemoteSettings(val config: Config) { getInt("akka.remote.system-message-buffer-size") } requiring (_ > 0, "system-message-buffer-size must be > 0") - val QuarantineDuration: Duration = { - if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Undefined - else config.getMillisDuration("akka.remote.quarantine-systems-for").requiring(_ > Duration.Zero, - "quarantine-systems-for must be > 0 or off") + val QuarantineDuration: FiniteDuration = { + Duration(getMilliseconds("akka.remote.prune-quarantine-marker-after"), MILLISECONDS).requiring(_ > Duration.Zero, + "prune-quarantine-marker-after must be > 0 ms") } val CommandAckTimeout: Timeout = { diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 89ec8b6380..2370155743 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -395,16 +395,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒ log.warning("Tried to associate with unreachable remote address [{}]. " + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}", - remoteAddress, settings.UnknownAddressGateClosedFor.toMillis, reason.getMessage) - endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) + remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage) + endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ log.debug("Remote system with address [{}] has shut down. " + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", - remoteAddress, settings.UnknownAddressGateClosedFor.toMillis) - endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop @@ -419,13 +419,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Stop case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ - settings.QuarantineDuration match { - case d: FiniteDuration ⇒ - log.warning("Association to [{}] with unknown UID is irrecoverably failed. " + - "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) - endpoints.markAsFailed(sender, Deadline.now + d) - case _ ⇒ - } + log.warning("Association to [{}] with unknown UID is irrecoverably failed. " + + "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender, Deadline.now + settings.RetryGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) Stop @@ -482,22 +479,18 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender case Quarantine(address, uid) ⇒ - settings.QuarantineDuration match { - case d: FiniteDuration ⇒ - // Stop writers - endpoints.writableEndpointWithPolicyFor(address) match { - case Some(Pass(endpoint)) ⇒ context.stop(endpoint) - case _ ⇒ // nothing to stop - } - // Stop inbound read-only associations - endpoints.readOnlyEndpointFor(address) match { - case Some(endpoint) ⇒ context.stop(endpoint) - case _ ⇒ // nothing to stop - } - endpoints.markAsQuarantined(address, uid, Deadline.now + d) - eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) - case _ ⇒ // Ignore + // Stop writers + endpoints.writableEndpointWithPolicyFor(address) match { + case Some(Pass(endpoint)) ⇒ context.stop(endpoint) + case _ ⇒ // nothing to stop } + // Stop inbound read-only associations + endpoints.readOnlyEndpointFor(address) match { + case Some(endpoint) ⇒ context.stop(endpoint) + case _ ⇒ // nothing to stop + } + endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) + eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 4f54ee05dc..29b7439d63 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -453,8 +453,6 @@ private[transport] class ThrottledAssociation( sender ! SetThrottleAck stay() case Event(Disassociated(info), _) ⇒ - if (upstreamListener ne null) upstreamListener notify Disassociated(info) - originalHandle.disassociate() stop() } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index e7ce152b57..7cb9359b7f 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -380,7 +380,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA // TODO: This should be factored out to an async (or thread-isolated) name lookup service #2960 def addressToSocketAddress(addr: Address): Future[InetSocketAddress] = addr match { - case Address(_, _, Some(host), Some(port)) ⇒ Future { new InetSocketAddress(InetAddress.getByName(host), port) } + case Address(_, _, Some(host), Some(port)) ⇒ Future { blocking { new InetSocketAddress(InetAddress.getByName(host), port) } } case _ ⇒ Future.failed(new IllegalArgumentException(s"Address [$addr] does not contain host or port information.")) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index deae34743d..076208af95 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -19,7 +19,7 @@ class RemoteConfigSpec extends AkkaSpec( akka.remote.netty.tcp.port = 0 """) { - "Remoting" must { + "Remoting" should { "contain correct configuration values in reference.conf" in { val remoteSettings = RARP(system).provider.remoteSettings @@ -33,17 +33,14 @@ class RemoteConfigSpec extends AkkaSpec( ShutdownTimeout.duration should be(10 seconds) FlushWait should be(2 seconds) StartupTimeout.duration should be(10 seconds) - RetryGateClosedFor should be(Duration.Zero) - UnknownAddressGateClosedFor should be(1 minute) - Dispatcher should equal("") + RetryGateClosedFor should be(5 seconds) + Dispatcher should equal("akka.remote.default-remote-dispatcher") UsePassiveConnections should be(true) - MaximumRetriesInWindow should be(3) - RetryWindow should be(60 seconds) BackoffPeriod should be(10 millis) SysMsgAckTimeout should be(0.3 seconds) - SysResendTimeout should be(1 seconds) + SysResendTimeout should be(2 seconds) SysMsgBufferSize should be(1000) - QuarantineDuration should be(60 seconds) + QuarantineDuration should be(5 days) CommandAckTimeout.duration should be(30 seconds) Transports.size should be(1) Transports.head._1 should be(classOf[akka.remote.transport.netty.NettyTransport].getName) @@ -58,7 +55,7 @@ class RemoteConfigSpec extends AkkaSpec( WatchUnreachableReaperInterval should be(1 second) WatchFailureDetectorConfig.getDouble("threshold") should be(10.0 +- 0.0001) WatchFailureDetectorConfig.getInt("max-sample-size") should be(200) - WatchFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(4 seconds) + WatchFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(10 seconds) WatchFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis) remoteSettings.config.getString("akka.remote.log-frame-size-exceeding") should be("off") @@ -72,10 +69,10 @@ class RemoteConfigSpec extends AkkaSpec( SecureCookie should equal(None) TransportFailureDetectorImplementationClass should be(classOf[PhiAccrualFailureDetector].getName) - TransportHeartBeatInterval should equal(1.seconds) + TransportHeartBeatInterval should equal(4.seconds) TransportFailureDetectorConfig.getDouble("threshold") should be(7.0 +- 0.0001) TransportFailureDetectorConfig.getInt("max-sample-size") should be(100) - TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(3 seconds) + TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(10 seconds) TransportFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis) } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index 024f8f83d5..c8ecd7e6a7 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -14,6 +14,7 @@ object AkkaProtocolStressTest { val configA: Config = ConfigFactory parseString (""" akka { #loglevel = DEBUG + actor.serialize-messages = off actor.provider = "akka.remote.RemoteActorRefProvider" remote.log-remote-lifecycle-events = on @@ -22,13 +23,12 @@ object AkkaProtocolStressTest { threshold = 1.0 max-sample-size = 2 min-std-deviation = 1 ms - acceptable-heartbeat-pause = 0.01 s + ## We want lots of lost connections in this test, keep it sensitive + heartbeat-interval = 1 s + acceptable-heartbeat-pause = 1 s } - remote.retry-window = 1 s - # This test drops messages, but dropping too much will make it fail. The reason is that this test - # expects at least a few of the final messages to arrive to prove that the Remoting does not get stuck - # in an irrecoverable state. The retry limit enabled case is covered by the SystemMessageDelivery tests. - remote.maximum-retries-in-window = 100 + ## Keep gate duration in this test for a reasonably low value otherwise too much messages are dropped + remote.retry-gate-closed-for = 1 s remote.netty.tcp { applied-adapters = ["gremlin"] @@ -38,6 +38,8 @@ object AkkaProtocolStressTest { } """) + object ResendFinal + class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor { import context.dispatcher @@ -58,13 +60,25 @@ object AkkaProtocolStressTest { if (seq > maxSeq) { losses += seq - maxSeq - 1 maxSeq = seq - if (seq > limit * 0.9) { + // Due to the (bursty) lossyness of gate, we are happy with receiving at least one message from the upper + // half (> 50000). Since messages are sent in bursts of 2000 0.5 seconds apart, this is reasonable. + // The purpose of this test is not reliable delivery (there is a gremlin with 30% loss anyway) but respecting + // the proper ordering. + if (seq > limit * 0.5) { controller ! ((maxSeq, losses)) + context.system.scheduler.schedule(1.second, 1.second, self, ResendFinal) + context.become(done) } } else { controller ! s"Received out of order message. Previous: ${maxSeq} Received: ${seq}" } } + + // Make sure the other side eventually "gets the message" + def done: Receive = { + case ResendFinal ⇒ + controller ! ((maxSeq, losses)) + } } } diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index a245d3ccd9..1043ff4c5b 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -36,17 +36,19 @@ object SystemMessageDeliveryStressTest { akka { #loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" + actor.serialize-messages = off remote.log-remote-lifecycle-events = on - remote.failure-detector { + remote.transport-failure-detector { threshold = 1.0 max-sample-size = 2 min-std-deviation = 1 ms - acceptable-heartbeat-pause = 0.01 s + heartbeat-interval = 500 ms + acceptable-heartbeat-pause = 2 s } - remote.retry-window = 1 s - remote.maximum-retries-in-window = 2 + ## Keep this setting tight, otherwise the test takes a long time or times out + remote.resend-interval = 0.5 s remote.use-passive-connections = on remote.netty.tcp { @@ -142,12 +144,9 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) } -class SystemMessageDeliveryDefault extends SystemMessageDeliveryStressTest("retry gate off, passive connections on", "") -class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections on", +class SystemMessageDeliveryRetryGate extends SystemMessageDeliveryStressTest("passive connections on", "akka.remote.retry-gate-closed-for = 0.5 s") -class SystemMessageDeliveryNoPassive extends SystemMessageDeliveryStressTest("retry gate off, passive connections off", - "akka.remote.use-passive-connections = off") -class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("retry gate on, passive connections off", +class SystemMessageDeliveryNoPassiveRetryGate extends SystemMessageDeliveryStressTest("passive connections off", """ akka.remote.use-passive-connections = off akka.remote.retry-gate-closed-for = 0.5 s diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 7b88de25a2..2716fbcb17 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -19,6 +19,9 @@ object ThrottlerTransportAdapterSpec { remote.netty.tcp.hostname = "localhost" remote.log-remote-lifecycle-events = off + remote.retry-gate-closed-for = 1 s + remote.transport-failure-detector.heartbeat-interval = 1 s + remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s remote.netty.tcp.applied-adapters = ["trttl"] remote.netty.tcp.port = 0 @@ -115,7 +118,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende here ! "Blackhole 3" false } - }, 5.seconds) + }, 15.seconds) here ! "Cleanup" fishForMessage(5.seconds) {