diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 009d2abd2e..9d1d0e4a1d 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -126,26 +126,23 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc endpointManager match { case Some(manager) ⇒ implicit val timeout = ShutdownTimeout - val stopped: Future[Boolean] = (manager ? ShutdownAndFlush).mapTo[Boolean] def finalize(): Unit = { eventPublisher.notifyListeners(RemotingShutdownEvent) endpointManager = None } - stopped.onComplete { + (manager ? ShutdownAndFlush).mapTo[Boolean].andThen { case Success(flushSuccessful) ⇒ if (!flushSuccessful) - log.warning("Shutdown finished, but flushing timed out. Some messages might not have been sent. " + + log.warning("Shutdown finished, but flushing might not have been successful and some messages might have been dropped. " + "Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this.") finalize() case Failure(e) ⇒ notifyError("Failure during shutdown of remoting.", e) finalize() - } - - stopped map { _ ⇒ () } // RARP needs only type Unit, not a boolean + } map { _ ⇒ () } // RARP needs only type Unit, not a boolean case None ⇒ log.warning("Remoting is not running. Ignoring shutdown attempt.") Future successful (()) @@ -532,12 +529,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpoints.prune() case ShutdownAndFlush ⇒ // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) - val sys = context.system // Avoid closing over context - Future sequence endpoints.allEndpoints.map { - gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop) - } map { _.foldLeft(true) { _ && _ } } recover { - case _: AskTimeoutException ⇒ false - } pipeTo sender + + def shutdownAll[T](resources: TraversableOnce[T])(shutdown: T ⇒ Future[Boolean]): Future[Boolean] = { + (Future sequence resources.map(shutdown(_))) map { _.foldLeft(true) { _ && _ } } recover { + case NonFatal(_) ⇒ false + } + } + + (for { + // The construction of the future for shutdownStatus has to happen after the flushStatus future has been finished + // so that endpoints are shut down before transports. + flushStatus ← shutdownAll(endpoints.allEndpoints)(gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop)) + shutdownStatus ← shutdownAll(transportMapping.values)(_.shutdown()) + } yield flushStatus && shutdownStatus) pipeTo sender + // Ignore all other writes context.become(flushing) } @@ -629,12 +634,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends override def postStop(): Unit = { pruneTimerCancellable.foreach { _.cancel() } - transportMapping.values foreach { transport ⇒ - try transport.shutdown() catch { - case NonFatal(e) ⇒ - log.error(e, s"Unable to shut down the underlying transport: [$transport]") - } - } } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index f7b8193214..d09f18c737 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -4,7 +4,7 @@ package akka.remote.transport import akka.actor._ -import akka.pattern.{ ask, pipe } +import akka.pattern.{ ask, pipe, gracefulStop } import akka.remote.Remoting.RegisterTransportActor import akka.remote.transport.Transport._ import akka.remote.RARP @@ -97,7 +97,7 @@ abstract class AbstractTransportAdapter(protected val wrappedTransport: Transpor statusPromise.future } - override def shutdown(): Unit = wrappedTransport.shutdown() + override def shutdown(): Future[Boolean] = wrappedTransport.shutdown() } @@ -159,7 +159,11 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS override def interceptAssociate(remoteAddress: Address, statusPromise: Promise[AssociationHandle]): Unit = manager ! AssociateUnderlying(remoteAddress, statusPromise) - override def shutdown(): Unit = manager ! PoisonPill + override def shutdown(): Future[Boolean] = + for { + stopResult ← gracefulStop(manager, RARP(system).provider.remoteSettings.FlushWait) + wrappedStopResult ← wrappedTransport.shutdown() + } yield stopResult && wrappedStopResult } abstract class ActorTransportAdapterManager extends Actor diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 3b6c83981b..115ac04777 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -141,10 +141,6 @@ private[transport] class AkkaProtocolManager( private def createTransportFailureDetector(): FailureDetector = FailureDetectorLoader(settings.TransportFailureDetectorImplementationClass, settings.TransportFailureDetectorConfig) - override def postStop() { - wrappedTransport.shutdown() - } - } private[remote] class AkkaProtocolHandle( diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index 2505cef5a6..d591e55564 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -87,7 +87,7 @@ class TestTransport( (localHandle, remoteHandle) } - private def defaultShutdown: Future[Unit] = Future.successful(()) + private def defaultShutdown: Future[Boolean] = Future.successful(true) /** * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the listen() method. @@ -106,13 +106,13 @@ class TestTransport( /** * The [[akka.remote.transport.TestTransport.SwitchableLoggedBehavior]] for the shutdown() method. */ - val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Unit]( + val shutdownBehavior = new SwitchableLoggedBehavior[Unit, Boolean]( (_) ⇒ defaultShutdown, (_) ⇒ registry.logActivity(ShutdownAttempt(localAddress))) override def listen: Future[(Address, Promise[AssociationEventListener])] = listenBehavior(()) override def associate(remoteAddress: Address): Future[AssociationHandle] = associateBehavior(remoteAddress) - override def shutdown(): Unit = shutdownBehavior(()) + override def shutdown(): Future[Boolean] = shutdownBehavior(()) private def defaultWrite(params: (TestAssociationHandle, ByteString)): Future[Boolean] = { registry.getRemoteReadHandlerFor(params._1) match { diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index df4a96238b..0132c0f86c 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -198,8 +198,6 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A private def nakedAddress(address: Address): Address = address.copy(protocol = "", system = "") - override def postStop(): Unit = wrappedTransport.shutdown() - override def ready: Receive = { case InboundAssociation(handle) ⇒ val wrappedHandle = wrapHandle(handle, associationListener, inbound = true) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 88510caa31..3673c2a0fc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -119,12 +119,14 @@ trait Transport { def associate(remoteAddress: Address): Future[AssociationHandle] /** - * Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous, may be - * called multiple times and does not return a success indication. + * Shuts down the transport layer and releases all the corresponding resources. Shutdown is asynchronous signalling + * the end of the shutdown by completing the returned future. * * The transport SHOULD try flushing pending writes before becoming completely closed. + * @return + * Future signalling the completion of shutdown */ - def shutdown(): Unit + def shutdown(): Future[Boolean] /** * This method allows upper layers to send management commands to the transport. It is the responsibility of the diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index de9c2b2b23..936bc1eb0a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -417,14 +417,15 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA } } - override def shutdown(): Unit = { - def always(c: ChannelGroupFuture) = NettyFutureBridge(c) recover { case _ ⇒ c.getGroup } + override def shutdown(): Future[Boolean] = { + def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ ⇒ true) recover { case _ ⇒ false } for { // Force flush by trying to write an empty buffer and wait for success - _ ← always(channelGroup.write(ChannelBuffers.buffer(0))) - _ ← always({ channelGroup.unbind(); channelGroup.disconnect() }) - _ ← always(channelGroup.close()) - } { + lastWriteStatus ← always(channelGroup.write(ChannelBuffers.buffer(0))) + unbindStatus ← always(channelGroup.unbind()) + disconnectStatus ← always(channelGroup.disconnect()) + closeStatus ← always(channelGroup.close()) + } yield { // Release the selectors, but don't try to kill the dispatcher if (UseDispatcherForIo.isDefined) { clientChannelFactory.shutdown() @@ -433,6 +434,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA clientChannelFactory.releaseExternalResources() serverChannelFactory.releaseExternalResources() } + lastWriteStatus && unbindStatus && disconnectStatus && closeStatus } }