Keep the refuseUid in a better way, #22156
* The scenario described in the issue can cause the quarantine marker to be lost when creating a new endpoint for that address. Then when later creating another endpoint from an inbound connection the uid is considered confirmed and Ack message is accepted, triggering the unexpected seq number issue. * The refuseUid was kept in the endpoint policy markers, but that is just very complicated and as illustrated by this issue not always safe. * Instead, keep the refuseUid separately so it's not lost when registering new endpoint. * The purpose of WasGated was only to try to keep the refuseUid (as far as I know), and that is not needed any longer. mima filter
This commit is contained in:
parent
9df5d80268
commit
d660f8971e
4 changed files with 149 additions and 114 deletions
|
|
@ -7,3 +7,6 @@ ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash$")
|
|||
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.StringValueFieldOffset")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.UsAscii")
|
||||
|
||||
#22156 lost refuseUid
|
||||
ProblemFilters.exclude[Problem]("akka.remote.EndpointManager*")
|
||||
|
|
|
|||
|
|
@ -260,7 +260,12 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
// it serves a separator.
|
||||
// If we already have an inbound handle then UID is initially confirmed.
|
||||
// (This actor is never restarted)
|
||||
var uidConfirmed: Boolean = uid.isDefined
|
||||
var uidConfirmed: Boolean = uid.isDefined && (uid != refuseUid)
|
||||
|
||||
if (uid.isDefined && (uid == refuseUid))
|
||||
throw new HopelessAssociation(localAddress, remoteAddress, uid,
|
||||
new IllegalStateException(
|
||||
s"The remote system [$remoteAddress] has a UID [${uid.get}] that has been quarantined. Association aborted."))
|
||||
|
||||
override def postStop(): Unit = {
|
||||
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
|
||||
|
|
@ -321,6 +326,8 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
|
||||
case s: EndpointWriter.StopReading ⇒
|
||||
writer forward s
|
||||
|
||||
case Ungate ⇒ // ok, not gated
|
||||
}
|
||||
|
||||
def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = {
|
||||
|
|
@ -377,6 +384,7 @@ private[remote] class ReliableDeliverySupervisor(
|
|||
case EndpointWriter.FlushAndStop ⇒ context.stop(self)
|
||||
case EndpointWriter.StopReading(w, replyTo) ⇒
|
||||
replyTo ! EndpointWriter.StoppedReading(w)
|
||||
case Ungate ⇒ // ok, not gated
|
||||
}
|
||||
|
||||
private def goToIdle(): Unit = {
|
||||
|
|
@ -867,11 +875,12 @@ private[remote] class EndpointWriter(
|
|||
}
|
||||
|
||||
private def trySendPureAck(): Unit =
|
||||
for (h ← handle; ack ← lastAck)
|
||||
for (h ← handle; ack ← lastAck) {
|
||||
if (h.write(codec.constructPureAck(ack))) {
|
||||
ackDeadline = newAckDeadline
|
||||
lastAck = None
|
||||
}
|
||||
}
|
||||
|
||||
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
|
||||
val newReader =
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
|||
import akka.util.ByteString.UTF_8
|
||||
import akka.util.OptionVal
|
||||
import scala.collection.immutable
|
||||
import akka.actor.ActorInitializationException
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -292,50 +293,45 @@ private[remote] object EndpointManager {
|
|||
*/
|
||||
def isTombstone: Boolean
|
||||
}
|
||||
final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy {
|
||||
final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy {
|
||||
override def isTombstone: Boolean = false
|
||||
}
|
||||
final case class Gated(timeOfRelease: Deadline, refuseUid: Option[Int]) extends EndpointPolicy {
|
||||
final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
|
||||
override def isTombstone: Boolean = true
|
||||
}
|
||||
final case class WasGated(refuseUid: Option[Int]) extends EndpointPolicy {
|
||||
override def isTombstone: Boolean = false
|
||||
}
|
||||
final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy {
|
||||
override def isTombstone: Boolean = true
|
||||
}
|
||||
|
||||
// Not threadsafe -- only to be used in HeadActor
|
||||
class EndpointRegistry {
|
||||
private var addressToRefuseUid = HashMap[Address, (Int, Deadline)]()
|
||||
private var addressToWritable = HashMap[Address, EndpointPolicy]()
|
||||
private var writableToAddress = HashMap[ActorRef, Address]()
|
||||
private var addressToReadonly = HashMap[Address, (ActorRef, Int)]()
|
||||
private var readonlyToAddress = HashMap[ActorRef, Address]()
|
||||
|
||||
def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef =
|
||||
def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef =
|
||||
addressToWritable.get(address) match {
|
||||
case Some(Pass(e, _, _)) ⇒
|
||||
case Some(Pass(e, _)) ⇒
|
||||
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
|
||||
case _ ⇒
|
||||
addressToWritable += address → Pass(endpoint, uid, refuseUid)
|
||||
// note that this overwrites Quarantine marker,
|
||||
// but that is ok since we keep the quarantined uid in addressToRefuseUid
|
||||
addressToWritable += address → Pass(endpoint, uid)
|
||||
writableToAddress += endpoint → address
|
||||
endpoint
|
||||
}
|
||||
|
||||
def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = {
|
||||
addressToWritable.get(remoteAddress) match {
|
||||
case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid), refuseUid)
|
||||
case other ⇒
|
||||
case Some(Pass(ep, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid))
|
||||
case other ⇒
|
||||
}
|
||||
}
|
||||
|
||||
def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int): Unit = {
|
||||
addressToWritable.get(remoteAddress) match {
|
||||
case Some(Pass(ep, uid, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, uid, Some(refuseUid))
|
||||
case Some(g: Gated) ⇒ addressToWritable += remoteAddress → g.copy(refuseUid = Some(refuseUid))
|
||||
case Some(w: WasGated) ⇒ addressToWritable += remoteAddress → w.copy(refuseUid = Some(refuseUid))
|
||||
case other ⇒
|
||||
}
|
||||
def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int, timeOfRelease: Deadline): Unit = {
|
||||
addressToRefuseUid = addressToRefuseUid.updated(remoteAddress, (refuseUid, timeOfRelease))
|
||||
}
|
||||
|
||||
def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = {
|
||||
|
|
@ -352,6 +348,7 @@ private[remote] object EndpointManager {
|
|||
case _ ⇒ addressToWritable -= address
|
||||
}
|
||||
writableToAddress -= endpoint
|
||||
// leave the refuseUid
|
||||
} else if (isReadOnly(endpoint)) {
|
||||
addressToReadonly -= readonlyToAddress(endpoint)
|
||||
readonlyToAddress -= endpoint
|
||||
|
|
@ -362,8 +359,8 @@ private[remote] object EndpointManager {
|
|||
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
|
||||
|
||||
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
|
||||
case Some(_: Pass | _: WasGated) ⇒ true
|
||||
case _ ⇒ false
|
||||
case Some(_: Pass) ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address)
|
||||
|
|
@ -376,17 +373,15 @@ private[remote] object EndpointManager {
|
|||
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
|
||||
// known fact that it is quarantined.
|
||||
case Some(Quarantined(`uid`, _)) ⇒ true
|
||||
case _ ⇒ false
|
||||
case _ ⇒
|
||||
addressToRefuseUid.get(address).exists { case (refuseUid, _) ⇒ refuseUid == uid }
|
||||
}
|
||||
|
||||
def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match {
|
||||
// timeOfRelease is only used for garbage collection. If an address is still probed, we should report the
|
||||
// known fact that it is quarantined.
|
||||
case Some(Quarantined(uid, _)) ⇒ Some(uid)
|
||||
case Some(Pass(_, _, refuseUid)) ⇒ refuseUid
|
||||
case Some(Gated(_, refuseUid)) ⇒ refuseUid
|
||||
case Some(WasGated(refuseUid)) ⇒ refuseUid
|
||||
case None ⇒ None
|
||||
case Some(Quarantined(uid, _)) ⇒ Some(uid)
|
||||
case _ ⇒ addressToRefuseUid.get(address).map { case (refuseUid, _) ⇒ refuseUid }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -398,15 +393,12 @@ private[remote] object EndpointManager {
|
|||
val address = writableToAddress(endpoint)
|
||||
addressToWritable.get(address) match {
|
||||
case Some(Quarantined(_, _)) ⇒ // don't overwrite Quarantined with Gated
|
||||
case Some(Pass(_, _, refuseUid)) ⇒
|
||||
addressToWritable += address → Gated(timeOfRelease, refuseUid)
|
||||
case Some(Pass(_, _)) ⇒
|
||||
addressToWritable += address → Gated(timeOfRelease)
|
||||
writableToAddress -= endpoint
|
||||
case Some(WasGated(refuseUid)) ⇒
|
||||
addressToWritable += address → Gated(timeOfRelease, refuseUid)
|
||||
writableToAddress -= endpoint
|
||||
case Some(Gated(_, _)) ⇒ // already gated
|
||||
case Some(Gated(_)) ⇒ // already gated
|
||||
case None ⇒
|
||||
addressToWritable += address → Gated(timeOfRelease, refuseUid = None)
|
||||
addressToWritable += address → Gated(timeOfRelease)
|
||||
writableToAddress -= endpoint
|
||||
}
|
||||
} else if (isReadOnly(endpoint)) {
|
||||
|
|
@ -414,8 +406,10 @@ private[remote] object EndpointManager {
|
|||
readonlyToAddress -= endpoint
|
||||
}
|
||||
|
||||
def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit =
|
||||
def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = {
|
||||
addressToWritable += address → Quarantined(uid, timeOfRelease)
|
||||
addressToRefuseUid = addressToRefuseUid.updated(address, (uid, timeOfRelease))
|
||||
}
|
||||
|
||||
def removePolicy(address: Address): Unit =
|
||||
addressToWritable -= address
|
||||
|
|
@ -424,13 +418,19 @@ private[remote] object EndpointManager {
|
|||
|
||||
def prune(): Unit = {
|
||||
addressToWritable = addressToWritable.collect {
|
||||
case entry @ (key, Gated(timeOfRelease, refuseUid)) ⇒
|
||||
if (timeOfRelease.hasTimeLeft) entry
|
||||
else (key → WasGated(refuseUid))
|
||||
case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒
|
||||
// Querantined removed when no time left
|
||||
case entry @ (_, Gated(timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒
|
||||
// Gated removed when no time left
|
||||
entry
|
||||
case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒
|
||||
// Quarantined removed when no time left
|
||||
entry
|
||||
case entry @ (_, _: Pass) ⇒ entry
|
||||
}
|
||||
|
||||
addressToRefuseUid = addressToRefuseUid.collect {
|
||||
case entry @ (_, (_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒
|
||||
// // Quarantined/refuseUid removed when no time left
|
||||
entry
|
||||
case entry @ (_, _: Pass | _: WasGated) ⇒ entry
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -479,7 +479,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case None ⇒ body
|
||||
}
|
||||
|
||||
override val supervisorStrategy =
|
||||
override val supervisorStrategy = {
|
||||
def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match {
|
||||
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒
|
||||
log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
|
||||
remoteAddress, uid)
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
|
||||
case _ ⇒ // disabled
|
||||
}
|
||||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
log.warning(
|
||||
"Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
}
|
||||
Stop
|
||||
}
|
||||
|
||||
OneForOneStrategy(loggingEnabled = false) {
|
||||
case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) ⇒
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
|
|
@ -508,26 +531,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
}
|
||||
Stop
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒
|
||||
log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.",
|
||||
remoteAddress, uid)
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
|
||||
case _ ⇒ // disabled
|
||||
}
|
||||
Stop
|
||||
case e: HopelessAssociation ⇒
|
||||
hopeless(e)
|
||||
|
||||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
keepQuarantinedOr(remoteAddress) {
|
||||
log.warning(
|
||||
"Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
remoteAddress, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
}
|
||||
Stop
|
||||
case e: ActorInitializationException if e.getCause.isInstanceOf[HopelessAssociation] ⇒
|
||||
hopeless(e.getCause.asInstanceOf[HopelessAssociation])
|
||||
|
||||
case NonFatal(e) ⇒
|
||||
e match {
|
||||
|
|
@ -537,6 +545,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor)
|
||||
Stop
|
||||
}
|
||||
}
|
||||
|
||||
// Structure for saving reliable delivery state across restarts of Endpoints
|
||||
val receiveBuffers = new ConcurrentHashMap[Link, ResendState]()
|
||||
|
|
@ -583,28 +592,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case Quarantine(address, uidToQuarantineOption) ⇒
|
||||
// Stop writers
|
||||
(endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match {
|
||||
case (Some(Pass(endpoint, _, _)), None) ⇒
|
||||
case (Some(Pass(endpoint, _)), None) ⇒
|
||||
context.stop(endpoint)
|
||||
log.warning(
|
||||
"Association to [{}] with unknown UID is reported as quarantined, but " +
|
||||
"address cannot be quarantined without knowing the UID, gating instead for {} ms.",
|
||||
address, settings.RetryGateClosedFor.toMillis)
|
||||
endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor)
|
||||
case (Some(Pass(endpoint, uidOption, refuseUidOption)), Some(quarantineUid)) ⇒
|
||||
case (Some(Pass(endpoint, uidOption)), Some(quarantineUid)) ⇒
|
||||
uidOption match {
|
||||
case Some(`quarantineUid`) ⇒
|
||||
endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid))
|
||||
context.stop(endpoint)
|
||||
// or it does not match with the UID to be quarantined
|
||||
case None if !refuseUidOption.contains(quarantineUid) ⇒
|
||||
case None if !endpoints.refuseUid(address).contains(quarantineUid) ⇒
|
||||
// the quarantine uid may be got fresh by cluster gossip, so update refuseUid for late handle when the writer got uid
|
||||
endpoints.registerWritableEndpointRefuseUid(address, quarantineUid)
|
||||
endpoints.registerWritableEndpointRefuseUid(address, quarantineUid, Deadline.now + settings.QuarantineDuration)
|
||||
case _ ⇒ //the quarantine uid has lost the race with some failure, do nothing
|
||||
}
|
||||
case (Some(WasGated(refuseUidOption)), Some(quarantineUid)) ⇒
|
||||
if (!refuseUidOption.contains(quarantineUid))
|
||||
endpoints.registerWritableEndpointRefuseUid(address, quarantineUid)
|
||||
case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid ⇒ // the UID to be quarantined already exists, do nothing
|
||||
case (_, Some(quarantineUid)) ⇒
|
||||
// the current state is gated or quarantined, and we know the UID, update
|
||||
|
|
@ -651,34 +657,31 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case s @ Send(message, senderOption, recipientRef, _) ⇒
|
||||
val recipientAddress = recipientRef.path.address
|
||||
|
||||
def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef =
|
||||
def createAndRegisterWritingEndpoint(): ActorRef = {
|
||||
endpoints.registerWritableEndpoint(
|
||||
recipientAddress,
|
||||
uid = None,
|
||||
refuseUid,
|
||||
createEndpoint(
|
||||
recipientAddress,
|
||||
recipientRef.localAddressToUse,
|
||||
transportMapping(recipientRef.localAddressToUse),
|
||||
settings,
|
||||
handleOption = None,
|
||||
writing = true,
|
||||
refuseUid))
|
||||
writing = true))
|
||||
}
|
||||
|
||||
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
|
||||
case Some(Pass(endpoint, _, _)) ⇒
|
||||
case Some(Pass(endpoint, _)) ⇒
|
||||
endpoint ! s
|
||||
case Some(Gated(timeOfRelease, refuseUid)) ⇒
|
||||
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid) ! s
|
||||
case Some(Gated(timeOfRelease)) ⇒
|
||||
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s
|
||||
else extendedSystem.deadLetters ! s
|
||||
case Some(WasGated(refuseUid)) ⇒
|
||||
createAndRegisterWritingEndpoint(refuseUid) ! s
|
||||
case Some(Quarantined(uid, _)) ⇒
|
||||
// timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
|
||||
// the Quarantined tombstone and we know what UID we don't want to accept, so use it.
|
||||
createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s
|
||||
createAndRegisterWritingEndpoint() ! s
|
||||
case None ⇒
|
||||
createAndRegisterWritingEndpoint(refuseUid = None) ! s
|
||||
createAndRegisterWritingEndpoint() ! s
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -693,20 +696,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case EndpointWriter.TookOver(endpoint, handle) ⇒
|
||||
removePendingReader(takingOverFrom = endpoint, withHandle = handle)
|
||||
case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒
|
||||
val refuseUidOption = endpoints.refuseUid(remoteAddress)
|
||||
endpoints.writableEndpointWithPolicyFor(remoteAddress) match {
|
||||
case Some(Pass(endpoint, _, refuseUidOption)) ⇒
|
||||
case Some(Pass(endpoint, _)) ⇒
|
||||
if (refuseUidOption.contains(uid)) {
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
|
||||
context.stop(endpoint)
|
||||
} else endpoints.registerWritableEndpointUid(remoteAddress, uid)
|
||||
handleStashedInbound(sender(), writerIsIdle = false)
|
||||
case Some(WasGated(refuseUidOption)) ⇒
|
||||
if (refuseUidOption.contains(uid)) {
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
|
||||
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
|
||||
} else endpoints.registerWritableEndpointUid(remoteAddress, uid)
|
||||
handleStashedInbound(sender(), writerIsIdle = false)
|
||||
case _ ⇒ // the GotUid might have lost the race with some failure
|
||||
}
|
||||
case ReliableDeliverySupervisor.Idle ⇒
|
||||
|
|
@ -749,22 +747,22 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
pendingReadHandoffs += endpoint → handle
|
||||
endpoint ! EndpointWriter.TakeOver(handle, self)
|
||||
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
|
||||
case Some(Pass(ep, _, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate
|
||||
case _ ⇒
|
||||
case Some(Pass(ep, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate
|
||||
case _ ⇒
|
||||
}
|
||||
case None ⇒
|
||||
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
|
||||
handle.disassociate(AssociationHandle.Quarantined)
|
||||
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
|
||||
case Some(Pass(ep, None, _)) ⇒
|
||||
case Some(Pass(ep, None)) ⇒
|
||||
// Idle writer will never send a GotUid or a Terminated so we need to "provoke it"
|
||||
// to get an unstash event
|
||||
if (!writerIsIdle) {
|
||||
ep ! ReliableDeliverySupervisor.IsIdle
|
||||
stashedInbound += ep → (stashedInbound.getOrElse(ep, Vector.empty) :+ ia)
|
||||
} else
|
||||
createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
|
||||
case Some(Pass(ep, Some(uid), _)) ⇒
|
||||
createAndRegisterEndpoint(handle)
|
||||
case Some(Pass(ep, Some(uid))) ⇒
|
||||
if (handle.handshakeInfo.uid == uid) {
|
||||
pendingReadHandoffs.get(ep) foreach (_.disassociate("the existing writable association was replaced by a new incoming one", log))
|
||||
pendingReadHandoffs += ep → handle
|
||||
|
|
@ -774,27 +772,29 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
context.stop(ep)
|
||||
endpoints.unregisterEndpoint(ep)
|
||||
pendingReadHandoffs -= ep
|
||||
createAndRegisterEndpoint(handle, refuseUid = Some(uid))
|
||||
endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
|
||||
createAndRegisterEndpoint(handle)
|
||||
}
|
||||
case state ⇒
|
||||
createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
|
||||
createAndRegisterEndpoint(handle)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = {
|
||||
private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = {
|
||||
val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress)
|
||||
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
|
||||
|
||||
val endpoint = createEndpoint(
|
||||
handle.remoteAddress,
|
||||
handle.localAddress,
|
||||
transportMapping(handle.localAddress),
|
||||
settings,
|
||||
Some(handle),
|
||||
writing,
|
||||
refuseUid = refuseUid)
|
||||
writing)
|
||||
|
||||
if (writing)
|
||||
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint)
|
||||
endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint)
|
||||
else {
|
||||
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
|
||||
if (!endpoints.hasWritableEndpointFor(handle.remoteAddress))
|
||||
|
|
@ -854,14 +854,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
val handle = pendingReadHandoffs(takingOverFrom)
|
||||
pendingReadHandoffs -= takingOverFrom
|
||||
eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true))
|
||||
|
||||
val endpoint = createEndpoint(
|
||||
handle.remoteAddress,
|
||||
handle.localAddress,
|
||||
transportMapping(handle.localAddress),
|
||||
settings,
|
||||
Some(handle),
|
||||
writing = false,
|
||||
refuseUid = None)
|
||||
writing = false)
|
||||
endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid)
|
||||
}
|
||||
}
|
||||
|
|
@ -877,11 +877,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
transport: AkkaProtocolTransport,
|
||||
endpointSettings: RemoteSettings,
|
||||
handleOption: Option[AkkaProtocolHandle],
|
||||
writing: Boolean,
|
||||
refuseUid: Option[Int]): ActorRef = {
|
||||
writing: Boolean): ActorRef = {
|
||||
require(transportMapping contains localAddress, "Transport mapping is not defined for the address")
|
||||
// refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed
|
||||
// quarantine checks
|
||||
val refuseUid = endpoints.refuseUid(remoteAddress)
|
||||
|
||||
if (writing) context.watch(context.actorOf(
|
||||
RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props(
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
||||
|
||||
reg.registerWritableEndpoint(address1, None, None, actorA) should ===(actorA)
|
||||
reg.registerWritableEndpoint(address1, None, actorA) should ===(actorA)
|
||||
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None, None)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None)))
|
||||
reg.readOnlyEndpointFor(address1) should ===(None)
|
||||
reg.isWritable(actorA) should ===(true)
|
||||
reg.isReadOnly(actorA) should ===(false)
|
||||
|
|
@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
||||
|
||||
reg.registerReadOnlyEndpoint(address1, actorA, 1) should ===(actorA)
|
||||
reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB)
|
||||
reg.registerWritableEndpoint(address1, None, actorB) should ===(actorB)
|
||||
|
||||
reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 1)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None)))
|
||||
|
||||
reg.isWritable(actorA) should ===(false)
|
||||
reg.isWritable(actorB) should ===(true)
|
||||
|
|
@ -66,10 +66,10 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
val reg = new EndpointRegistry
|
||||
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
||||
reg.registerWritableEndpoint(address1, None, None, actorA)
|
||||
reg.registerWritableEndpoint(address1, None, actorA)
|
||||
val deadline = Deadline.now
|
||||
reg.markAsFailed(actorA, deadline)
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline)))
|
||||
reg.isReadOnly(actorA) should ===(false)
|
||||
reg.isWritable(actorA) should ===(false)
|
||||
}
|
||||
|
|
@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
"keep tombstones when removing an endpoint" in {
|
||||
val reg = new EndpointRegistry
|
||||
|
||||
reg.registerWritableEndpoint(address1, None, None, actorA)
|
||||
reg.registerWritableEndpoint(address2, None, None, actorB)
|
||||
reg.registerWritableEndpoint(address1, None, actorA)
|
||||
reg.registerWritableEndpoint(address2, None, actorB)
|
||||
val deadline = Deadline.now
|
||||
reg.markAsFailed(actorA, deadline)
|
||||
reg.markAsQuarantined(address2, 42, deadline)
|
||||
|
|
@ -94,7 +94,7 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
reg.unregisterEndpoint(actorA)
|
||||
reg.unregisterEndpoint(actorB)
|
||||
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline)))
|
||||
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Quarantined(42, deadline)))
|
||||
|
||||
}
|
||||
|
|
@ -102,15 +102,15 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
"prune outdated Gated directives properly" in {
|
||||
val reg = new EndpointRegistry
|
||||
|
||||
reg.registerWritableEndpoint(address1, None, None, actorA)
|
||||
reg.registerWritableEndpoint(address2, None, None, actorB)
|
||||
reg.registerWritableEndpoint(address1, None, actorA)
|
||||
reg.registerWritableEndpoint(address2, None, actorB)
|
||||
reg.markAsFailed(actorA, Deadline.now)
|
||||
val farInTheFuture = Deadline.now + Duration(60, SECONDS)
|
||||
reg.markAsFailed(actorB, farInTheFuture)
|
||||
reg.prune()
|
||||
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(WasGated(None)))
|
||||
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture, None)))
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(None)
|
||||
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture)))
|
||||
}
|
||||
|
||||
"be able to register Quarantined policy for an address" in {
|
||||
|
|
@ -124,6 +124,29 @@ class EndpointRegistrySpec extends AkkaSpec {
|
|||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline)))
|
||||
}
|
||||
|
||||
"keep refuseUid after register new endpoint" in {
|
||||
val reg = new EndpointRegistry
|
||||
val deadline = Deadline.now + 30.minutes
|
||||
|
||||
reg.registerWritableEndpoint(address1, None, actorA)
|
||||
reg.markAsQuarantined(address1, 42, deadline)
|
||||
reg.refuseUid(address1) should ===(Some(42))
|
||||
reg.isQuarantined(address1, 42) should ===(true)
|
||||
|
||||
reg.unregisterEndpoint(actorA)
|
||||
// Quarantined marker is kept so far
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline)))
|
||||
reg.refuseUid(address1) should ===(Some(42))
|
||||
reg.isQuarantined(address1, 42) should ===(true)
|
||||
|
||||
reg.registerWritableEndpoint(address1, None, actorB)
|
||||
// Quarantined marker is gone
|
||||
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None)))
|
||||
// but we still have the refuseUid
|
||||
reg.refuseUid(address1) should ===(Some(42))
|
||||
reg.isQuarantined(address1, 42) should ===(true)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue