Merge pull request #19178 from akka/wip-19133-DeathPactException-patriknw

=rem #19133 avoid DeathPactException race condition
This commit is contained in:
Patrik Nordwall 2015-12-16 14:21:02 +01:00
commit a99fee96df
2 changed files with 16 additions and 6 deletions

View file

@ -213,7 +213,7 @@ private[remote] class ReliableDeliverySupervisor(
if (bufferWasInUse) {
if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
context.become(gated)
context.become(gated(writerTerminated = false, earlyUngateRequested = false))
currentHandle = None
context.parent ! StoppedReading(self)
Stop
@ -312,12 +312,19 @@ private[remote] class ReliableDeliverySupervisor(
writer forward s
}
def gated: Receive = {
case Terminated(_)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = {
case Terminated(_) if !writerTerminated
if (earlyUngateRequested)
self ! Ungate
else
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
context.become(gated(writerTerminated = true, earlyUngateRequested))
case IsIdle sender() ! Idle
case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
if (!writerTerminated) {
// Ungate was sent from EndpointManager, but we must wait for Terminated first.
context.become(gated(writerTerminated = false, earlyUngateRequested = true))
} else if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
// If we talk to a system we have not talked to before (or has given up talking to in the past) stop
// system delivery attempts after the specified time. This act will drop the pending system messages and gate the
// remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable

View file

@ -582,7 +582,10 @@ object MiMa extends AutoPlugin {
FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator"),
// #18328 optimize VersionVector for size 1
FilterAnyProblem("akka.cluster.ddata.VersionVector")
FilterAnyProblem("akka.cluster.ddata.VersionVector"),
// #19133 change in internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated")
)
)
}