diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 2f3df73be6..edcc3e664a 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -129,7 +129,7 @@ trait Conductor { this: TestConductorExt ⇒ } private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) - throw new ConfigurationException("To use this feature you must activate the failure injector adapters "+ + throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + "(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.") /** diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala index 549f74895d..09f936d07e 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Player.scala @@ -215,13 +215,13 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) import settings.QueryTimeout import context.dispatcher // FIXME is this the right EC for the future below? val mode = if (t.rateMBit < 0.0f) Unthrottled - else if (t.rateMBit == 0.0f) Blackhole - else TokenBucket(500, t.rateMBit * 125000.0, 0, 0) + else if (t.rateMBit == 0.0f) Blackhole + else TokenBucket(500, t.rateMBit * 125000.0, 0, 0) val cmdFuture = TestConductor().transport.managementCommand(SetThrottle(t.target, t.direction, mode)) cmdFuture onSuccess { case b: Boolean ⇒ self ! ToServer(Done) - case _ => throw new RuntimeException("Throttle was requested from the TestConductor, but no transport "+ + case _ ⇒ throw new RuntimeException("Throttle was requested from the TestConductor, but no transport " + "adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig") } stay diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 5964820adc..add971bf1a 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -279,9 +279,12 @@ private[remote] object EndpointManager { endpoint } + // FIXME: Temporary hack to verify the bug + def isPassive(endpoint: ActorRef): Boolean = addressToPassive.contains(endpointToAddress(endpoint)) + def markFailed(endpoint: ActorRef, timeOfFailure: Long): Unit = { addressToEndpointAndPolicy += endpointToAddress(endpoint) -> Gated(timeOfFailure) - endpointToAddress = endpointToAddress - endpoint + if (!isPassive(endpoint)) endpointToAddress = endpointToAddress - endpoint } def markQuarantine(address: Address, reason: Throwable): Unit = @@ -289,12 +292,13 @@ private[remote] object EndpointManager { def removeIfNotGated(endpoint: ActorRef): Unit = { endpointToAddress.get(endpoint) foreach { address ⇒ - addressToEndpointAndPolicy.get(address) foreach { - case Pass(_) ⇒ - addressToEndpointAndPolicy = addressToEndpointAndPolicy - address - endpointToAddress = endpointToAddress - endpoint - case _ ⇒ + addressToEndpointAndPolicy.get(address) foreach { + case Pass(_) ⇒ addressToEndpointAndPolicy = addressToEndpointAndPolicy - address + case _ ⇒ } + + endpointToAddress = endpointToAddress - endpoint + addressToPassive = addressToPassive - address } } } @@ -472,6 +476,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends .withDispatcher("akka.remoting.writer-dispatcher"), "endpointWriter-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + endpointId.next())) + context.watch(endpoint) + } private def retryGateOpen(timeOfFailure: Long): Boolean = (timeOfFailure + settings.RetryGateClosedFor) < System.nanoTime() diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 4a5d756199..77c311fe60 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -125,10 +125,9 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A import context.dispatcher private val ids = Iterator from 0 - var localAddress: Address = _ private var associationListener: AssociationEventListener = _ private var throttlingModes = Map[Address, (ThrottleMode, Direction)]() - private var handleTable = Map[Address, ThrottlerHandle]() + private var handleTable = List[(Address, ThrottlerHandle)]() private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "") @@ -136,7 +135,6 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A def receive: Receive = { case ListenUnderlying(listenAddress, upstreamListenerFuture) ⇒ - localAddress = listenAddress upstreamListenerFuture.future.map { ListenerRegistered(_) } pipeTo self case ListenerRegistered(listener) ⇒ @@ -159,20 +157,20 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val inMode = getInboundMode(nakedAddress(remoteAddress)) wrappedHandle.outboundThrottleMode.set(getOutboundMode(nakedAddress(remoteAddress))) wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor - + handleTable ::= nakedAddress(remoteAddress) -> wrappedHandle statusPromise.success(Ready(wrappedHandle)) case s @ _ ⇒ statusPromise.complete(s) } case s @ SetThrottle(address, direction, mode) ⇒ val naked = nakedAddress(address) throttlingModes += naked -> (mode, direction) - handleTable.get(naked) match { - case Some(handle) ⇒ setMode(handle, mode, direction) - case None ⇒ + handleTable.foreach { + case (addr, handle) ⇒ + if (addr == naked) setMode(handle, mode, direction) } case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) - handleTable += naked -> handle + handleTable ::= naked -> handle setMode(naked, handle) } @@ -207,9 +205,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val managerRef = self val throttlerActor = context.actorOf(Props(new ThrottledAssociation(managerRef, listener, originalHandle, inbound)), "throttler" + ids.next()) - val handle = ThrottlerHandle(originalHandle, throttlerActor) - handleTable += nakedAddress(originalHandle.remoteAddress) -> handle - handle + ThrottlerHandle(originalHandle, throttlerActor) } }