Merge pull request #16880 from drewhk/wip-16224-gate-should-open-on-inbound-connect-drewhk

=rem #16224: Not terminate connections if quarantine id do not match
This commit is contained in:
drewhk 2015-03-26 18:59:04 +01:00
commit 213a5a4300
5 changed files with 403 additions and 28 deletions

View file

@ -292,7 +292,7 @@ private[remote] object EndpointManager {
class EndpointRegistry {
private var addressToWritable = HashMap[Address, EndpointPolicy]()
private var writableToAddress = HashMap[ActorRef, Address]()
private var addressToReadonly = HashMap[Address, ActorRef]()
private var addressToReadonly = HashMap[Address, (ActorRef, Int)]()
private var readonlyToAddress = HashMap[ActorRef, Address]()
def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
@ -312,8 +312,8 @@ private[remote] object EndpointManager {
}
}
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef): ActorRef = {
addressToReadonly += address -> endpoint
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = {
addressToReadonly += address -> ((endpoint, uid))
readonlyToAddress += endpoint -> address
endpoint
}
@ -338,7 +338,7 @@ private[remote] object EndpointManager {
case _ false
}
def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address)
def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address)
def isWritable(endpoint: ActorRef): Boolean = writableToAddress contains endpoint
@ -528,25 +528,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}
Future.fold(allStatuses)(true)(_ && _) map ManagementCommandAck pipeTo sender()
case Quarantine(address, uidOption)
case Quarantine(address, uidToQuarantineOption)
// Stop writers
endpoints.writableEndpointWithPolicyFor(address) match {
case Some(Pass(endpoint, _, _))
(endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match {
case (Some(Pass(endpoint, _, _)), None)
context.stop(endpoint)
if (uidOption.isEmpty) {
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
address, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
}
log.warning("Association to [{}] with unknown UID is reported as quarantined, but " +
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
address, settings.RetryGateClosedFor.toMillis)
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
case (Some(Pass(endpoint, Some(currentUid), _)), Some(quarantineUid)) if currentUid == quarantineUid
context.stop(endpoint)
case _
// Do nothing, because either:
// A: we don't know yet the UID of the writer, it will be checked against current quarantine state later
// B: we know the UID, but it does not match with the UID to be quarantined
}
// Stop inbound read-only associations
(endpoints.readOnlyEndpointFor(address), uidToQuarantineOption) match {
case (Some((endpoint, _)), None) context.stop(endpoint)
case (Some((endpoint, currentUid)), Some(quarantineUid)) if currentUid == quarantineUid context.stop(endpoint)
case _ // nothing to stop
}
// Stop inbound read-only associations
endpoints.readOnlyEndpointFor(address) match {
case Some(endpoint) context.stop(endpoint)
case _ // nothing to stop
}
uidOption foreach { uid
uidToQuarantineOption foreach { uid
endpoints.markAsQuarantined(address, uid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(address, uid))
}
@ -630,10 +635,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match {
case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some(endpoint)
case Some((endpoint, _))
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
pendingReadHandoffs += endpoint -> handle
endpoint ! EndpointWriter.TakeOver(handle, self)
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep, _, _)) ep ! ReliableDeliverySupervisor.Ungate
case _
}
case None
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
handle.disassociate(AssociationHandle.Quarantined)
@ -651,6 +660,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
pendingReadHandoffs.get(ep) foreach (_.disassociate())
pendingReadHandoffs += ep -> handle
ep ! EndpointWriter.StopReading(ep, self)
ep ! ReliableDeliverySupervisor.Ungate
} else {
context.stop(ep)
endpoints.unregisterEndpoint(ep)
@ -677,7 +687,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
if (writing)
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
else {
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
endpoints.removePolicy(handle.remoteAddress)
}
@ -743,7 +753,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
Some(handle),
writing = false,
refuseUid = None)
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint)
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
}
}