Merge pull request #18542 from akka/wip-18339-handshake-timeout-patriknw

=rem #18339 Use explicit handshake timeout
This commit is contained in:
Patrik Nordwall 2015-10-19 16:04:51 +02:00
commit 82f67f82f9
7 changed files with 106 additions and 9 deletions

View file

@ -34,7 +34,10 @@ object SurviveNetworkInstabilityMultiJvmSpec extends MultiNodeConfig {
val eighth = role("eighth")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("akka.remote.system-message-buffer-size=100")).
ConfigFactory.parseString("""
akka.remote.system-message-buffer-size=100
akka.remote.netty.tcp.connection-timeout = 10s
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)

View file

@ -98,6 +98,11 @@ akka {
# Acknowledgment timeout of management commands sent to the transport stack.
command-ack-timeout = 30 s
# The timeout for outbound associations to perform the handshake.
# If the transport is akka.remote.netty.tcp or akka.remote.netty.ssl
# the configured connection-timeout for the transport will be used instead.
handshake-timeout = 15 s
# If set to a nonempty string remoting will use the given dispatcher for
# its internal actors otherwise the default dispatcher is used. Please note

View file

@ -3,6 +3,7 @@
*/
package akka.remote.transport
import java.util.concurrent.TimeoutException
import akka.ConfigurationException
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
@ -43,6 +44,17 @@ private[remote] class AkkaProtocolSettings(config: Config) {
val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie")
val SecureCookie: Option[String] = if (RequireCookie) Some(getString("akka.remote.secure-cookie")) else None
val HandshakeTimeout: FiniteDuration = {
val enabledTransports = config.getStringList("akka.remote.enabled-transports")
if (enabledTransports.contains("akka.remote.netty.tcp"))
config.getMillisDuration("akka.remote.netty.tcp.connection-timeout")
else if (enabledTransports.contains("akka.remote.netty.ssl"))
config.getMillisDuration("akka.remote.netty.ssl.connection-timeout")
else
config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero,
"handshake-timeout must be > 0")
}
}
private[remote] object AkkaProtocolTransport { //Couldn't these go into the Remoting Extension/ RemoteSettings instead?
@ -213,6 +225,8 @@ private[transport] object ProtocolStateActor {
case object HeartbeatTimer extends NoSerializationVerificationNeeded
case object HandshakeTimer extends NoSerializationVerificationNeeded
final case class Handle(handle: AssociationHandle) extends NoSerializationVerificationNeeded
final case class HandleListenerRegistered(listener: HandleEventListener) extends NoSerializationVerificationNeeded
@ -301,6 +315,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
val localAddress = localHandshakeInfo.origin
val handshakeTimerKey = "handshake-timer"
initialData match {
case d: OutboundUnassociated
@ -309,9 +324,12 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case d: InboundUnassociated
d.wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(self))
initHandshakeTimer()
startWith(WaitHandshake, d)
}
initHandshakeTimer()
when(Closed) {
// Transport layer events for outbound associations
@ -323,7 +341,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
wrappedHandle.readHandlerPromise.trySuccess(ActorHandleEventListener(self))
if (sendAssociate(wrappedHandle, localHandshakeInfo)) {
failureDetector.heartbeat()
initTimers()
initHeartbeatTimer()
goto(WaitHandshake) using OutboundUnderlyingAssociated(statusPromise, wrappedHandle)
} else {
@ -335,11 +353,17 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Event(DisassociateUnderlying(_), _)
stop()
case Event(HandshakeTimer, OutboundUnassociated(_, statusPromise, _))
val errMsg = "No response from remote for outbound association. Associate timed out after " +
s"[${settings.HandshakeTimeout.toMillis} ms]."
statusPromise.failure(new TimeoutException(errMsg))
stop(FSM.Failure(TimeoutReason(errMsg)))
case _ stay()
}
// Timeout of this state is implicitly handled by the failure detector
// Timeout of this state is handled by the HandshakeTimer
when(WaitHandshake) {
case Event(Disassociated(info), _)
stop(FSM.Failure(info))
@ -352,6 +376,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Associate(handshakeInfo)
failureDetector.heartbeat()
cancelTimer(handshakeTimerKey)
goto(Open) using AssociatedWaitHandler(
notifyOutboundHandler(wrappedHandle, handshakeInfo, statusPromise),
wrappedHandle,
@ -381,7 +406,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
if (!settings.RequireCookie || info.cookie == settings.SecureCookie) {
sendAssociate(wrappedHandle, localHandshakeInfo)
failureDetector.heartbeat()
initTimers()
initHeartbeatTimer()
cancelTimer(handshakeTimerKey)
goto(Open) using AssociatedWaitHandler(
notifyInboundHandler(wrappedHandle, info, associationHandler),
wrappedHandle,
@ -402,6 +428,16 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
case Event(HandshakeTimer, OutboundUnderlyingAssociated(_, wrappedHandle))
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason("No response from remote for outbound association. Handshake timed out after " +
s"[${settings.HandshakeTimeout.toMillis} ms].")))
case Event(HandshakeTimer, InboundUnassociated(_, wrappedHandle))
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason("No response from remote for inbound association. Handshake timed out after " +
s"[${settings.HandshakeTimeout.toMillis} ms].")))
}
when(Open) {
@ -452,10 +488,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
stay() using ListenerReady(listener, wrappedHandle)
}
private def initTimers(): Unit = {
private def initHeartbeatTimer(): Unit = {
setTimer("heartbeat-timer", HeartbeatTimer, settings.TransportHeartBeatInterval, repeat = true)
}
private def initHandshakeTimer(): Unit = {
setTimer(handshakeTimerKey, HandshakeTimer, settings.HandshakeTimeout, repeat = false)
}
private def handleTimers(wrappedHandle: AssociationHandle): State = {
if (failureDetector.isAvailable) {
sendHeartbeat(wrappedHandle)
@ -464,7 +504,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
// send disassociate just to be sure
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason(s"No response from remote. " +
s"Handshake timed out or transport failure detector triggered. (internal state was $stateName)")))
s"Transport failure detector triggered. (internal state was $stateName)")))
}
}

View file

@ -82,6 +82,8 @@ class RemoteConfigSpec extends AkkaSpec(
import s._
ConnectionTimeout should ===(15.seconds)
ConnectionTimeout should ===(new AkkaProtocolSettings(RARP(system).provider.remoteSettings.config)
.HandshakeTimeout)
WriteBufferHighWaterMark should ===(None)
WriteBufferLowWaterMark should ===(None)
SendBufferSize should ===(Some(256000))

View file

@ -8,7 +8,7 @@ import akka.event.AddressTerminatedTopic
import akka.pattern.ask
import akka.remote.transport.AssociationHandle.{ HandleEventListener, InboundPayload, HandleEvent }
import akka.remote.transport._
import akka.remote.transport.Transport.{ AssociationEvent, InvalidAssociationException }
import akka.remote.transport.Transport.InvalidAssociationException
import akka.testkit._
import akka.util.ByteString
import com.typesafe.config._

View file

@ -14,6 +14,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.{ Await, Promise }
import akka.actor.Deploy
import java.util.concurrent.TimeoutException
object AkkaProtocolSpec {
@ -56,7 +57,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
use-passive-connections = on
}
""")
""").withFallback(system.settings.config)
val localAddress = Address("test", "testsystem", "testhost", 1234)
val localAkkaAddress = Address("akka.test", "testsystem", "testhost", 1234)
@ -445,6 +446,51 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
}
"give up outbound after connection timeout" in {
val (failureDetector, registry, transport, handle) = collaborators
handle.writable = false // nothing will be written
transport.associateBehavior.pushConstant(handle)
val statusPromise: Promise[AssociationHandle] = Promise()
val conf2 = ConfigFactory.parseString("akka.remote.netty.tcp.connection-timeout = 500 ms").
withFallback(conf)
val stateActor = system.actorOf(ProtocolStateActor.outboundProps(
HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
remoteAddress,
statusPromise,
transport,
new AkkaProtocolSettings(conf2),
codec,
failureDetector,
refuseUid = None))
watch(stateActor)
intercept[TimeoutException] {
Await.result(statusPromise.future, 5.seconds)
}
expectTerminated(stateActor)
}
"give up inbound after connection timeout" in {
val (failureDetector, registry, _, handle) = collaborators
val conf2 = ConfigFactory.parseString("akka.remote.netty.tcp.connection-timeout = 500 ms").
withFallback(conf)
val reader = system.actorOf(ProtocolStateActor.inboundProps(
HandshakeInfo(origin = localAddress, uid = 42, cookie = None),
handle,
ActorAssociationEventListener(testActor),
new AkkaProtocolSettings(conf2),
codec,
failureDetector))
watch(reader)
expectTerminated(reader)
}
}
}

View file

@ -34,7 +34,8 @@ object MiMa extends AutoPlugin {
val mimaIgnoredProblems = {
import com.typesafe.tools.mima.core._
Seq()
Seq(
FilterAnyProblem("akka.remote.transport.ProtocolStateActor"))
// FIXME somehow we must use different filters when akkaPreviousArtifact is 2.3.x
/* Below are the filters we used when comparing to 2.3.x