=rem #17555: Quarantine should clear pending connections
(cherry picked from commit 010074d)
This commit is contained in:
parent
bc568e2a1b
commit
cbd4fc7d66
2 changed files with 110 additions and 0 deletions
|
|
@ -707,6 +707,87 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
|
||||
}
|
||||
|
||||
"should properly quarantine stashed inbound connections" in {
|
||||
val localAddress = Address("akka.test", "system1", "localhost", 1)
|
||||
val rawLocalAddress = localAddress.copy(protocol = "test")
|
||||
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
|
||||
val rawRemoteAddress = remoteAddress.copy(protocol = "test")
|
||||
val remoteUID = 16
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.remote.enabled-transports = ["akka.remote.test"]
|
||||
akka.remote.retry-gate-closed-for = 5s
|
||||
akka.remote.log-lifecylce-events = on
|
||||
|
||||
akka.remote.test {
|
||||
registry-key = JMeMndLLsw
|
||||
local-address = "test://${localAddress.system}@${localAddress.host.get}:${localAddress.port.get}"
|
||||
}
|
||||
""").withFallback(remoteSystem.settings.config)
|
||||
val thisSystem = ActorSystem("this-system", config)
|
||||
muteSystem(thisSystem)
|
||||
|
||||
try {
|
||||
|
||||
// Set up a mock remote system using the test transport
|
||||
val registry = AssociationRegistry.get("JMeMndLLsw")
|
||||
val remoteTransport = new TestTransport(rawRemoteAddress, registry)
|
||||
val remoteTransportProbe = TestProbe()
|
||||
|
||||
registry.registerTransport(remoteTransport, associationEventListenerFuture = Future.successful(new Transport.AssociationEventListener {
|
||||
override def notify(ev: Transport.AssociationEvent): Unit = remoteTransportProbe.ref ! ev
|
||||
}))
|
||||
|
||||
val outboundHandle = new TestAssociationHandle(rawLocalAddress, rawRemoteAddress, remoteTransport, inbound = false)
|
||||
|
||||
// Hijack associations through the test transport
|
||||
awaitCond(registry.transportsReady(rawLocalAddress, rawRemoteAddress))
|
||||
val testTransport = registry.transportFor(rawLocalAddress).get._1
|
||||
testTransport.writeBehavior.pushConstant(true)
|
||||
|
||||
// Force an outbound associate on the real system (which we will hijack)
|
||||
// we send no handshake packet, so this remains a pending connection
|
||||
val dummySelection = thisSystem.actorSelection(ActorPath.fromString(remoteAddress.toString + "/user/noonethere"))
|
||||
dummySelection.tell("ping", system.deadLetters)
|
||||
|
||||
val remoteHandle = remoteTransportProbe.expectMsgType[Transport.InboundAssociation]
|
||||
remoteHandle.association.readHandlerPromise.success(new HandleEventListener {
|
||||
override def notify(ev: HandleEvent): Unit = ()
|
||||
})
|
||||
|
||||
// Now we initiate an emulated inbound connection to the real system
|
||||
val inboundHandleProbe = TestProbe()
|
||||
val inboundHandle = Await.result(remoteTransport.associate(rawLocalAddress), 3.seconds)
|
||||
inboundHandle.readHandlerPromise.success(new AssociationHandle.HandleEventListener {
|
||||
override def notify(ev: HandleEvent): Unit = inboundHandleProbe.ref ! ev
|
||||
})
|
||||
|
||||
awaitAssert {
|
||||
registry.getRemoteReadHandlerFor(inboundHandle.asInstanceOf[TestAssociationHandle]).get
|
||||
}
|
||||
|
||||
val handshakePacket = AkkaPduProtobufCodec.constructAssociate(HandshakeInfo(rawRemoteAddress, uid = remoteUID, cookie = None))
|
||||
|
||||
// Finish the inbound handshake so now it is handed up to Remoting
|
||||
inboundHandle.write(handshakePacket)
|
||||
|
||||
// No disassociation now, the connection is still stashed
|
||||
inboundHandleProbe.expectNoMsg(1.second)
|
||||
|
||||
// Quarantine unrelated connection
|
||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(-1))
|
||||
inboundHandleProbe.expectNoMsg(1.second)
|
||||
|
||||
// Quarantine the connection
|
||||
RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID))
|
||||
|
||||
// Even though the connection is stashed it will be disassociated
|
||||
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
|
||||
|
||||
} finally shutdown(thisSystem)
|
||||
|
||||
}
|
||||
|
||||
"be able to connect to system even if it's not there at first" in {
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue