From cbd4fc7d66a0e0cba9844ba4cb92fe278641f027 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 26 Aug 2015 12:32:53 +0200 Subject: [PATCH] =rem #17555: Quarantine should clear pending connections (cherry picked from commit 010074d) --- .../src/main/scala/akka/remote/Remoting.scala | 29 +++++++ .../test/scala/akka/remote/RemotingSpec.scala | 81 +++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index dc5848fce0..858dcd40b8 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -555,6 +555,35 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid ⇒ context.stop(endpoint) case _ ⇒ // nothing to stop } + + def matchesQuarantine(handle: AkkaProtocolHandle): Boolean = { + handle.remoteAddress == address && + uidToQuarantineOption.forall(_ == handle.handshakeInfo.uid) + } + + // Stop all matching pending read handoffs + pendingReadHandoffs = pendingReadHandoffs.filter { + case (pendingActor, pendingHandle) ⇒ + val drop = matchesQuarantine(pendingHandle) + // Side-effecting here + if (drop) { + pendingHandle.disassociate() + context.stop(pendingActor) + } + !drop + } + + // Stop all matching stashed connections + stashedInbound = stashedInbound.map { + case (writer, associations) ⇒ + writer -> associations.filter { assoc ⇒ + val handle = assoc.association.asInstanceOf[AkkaProtocolHandle] + val drop = matchesQuarantine(handle) + if (drop) handle.disassociate() + !drop + } + } + uidToQuarantineOption foreach { uid ⇒ endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(address, uid)) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 2ab522e6e5..ea4b0966cd 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -707,6 +707,87 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } + "should properly quarantine stashed inbound connections" in { + val localAddress = Address("akka.test", "system1", "localhost", 1) + val rawLocalAddress = localAddress.copy(protocol = "test") + val remoteAddress = Address("akka.test", "system2", "localhost", 2) + val rawRemoteAddress = remoteAddress.copy(protocol = "test") + val remoteUID = 16 + + val config = ConfigFactory.parseString(s""" + akka.remote.enabled-transports = ["akka.remote.test"] + akka.remote.retry-gate-closed-for = 5s + akka.remote.log-lifecylce-events = on + + akka.remote.test { + registry-key = JMeMndLLsw + local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}" + } + """).withFallback(remoteSystem.settings.config) + val thisSystem = ActorSystem("this-system", config) + muteSystem(thisSystem) + + try { + + // Set up a mock remote system using the test transport + val registry = AssociationRegistry.get("JMeMndLLsw") + val remoteTransport = new TestTransport(rawRemoteAddress, registry) + val remoteTransportProbe = TestProbe() + + registry.registerTransport(remoteTransport, associationEventListenerFuture = Future.successful(new Transport.AssociationEventListener { + override def notify(ev: Transport.AssociationEvent): Unit = remoteTransportProbe.ref ! ev + })) + + val outboundHandle = new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false) + + // Hijack associations through the test transport + awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress)) + val testTransport = registry.transportFor(rawLocalAddress).get._1 + testTransport.writeBehavior.pushConstant(true) + + // Force an outbound associate on the real system (which we will hijack) + // we send no handshake packet, so this remains a pending connection + val dummySelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere")) + dummySelection.tell("ping", system.deadLetters) + + val remoteHandle = remoteTransportProbe.expectMsgType[Transport.InboundAssociation] + remoteHandle.association.readHandlerPromise.success(new HandleEventListener { + override def notify(ev: HandleEvent): Unit = () + }) + + // Now we initiate an emulated inbound connection to the real system + val inboundHandleProbe = TestProbe() + val inboundHandle = Await.result(remoteTransport.associate(rawLocalAddress), 3.seconds) + inboundHandle.readHandlerPromise.success(new AssociationHandle.HandleEventListener { + override def notify(ev: HandleEvent): Unit = inboundHandleProbe.ref ! ev + }) + + awaitAssert { + registry.getRemoteReadHandlerFor(inboundHandle.asInstanceOf[TestAssociationHandle]).get + } + + val handshakePacket = AkkaPduProtobufCodec.constructAssociate(HandshakeInfo(rawRemoteAddress, uid = remoteUID, cookie = None)) + + // Finish the inbound handshake so now it is handed up to Remoting + inboundHandle.write(handshakePacket) + + // No disassociation now, the connection is still stashed + inboundHandleProbe.expectNoMsg(1.second) + + // Quarantine unrelated connection + RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1)) + inboundHandleProbe.expectNoMsg(1.second) + + // Quarantine the connection + RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID)) + + // Even though the connection is stashed it will be disassociated + inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated] + + } finally shutdown(thisSystem) + + } + "be able to connect to system even if it's not there at first" in { val config = ConfigFactory.parseString(s""" akka.remote.enabled-transports = ["akka.remote.netty.tcp"]