diff --git a/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes index 7315019cb2..0ee5bba2dd 100644 --- a/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.4.backwards.excludes @@ -7,3 +7,6 @@ ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash$") ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FastHash") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.StringValueFieldOffset") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EnvelopeBuffer.UsAscii") + +#22156 lost refuseUid +ProblemFilters.exclude[Problem]("akka.remote.EndpointManager*") diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index c44f6261a5..ac8783cf6e 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -260,7 +260,12 @@ private[remote] class ReliableDeliverySupervisor( // it serves a separator. // If we already have an inbound handle then UID is initially confirmed. // (This actor is never restarted) - var uidConfirmed: Boolean = uid.isDefined + var uidConfirmed: Boolean = uid.isDefined && (uid != refuseUid) + + if (uid.isDefined && (uid == refuseUid)) + throw new HopelessAssociation(localAddress, remoteAddress, uid, + new IllegalStateException( + s"The remote system [$remoteAddress] has a UID [${uid.get}] that has been quarantined. Association aborted.")) override def postStop(): Unit = { // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence @@ -321,6 +326,8 @@ private[remote] class ReliableDeliverySupervisor( case s: EndpointWriter.StopReading ⇒ writer forward s + + case Ungate ⇒ // ok, not gated } def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = { @@ -377,6 +384,7 @@ private[remote] class ReliableDeliverySupervisor( case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w, replyTo) ⇒ replyTo ! EndpointWriter.StoppedReading(w) + case Ungate ⇒ // ok, not gated } private def goToIdle(): Unit = { @@ -867,11 +875,12 @@ private[remote] class EndpointWriter( } private def trySendPureAck(): Unit = - for (h ← handle; ack ← lastAck) + for (h ← handle; ack ← lastAck) { if (h.write(codec.constructPureAck(ack))) { ackDeadline = newAckDeadline lastAck = None } + } private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = { val newReader = diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 2455aea394..7f2e8356ee 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -26,6 +26,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString.UTF_8 import akka.util.OptionVal import scala.collection.immutable +import akka.actor.ActorInitializationException /** * INTERNAL API @@ -292,50 +293,45 @@ private[remote] object EndpointManager { */ def isTombstone: Boolean } - final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy { + final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = false } - final case class Gated(timeOfRelease: Deadline, refuseUid: Option[Int]) extends EndpointPolicy { + final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } - final case class WasGated(refuseUid: Option[Int]) extends EndpointPolicy { - override def isTombstone: Boolean = false - } final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } // Not threadsafe -- only to be used in HeadActor class EndpointRegistry { + private var addressToRefuseUid = HashMap[Address, (Int, Deadline)]() private var addressToWritable = HashMap[Address, EndpointPolicy]() private var writableToAddress = HashMap[ActorRef, Address]() private var addressToReadonly = HashMap[Address, (ActorRef, Int)]() private var readonlyToAddress = HashMap[ActorRef, Address]() - def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef = + def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { - case Some(Pass(e, _, _)) ⇒ + case Some(Pass(e, _)) ⇒ throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ⇒ - addressToWritable += address → Pass(endpoint, uid, refuseUid) + // note that this overwrites Quarantine marker, + // but that is ok since we keep the quarantined uid in addressToRefuseUid + addressToWritable += address → Pass(endpoint, uid) writableToAddress += endpoint → address endpoint } def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = { addressToWritable.get(remoteAddress) match { - case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid), refuseUid) - case other ⇒ + case Some(Pass(ep, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, Some(uid)) + case other ⇒ } } - def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int): Unit = { - addressToWritable.get(remoteAddress) match { - case Some(Pass(ep, uid, _)) ⇒ addressToWritable += remoteAddress → Pass(ep, uid, Some(refuseUid)) - case Some(g: Gated) ⇒ addressToWritable += remoteAddress → g.copy(refuseUid = Some(refuseUid)) - case Some(w: WasGated) ⇒ addressToWritable += remoteAddress → w.copy(refuseUid = Some(refuseUid)) - case other ⇒ - } + def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int, timeOfRelease: Deadline): Unit = { + addressToRefuseUid = addressToRefuseUid.updated(remoteAddress, (refuseUid, timeOfRelease)) } def registerReadOnlyEndpoint(address: Address, endpoint: ActorRef, uid: Int): ActorRef = { @@ -352,6 +348,7 @@ private[remote] object EndpointManager { case _ ⇒ addressToWritable -= address } writableToAddress -= endpoint + // leave the refuseUid } else if (isReadOnly(endpoint)) { addressToReadonly -= readonlyToAddress(endpoint) readonlyToAddress -= endpoint @@ -362,8 +359,8 @@ private[remote] object EndpointManager { def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(_: Pass | _: WasGated) ⇒ true - case _ ⇒ false + case Some(_: Pass) ⇒ true + case _ ⇒ false } def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address) @@ -376,17 +373,15 @@ private[remote] object EndpointManager { // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the // known fact that it is quarantined. case Some(Quarantined(`uid`, _)) ⇒ true - case _ ⇒ false + case _ ⇒ + addressToRefuseUid.get(address).exists { case (refuseUid, _) ⇒ refuseUid == uid } } def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match { // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the // known fact that it is quarantined. - case Some(Quarantined(uid, _)) ⇒ Some(uid) - case Some(Pass(_, _, refuseUid)) ⇒ refuseUid - case Some(Gated(_, refuseUid)) ⇒ refuseUid - case Some(WasGated(refuseUid)) ⇒ refuseUid - case None ⇒ None + case Some(Quarantined(uid, _)) ⇒ Some(uid) + case _ ⇒ addressToRefuseUid.get(address).map { case (refuseUid, _) ⇒ refuseUid } } /** @@ -398,15 +393,12 @@ private[remote] object EndpointManager { val address = writableToAddress(endpoint) addressToWritable.get(address) match { case Some(Quarantined(_, _)) ⇒ // don't overwrite Quarantined with Gated - case Some(Pass(_, _, refuseUid)) ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid) + case Some(Pass(_, _)) ⇒ + addressToWritable += address → Gated(timeOfRelease) writableToAddress -= endpoint - case Some(WasGated(refuseUid)) ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid) - writableToAddress -= endpoint - case Some(Gated(_, _)) ⇒ // already gated + case Some(Gated(_)) ⇒ // already gated case None ⇒ - addressToWritable += address → Gated(timeOfRelease, refuseUid = None) + addressToWritable += address → Gated(timeOfRelease) writableToAddress -= endpoint } } else if (isReadOnly(endpoint)) { @@ -414,8 +406,10 @@ private[remote] object EndpointManager { readonlyToAddress -= endpoint } - def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = + def markAsQuarantined(address: Address, uid: Int, timeOfRelease: Deadline): Unit = { addressToWritable += address → Quarantined(uid, timeOfRelease) + addressToRefuseUid = addressToRefuseUid.updated(address, (uid, timeOfRelease)) + } def removePolicy(address: Address): Unit = addressToWritable -= address @@ -424,13 +418,19 @@ private[remote] object EndpointManager { def prune(): Unit = { addressToWritable = addressToWritable.collect { - case entry @ (key, Gated(timeOfRelease, refuseUid)) ⇒ - if (timeOfRelease.hasTimeLeft) entry - else (key → WasGated(refuseUid)) - case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ - // Querantined removed when no time left + case entry @ (_, Gated(timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // Gated removed when no time left + entry + case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // Quarantined removed when no time left + entry + case entry @ (_, _: Pass) ⇒ entry + } + + addressToRefuseUid = addressToRefuseUid.collect { + case entry @ (_, (_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // // Quarantined/refuseUid removed when no time left entry - case entry @ (_, _: Pass | _: WasGated) ⇒ entry } } } @@ -479,7 +479,30 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case None ⇒ body } - override val supervisorStrategy = + override val supervisorStrategy = { + def hopeless(e: HopelessAssociation): SupervisorStrategy.Directive = e match { + case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ + log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", + remoteAddress, uid) + settings.QuarantineDuration match { + case d: FiniteDuration ⇒ + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) + case _ ⇒ // disabled + } + Stop + + case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ + keepQuarantinedOr(remoteAddress) { + log.warning( + "Association to [{}] with unknown UID is irrecoverably failed. " + + "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + } + Stop + } + OneForOneStrategy(loggingEnabled = false) { case e @ InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) ⇒ keepQuarantinedOr(remoteAddress) { @@ -508,26 +531,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } Stop - case HopelessAssociation(localAddress, remoteAddress, Some(uid), reason) ⇒ - log.error(reason, "Association to [{}] with UID [{}] irrecoverably failed. Quarantining address.", - remoteAddress, uid) - settings.QuarantineDuration match { - case d: FiniteDuration ⇒ - endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) - case _ ⇒ // disabled - } - Stop + case e: HopelessAssociation ⇒ + hopeless(e) - case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ - keepQuarantinedOr(remoteAddress) { - log.warning( - "Association to [{}] with unknown UID is irrecoverably failed. " + - "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", - remoteAddress, settings.RetryGateClosedFor.toMillis) - endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) - } - Stop + case e: ActorInitializationException if e.getCause.isInstanceOf[HopelessAssociation] ⇒ + hopeless(e.getCause.asInstanceOf[HopelessAssociation]) case NonFatal(e) ⇒ e match { @@ -537,6 +545,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) Stop } + } // Structure for saving reliable delivery state across restarts of Endpoints val receiveBuffers = new ConcurrentHashMap[Link, ResendState]() @@ -583,28 +592,25 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Quarantine(address, uidToQuarantineOption) ⇒ // Stop writers (endpoints.writableEndpointWithPolicyFor(address), uidToQuarantineOption) match { - case (Some(Pass(endpoint, _, _)), None) ⇒ + case (Some(Pass(endpoint, _)), None) ⇒ context.stop(endpoint) log.warning( "Association to [{}] with unknown UID is reported as quarantined, but " + "address cannot be quarantined without knowing the UID, gating instead for {} ms.", address, settings.RetryGateClosedFor.toMillis) endpoints.markAsFailed(endpoint, Deadline.now + settings.RetryGateClosedFor) - case (Some(Pass(endpoint, uidOption, refuseUidOption)), Some(quarantineUid)) ⇒ + case (Some(Pass(endpoint, uidOption)), Some(quarantineUid)) ⇒ uidOption match { case Some(`quarantineUid`) ⇒ endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid)) context.stop(endpoint) // or it does not match with the UID to be quarantined - case None if !refuseUidOption.contains(quarantineUid) ⇒ + case None if !endpoints.refuseUid(address).contains(quarantineUid) ⇒ // the quarantine uid may be got fresh by cluster gossip, so update refuseUid for late handle when the writer got uid - endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) + endpoints.registerWritableEndpointRefuseUid(address, quarantineUid, Deadline.now + settings.QuarantineDuration) case _ ⇒ //the quarantine uid has lost the race with some failure, do nothing } - case (Some(WasGated(refuseUidOption)), Some(quarantineUid)) ⇒ - if (!refuseUidOption.contains(quarantineUid)) - endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid ⇒ // the UID to be quarantined already exists, do nothing case (_, Some(quarantineUid)) ⇒ // the current state is gated or quarantined, and we know the UID, update @@ -651,34 +657,31 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case s @ Send(message, senderOption, recipientRef, _) ⇒ val recipientAddress = recipientRef.path.address - def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = + def createAndRegisterWritingEndpoint(): ActorRef = { endpoints.registerWritableEndpoint( recipientAddress, uid = None, - refuseUid, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, handleOption = None, - writing = true, - refuseUid)) + writing = true)) + } endpoints.writableEndpointWithPolicyFor(recipientAddress) match { - case Some(Pass(endpoint, _, _)) ⇒ + case Some(Pass(endpoint, _)) ⇒ endpoint ! s - case Some(Gated(timeOfRelease, refuseUid)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid) ! s + case Some(Gated(timeOfRelease)) ⇒ + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s else extendedSystem.deadLetters ! s - case Some(WasGated(refuseUid)) ⇒ - createAndRegisterWritingEndpoint(refuseUid) ! s case Some(Quarantined(uid, _)) ⇒ // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don't want to accept, so use it. - createAndRegisterWritingEndpoint(refuseUid = Some(uid)) ! s + createAndRegisterWritingEndpoint() ! s case None ⇒ - createAndRegisterWritingEndpoint(refuseUid = None) ! s + createAndRegisterWritingEndpoint() ! s } @@ -693,20 +696,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case EndpointWriter.TookOver(endpoint, handle) ⇒ removePendingReader(takingOverFrom = endpoint, withHandle = handle) case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒ + val refuseUidOption = endpoints.refuseUid(remoteAddress) endpoints.writableEndpointWithPolicyFor(remoteAddress) match { - case Some(Pass(endpoint, _, refuseUidOption)) ⇒ + case Some(Pass(endpoint, _)) ⇒ if (refuseUidOption.contains(uid)) { endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) context.stop(endpoint) } else endpoints.registerWritableEndpointUid(remoteAddress, uid) handleStashedInbound(sender(), writerIsIdle = false) - case Some(WasGated(refuseUidOption)) ⇒ - if (refuseUidOption.contains(uid)) { - endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) - eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) - } else endpoints.registerWritableEndpointUid(remoteAddress, uid) - handleStashedInbound(sender(), writerIsIdle = false) case _ ⇒ // the GotUid might have lost the race with some failure } case ReliableDeliverySupervisor.Idle ⇒ @@ -749,22 +747,22 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends pendingReadHandoffs += endpoint → handle endpoint ! EndpointWriter.TakeOver(handle, self) endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep, _, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate - case _ ⇒ + case Some(Pass(ep, _)) ⇒ ep ! ReliableDeliverySupervisor.Ungate + case _ ⇒ } case None ⇒ if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep, None, _)) ⇒ + case Some(Pass(ep, None)) ⇒ // Idle writer will never send a GotUid or a Terminated so we need to "provoke it" // to get an unstash event if (!writerIsIdle) { ep ! ReliableDeliverySupervisor.IsIdle stashedInbound += ep → (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) } else - createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) - case Some(Pass(ep, Some(uid), _)) ⇒ + createAndRegisterEndpoint(handle) + case Some(Pass(ep, Some(uid))) ⇒ if (handle.handshakeInfo.uid == uid) { pendingReadHandoffs.get(ep) foreach (_.disassociate("the existing writable association was replaced by a new incoming one", log)) pendingReadHandoffs += ep → handle @@ -774,27 +772,29 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.stop(ep) endpoints.unregisterEndpoint(ep) pendingReadHandoffs -= ep - createAndRegisterEndpoint(handle, refuseUid = Some(uid)) + endpoints.markAsQuarantined(handle.remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + createAndRegisterEndpoint(handle) } case state ⇒ - createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) + createAndRegisterEndpoint(handle) } } } - private def createAndRegisterEndpoint(handle: AkkaProtocolHandle, refuseUid: Option[Int]): Unit = { + private def createAndRegisterEndpoint(handle: AkkaProtocolHandle): Unit = { val writing = settings.UsePassiveConnections && !endpoints.hasWritableEndpointFor(handle.remoteAddress) eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) + val endpoint = createEndpoint( handle.remoteAddress, handle.localAddress, transportMapping(handle.localAddress), settings, Some(handle), - writing, - refuseUid = refuseUid) + writing) + if (writing) - endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint) + endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint) else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) if (!endpoints.hasWritableEndpointFor(handle.remoteAddress)) @@ -854,14 +854,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val handle = pendingReadHandoffs(takingOverFrom) pendingReadHandoffs -= takingOverFrom eventPublisher.notifyListeners(AssociatedEvent(handle.localAddress, handle.remoteAddress, inbound = true)) + val endpoint = createEndpoint( handle.remoteAddress, handle.localAddress, transportMapping(handle.localAddress), settings, Some(handle), - writing = false, - refuseUid = None) + writing = false) endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint, handle.handshakeInfo.uid) } } @@ -877,11 +877,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport: AkkaProtocolTransport, endpointSettings: RemoteSettings, handleOption: Option[AkkaProtocolHandle], - writing: Boolean, - refuseUid: Option[Int]): ActorRef = { + writing: Boolean): ActorRef = { require(transportMapping contains localAddress, "Transport mapping is not defined for the address") // refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed // quarantine checks + val refuseUid = endpoints.refuseUid(remoteAddress) if (writing) context.watch(context.actorOf( RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props( diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index 06a9633921..e66399f791 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.registerWritableEndpoint(address1, None, None, actorA) should ===(actorA) + reg.registerWritableEndpoint(address1, None, actorA) should ===(actorA) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorA, None))) reg.readOnlyEndpointFor(address1) should ===(None) reg.isWritable(actorA) should ===(true) reg.isReadOnly(actorA) should ===(false) @@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(None) reg.registerReadOnlyEndpoint(address1, actorA, 1) should ===(actorA) - reg.registerWritableEndpoint(address1, None, None, actorB) should ===(actorB) + reg.registerWritableEndpoint(address1, None, actorB) should ===(actorB) reg.readOnlyEndpointFor(address1) should ===(Some((actorA, 1))) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None))) reg.isWritable(actorA) should ===(false) reg.isWritable(actorB) should ===(true) @@ -66,10 +66,10 @@ class EndpointRegistrySpec extends AkkaSpec { val reg = new EndpointRegistry reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.registerWritableEndpoint(address1, None, None, actorA) + reg.registerWritableEndpoint(address1, None, actorA) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) reg.isReadOnly(actorA) should ===(false) reg.isWritable(actorA) should ===(false) } @@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec { "keep tombstones when removing an endpoint" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, None, actorA) - reg.registerWritableEndpoint(address2, None, None, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.markAsQuarantined(address2, 42, deadline) @@ -94,7 +94,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.unregisterEndpoint(actorA) reg.unregisterEndpoint(actorB) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) reg.writableEndpointWithPolicyFor(address2) should ===(Some(Quarantined(42, deadline))) } @@ -102,15 +102,15 @@ class EndpointRegistrySpec extends AkkaSpec { "prune outdated Gated directives properly" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, None, actorA) - reg.registerWritableEndpoint(address2, None, None, actorB) + reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address2, None, actorB) reg.markAsFailed(actorA, Deadline.now) val farInTheFuture = Deadline.now + Duration(60, SECONDS) reg.markAsFailed(actorB, farInTheFuture) reg.prune() - reg.writableEndpointWithPolicyFor(address1) should ===(Some(WasGated(None))) - reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture, None))) + reg.writableEndpointWithPolicyFor(address1) should ===(None) + reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture))) } "be able to register Quarantined policy for an address" in { @@ -124,6 +124,29 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline))) } + "keep refuseUid after register new endpoint" in { + val reg = new EndpointRegistry + val deadline = Deadline.now + 30.minutes + + reg.registerWritableEndpoint(address1, None, actorA) + reg.markAsQuarantined(address1, 42, deadline) + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + + reg.unregisterEndpoint(actorA) + // Quarantined marker is kept so far + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Quarantined(42, deadline))) + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + + reg.registerWritableEndpoint(address1, None, actorB) + // Quarantined marker is gone + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Pass(actorB, None))) + // but we still have the refuseUid + reg.refuseUid(address1) should ===(Some(42)) + reg.isQuarantined(address1, 42) should ===(true) + } + } }