From b6b498bd2c38e6aa90e507ec083a6a21fb30e25f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 14 Dec 2015 20:33:36 +0100 Subject: [PATCH] =rem #19133 avoid DeathPactException race condition * DeathPactException could occur if the ReliableDeliverySupervisor was gated but not yet received Terminated and got an Ungate message from the EndpointManager and thereby entered idle state, followed by receiving the Terminated message, which is not handled in idle --- .../src/main/scala/akka/remote/Endpoint.scala | 17 ++++++++++++----- project/MiMa.scala | 5 ++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 8743d364d4..18c9d35cac 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -213,7 +213,7 @@ private[remote] class ReliableDeliverySupervisor( if (bufferWasInUse) { if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) - context.become(gated) + context.become(gated(writerTerminated = false, earlyUngateRequested = false)) currentHandle = None context.parent ! StoppedReading(self) Stop @@ -312,12 +312,19 @@ private[remote] class ReliableDeliverySupervisor( writer forward s } - def gated: Receive = { - case Terminated(_) ⇒ - context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = { + case Terminated(_) if !writerTerminated ⇒ + if (earlyUngateRequested) + self ! Ungate + else + context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + context.become(gated(writerTerminated = true, earlyUngateRequested)) case IsIdle ⇒ sender() ! Idle case Ungate ⇒ - if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { + if (!writerTerminated) { + // Ungate was sent from EndpointManager, but we must wait for Terminated first. + context.become(gated(writerTerminated = false, earlyUngateRequested = true)) + } else if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { // If we talk to a system we have not talked to before (or has given up talking to in the past) stop // system delivery attempts after the specified time. This act will drop the pending system messages and gate the // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable diff --git a/project/MiMa.scala b/project/MiMa.scala index 7d81c9cdc1..1b0550b70b 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -582,7 +582,10 @@ object MiMa extends AutoPlugin { FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator"), // #18328 optimize VersionVector for size 1 - FilterAnyProblem("akka.cluster.ddata.VersionVector") + FilterAnyProblem("akka.cluster.ddata.VersionVector"), + + // #19133 change in internal actor + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated") ) ) }