=rem #19133 avoid DeathPactException race condition

* DeathPactException could occur if the ReliableDeliverySupervisor
  was gated but not yet received Terminated and got an Ungate message
  from the EndpointManager and thereby entered idle state, followed by
  receiving the Terminated message, which is not handled in idle
This commit is contained in:
Patrik Nordwall 2015-12-14 20:33:36 +01:00
parent e851fc26a7
commit b6b498bd2c
2 changed files with 16 additions and 6 deletions

View file

@ -213,7 +213,7 @@ private[remote] class ReliableDeliverySupervisor(
if (bufferWasInUse) {
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
context.become(gated)
context.become(gated(writerTerminated = false, earlyUngateRequested = false))
currentHandle = None
context.parent ! StoppedReading(self)
Stop
@ -312,12 +312,19 @@ private[remote] class ReliableDeliverySupervisor(
writer forward s
}
def gated: Receive = {
case Terminated(_)
def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = {
case Terminated(_) if !writerTerminated
if (earlyUngateRequested)
self ! Ungate
else
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
context.become(gated(writerTerminated = true, earlyUngateRequested))
case IsIdle sender() ! Idle
case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
if (!writerTerminated) {
// Ungate was sent from EndpointManager, but we must wait for Terminated first.
context.become(gated(writerTerminated = false, earlyUngateRequested = true))
} else if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
// If we talk to a system we have not talked to before (or has given up talking to in the past) stop
// system delivery attempts after the specified time. This act will drop the pending system messages and gate the
// remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable

View file

@ -582,7 +582,10 @@ object MiMa extends AutoPlugin {
FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator"),
// #18328 optimize VersionVector for size 1
FilterAnyProblem("akka.cluster.ddata.VersionVector")
FilterAnyProblem("akka.cluster.ddata.VersionVector"),
// #19133 change in internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated")
)
)
}