From 752e43a0e754e94a6ca9d01a4bb59dc0cd92f7b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 14 Dec 2012 16:09:38 +0100 Subject: [PATCH] Implemented strictly ordered shutdown between remoteDaemon and transport in RARP. --- .../akka/remote/RemoteActorRefProvider.scala | 106 ++++++++++++++---- .../main/scala/akka/remote/RemoteDaemon.scala | 5 +- .../scala/akka/remote/RemoteTransport.scala | 2 +- .../src/main/scala/akka/remote/Remoting.scala | 36 ++++-- .../remote/netty/NettyRemoteSupport.scala | 4 +- 5 files changed, 115 insertions(+), 38 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 944641403d..8467a96d73 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -9,8 +9,58 @@ import akka.dispatch._ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } +import akka.pattern.pipe import scala.concurrent.Future import scala.util.control.NonFatal +import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } + +object RemoteActorRefProvider { + private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) + + sealed trait TerminatorState + case object Uninitialized extends TerminatorState + case object Idle extends TerminatorState + case object WaitDaemonShutdown extends TerminatorState + case object WaitTransportShutdown extends TerminatorState + case object Finished extends TerminatorState + + private class RemotingTerminator extends Actor with FSM[TerminatorState, Option[Internals]] { + import context.dispatcher + val systemGuardian = context.system.asInstanceOf[ExtendedActorSystem].provider.systemGuardian + + startWith(Uninitialized, None) + + when(Uninitialized) { + case Event(i: Internals, _) ⇒ + systemGuardian.tell(RegisterTerminationHook, self) + goto(Idle) using Some(i) + } + + when(Idle) { + case Event(TerminationHook, Some(internals)) ⇒ + log.info("Shutting down remote daemon.") + internals.remoteDaemon ! TerminationHook + goto(WaitDaemonShutdown) + } + + // TODO: state timeout + when(WaitDaemonShutdown) { + case Event(TerminationHookDone, Some(internals)) ⇒ + log.info("Remote daemon shut down.") + log.info("Shutting down remoting.") + internals.transport.shutdown() pipeTo self + goto(WaitTransportShutdown) + } + + when(WaitTransportShutdown) { + case Event((), _) ⇒ + log.info("Remoting shut down.") + systemGuardian.tell(TerminationHookDone, self) + stop() + } + + } +} /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -21,6 +71,7 @@ class RemoteActorRefProvider( val eventStream: EventStream, val scheduler: Scheduler, val dynamicAccess: DynamicAccess) extends ActorRefProvider { + import RemoteActorRefProvider._ val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName) @@ -49,40 +100,54 @@ class RemoteActorRefProvider( override def tempContainer: VirtualPathContainer = local.tempContainer @volatile - private var _transport: RemoteTransport = _ - def transport: RemoteTransport = _transport + private var _internals: Internals = _ - @volatile - private var _serialization: Serialization = _ - def serialization: Serialization = _serialization + def transport: RemoteTransport = _internals.transport + def serialization: Serialization = _internals.serialization + def remoteDaemon: InternalActorRef = _internals.remoteDaemon + // This actor ensures the ordering of shutdown between remoteDaemon and the transport @volatile - private var _remoteDaemon: InternalActorRef = _ - def remoteDaemon: InternalActorRef = _remoteDaemon + private var remotingTerminator: ActorRef = _ def init(system: ActorSystemImpl): Unit = { local.init(system) - _remoteDaemon = new RemoteSystemDaemon(system, local.rootPath / "remote", rootGuardian, log, untrustedMode = remoteSettings.UntrustedMode) - local.registerExtraNames(Map(("remote", remoteDaemon))) + remotingTerminator = system.systemActorOf(Props[RemotingTerminator], "remoting-terminator") - _serialization = SerializationExtension(system) + val internals = Internals( + remoteDaemon = { + val d = new RemoteSystemDaemon( + system, + local.rootPath / "remote", + rootGuardian, + remotingTerminator, + log, + untrustedMode = remoteSettings.UntrustedMode) + local.registerExtraNames(Map(("remote", d))) + d + }, - _transport = { - val fqn = remoteSettings.RemoteTransport - val args = List( - classOf[ExtendedActorSystem] -> system, - classOf[RemoteActorRefProvider] -> this) + serialization = SerializationExtension(system), - system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({ - case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - }).get - } + transport = { + val fqn = remoteSettings.RemoteTransport + val args = List( + classOf[ExtendedActorSystem] -> system, + classOf[RemoteActorRefProvider] -> this) + + system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args).recover({ + case problem ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + }).get + }) + + remotingTerminator ! internals + _internals = internals _log = Logging(eventStream, "RemoteActorRefProvider") // this enables reception of remote requests - _transport.start() + transport.start() _rootPath = RootActorPath(local.rootPath.address.copy( protocol = transport.defaultAddress.protocol, @@ -99,7 +164,6 @@ class RemoteActorRefProvider( system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - system.registerOnTermination(transport.shutdown()) } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index ee8a6c5698..8d191c3bc5 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -26,6 +26,7 @@ private[akka] class RemoteSystemDaemon( system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, + terminator: ActorRef, _log: LoggingAdapter, val untrustedMode: Boolean) extends VirtualPathContainer(system.provider, _path, _parent, _log) { @@ -34,8 +35,6 @@ private[akka] class RemoteSystemDaemon( private val terminating = new Switch(false) - system.provider.systemGuardian.tell(RegisterTerminationHook, this) - system.eventStream.subscribe(this, classOf[AddressTerminated]) /** @@ -111,7 +110,7 @@ private[akka] class RemoteSystemDaemon( } def terminationHookDoneWhenNoChildren(): Unit = terminating.whileOn { - if (!hasChildren) system.provider.systemGuardian.tell(TerminationHookDone, this) + if (!hasChildren) terminator.tell(TerminationHookDone, this) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 1e78351ac2..0a580d84cf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -173,7 +173,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re /** * Shuts down the remoting */ - def shutdown(): Unit + def shutdown(): Future[Unit] /** * Address to be used in RootActorPath of refs generated for this transport. diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 3e86b9b138..1ad24f3622 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -19,6 +19,7 @@ import scala.collection.immutable.{ Seq, HashMap } import scala.concurrent.duration._ import scala.concurrent.{ Promise, Await, Future } import scala.util.control.NonFatal +import scala.util.{ Failure, Success } class RemotingSettings(val config: Config) { @@ -142,23 +143,34 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private def notifyError(msg: String, cause: Throwable): Unit = eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause))) - override def shutdown(): Unit = { + override def shutdown(): Future[Unit] = { + import scala.concurrent.ExecutionContext.Implicits.global endpointManager match { case Some(manager) ⇒ - try { - implicit val timeout = new Timeout(settings.ShutdownTimeout) - val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] + implicit val timeout = new Timeout(settings.ShutdownTimeout) + val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] - if (!Await.result(stopped, settings.ShutdownTimeout)) - log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + - "Increase akka.remoting.flush-wait-on-shutdown to a larger value to avoid this message.") + def finalize(): Unit = { eventPublisher.notifyListeners(RemotingShutdownEvent) + endpointManager = None + } - } catch { - case e: TimeoutException ⇒ notifyError("Shutdown timed out.", e) - case NonFatal(e) ⇒ notifyError("Shutdown failed.", e) - } finally endpointManager = None - case None ⇒ log.warning("Remoting is not running. Ignoring shutdown attempt.") + stopped.onComplete { + case Success(flushSuccessful) ⇒ + if (!flushSuccessful) + log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + + "Increase akka.remoting.flush-wait-on-shutdown to a larger value to avoid this.") + finalize() + + case Failure(e) ⇒ + notifyError("Failure during shutdown of remoting.", e) + finalize() + } + + stopped map { _ ⇒ () } // RARP needs only type Unit, not a boolean + case None ⇒ + log.warning("Remoting is not running. Ignoring shutdown attempt.") + Future successful () } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5fdaa23ba2..6c4a7b9c9b 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -25,6 +25,7 @@ import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefPr import scala.util.control.NonFatal import akka.actor.{ ExtendedActorSystem, Address, ActorRef } import com.google.protobuf.MessageLite +import scala.concurrent.Future private[akka] object ChannelAddress extends ChannelLocal[Option[Address]] { override def initialValue(ch: Channel): Option[Address] = None @@ -189,7 +190,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider notifyListeners(RemoteServerStarted(this)) } - def shutdown(): Unit = { + def shutdown(): Future[Unit] = { clientsLock.writeLock().lock() try { remoteClients foreach { @@ -210,6 +211,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider } } } + Future successful () } def send(