diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 8743d364d4..18c9d35cac 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -213,7 +213,7 @@ private[remote] class ReliableDeliverySupervisor( if (bufferWasInUse) { if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty) bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout) - context.become(gated) + context.become(gated(writerTerminated = false, earlyUngateRequested = false)) currentHandle = None context.parent ! StoppedReading(self) Stop @@ -312,12 +312,19 @@ private[remote] class ReliableDeliverySupervisor( writer forward s } - def gated: Receive = { - case Terminated(_) ⇒ - context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + def gated(writerTerminated: Boolean, earlyUngateRequested: Boolean): Receive = { + case Terminated(_) if !writerTerminated ⇒ + if (earlyUngateRequested) + self ! Ungate + else + context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) + context.become(gated(writerTerminated = true, earlyUngateRequested)) case IsIdle ⇒ sender() ! Idle case Ungate ⇒ - if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { + if (!writerTerminated) { + // Ungate was sent from EndpointManager, but we must wait for Terminated first. + context.become(gated(writerTerminated = false, earlyUngateRequested = true)) + } else if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { // If we talk to a system we have not talked to before (or has given up talking to in the past) stop // system delivery attempts after the specified time. This act will drop the pending system messages and gate the // remote address at the EndpointManager level stopping this actor. In case the remote system becomes reachable diff --git a/project/MiMa.scala b/project/MiMa.scala index 7d81c9cdc1..1b0550b70b 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -582,7 +582,10 @@ object MiMa extends AutoPlugin { FilterAnyProblem("akka.cluster.sharding.DDataShardCoordinator"), // #18328 optimize VersionVector for size 1 - FilterAnyProblem("akka.cluster.ddata.VersionVector") + FilterAnyProblem("akka.cluster.ddata.VersionVector"), + + // #19133 change in internal actor + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated") ) ) }