From eb3a0de01b3617f79b5aa0bbe7a2d97342b49333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 13 Dec 2012 14:27:34 +0100 Subject: [PATCH] Added flushing shutdown to Remoting --- .../src/main/scala/akka/remote/Remoting.scala | 84 +++++++++++++------ 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 8524d2f524..2ddc141a46 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -4,22 +4,21 @@ import scala.language.postfixOps import akka.actor.SupervisorStrategy._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } -import akka.pattern.{ gracefulStop, pipe } -import akka.remote.EndpointManager.{ StartupFinished, ManagementCommand, Listen, Send } -import akka.remote.transport.Transport.{ AssociationEventListener, InboundAssociation } +import akka.japi.Util.immutableSeq +import akka.pattern.{ gracefulStop, pipe, ask } +import akka.remote.EndpointManager._ +import akka.remote.Remoting.TransportSupervisor +import akka.remote.transport.Transport.AssociationEventListener +import akka.remote.transport.Transport.InboundAssociation import akka.remote.transport._ import akka.util.Timeout -import com.typesafe.config.{ ConfigFactory, Config } +import com.typesafe.config.Config +import java.net.URLEncoder +import java.util.concurrent.TimeoutException import scala.collection.immutable.{ Seq, HashMap } import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await, Future } import scala.util.control.NonFatal -import java.net.URLEncoder -import java.util.concurrent.TimeoutException -import scala.util.{ Failure, Success } -import scala.collection.immutable -import akka.japi.Util.immutableSeq -import akka.remote.Remoting.{ TransportSupervisor, RegisterTransportActor } class RemotingSettings(val config: Config) { @@ -30,6 +29,8 @@ class RemotingSettings(val config: Config) { val ShutdownTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.shutdown-timeout"), MILLISECONDS) + val FlushWait: FiniteDuration = Duration(getMilliseconds("akka.remoting.flush-wait-on-shutdown"), MILLISECONDS) + val StartupTimeout: FiniteDuration = Duration(getMilliseconds("akka.remoting.startup-timeout"), MILLISECONDS) val RetryGateClosedFor: Long = getNanoseconds("akka.remoting.retry-gate-closed-for") @@ -145,11 +146,13 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc endpointManager match { case Some(manager) ⇒ try { - val stopped: Future[Boolean] = gracefulStop(manager, settings.ShutdownTimeout)(system) + implicit val timeout = new Timeout(settings.ShutdownTimeout) + val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] - if (Await.result(stopped, settings.ShutdownTimeout)) { - eventPublisher.notifyListeners(RemotingShutdownEvent) - } + if (!Await.result(stopped, settings.ShutdownTimeout)) + log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + + "Increase akka.remoting.flush-wait-on-shutdown to a larger value to avoid this message.") + eventPublisher.notifyListeners(RemotingShutdownEvent) } catch { case e: TimeoutException ⇒ notifyError("Shutdown timed out.", e) @@ -231,16 +234,18 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private[remote] object EndpointManager { + // Messages between Remoting and EndpointManager sealed trait RemotingCommand case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand case object StartupFinished extends RemotingCommand - + case object ShutdownAndFlush extends RemotingCommand case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand { override def toString = s"Remote message $senderOption -> $recipient" } - case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand + // Messages internal to EndpointManager + case object Prune case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]], results: Seq[(Transport, Address, Promise[AssociationEventListener])]) @@ -249,8 +254,6 @@ private[remote] object EndpointManager { case class Gated(timeOfFailure: Long) extends EndpointPolicy case class Quarantined(reason: Throwable) extends EndpointPolicy - case object Prune - // Not threadsafe -- only to be used in HeadActor private[EndpointManager] class EndpointRegistry { private var addressToEndpointAndPolicy = HashMap[Address, EndpointPolicy]() @@ -311,6 +314,8 @@ private[remote] object EndpointManager { addressToPassive = addressToPassive - address } } + + def allEndpoints: Iterable[ActorRef] = endpointToAddress.keys } } @@ -358,7 +363,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def receive = { case Listen(addressesPromise) ⇒ listens map { ListensResult(addressesPromise, _) } pipeTo self - case ListensResult(addressesPromise, results) ⇒ transportMapping = results.groupBy { case (_, transportAddress, _) ⇒ transportAddress @@ -374,10 +378,13 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends transport -> address } addressesPromise.success(transportsAndAddresses) - - case ManagementCommand(_, statusPromise) ⇒ statusPromise.success(false) - - case StartupFinished ⇒ context.become(accepting) + case ManagementCommand(_, statusPromise) ⇒ + statusPromise.success(false) + case StartupFinished ⇒ + context.become(accepting) + case ShutdownAndFlush ⇒ + sender ! true + context.stop(self) // Nothing to flush at this point } val accepting: Receive = { @@ -398,8 +405,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends None) endpoints.registerActiveEndpoint(recipientAddress, endpoint) endpoint ! s - } else extendedSystem.deadLetters forward message - case Some(Quarantined(_)) ⇒ extendedSystem.deadLetters forward message + } else forwardToDeadLetters(s) + case Some(Quarantined(_)) ⇒ forwardToDeadLetters(s) case None ⇒ val endpoint = createEndpoint( recipientAddress, @@ -428,8 +435,31 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.registerPassiveEndpoint(handle.remoteAddress, endpoint) else handle.disassociate() } - case Terminated(endpoint) ⇒ endpoints.removeIfNotGated(endpoint) - case Prune ⇒ endpoints.prune(settings.RetryGateClosedFor) + case Terminated(endpoint) ⇒ + endpoints.removeIfNotGated(endpoint) + case Prune ⇒ + endpoints.prune(settings.RetryGateClosedFor) + case ShutdownAndFlush ⇒ + // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) + val sys = context.system // Avoid closing over context + Future sequence endpoints.allEndpoints.map { + gracefulStop(_, settings.FlushWait)(sys) + } map { _.foldLeft(true) { _ && _ } } pipeTo sender + // Ignore all other writes + context.become(flushing) + } + + def flushing: Receive = { + case s: Send ⇒ forwardToDeadLetters(s) + case InboundAssociation(h) ⇒ h.disassociate() + } + + private def forwardToDeadLetters(s: Send): Unit = { + val sender = s.senderOption match { + case Some(sender) ⇒ sender + case None ⇒ Actor.noSender + } + extendedSystem.deadLetters.tell(s.message, sender) } private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {