=rem #3633 Fix race between EndpointWriter Terminated and TakeOver

This commit is contained in:
Björn Antonsson 2013-11-15 08:59:46 +01:00
parent e104e2a92c
commit 838e8ffbc1
5 changed files with 43 additions and 10 deletions

View file

@ -475,6 +475,13 @@ akka {
}
}
### Default configuration for the failure injector transport adapter
gremlin {
# Enable debug logging of the failure injector transport adapter
debug = off
}
}
}

View file

@ -437,6 +437,7 @@ private[remote] object EndpointWriter {
* @param handle Handle of the new inbound association.
*/
case class TakeOver(handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded
case class TookOver(writer: ActorRef, handle: AkkaProtocolHandle) extends NoSerializationVerificationNeeded
case object BackoffTimer
case object FlushAndStop
case object AckIdleCheckTimer
@ -641,6 +642,7 @@ private[remote] class EndpointWriter(
// Shutdown old reader
handle foreach { _.disassociate() }
handle = Some(newHandle)
sender ! TookOver(self, newHandle)
goto(Handoff)
case Event(FlushAndStop, _)
stopReason = AssociationHandle.Shutdown

View file

@ -528,6 +528,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint)
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
pendingReadHandoffs += endpoint -> handle
endpoint ! EndpointWriter.TakeOver(handle)
case None
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
@ -566,6 +568,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Terminated(endpoint)
acceptPendingReader(takingOverFrom = endpoint)
endpoints.unregisterEndpoint(endpoint)
case EndpointWriter.TookOver(endpoint, handle)
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
case Prune
endpoints.prune()
case ShutdownAndFlush
@ -659,6 +663,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
}
private def removePendingReader(takingOverFrom: ActorRef, withHandle: AkkaProtocolHandle): Unit = {
if (pendingReadHandoffs.get(takingOverFrom).exists(handle handle == withHandle))
pendingReadHandoffs -= takingOverFrom
}
private def createEndpoint(remoteAddress: Address,
localAddress: Address,
transport: Transport,

View file

@ -14,6 +14,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.{ Future, Promise }
import scala.util.control.NoStackTrace
import scala.util.Try
@SerialVersionUID(1L)
case class FailureInjectorException(msg: String) extends AkkaException(msg) with NoStackTrace
@ -57,6 +58,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
private def rng = ThreadLocalRandom.current()
private val log = Logging(extendedSystem, "FailureInjector (gremlin)")
private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("akka.remote.gremlin.debug")
@volatile private var upstreamListener: Option[AssociationEventListener] = None
private[transport] val addressChaosTable = new ConcurrentHashMap[Address, GremlinMode]()
@ -90,7 +92,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
protected def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = {
// Association is simulated to be failed if there was either an inbound or outbound message drop
if (shouldDropInbound(remoteAddress) || shouldDropOutbound(remoteAddress))
if (shouldDropInbound(remoteAddress, Unit, "interceptAssociate") || shouldDropOutbound(remoteAddress, Unit, "interceptAssociate"))
statusPromise.failure(new FailureInjectorException("Simulated failure of association to " + remoteAddress))
else
statusPromise.completeWith(wrappedTransport.associate(remoteAddress).map { handle
@ -100,7 +102,7 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
}
def notify(ev: AssociationEvent): Unit = ev match {
case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress) //Ignore
case InboundAssociation(handle) if shouldDropInbound(handle.remoteAddress, ev, "notify") //Ignore
case _ upstreamListener match {
case Some(listener) listener notify interceptInboundAssociation(ev)
case None
@ -112,14 +114,22 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
case _ ev
}
def shouldDropInbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match {
def shouldDropInbound(remoteAddress: Address, instance: Any, debugMessage: String): Boolean = chaosMode(remoteAddress) match {
case PassThru false
case Drop(_, inboundDropP) rng.nextDouble() <= inboundDropP
case Drop(_, inboundDropP)
if (rng.nextDouble() <= inboundDropP) {
if (shouldDebugLog) log.debug("Dropping inbound [{}] for [{}] {}", instance.getClass, remoteAddress, debugMessage)
true
} else false
}
def shouldDropOutbound(remoteAddress: Address): Boolean = chaosMode(remoteAddress) match {
def shouldDropOutbound(remoteAddress: Address, instance: Any, debugMessage: String): Boolean = chaosMode(remoteAddress) match {
case PassThru false
case Drop(outboundDropP, _) rng.nextDouble() <= outboundDropP
case Drop(outboundDropP, _)
if (rng.nextDouble() <= outboundDropP) {
if (shouldDebugLog) log.debug("Dropping outbound [{}] for [{}] {}", instance.getClass, remoteAddress, debugMessage)
true
} else false
}
def chaosMode(remoteAddress: Address): GremlinMode = {
@ -147,13 +157,13 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand
}
override def write(payload: ByteString): Boolean =
if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress)) wrappedHandle.write(payload)
if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress, payload, "handler.write")) wrappedHandle.write(payload)
else true
override def disassociate(): Unit = wrappedHandle.disassociate()
override def notify(ev: HandleEvent): Unit =
if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress))
if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress, ev, "handler.notify"))
upstreamListener notify ev
}

View file

@ -131,5 +131,10 @@ class RemoteConfigSpec extends AkkaSpec(
sslSettings.SSLEnabledAlgorithms must be(Set("TLS_RSA_WITH_AES_128_CBC_SHA"))
sslSettings.SSLRandomNumberGenerator must be(None)
}
"have debug logging of the failure injector turned off in reference.conf" in {
val c = RARP(system).provider.remoteSettings.config.getConfig("akka.remote.gremlin")
c.getBoolean("debug") must be(false)
}
}
}