Shutdown of transports are now synchronous with system shutdown #3368

This commit is contained in:
Endre Sándor Varga 2013-05-23 13:43:42 +02:00
parent 37421efe2e
commit 4c3853a999
7 changed files with 40 additions and 39 deletions

View file

@ -126,26 +126,23 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager match { endpointManager match {
case Some(manager) case Some(manager)
implicit val timeout = ShutdownTimeout implicit val timeout = ShutdownTimeout
val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean]
def finalize(): Unit = { def finalize(): Unit = {
eventPublisher.notifyListeners(RemotingShutdownEvent) eventPublisher.notifyListeners(RemotingShutdownEvent)
endpointManager = None endpointManager = None
} }
stopped.onComplete { (manager ? ShutdownAndFlush).mapTo[Boolean].andThen {
case Success(flushSuccessful) case Success(flushSuccessful)
if (!flushSuccessful) if (!flushSuccessful)
log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + log.warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " +
"Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.") "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.")
finalize() finalize()
case Failure(e) case Failure(e)
notifyError("Failure during shutdown of remoting.", e) notifyError("Failure during shutdown of remoting.", e)
finalize() finalize()
} } map { _ () } // RARP needs only type Unit, not a boolean
stopped map { _ () } // RARP needs only type Unit, not a boolean
case None case None
log.warning("Remoting is not running. Ignoring shutdown attempt.") log.warning("Remoting is not running. Ignoring shutdown attempt.")
Future successful (()) Future successful (())
@ -526,12 +523,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpoints.prune() endpoints.prune()
case ShutdownAndFlush case ShutdownAndFlush
// Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully)
val sys = context.system // Avoid closing over context
Future sequence endpoints.allEndpoints.map { def shutdownAll[T](resources: TraversableOnce[T])(shutdown: T Future[Boolean]): Future[Boolean] = {
gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop) (Future sequence resources.map(shutdown(_))) map { _.foldLeft(true) { _ && _ } } recover {
} map { _.foldLeft(true) { _ && _ } } recover { case NonFatal(_) false
case _: AskTimeoutException false }
} pipeTo sender }
(for {
// The construction of the future for shutdownStatus has to happen after the flushStatus future has been finished
// so that endpoints are shut down before transports.
flushStatus shutdownAll(endpoints.allEndpoints)(gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop))
shutdownStatus shutdownAll(transportMapping.values)(_.shutdown())
} yield flushStatus && shutdownStatus) pipeTo sender
// Ignore all other writes // Ignore all other writes
context.become(flushing) context.become(flushing)
} }
@ -623,12 +628,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override def postStop(): Unit = { override def postStop(): Unit = {
pruneTimerCancellable.foreach { _.cancel() } pruneTimerCancellable.foreach { _.cancel() }
transportMapping.values foreach { transport
try transport.shutdown() catch {
case NonFatal(e)
log.error(e, s"Unable to shut down the underlying transport: [$transport]")
}
}
} }
} }

View file

@ -4,7 +4,7 @@
package akka.remote.transport package akka.remote.transport
import akka.actor._ import akka.actor._
import akka.pattern.{ ask, pipe } import akka.pattern.{ ask, pipe, gracefulStop }
import akka.remote.Remoting.RegisterTransportActor import akka.remote.Remoting.RegisterTransportActor
import akka.remote.transport.Transport._ import akka.remote.transport.Transport._
import akka.remote.RARP import akka.remote.RARP
@ -97,7 +97,7 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor
statusPromise.future statusPromise.future
} }
override def shutdown(): Unit = wrappedTransport.shutdown() override def shutdown(): Future[Boolean] = wrappedTransport.shutdown()
} }
@ -159,7 +159,11 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit =
manager ! AssociateUnderlying(remoteAddress, statusPromise) manager ! AssociateUnderlying(remoteAddress, statusPromise)
override def shutdown(): Unit = manager ! PoisonPill override def shutdown(): Future[Boolean] =
for {
stopResult gracefulStop(manager, RARP(system).provider.remoteSettings.FlushWait)
wrappedStopResult wrappedTransport.shutdown()
} yield stopResult && wrappedStopResult
} }
abstract class ActorTransportAdapterManager extends Actor abstract class ActorTransportAdapterManager extends Actor

