Fixed handling of passive connections in the failureinjector and remoting

This commit is contained in:
Endre Sándor Varga 2012-12-12 12:29:36 +01:00
parent a7b78bf78b
commit f1177464ad
4 changed files with 23 additions and 21 deletions

View file

@ -129,7 +129,7 @@ trait Conductor { this: TestConductorExt ⇒
} }
private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters "+ throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.") "(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.")
/** /**

View file

@ -215,13 +215,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
import settings.QueryTimeout import settings.QueryTimeout
import context.dispatcher // FIXME is this the right EC for the future below? import context.dispatcher // FIXME is this the right EC for the future below?
val mode = if (t.rateMBit < 0.0f) Unthrottled val mode = if (t.rateMBit < 0.0f) Unthrottled
else if (t.rateMBit == 0.0f) Blackhole else if (t.rateMBit == 0.0f) Blackhole
else TokenBucket(500, t.rateMBit * 125000.0, 0, 0) else TokenBucket(500, t.rateMBit * 125000.0, 0, 0)
val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode))
cmdFuture onSuccess { cmdFuture onSuccess {
case b: Boolean self ! ToServer(Done) case b: Boolean self ! ToServer(Done)
case _ => throw new RuntimeException("Throttle was requested from the TestConductor, but no transport "+ case _ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " +
"adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig") "adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig")
} }
stay stay

View file

@ -279,9 +279,12 @@ private[remote] object EndpointManager {
endpoint endpoint
} }
// FIXME: Temporary hack to verify the bug
def isPassive(endpoint: ActorRef): Boolean = addressToPassive.contains(endpointToAddress(endpoint))
def markFailed(endpoint: ActorRef, timeOfFailure: Long): Unit = { def markFailed(endpoint: ActorRef, timeOfFailure: Long): Unit = {
addressToEndpointAndPolicy += endpointToAddress(endpoint) -> Gated(timeOfFailure) addressToEndpointAndPolicy += endpointToAddress(endpoint) -> Gated(timeOfFailure)
endpointToAddress = endpointToAddress - endpoint if (!isPassive(endpoint)) endpointToAddress = endpointToAddress - endpoint
} }
def markQuarantine(address: Address, reason: Throwable): Unit = def markQuarantine(address: Address, reason: Throwable): Unit =
@ -290,11 +293,12 @@ private[remote] object EndpointManager {
def removeIfNotGated(endpoint: ActorRef): Unit = { def removeIfNotGated(endpoint: ActorRef): Unit = {
endpointToAddress.get(endpoint) foreach { address endpointToAddress.get(endpoint) foreach { address
addressToEndpointAndPolicy.get(address) foreach { addressToEndpointAndPolicy.get(address) foreach {
case Pass(_) case Pass(_) addressToEndpointAndPolicy = addressToEndpointAndPolicy - address
addressToEndpointAndPolicy = addressToEndpointAndPolicy - address case _
endpointToAddress = endpointToAddress - endpoint
case _
} }
endpointToAddress = endpointToAddress - endpoint
addressToPassive = addressToPassive - address
} }
} }
} }
@ -472,6 +476,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
.withDispatcher("akka.remoting.writer-dispatcher"), .withDispatcher("akka.remoting.writer-dispatcher"),
"endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next())) "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next()))
context.watch(endpoint)
} }
private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime() private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime()

View file

@ -125,10 +125,9 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
import context.dispatcher import context.dispatcher
private val ids = Iterator from 0 private val ids = Iterator from 0
var localAddress: Address = _
private var associationListener: AssociationEventListener = _ private var associationListener: AssociationEventListener = _
private var throttlingModes = Map[Address, (ThrottleMode, Direction)]() private var throttlingModes = Map[Address, (ThrottleMode, Direction)]()
private var handleTable = Map[Address, ThrottlerHandle]() private var handleTable = List[(Address, ThrottlerHandle)]()
private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "") private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "")
@ -136,7 +135,6 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
def receive: Receive = { def receive: Receive = {
case ListenUnderlying(listenAddress, upstreamListenerFuture) case ListenUnderlying(listenAddress, upstreamListenerFuture)
localAddress = listenAddress
upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self
case ListenerRegistered(listener) case ListenerRegistered(listener)
@ -159,20 +157,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
val inMode = getInboundMode(nakedAddress(remoteAddress)) val inMode = getInboundMode(nakedAddress(remoteAddress))
wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress))) wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress)))
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(remoteAddress) -> wrappedHandle
statusPromise.success(Ready(wrappedHandle)) statusPromise.success(Ready(wrappedHandle))
case s @ _ statusPromise.complete(s) case s @ _ statusPromise.complete(s)
} }
case s @ SetThrottle(address, direction, mode) case s @ SetThrottle(address, direction, mode)
val naked = nakedAddress(address) val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction) throttlingModes += naked -> (mode, direction)
handleTable.get(naked) match { handleTable.foreach {
case Some(handle) setMode(handle, mode, direction) case (addr, handle)
case None if (addr == naked) setMode(handle, mode, direction)
} }
case Checkin(origin, handle) case Checkin(origin, handle)
val naked: Address = nakedAddress(origin) val naked: Address = nakedAddress(origin)
handleTable += naked -> handle handleTable ::= naked -> handle
setMode(naked, handle) setMode(naked, handle)
} }
@ -207,9 +205,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
val managerRef = self val managerRef = self
val throttlerActor = context.actorOf(Props(new ThrottledAssociation(managerRef, listener, originalHandle, inbound)), val throttlerActor = context.actorOf(Props(new ThrottledAssociation(managerRef, listener, originalHandle, inbound)),
"throttler" + ids.next()) "throttler" + ids.next())
val handle = ThrottlerHandle(originalHandle, throttlerActor) ThrottlerHandle(originalHandle, throttlerActor)
handleTable += nakedAddress(originalHandle.remoteAddress) -> handle
handle
} }
} }