Merge pull request #20594 from akka/wip-20531-patriknw

gating should not overwrite quarantine, #20531
This commit is contained in:
Patrik Nordwall 2016-06-02 17:41:12 +02:00
commit c1f36d8a43
4 changed files with 56 additions and 19 deletions

View file

@ -286,9 +286,12 @@ private[remote] object EndpointManager {
final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = false
}
final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy {
final case class Gated(timeOfRelease: Deadline, refuseUid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = true
}
final case class WasGated(refuseUid: Option[Int]) extends EndpointPolicy {
override def isTombstone: Boolean = false
}
final case class Quarantined(uid: Int, timeOfRelease: Deadline) extends EndpointPolicy {
override def isTombstone: Boolean = true
}
@ -320,6 +323,8 @@ private[remote] object EndpointManager {
def registerWritableEndpointRefuseUid(remoteAddress: Address, refuseUid: Int): Unit = {
addressToWritable.get(remoteAddress) match {
case Some(Pass(ep, uid, _)) addressToWritable += remoteAddress -> Pass(ep, uid, Some(refuseUid))
case Some(g: Gated) addressToWritable += remoteAddress -> g.copy(refuseUid = Some(refuseUid))
case Some(w: WasGated) addressToWritable += remoteAddress -> w.copy(refuseUid = Some(refuseUid))
case other
}
}
@ -348,8 +353,8 @@ private[remote] object EndpointManager {
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match {
case Some(Pass(_, _, _)) true
case _ false
case Some(_: Pass | _: WasGated) true
case _ false
}
def readOnlyEndpointFor(address: Address): Option[(ActorRef, Int)] = addressToReadonly.get(address)
@ -370,7 +375,9 @@ private[remote] object EndpointManager {
// known fact that it is quarantined.
case Some(Quarantined(uid, _)) Some(uid)
case Some(Pass(_, _, refuseUid)) refuseUid
case _ None
case Some(Gated(_, refuseUid)) refuseUid
case Some(WasGated(refuseUid)) refuseUid
case None None
}
/**
@ -379,8 +386,20 @@ private[remote] object EndpointManager {
*/
def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit =
if (isWritable(endpoint)) {
addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease)
writableToAddress -= endpoint
val address = writableToAddress(endpoint)
addressToWritable.get(address) match {
case Some(Quarantined(_, _)) // don't overwrite Quarantined with Gated
case Some(Pass(_, _, refuseUid))
addressToWritable += address -> Gated(timeOfRelease, refuseUid)
writableToAddress -= endpoint
case Some(WasGated(refuseUid))
addressToWritable += address -> Gated(timeOfRelease, refuseUid)
writableToAddress -= endpoint
case Some(Gated(_, _)) // already gated
case None
addressToWritable += address -> Gated(timeOfRelease, refuseUid = None)
writableToAddress -= endpoint
}
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
@ -395,10 +414,14 @@ private[remote] object EndpointManager {
def allEndpoints: collection.Iterable[ActorRef] = writableToAddress.keys ++ readonlyToAddress.keys
def prune(): Unit = {
addressToWritable = addressToWritable.filter {
case (_, Gated(timeOfRelease)) timeOfRelease.hasTimeLeft
case (_, Quarantined(_, timeOfRelease)) timeOfRelease.hasTimeLeft
case _ true
addressToWritable = addressToWritable.collect {
case entry @ (key, Gated(timeOfRelease, refuseUid))
if (timeOfRelease.hasTimeLeft) entry
else (key -> WasGated(refuseUid))
case entry @ (_, Quarantined(_, timeOfRelease)) if timeOfRelease.hasTimeLeft
// Querantined removed when no time left
entry
case entry @ (_, _: Pass | _: WasGated) entry
}
}
}
@ -567,12 +590,15 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.registerWritableEndpointRefuseUid(address, quarantineUid)
case _ //the quarantine uid has lost the race with some failure, do nothing
}
case (Some(WasGated(refuseUidOption)), Some(quarantineUid))
if (!refuseUidOption.contains(quarantineUid))
endpoints.registerWritableEndpointRefuseUid(address, quarantineUid)
case (Some(Quarantined(uid, _)), Some(quarantineUid)) if uid == quarantineUid // the UID to be quarantined already exists, do nothing
case (_, Some(quarantineUid))
// the current state is gated or quarantined, and we know the UID, update
endpoints.markAsQuarantined(address, quarantineUid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(address, quarantineUid))
case _ // the current state is gated or quarantined, and we don't know the UID, do nothing.
case _ // the current state is Gated, WasGated or Quarantined, and we don't know the UID, do nothing.
}
// Stop inbound read-only associations
@ -630,9 +656,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.writableEndpointWithPolicyFor(recipientAddress) match {
case Some(Pass(endpoint, _, _))
endpoint ! s
case Some(Gated(timeOfRelease))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s
case Some(Gated(timeOfRelease, refuseUid))
if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid) ! s
else extendedSystem.deadLetters ! s
case Some(WasGated(refuseUid))
createAndRegisterWritingEndpoint(refuseUid) ! s
case Some(Quarantined(uid, _))
// timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have
// the Quarantined tombstone and we know what UID we don't want to accept, so use it.
@ -661,6 +689,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
context.stop(endpoint)
} else endpoints.registerWritableEndpointUid(remoteAddress, uid)
handleStashedInbound(sender(), writerIsIdle = false)
case Some(WasGated(refuseUidOption))
if (refuseUidOption.contains(uid)) {
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration)
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid))
} else endpoints.registerWritableEndpointUid(remoteAddress, uid)
handleStashedInbound(sender(), writerIsIdle = false)
case _ // the GotUid might have lost the race with some failure
}
case ReliableDeliverySupervisor.Idle

View file

@ -69,7 +69,7 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.registerWritableEndpoint(address1, None, None, actorA)
val deadline = Deadline.now
reg.markAsFailed(actorA, deadline)
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline)))
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None)))
reg.isReadOnly(actorA) should ===(false)
reg.isWritable(actorA) should ===(false)
}
@ -94,7 +94,7 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.unregisterEndpoint(actorA)
reg.unregisterEndpoint(actorB)
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline)))
reg.writableEndpointWithPolicyFor(address1) should ===(Some(Gated(deadline, None)))
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Quarantined(42, deadline)))
}
@ -109,8 +109,8 @@ class EndpointRegistrySpec extends AkkaSpec {
reg.markAsFailed(actorB, farInTheFuture)
reg.prune()
reg.writableEndpointWithPolicyFor(address1) should ===(None)
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture)))
reg.writableEndpointWithPolicyFor(address1) should ===(Some(WasGated(None)))
reg.writableEndpointWithPolicyFor(address2) should ===(Some(Gated(farInTheFuture, None)))
}
"be able to register Quarantined policy for an address" in {

View file

@ -109,7 +109,7 @@ abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
with DefaultTimeout {
import SystemMessageDeliveryStressTest._
override def expectedTestDuration: FiniteDuration = 120.seconds
override def expectedTestDuration: FiniteDuration = 200.seconds
val systemA = system
val systemB = ActorSystem("systemB", system.settings.config)

View file

@ -872,7 +872,10 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onPush"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onUpstreamFinish"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.onPull"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.postStop")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Framing#LengthFieldFramingStage.postStop"),
// #20531 adding refuseUid to Gated
FilterAnyProblem("akka.remote.EndpointManager$Gated")
)
)
}