diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 521175fe60..dba1cd6461 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -36,8 +36,16 @@ trait GracefulStopSupport { * * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. + * + * If you want to invoke specalized stopping logic on your target actor instead of PoisonPill, you can pass your + * stop command as a parameter: + * {{{ + * gracefulStop(someChild, timeout, MyStopGracefullyMessage).onComplete { + * // Do something after someChild being stopped + * } + * }}} */ - def gracefulStop(target: ActorRef, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Boolean] = { + def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill)(implicit system: ActorSystem): Future[Boolean] = { if (target.isTerminated) Future successful true else system match { case e: ExtendedActorSystem ⇒ @@ -50,7 +58,7 @@ trait GracefulStopSupport { case Success(Terminated(`target`)) ⇒ () case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) } - target ! PoisonPill + target ! stopMessage f map { case Terminated(`target`) ⇒ true case _ ⇒ false diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 15bf4a8011..b5759409c9 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -104,6 +104,7 @@ private[remote] object EndpointWriter { */ case class TakeOver(handle: AssociationHandle) case object BackoffTimer + case object FlushAndStop sealed trait State case object Initializing extends State @@ -202,6 +203,10 @@ private[remote] class EndpointWriter( stay() case Event(BackoffTimer, _) ⇒ goto(Writing) + + case Event(FlushAndStop, _) ⇒ + stash() // Flushing is postponed after the pending writes + stay() } when(Writing) { @@ -221,6 +226,9 @@ private[remote] class EndpointWriter( case NonFatal(e: EndpointException) ⇒ publishAndThrow(e) case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) } + + // We are in Writing state, so stash is emtpy, safe to stop here + case Event(FlushAndStop, _) ⇒ stop() } when(Handoff) { @@ -245,6 +253,8 @@ private[remote] class EndpointWriter( reader foreach context.stop handle = Some(newHandle) goto(Handoff) + case Event(FlushAndStop, _) ⇒ + stop() } onTransition { diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index a26ac79733..1be99f6f86 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -454,7 +454,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) val sys = context.system // Avoid closing over context Future sequence endpoints.allEndpoints.map { - gracefulStop(_, settings.FlushWait)(sys) + gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop)(sys) } map { _.foldLeft(true) { _ && _ } } pipeTo sender // Ignore all other writes context.become(flushing) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index d1ce2a265d..9b9179a0e0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -199,7 +199,13 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty private[transport] object NettyTransport { // 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler. val FrameLengthFieldLength = 4 - def gracefulClose(channel: Channel): Unit = channel.disconnect().addListener(ChannelFutureListener.CLOSE) + def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = { + def always(c: ChannelFuture) = NettyFutureBridge(c) recover { case _ ⇒ c.getChannel } + for { + _ ← always { channel.write(ChannelBuffers.buffer(0)) } // Force flush by waiting on a final dummy write + _ ← always { channel.disconnect() } + } channel.close() + } val uniqueIdCounter = new AtomicInteger(0) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index fb40b7d966..663568b2ba 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -35,7 +35,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener)) override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = - new TcpAssociationHandle(localAddress, remoteAddress, channel) + new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = notifyListener(e.getChannel, Disassociated) @@ -76,8 +76,12 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress /** * INTERNAL API */ -private[remote] class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel) +private[remote] class TcpAssociationHandle(val localAddress: Address, + val remoteAddress: Address, + val transport: NettyTransport, + private val channel: Channel) extends AssociationHandle { + import transport.executionContext override val readHandlerPromise: Promise[HandleEventListener] = Promise()