Merge pull request #18391 from drewhk/wip-17555-forward-port-drewhk
=rem #17555: Quarantine should clear pending connections (forward port)
This commit is contained in:
commit
baa4399521
2 changed files with 110 additions and 0 deletions
|
|
@ -555,6 +555,35 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid ⇒ context.stop(endpoint)
|
||||
case _ ⇒ // nothing to stop
|
||||
}
|
||||
|
||||
def matchesQuarantine(handle: AkkaProtocolHandle): Boolean = {
|
||||
handle.remoteAddress == address &&
|
||||
uidToQuarantineOption.forall(_ == handle.handshakeInfo.uid)
|
||||
}
|
||||
|
||||
// Stop all matching pending read handoffs
|
||||
pendingReadHandoffs = pendingReadHandoffs.filter {
|
||||
case (pendingActor, pendingHandle) ⇒
|
||||
val drop = matchesQuarantine(pendingHandle)
|
||||
// Side-effecting here
|
||||
if (drop) {
|
||||
pendingHandle.disassociate()
|
||||
context.stop(pendingActor)
|
||||
}
|
||||
!drop
|
||||
}
|
||||
|
||||
// Stop all matching stashed connections
|
||||
stashedInbound = stashedInbound.map {
|
||||
case (writer, associations) ⇒
|
||||
writer -> associations.filter { assoc ⇒
|
||||
val handle = assoc.association.asInstanceOf[AkkaProtocolHandle]
|
||||
val drop = matchesQuarantine(handle)
|
||||
if (drop) handle.disassociate()
|
||||
!drop
|
||||
}
|
||||
}
|
||||
|
||||
uidToQuarantineOption foreach { uid ⇒
|
||||
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
|
||||
|
|
|
|||
|
|
@ -707,6 +707,87 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
|
||||
}
|
||||
|
||||
"should properly quarantine stashed inbound connections" in {
|
||||
val localAddress = Address("akka.test", "system1", "localhost", 1)
|
||||
val rawLocalAddress = localAddress.copy(protocol = "test")
|
||||
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
|
||||
val rawRemoteAddress = remoteAddress.copy(protocol = "test")
|
||||
val remoteUID = 16
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.remote.enabled-transports = ["akka.remote.test"]
|
||||
akka.remote.retry-gate-closed-for = 5s
|
||||
akka.remote.log-lifecylce-events = on
|
||||
|
||||
akka.remote.test {
|
||||
registry-key = JMeMndLLsw
|
||||
local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}"
|
||||
}
|
||||
""").withFallback(remoteSystem.settings.config)
|
||||
val thisSystem = ActorSystem("this-system", config)
|
||||
muteSystem(thisSystem)
|
||||
|
||||
try {
|
||||
|
||||
// Set up a mock remote system using the test transport
|
||||
val registry = AssociationRegistry.get("JMeMndLLsw")
|
||||
val remoteTransport = new TestTransport(rawRemoteAddress, registry)
|
||||
val remoteTransportProbe = TestProbe()
|
||||
|
||||
registry.registerTransport(remoteTransport, associationEventListenerFuture = Future.successful(new Transport.AssociationEventListener {
|
||||
override def notify(ev: Transport.AssociationEvent): Unit = remoteTransportProbe.ref ! ev
|
||||
}))
|
||||
|
||||
val outboundHandle = new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false)
|
||||
|
||||
// Hijack associations through the test transport
|
||||
awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress))
|
||||
val testTransport = registry.transportFor(rawLocalAddress).get._1
|
||||
testTransport.writeBehavior.pushConstant(true)
|
||||
|
||||
// Force an outbound associate on the real system (which we will hijack)
|
||||
// we send no handshake packet, so this remains a pending connection
|
||||
val dummySelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere"))
|
||||
dummySelection.tell("ping", system.deadLetters)
|
||||
|
||||
val remoteHandle = remoteTransportProbe.expectMsgType[Transport.InboundAssociation]
|
||||
remoteHandle.association.readHandlerPromise.success(new HandleEventListener {
|
||||
override def notify(ev: HandleEvent): Unit = ()
|
||||
})
|
||||
|
||||
// Now we initiate an emulated inbound connection to the real system
|
||||
val inboundHandleProbe = TestProbe()
|
||||
val inboundHandle = Await.result(remoteTransport.associate(rawLocalAddress), 3.seconds)
|
||||
inboundHandle.readHandlerPromise.success(new AssociationHandle.HandleEventListener {
|
||||
override def notify(ev: HandleEvent): Unit = inboundHandleProbe.ref ! ev
|
||||
})
|
||||
|
||||
awaitAssert {
|
||||
registry.getRemoteReadHandlerFor(inboundHandle.asInstanceOf[TestAssociationHandle]).get
|
||||
}
|
||||
|
||||
val handshakePacket = AkkaPduProtobufCodec.constructAssociate(HandshakeInfo(rawRemoteAddress, uid = remoteUID, cookie = None))
|
||||
|
||||
// Finish the inbound handshake so now it is handed up to Remoting
|
||||
inboundHandle.write(handshakePacket)
|
||||
|
||||
// No disassociation now, the connection is still stashed
|
||||
inboundHandleProbe.expectNoMsg(1.second)
|
||||
|
||||
// Quarantine unrelated connection
|
||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1))
|
||||
inboundHandleProbe.expectNoMsg(1.second)
|
||||
|
||||
// Quarantine the connection
|
||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID))
|
||||
|
||||
// Even though the connection is stashed it will be disassociated
|
||||
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
|
||||
|
||||
} finally shutdown(thisSystem)
|
||||
|
||||
}
|
||||
|
||||
"be able to connect to system even if it's not there at first" in {
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue