diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 689df91265..4a611b6320 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -286,9 +286,12 @@ private[remote] object EndpointManager { final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = false } - final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { + final case class Gated(timeOfRelease: Deadline, refuseUid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = true } + final case class WasGated(refuseUid: Option[Int]) extends EndpointPolicy { + override def isTombstone: Boolean = false + } final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy { override def isTombstone: Boolean = true } @@ -320,6 +323,8 @@ private[remote] object EndpointManager { def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int): Unit = { addressToWritable.get(remoteAddress) match { case Some(Pass(ep, uid, _)) ⇒ addressToWritable += remoteAddress -> Pass(ep, uid, Some(refuseUid)) + case Some(g: Gated) ⇒ addressToWritable += remoteAddress -> g.copy(refuseUid = Some(refuseUid)) + case Some(w: WasGated) ⇒ addressToWritable += remoteAddress -> w.copy(refuseUid = Some(refuseUid)) case other ⇒ } } @@ -348,8 +353,8 @@ private[remote] object EndpointManager { def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(Pass(_, _, _)) ⇒ true - case _ ⇒ false + case Some(_: Pass | _: WasGated) ⇒ true + case _ ⇒ false } def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address) @@ -370,7 +375,9 @@ private[remote] object EndpointManager { // known fact that it is quarantined. case Some(Quarantined(uid, _)) ⇒ Some(uid) case Some(Pass(_, _, refuseUid)) ⇒ refuseUid - case _ ⇒ None + case Some(Gated(_, refuseUid)) ⇒ refuseUid + case Some(WasGated(refuseUid)) ⇒ refuseUid + case None ⇒ None } /** @@ -379,8 +386,20 @@ private[remote] object EndpointManager { */ def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = if (isWritable(endpoint)) { - addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease) - writableToAddress -= endpoint + val address = writableToAddress(endpoint) + addressToWritable.get(address) match { + case Some(Quarantined(_, _)) ⇒ // don't overwrite Quarantined with Gated + case Some(Pass(_, _, refuseUid)) ⇒ + addressToWritable += address -> Gated(timeOfRelease, refuseUid) + writableToAddress -= endpoint + case Some(WasGated(refuseUid)) ⇒ + addressToWritable += address -> Gated(timeOfRelease, refuseUid) + writableToAddress -= endpoint + case Some(Gated(_, _)) ⇒ // already gated + case None ⇒ + addressToWritable += address -> Gated(timeOfRelease, refuseUid = None) + writableToAddress -= endpoint + } } else if (isReadOnly(endpoint)) { addressToReadonly -= readonlyToAddress(endpoint) readonlyToAddress -= endpoint @@ -395,10 +414,14 @@ private[remote] object EndpointManager { def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys def prune(): Unit = { - addressToWritable = addressToWritable.filter { - case (_, Gated(timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft - case (_, Quarantined(_, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft - case _ ⇒ true + addressToWritable = addressToWritable.collect { + case entry @ (key, Gated(timeOfRelease, refuseUid)) ⇒ + if (timeOfRelease.hasTimeLeft) entry + else (key -> WasGated(refuseUid)) + case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft ⇒ + // Querantined removed when no time left + entry + case entry @ (_, _: Pass | _: WasGated) ⇒ entry } } } @@ -567,12 +590,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) case _ ⇒ //the quarantine uid has lost the race with some failure, do nothing } + case (Some(WasGated(refuseUidOption)), Some(quarantineUid)) ⇒ + if (!refuseUidOption.contains(quarantineUid)) + endpoints.registerWritableEndpointRefuseUid(address, quarantineUid) case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid ⇒ // the UID to be quarantined already exists, do nothing case (_, Some(quarantineUid)) ⇒ // the current state is gated or quarantined, and we know the UID, update endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration) eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid)) - case _ ⇒ // the current state is gated or quarantined, and we don't know the UID, do nothing. + case _ ⇒ // the current state is Gated, WasGated or Quarantined, and we don't know the UID, do nothing. } // Stop inbound read-only associations @@ -630,9 +656,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.writableEndpointWithPolicyFor(recipientAddress) match { case Some(Pass(endpoint, _, _)) ⇒ endpoint ! s - case Some(Gated(timeOfRelease)) ⇒ - if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s + case Some(Gated(timeOfRelease, refuseUid)) ⇒ + if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid) ! s else extendedSystem.deadLetters ! s + case Some(WasGated(refuseUid)) ⇒ + createAndRegisterWritingEndpoint(refuseUid) ! s case Some(Quarantined(uid, _)) ⇒ // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don't want to accept, so use it. @@ -661,6 +689,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.stop(endpoint) } else endpoints.registerWritableEndpointUid(remoteAddress, uid) handleStashedInbound(sender(), writerIsIdle = false) + case Some(WasGated(refuseUidOption)) ⇒ + if (refuseUidOption.contains(uid)) { + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid)) + } else endpoints.registerWritableEndpointUid(remoteAddress, uid) + handleStashedInbound(sender(), writerIsIdle = false) case _ ⇒ // the GotUid might have lost the race with some failure } case ReliableDeliverySupervisor.Idle ⇒ diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index f3434ccaa5..06a9633921 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -69,7 +69,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.registerWritableEndpoint(address1, None, None, actorA) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) reg.isReadOnly(actorA) should ===(false) reg.isWritable(actorA) should ===(false) } @@ -94,7 +94,7 @@ class EndpointRegistrySpec extends AkkaSpec { reg.unregisterEndpoint(actorA) reg.unregisterEndpoint(actorB) - reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None))) reg.writableEndpointWithPolicyFor(address2) should ===(Some(Quarantined(42, deadline))) } @@ -109,8 +109,8 @@ class EndpointRegistrySpec extends AkkaSpec { reg.markAsFailed(actorB, farInTheFuture) reg.prune() - reg.writableEndpointWithPolicyFor(address1) should ===(None) - reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture))) + reg.writableEndpointWithPolicyFor(address1) should ===(Some(WasGated(None))) + reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture, None))) } "be able to register Quarantined policy for an address" in { diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index 933c2747f0..a99fb5c3d3 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -109,7 +109,7 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) with DefaultTimeout { import SystemMessageDeliveryStressTest._ - override def expectedTestDuration: FiniteDuration = 120.seconds + override def expectedTestDuration: FiniteDuration = 200.seconds val systemA = system val systemB = ActorSystem("systemB", system.settings.config) diff --git a/project/MiMa.scala b/project/MiMa.scala index 38c20af245..9b09a7c6da 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -872,7 +872,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onPush"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onUpstreamFinish"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onPull"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.postStop") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.postStop"), + + // #20531 adding refuseUid to Gated + FilterAnyProblem("akka.remote.EndpointManager$Gated") ) ) }