From 838e8ffbc13b4e9ea95e68f7630bddcbe6a48951 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Fri, 15 Nov 2013 08:59:46 +0100 Subject: [PATCH] =rem #3633 Fix race between EndpointWriter Terminated and TakeOver --- akka-remote/src/main/resources/reference.conf | 7 +++++ .../src/main/scala/akka/remote/Endpoint.scala | 2 ++ .../src/main/scala/akka/remote/Remoting.scala | 9 ++++++ .../FailureInjectorTransportAdapter.scala | 30 ++++++++++++------- .../scala/akka/remote/RemoteConfigSpec.scala | 5 ++++ 5 files changed, 43 insertions(+), 10 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 22387f0b1f..1cd3a31af1 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -475,6 +475,13 @@ akka { } } + ### Default configuration for the failure injector transport adapter + + gremlin { + # Enable debug logging of the failure injector transport adapter + debug = off + } + } } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 89c52ea35a..5025bfa1ce 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -437,6 +437,7 @@ private[remote] object EndpointWriter { * @param handle Handle of the new inbound association. */ case class TakeOver(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded + case class TookOver(writer: ActorRef, handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded case object BackoffTimer case object FlushAndStop case object AckIdleCheckTimer @@ -641,6 +642,7 @@ private[remote] class EndpointWriter( // Shutdown old reader handle foreach { _.disassociate() } handle = Some(newHandle) + sender ! TookOver(self, newHandle) goto(Handoff) case Event(FlushAndStop, _) ⇒ stopReason = AssociationHandle.Shutdown diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index deae9e63d0..fb194e7a47 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -528,6 +528,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case Some(endpoint) ⇒ + pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) + pendingReadHandoffs += endpoint -> handle endpoint ! EndpointWriter.TakeOver(handle) case None ⇒ if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) @@ -566,6 +568,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(endpoint) ⇒ acceptPendingReader(takingOverFrom = endpoint) endpoints.unregisterEndpoint(endpoint) + case EndpointWriter.TookOver(endpoint, handle) ⇒ + removePendingReader(takingOverFrom = endpoint, withHandle = handle) case Prune ⇒ endpoints.prune() case ShutdownAndFlush ⇒ @@ -659,6 +663,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } } + private def removePendingReader(takingOverFrom: ActorRef, withHandle: AkkaProtocolHandle): Unit = { + if (pendingReadHandoffs.get(takingOverFrom).exists(handle ⇒ handle == withHandle)) + pendingReadHandoffs -= takingOverFrom + } + private def createEndpoint(remoteAddress: Address, localAddress: Address, transport: Transport, diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index ed1846dd47..59a339a214 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace +import scala.util.Try @SerialVersionUID(1L) case class FailureInjectorException(msg: String) extends AkkaException(msg) with NoStackTrace @@ -57,6 +58,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor private def rng = ThreadLocalRandom.current() private val log = Logging(extendedSystem, "FailureInjector (gremlin)") + private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("akka.remote.gremlin.debug") @volatile private var upstreamListener: Option[AssociationEventListener] = None private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]() @@ -90,7 +92,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = { // Association is simulated to be failed if there was either an inbound or outbound message drop - if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress)) + if (shouldDropInbound(remoteAddress, Unit, "interceptAssociate") || shouldDropOutbound(remoteAddress, Unit, "interceptAssociate")) statusPromise.failure(new FailureInjectorException("Simulated failure of association to " + remoteAddress)) else statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { handle ⇒ @@ -100,7 +102,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor } def notify(ev: AssociationEvent): Unit = ev match { - case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress) ⇒ //Ignore + case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress, ev, "notify") ⇒ //Ignore case _ ⇒ upstreamListener match { case Some(listener) ⇒ listener notify interceptInboundAssociation(ev) case None ⇒ @@ -112,14 +114,22 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor case _ ⇒ ev } - def shouldDropInbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match { - case PassThru ⇒ false - case Drop(_, inboundDropP) ⇒ rng.nextDouble() <= inboundDropP + def shouldDropInbound(remoteAddress: Address, instance: Any, debugMessage: String): Boolean = chaosMode(remoteAddress) match { + case PassThru ⇒ false + case Drop(_, inboundDropP) ⇒ + if (rng.nextDouble() <= inboundDropP) { + if (shouldDebugLog) log.debug("Dropping inbound [{}] for [{}] {}", instance.getClass, remoteAddress, debugMessage) + true + } else false } - def shouldDropOutbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match { - case PassThru ⇒ false - case Drop(outboundDropP, _) ⇒ rng.nextDouble() <= outboundDropP + def shouldDropOutbound(remoteAddress: Address, instance: Any, debugMessage: String): Boolean = chaosMode(remoteAddress) match { + case PassThru ⇒ false + case Drop(outboundDropP, _) ⇒ + if (rng.nextDouble() <= outboundDropP) { + if (shouldDebugLog) log.debug("Dropping outbound [{}] for [{}] {}", instance.getClass, remoteAddress, debugMessage) + true + } else false } def chaosMode(remoteAddress: Address): GremlinMode = { @@ -147,13 +157,13 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand } override def write(payload: ByteString): Boolean = - if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) wrappedHandle.write(payload) + if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress, payload, "handler.write")) wrappedHandle.write(payload) else true override def disassociate(): Unit = wrappedHandle.disassociate() override def notify(ev: HandleEvent): Unit = - if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress)) + if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress, ev, "handler.notify")) upstreamListener notify ev } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 5d905c2470..508d97bb38 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -131,5 +131,10 @@ class RemoteConfigSpec extends AkkaSpec( sslSettings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA")) sslSettings.SSLRandomNumberGenerator must be(None) } + + "have debug logging of the failure injector turned off in reference.conf" in { + val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.gremlin") + c.getBoolean("debug") must be(false) + } } }