From 94896e8e7539ca6875576db69810d66ce62d3202 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 23 Sep 2015 14:31:26 +0200 Subject: [PATCH] =rem #18339 Use explicit handshake timeout * instead of using transport failure detector * add a new config property akka.remote.handshake-timeout, but for netty.tcp and netty.ssl the existing netty.tcp.connection-timeout setting will be used * add test of the timeouts * mima filter for internal ProtocolStateActor --- .../SurviveNetworkInstabilitySpec.scala | 5 +- akka-remote/src/main/resources/reference.conf | 5 ++ .../transport/AkkaProtocolTransport.scala | 50 +++++++++++++++++-- .../scala/akka/remote/RemoteConfigSpec.scala | 2 + .../test/scala/akka/remote/RemotingSpec.scala | 2 +- .../remote/transport/AkkaProtocolSpec.scala | 48 +++++++++++++++++- project/MiMa.scala | 3 +- 7 files changed, 106 insertions(+), 9 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala index 6f8381c4bf..2988827714 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SurviveNetworkInstabilitySpec.scala @@ -34,7 +34,10 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig { val eighth = role("eighth") commonConfig(debugConfig(on = false).withFallback( - ConfigFactory.parseString("akka.remote.system-message-buffer-size=100")). + ConfigFactory.parseString(""" + akka.remote.system-message-buffer-size=100 + akka.remote.netty.tcp.connection-timeout = 10s + """)). withFallback(MultiNodeClusterSpec.clusterConfig)) testTransport(on = true) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 56c3b6d6e7..19e2bab821 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -98,6 +98,11 @@ akka { # Acknowledgment timeout of management commands sent to the transport stack. command-ack-timeout = 30 s + + # The timeout for outbound associations to perform the handshake. + # If the transport is akka.remote.netty.tcp or akka.remote.netty.ssl + # the configured connection-timeout for the transport will be used instead. + handshake-timeout = 15 s # If set to a nonempty string remoting will use the given dispatcher for # its internal actors otherwise the default dispatcher is used. Please note diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 1c619f0c48..97fe0410be 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -3,6 +3,7 @@ */ package akka.remote.transport +import java.util.concurrent.TimeoutException import akka.ConfigurationException import akka.actor.SupervisorStrategy.Stop import akka.actor._ @@ -43,6 +44,17 @@ private[remote] class AkkaProtocolSettings(config: Config) { val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie") val SecureCookie: Option[String] = if (RequireCookie) Some(getString("akka.remote.secure-cookie")) else None + + val HandshakeTimeout: FiniteDuration = { + val enabledTransports = config.getStringList("akka.remote.enabled-transports") + if (enabledTransports.contains("akka.remote.netty.tcp")) + config.getMillisDuration("akka.remote.netty.tcp.connection-timeout") + else if (enabledTransports.contains("akka.remote.netty.ssl")) + config.getMillisDuration("akka.remote.netty.ssl.connection-timeout") + else + config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero, + "handshake-timeout must be > 0") + } } private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead? @@ -213,6 +225,8 @@ private[transport] object ProtocolStateActor { case object HeartbeatTimer extends NoSerializationVerificationNeeded + case object HandshakeTimer extends NoSerializationVerificationNeeded + final case class Handle(handle: AssociationHandle) extends NoSerializationVerificationNeeded final case class HandleListenerRegistered(listener: HandleEventListener) extends NoSerializationVerificationNeeded @@ -301,6 +315,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } val localAddress = localHandshakeInfo.origin + val handshakeTimerKey = "handshake-timer" initialData match { case d: OutboundUnassociated ⇒ @@ -309,9 +324,12 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case d: InboundUnassociated ⇒ d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self)) + initHandshakeTimer() startWith(WaitHandshake, d) } + initHandshakeTimer() + when(Closed) { // Transport layer events for outbound associations @@ -323,7 +341,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self)) if (sendAssociate(wrappedHandle, localHandshakeInfo)) { failureDetector.heartbeat() - initTimers() + initHeartbeatTimer() goto(WaitHandshake) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle) } else { @@ -335,11 +353,17 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Event(DisassociateUnderlying(_), _) ⇒ stop() + case Event(HandshakeTimer, OutboundUnassociated(_, statusPromise, _)) ⇒ + val errMsg = "No response from remote for outbound association. Associate timed out after " + + s"[${settings.HandshakeTimeout.toMillis} ms]." + statusPromise.failure(new TimeoutException(errMsg)) + stop(FSM.Failure(TimeoutReason(errMsg))) + case _ ⇒ stay() } - // Timeout of this state is implicitly handled by the failure detector + // Timeout of this state is handled by the HandshakeTimer when(WaitHandshake) { case Event(Disassociated(info), _) ⇒ stop(FSM.Failure(info)) @@ -352,6 +376,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Associate(handshakeInfo) ⇒ failureDetector.heartbeat() + cancelTimer(handshakeTimerKey) goto(Open) using AssociatedWaitHandler( notifyOutboundHandler(wrappedHandle, handshakeInfo, statusPromise), wrappedHandle, @@ -381,7 +406,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat if (!settings.RequireCookie || info.cookie == settings.SecureCookie) { sendAssociate(wrappedHandle, localHandshakeInfo) failureDetector.heartbeat() - initTimers() + initHeartbeatTimer() + cancelTimer(handshakeTimerKey) goto(Open) using AssociatedWaitHandler( notifyInboundHandler(wrappedHandle, info, associationHandler), wrappedHandle, @@ -402,6 +428,16 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } + case Event(HandshakeTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒ + sendDisassociate(wrappedHandle, Unknown) + stop(FSM.Failure(TimeoutReason("No response from remote for outbound association. Handshake timed out after " + + s"[${settings.HandshakeTimeout.toMillis} ms]."))) + + case Event(HandshakeTimer, InboundUnassociated(_, wrappedHandle)) ⇒ + sendDisassociate(wrappedHandle, Unknown) + stop(FSM.Failure(TimeoutReason("No response from remote for inbound association. Handshake timed out after " + + s"[${settings.HandshakeTimeout.toMillis} ms]."))) + } when(Open) { @@ -452,10 +488,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat stay() using ListenerReady(listener, wrappedHandle) } - private def initTimers(): Unit = { + private def initHeartbeatTimer(): Unit = { setTimer("heartbeat-timer", HeartbeatTimer, settings.TransportHeartBeatInterval, repeat = true) } + private def initHandshakeTimer(): Unit = { + setTimer(handshakeTimerKey, HandshakeTimer, settings.HandshakeTimeout, repeat = false) + } + private def handleTimers(wrappedHandle: AssociationHandle): State = { if (failureDetector.isAvailable) { sendHeartbeat(wrappedHandle) @@ -464,7 +504,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // send disassociate just to be sure sendDisassociate(wrappedHandle, Unknown) stop(FSM.Failure(TimeoutReason(s"No response from remote. " + - s"Handshake timed out or transport failure detector triggered. (internal state was $stateName)"))) + s"Transport failure detector triggered. (internal state was $stateName)"))) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index e0575a17df..db5d1402a1 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -82,6 +82,8 @@ class RemoteConfigSpec extends AkkaSpec( import s._ ConnectionTimeout should ===(15.seconds) + ConnectionTimeout should ===(new AkkaProtocolSettings(RARP(system).provider.remoteSettings.config) + .HandshakeTimeout) WriteBufferHighWaterMark should ===(None) WriteBufferLowWaterMark should ===(None) SendBufferSize should ===(Some(256000)) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index f2298cd4f2..d0fd090cf3 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -8,7 +8,7 @@ import akka.event.AddressTerminatedTopic import akka.pattern.ask import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload, HandleEvent } import akka.remote.transport._ -import akka.remote.transport.Transport.{ AssociationEvent, InvalidAssociationException } +import akka.remote.transport.Transport.InvalidAssociationException import akka.testkit._ import akka.util.ByteString import com.typesafe.config._ diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 871af1dd73..daecb84094 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -14,6 +14,7 @@ import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.{ Await, Promise } import akka.actor.Deploy +import java.util.concurrent.TimeoutException object AkkaProtocolSpec { @@ -56,7 +57,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re use-passive-connections = on } - """) + """).withFallback(system.settings.config) val localAddress = Address("test", "testsystem", "testhost", 1234) val localAkkaAddress = Address("akka.test", "testsystem", "testhost", 1234) @@ -445,6 +446,51 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re } + "give up outbound after connection timeout" in { + val (failureDetector, registry, transport, handle) = collaborators + handle.writable = false // nothing will be written + transport.associateBehavior.pushConstant(handle) + + val statusPromise: Promise[AssociationHandle] = Promise() + + val conf2 = ConfigFactory.parseString("akka.remote.netty.tcp.connection-timeout = 500 ms"). + withFallback(conf) + + val stateActor = system.actorOf(ProtocolStateActor.outboundProps( + HandshakeInfo(origin = localAddress, uid = 42, cookie = None), + remoteAddress, + statusPromise, + transport, + new AkkaProtocolSettings(conf2), + codec, + failureDetector, + refuseUid = None)) + + watch(stateActor) + intercept[TimeoutException] { + Await.result(statusPromise.future, 5.seconds) + } + expectTerminated(stateActor) + } + + "give up inbound after connection timeout" in { + val (failureDetector, registry, _, handle) = collaborators + + val conf2 = ConfigFactory.parseString("akka.remote.netty.tcp.connection-timeout = 500 ms"). + withFallback(conf) + + val reader = system.actorOf(ProtocolStateActor.inboundProps( + HandshakeInfo(origin = localAddress, uid = 42, cookie = None), + handle, + ActorAssociationEventListener(testActor), + new AkkaProtocolSettings(conf2), + codec, + failureDetector)) + + watch(reader) + expectTerminated(reader) + } + } } diff --git a/project/MiMa.scala b/project/MiMa.scala index f46ae52aa4..348dc4e213 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -34,7 +34,8 @@ object MiMa extends AutoPlugin { val mimaIgnoredProblems = { import com.typesafe.tools.mima.core._ - Seq() + Seq( + FilterAnyProblem("akka.remote.transport.ProtocolStateActor")) // FIXME somehow we must use different filters when akkaPreviousArtifact is 2.3.x /* Below are the filters we used when comparing to 2.3.x