View file

@ -141,10 +141,6 @@ private[transport] class AkkaProtocolManager(
private def createTransportFailureDetector(): FailureDetector = private def createTransportFailureDetector(): FailureDetector =
FailureDetectorLoader(settings.TransportFailureDetectorImplementationClass, settings.TransportFailureDetectorConfig) FailureDetectorLoader(settings.TransportFailureDetectorImplementationClass, settings.TransportFailureDetectorConfig)
override def postStop() {
wrappedTransport.shutdown()
}
} }
private[remote] class AkkaProtocolHandle( private[remote] class AkkaProtocolHandle(

View file

@ -86,7 +86,7 @@ class TestTransport(
(localHandle, remoteHandle) (localHandle, remoteHandle)
} }
private def defaultShutdown: Future[Unit] = Future.successful(()) private def defaultShutdown: Future[Boolean] = Future.successful(true)
/** /**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method. * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method.
@ -105,13 +105,13 @@ class TestTransport(
/** /**
* The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method. * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method.
*/ */
val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit]( val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Boolean](
(_) defaultShutdown, (_) defaultShutdown,
(_) registry.logActivity(ShutdownAttempt(localAddress))) (_) registry.logActivity(ShutdownAttempt(localAddress)))
override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(()) override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(())
override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress) override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress)
override def shutdown(): Unit = shutdownBehavior(()) override def shutdown(): Future[Boolean] = shutdownBehavior(())
private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = {
registry.getRemoteReadHandlerFor(params._1) match { registry.getRemoteReadHandlerFor(params._1) match {

View file

@ -198,8 +198,6 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "") private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "")
override def postStop(): Unit = wrappedTransport.shutdown()
override def ready: Receive = { override def ready: Receive = {
case InboundAssociation(handle) case InboundAssociation(handle)
val wrappedHandle = wrapHandle(handle, associationListener, inbound = true) val wrappedHandle = wrapHandle(handle, associationListener, inbound = true)

View file

@ -119,12 +119,14 @@ trait Transport {
def associate(remoteAddress: Address): Future[AssociationHandle] def associate(remoteAddress: Address): Future[AssociationHandle]
/** /**
* Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous, may be * Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous signalling
* called multiple times and does not return a success indication. * the end of the shutdown by completing the returned future.
* *
* The transport SHOULD try flushing pending writes before becoming completely closed. * The transport SHOULD try flushing pending writes before becoming completely closed.
* @return
* Future signalling the completion of shutdown
*/ */
def shutdown(): Unit def shutdown(): Future[Boolean]
/** /**
* This method allows upper layers to send management commands to the transport. It is the responsibility of the * This method allows upper layers to send management commands to the transport. It is the responsibility of the

View file

@ -417,14 +417,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
} }
} }
override def shutdown(): Unit = { override def shutdown(): Future[Boolean] = {
def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ c.getGroup } def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ true) recover { case _ false }
for { for {
// Force flush by trying to write an empty buffer and wait for success // Force flush by trying to write an empty buffer and wait for success
_ always(channelGroup.write(ChannelBuffers.buffer(0))) lastWriteStatus always(channelGroup.write(ChannelBuffers.buffer(0)))
_ always({ channelGroup.unbind(); channelGroup.disconnect() }) unbindStatus always(channelGroup.unbind())
_ always(channelGroup.close()) disconnectStatus always(channelGroup.disconnect())
} { closeStatus always(channelGroup.close())
} yield {
// Release the selectors, but don't try to kill the dispatcher // Release the selectors, but don't try to kill the dispatcher
if (UseDispatcherForIo.isDefined) { if (UseDispatcherForIo.isDefined) {
clientChannelFactory.shutdown() clientChannelFactory.shutdown()
@ -433,6 +434,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
clientChannelFactory.releaseExternalResources() clientChannelFactory.releaseExternalResources()
serverChannelFactory.releaseExternalResources() serverChannelFactory.releaseExternalResources()
} }
lastWriteStatus && unbindStatus && disconnectStatus && closeStatus
} }
} }