From d660f8971efe134d807728994af970e71d5762ed Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 11 Sep 2017 10:10:04 +0200 Subject: [PATCH] Keep the refuseUid in a better way, #22156 * The scenario described in the issue can cause the quarantine marker to be lost when creating a new endpoint for that address. Then when later creating another endpoint from an inbound connection the uid is considered confirmed and Ack message is accepted, triggering the unexpected seq number issue. * The refuseUid was kept in the endpoint policy markers, but that is just very complicated and as illustrated by this issue not always safe. * Instead, keep the refuseUid separately so it's not lost when registering new endpoint. * The purpose of WasGated was only to try to keep the refuseUid (as far as I know), and that is not needed any longer. mima filter --- .../mima-filters/2.5.4.backwards.excludes | 3 + .../src/main/scala/akka/remote/Endpoint.scala | 13 +- .../src/main/scala/akka/remote/Remoting.scala | 198 +++++++++--------- .../akka/remote/EndpointRegistrySpec.scala | 49 +++-- 4 files changed, 149 insertions(+), 114 deletions(-) diff --git a/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes index 7315019cb2..0ee5bba2dd 100644 --- a/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes @@ -7,3 +7,6 @@ ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash$") ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.StringValueFieldOffset") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.UsAscii") + +#22156 lost refuseUid +ProblemFilters.exclude[Problem]("akka.remote.EndpointManager*") diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index c44f6261a5..ac8783cf6e 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -260,7 +260,12 @@ private[remote] class ReliableDeliverySupervisor( // it serves a separator. // If we already have an inbound handle then UID is initially confirmed. // (This actor is never restarted) - var uidConfirmed: Boolean = uid.isDefined + var uidConfirmed: Boolean = uid.isDefined && (uid != refuseUid) + + if (uid.isDefined && (uid == refuseUid)) + throw new HopelessAssociation(localAddress, remoteAddress, uid, + new IllegalStateException( + s"The remote system [$remoteAddress] has a UID [${uid.get}] that has been quarantined. Association aborted.")) override def postStop(): Unit = { // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence @@ -321,6 +326,8 @@ private[remote] class ReliableDeliverySupervisor( case s: EndpointWriter.StopReading ⇒ writer forward s + + case Ungate ⇒ // ok, not gated } def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = { @@ -377,6 +384,7 @@ private[remote] class ReliableDeliverySupervisor( case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w, replyTo) ⇒ replyTo ! EndpointWriter.StoppedReading(w) + case Ungate ⇒ // ok, not gated } private def goToIdle(): Unit = { @@ -867,11 +875,12 @@ private[remote] class EndpointWriter( } private def trySendPureAck(): Unit = - for (h ← handle; ack ← lastAck) + for (h ← handle; ack ← lastAck) { if (h.write(codec.constructPureAck(ack))) { ackDeadline = newAckDeadline lastAck = None } + } private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = { val newReader = diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 2455aea394..7f2e8356ee 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -26,6 +26,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString.UTF_8 import akka.util.OptionVal import scala.collection.immutable +import akka.actor.ActorInitializationException /** * INTERNAL API @@ -292,50 +293,45 @@ private[remote] object EndpointManager { */ def isTombstone: Boolean } - final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy { + final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = false } - final case class Gated(timeOfRelease: Deadline, refuseUid: Option[Int]) extends EndpointPolicy { + final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } - final case class WasGated(refuseUid: Option[Int]) extends EndpointPolicy { - override def isTombstone: Boolean = false - } final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } // Not threadsafe -- only to be used in HeadActor class EndpointRegistry { + private var addressToRefuseUid = HashMap[Address, (Int, Deadline)]() private var addressToWritable = HashMap[Address, EndpointPolicy]() private var writableToAddress = HashMap[ActorRef, Address]() private var addressToReadonly = HashMap[Address, (ActorRef, Int)]() private var readonlyToAddress = HashMap[ActorRef, Address]() - def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef = + def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { - case Some(Pass(e, _, _)) ⇒ + case Some(Pass(e, _)) ⇒ throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ⇒ - addressToWritable += address → Pass(endpoint, uid, refuseUid) + // note that this overwrites Quarantine marker, + // but that is ok since we keep the quarantined uid in addressToRefuseUid + addressToWritable += address → Pass(endpoint, uid) writableToAddress += endpoint → address endpoint } def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = { addressToWritable.get(remoteAddress) match { - case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid), refuseUid) - case other ⇒ + case Some(Pass(ep, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid)) + case other ⇒ } } - def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int): Unit = { - addressToWritable.get(remoteAddress) match { - case Some(Pass(ep, uid, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, uid, Some(refuseUid)) - case Some(g: Gated) ⇒ addressToWritable += remoteAddress → g.copy(refuseUid = Some(refuseUid)) - case Some(w: WasGated) ⇒ addressToWritable += remoteAddress → w.copy(refuseUid = Some(refuseUid)) - case other ⇒ - } + def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int, timeOfRelease: Deadline): Unit = { + addressToRefuseUid = addressToRefuseUid.updated(remoteAddress, (refuseUid, timeOfRelease)) } def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = { @@ -352,6 +348,7 @@ private[remote] object EndpointManager { case _ ⇒ addressToWritable -= address } writableToAddress -= endpoint + // leave the refuseUid } else if (isReadOnly(endpoint)) { addressToReadonly -= readonlyToAddress(endpoint) readonlyToAddress -= endpoint @@ -362,8 +359,8 @@ private[remote] object EndpointManager { def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(_: Pass | _: WasGated) ⇒ true - case _ ⇒ false + case Some(_: Pass) ⇒ true + case _ ⇒ false } def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address) @@ -376,17 +373,15 @@ private[remote] object EndpointManager { // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the // known fact that it is quarantined. case Some(Quarantined(`uid`, _)) ⇒ true - case _ ⇒ false + case _ ⇒ + addressToRefuseUid.get(address).exists { case (refuseUid, _) ⇒ refuseUid == uid } } def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match { // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the // known fact that it is quarantined. - case Some(Quarantined(uid, _)) ⇒ Some(uid) - case Some(Pass(_, _, refuseUid)) ⇒ refuseUid - case Some(Gated(_, refuseUid)) ⇒ refuseUid - case Some(WasGated(refuseUid)) ⇒ refuseUid - case None ⇒ None + case Some(Quarantined(uid, _)) ⇒ Some(uid) + case _ ⇒ addressToRefuseUid.get(address).map { case (refuseUid, _) ⇒ refuseUid } } /** @@ -398,15 +393,12 @@ private[remote] object EndpointManager { val address = writableToAddress(endpoint) addressToWritable.get(address) match { case Some(Quarantined(_, _)) ⇒ // don't overwrite Quarantined with Gated - case Some(Pass(_, _, refuseUid)) ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid) + case Some(Pass(_, _)) ⇒ + addressToWritable += address → Gated(timeOfRelease) writableToAddress -= endpoint - case Some(WasGated(refuseUid)) ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid) - writableToAddress -= endpoint - case Some(Gated(_, _)) ⇒ // already gated + case Some(Gated(_)) ⇒ // already gated case None ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid = None) + addressToWritable += address → Gated(timeOfRelease) writableToAddress -= endpoint } } else if (isReadOnly(endpoint)) { @@ -414,8 +406,10 @@ private[remote] object EndpointManager { readonlyToAddress -= endpoint } - def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = + def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = { addressToWritable += address → Quarantined(uid, timeOfRelease) + addressToRefuseUid = addressToRefuseUid.updated(address, (uid, timeOfRelease)) + } def removePolicy(address: Address): Unit = addressToWritable -= address @@ -424,13 +418,19 @@ private[remote] object EndpointManager { def prune(): Unit = { addressToWritable = addressToWritable.collect { - case entry @ (key, Gated(timeOfRelease, refuseUid)) ⇒ - if (timeOfRelease.hasTimeLeft) entry - else (key → WasGated(refuseUid)) - case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ - // Querantined removed when no time left + case entry @ (_, Gated(timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // Gated removed when no time left + entry + case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // Quarantined removed when no time left + entry + case entry @ (_, _: Pass) ⇒ entry + } + + addressToRefuseUid = addressToRefuseUid.collect { + case entry @ (_, (_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // // Quarantined/refuseUid removed when no time left entry - case entry @ (_, _: Pass | _: WasGated) ⇒ entry } } } @@ -479,7 +479,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case None ⇒ body } - override val supervisorStrategy = + override val supervisorStrategy = { + def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match { + case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ + log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", + remoteAddress, uid) + settings.QuarantineDuration match { + case d: FiniteDuration ⇒ + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) + case _ ⇒ // disabled + } + Stop + + case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ + keepQuarantinedOr(remoteAddress) { + log.warning( + "Association to [{}] with unknown UID is irrecoverably failed. " + + "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + } + Stop + } + OneForOneStrategy(loggingEnabled = false) { case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) ⇒ keepQuarantinedOr(remoteAddress) { @@ -508,26 +531,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } Stop - case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ - log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", - remoteAddress, uid) - settings.QuarantineDuration match { - case d: FiniteDuration ⇒ - endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) - case _ ⇒ // disabled - } - Stop + case e: HopelessAssociation ⇒ + hopeless(e) - case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ - keepQuarantinedOr(remoteAddress) { - log.warning( - "Association to [{}] with unknown UID is irrecoverably failed. " + - "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", - remoteAddress, settings.RetryGateClosedFor.toMillis) - endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) - } - Stop + case e: ActorInitializationException if e.getCause.isInstanceOf[HopelessAssociation] ⇒ + hopeless(e.getCause.asInstanceOf[HopelessAssociation]) case NonFatal(e) ⇒ e match { @@ -537,6 +545,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) Stop } + } // Structure for saving reliable delivery state across restarts of Endpoints val receiveBuffers = new ConcurrentHashMap[Link, ResendState]() @@ -583,28 +592,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Quarantine(address, uidToQuarantineOption) ⇒ // Stop writers (endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match { - case (Some(Pass(endpoint, _, _)), None) ⇒ + case (Some(Pass(endpoint, _)), None) ⇒ context.stop(endpoint) log.warning( "Association to [{}] with unknown UID is reported as quarantined, but " + "address cannot be quarantined without knowing the UID, gating instead for {} ms.", address, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor) - case (Some(Pass(endpoint, uidOption, refuseUidOption)), Some(quarantineUid)) ⇒ + case (Some(Pass(endpoint, uidOption)), Some(quarantineUid)) ⇒ uidOption match { case Some(`quarantineUid`) ⇒ endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid)) context.stop(endpoint) // or it does not match with the UID to be quarantined - case None if !refuseUidOption.contains(quarantineUid) ⇒ + case None if !endpoints.refuseUid(address).contains(quarantineUid) ⇒ // the quarantine uid may be got fresh by cluster gossip, so update refuseUid for late handle when the writer got uid - endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) + endpoints.registerWritableEndpointRefuseUid(address, quarantineUid, Deadline.now + settings.QuarantineDuration) case _ ⇒ //the quarantine uid has lost the race with some failure, do nothing } - case (Some(WasGated(refuseUidOption)), Some(quarantineUid)) ⇒ - if (!refuseUidOption.contains(quarantineUid)) - endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid ⇒ // the UID to be quarantined already exists, do nothing case (_, Some(quarantineUid)) ⇒ // the current state is gated or quarantined, and we know the UID, update @@ -651,34 +657,31 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address - def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = + def createAndRegisterWritingEndpoint(): ActorRef = { endpoints.registerWritableEndpoint( recipientAddress, uid = None, - refuseUid, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, handleOption = None, - writing = true, - refuseUid)) + writing = true)) + } endpoints.writableEndpointWithPolicyFor(recipientAddress) match { - case Some(Pass(endpoint, _, _)) ⇒ + case Some(Pass(endpoint, _)) ⇒ endpoint ! s - case Some(Gated(timeOfRelease, refuseUid)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid) ! s + case Some(Gated(timeOfRelease)) ⇒ + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s else extendedSystem.deadLetters ! s - case Some(WasGated(refuseUid)) ⇒ - createAndRegisterWritingEndpoint(refuseUid) ! s case Some(Quarantined(uid, _)) ⇒ // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don't want to accept, so use it. - createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s + createAndRegisterWritingEndpoint() ! s case None ⇒ - createAndRegisterWritingEndpoint(refuseUid = None) ! s + createAndRegisterWritingEndpoint() ! s } @@ -693,20 +696,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case EndpointWriter.TookOver(endpoint, handle) ⇒ removePendingReader(takingOverFrom = endpoint, withHandle = handle) case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒ + val refuseUidOption = endpoints.refuseUid(remoteAddress) endpoints.writableEndpointWithPolicyFor(remoteAddress) match { - case Some(Pass(endpoint, _, refuseUidOption)) ⇒ + case Some(Pass(endpoint, _)) ⇒ if (refuseUidOption.contains(uid)) { endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) context.stop(endpoint) } else endpoints.registerWritableEndpointUid(remoteAddress, uid) handleStashedInbound(sender(), writerIsIdle = false) - case Some(WasGated(refuseUidOption)) ⇒ - if (refuseUidOption.contains(uid)) { - endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) - } else endpoints.registerWritableEndpointUid(remoteAddress, uid) - handleStashedInbound(sender(), writerIsIdle = false) case _ ⇒ // the GotUid might have lost the race with some failure } case ReliableDeliverySupervisor.Idle ⇒ @@ -749,22 +747,22 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends pendingReadHandoffs += endpoint → handle endpoint ! EndpointWriter.TakeOver(handle, self) endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep, _, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate - case _ ⇒ + case Some(Pass(ep, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate + case _ ⇒ } case None ⇒ if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep, None, _)) ⇒ + case Some(Pass(ep, None)) ⇒ // Idle writer will never send a GotUid or a Terminated so we need to "provoke it" // to get an unstash event if (!writerIsIdle) { ep ! ReliableDeliverySupervisor.IsIdle stashedInbound += ep → (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) } else - createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) - case Some(Pass(ep, Some(uid), _)) ⇒ + createAndRegisterEndpoint(handle) + case Some(Pass(ep, Some(uid))) ⇒ if (handle.handshakeInfo.uid == uid) { pendingReadHandoffs.get(ep) foreach (_.disassociate("the existing writable association was replaced by a new incoming one", log)) pendingReadHandoffs += ep → handle @@ -774,27 +772,29 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.stop(ep) endpoints.unregisterEndpoint(ep) pendingReadHandoffs -= ep - createAndRegisterEndpoint(handle, refuseUid = Some(uid)) + endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + createAndRegisterEndpoint(handle) } case state ⇒ - createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) + createAndRegisterEndpoint(handle) } } } - private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = { + private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = { val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) + val endpoint = createEndpoint( handle.remoteAddress, handle.localAddress, transportMapping(handle.localAddress), settings, Some(handle), - writing, - refuseUid = refuseUid) + writing) + if (writing) - endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint) + endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint) else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) if (!endpoints.hasWritableEndpointFor(handle.remoteAddress)) @@ -854,14 +854,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val handle = pendingReadHandoffs(takingOverFrom) pendingReadHandoffs -= takingOverFrom eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) + val endpoint = createEndpoint( handle.remoteAddress, handle.localAddress, transportMapping(handle.localAddress), settings, Some(handle), - writing = false, - refuseUid = None) + writing = false) endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) } } @@ -877,11 +877,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport: AkkaProtocolTransport, endpointSettings: RemoteSettings, handleOption: Option[AkkaProtocolHandle], - writing: Boolean, - refuseUid: Option[Int]): ActorRef = { + writing: Boolean): ActorRef = { require(transportMapping contains localAddress, "Transport mapping is not defined for the address") // refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed // quarantine checks + val refuseUid = endpoints.refuseUid(remoteAddress) if (writing) context.watch(context.actorOf( RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props( diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index 06a9633921..e66399f791 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.registerWritableEndpoint(address1, None, None, actorA) should ===(actorA) + reg.registerWritableEndpoint(address1, None, actorA) should ===(actorA) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None))) reg.readOnlyEndpointFor(address1) should ===(None) reg.isWritable(actorA) should ===(true) reg.isReadOnly(actorA) should ===(false) @@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(None) reg.registerReadOnlyEndpoint(address1, actorA, 1) should ===(actorA) - reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB) + reg.registerWritableEndpoint(address1, None, actorB) should ===(actorB) reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 1))) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None))) reg.isWritable(actorA) should ===(false) reg.isWritable(actorB) should ===(true) @@ -66,10 +66,10 @@ class EndpointRegistrySpec extends AkkaSpec { val reg = new EndpointRegistry reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.registerWritableEndpoint(address1, None, None, actorA) + reg.registerWritableEndpoint(address1, None, actorA) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) reg.isReadOnly(actorA) should ===(false) reg.isWritable(actorA) should ===(false) } @@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec { "keep tombstones when removing an endpoint" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, None, actorA) - reg.registerWritableEndpoint(address2, None, None, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.markAsQuarantined(address2, 42, deadline) @@ -94,7 +94,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.unregisterEndpoint(actorA) reg.unregisterEndpoint(actorB) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) reg.writableEndpointWithPolicyFor(address2) should ===(Some(Quarantined(42, deadline))) } @@ -102,15 +102,15 @@ class EndpointRegistrySpec extends AkkaSpec { "prune outdated Gated directives properly" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, None, actorA) - reg.registerWritableEndpoint(address2, None, None, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) reg.markAsFailed(actorA, Deadline.now) val farInTheFuture = Deadline.now + Duration(60, SECONDS) reg.markAsFailed(actorB, farInTheFuture) reg.prune() - reg.writableEndpointWithPolicyFor(address1) should ===(Some(WasGated(None))) - reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(None) + reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture))) } "be able to register Quarantined policy for an address" in { @@ -124,6 +124,29 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline))) } + "keep refuseUid after register new endpoint" in { + val reg = new EndpointRegistry + val deadline = Deadline.now + 30.minutes + + reg.registerWritableEndpoint(address1, None, actorA) + reg.markAsQuarantined(address1, 42, deadline) + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + + reg.unregisterEndpoint(actorA) + // Quarantined marker is kept so far + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline))) + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + + reg.registerWritableEndpoint(address1, None, actorB) + // Quarantined marker is gone + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None))) + // but we still have the refuseUid + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + } + } }