Merge pull request #1197 from drewhk/wip-3036-newremoteactor-failed-drewhk

Hardened flushing code in Remoting #3036
This commit is contained in:
drewhk 2013-02-26 02:13:44 -08:00
commit 3ce5c3999e
5 changed files with 34 additions and 6 deletions

View file

@ -36,8 +36,16 @@ trait GracefulStopSupport {
*
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
* is completed with failure [[akka.pattern.AskTimeoutException]].
*
* If you want to invoke specalized stopping logic on your target actor instead of PoisonPill, you can pass your
* stop command as a parameter:
* {{{
* gracefulStop(someChild, timeout, MyStopGracefullyMessage).onComplete {
* // Do something after someChild being stopped
* }
* }}}
*/
def gracefulStop(target: ActorRef, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Boolean] = {
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) Future successful true
else system match {
case e: ExtendedActorSystem
@ -50,7 +58,7 @@ trait GracefulStopSupport {
case Success(Terminated(`target`)) ()
case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
}
target ! PoisonPill
target ! stopMessage
f map {
case Terminated(`target`) true
case _ false

View file

@ -104,6 +104,7 @@ private[remote] object EndpointWriter {
*/
case class TakeOver(handle: AssociationHandle)
case object BackoffTimer
case object FlushAndStop
sealed trait State
case object Initializing extends State
@ -202,6 +203,10 @@ private[remote] class EndpointWriter(
stay()
case Event(BackoffTimer, _) goto(Writing)
case Event(FlushAndStop, _)
stash() // Flushing is postponed after the pending writes
stay()
}
when(Writing) {
@ -221,6 +226,9 @@ private[remote] class EndpointWriter(
case NonFatal(e: EndpointException) publishAndThrow(e)
case NonFatal(e) publishAndThrow(new EndpointException("Failed to write message to the transport", e))
}
// We are in Writing state, so stash is emtpy, safe to stop here
case Event(FlushAndStop, _) stop()
}
when(Handoff) {
@ -245,6 +253,8 @@ private[remote] class EndpointWriter(
reader foreach context.stop
handle = Some(newHandle)
goto(Handoff)
case Event(FlushAndStop, _)
stop()
}
onTransition {

View file

@ -454,7 +454,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
// Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully)
val sys = context.system // Avoid closing over context
Future sequence endpoints.allEndpoints.map {
gracefulStop(_, settings.FlushWait)(sys)
gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop)(sys)
} map { _.foldLeft(true) { _ && _ } } pipeTo sender
// Ignore all other writes
context.become(flushing)

View file

@ -199,7 +199,13 @@ private[netty] abstract class ClientHandler(protected final val transport: Netty
private[transport] object NettyTransport {
// 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler.
val FrameLengthFieldLength = 4
def gracefulClose(channel: Channel): Unit = channel.disconnect().addListener(ChannelFutureListener.CLOSE)
def gracefulClose(channel: Channel)(implicit ec: ExecutionContext): Unit = {
def always(c: ChannelFuture) = NettyFutureBridge(c) recover { case _ c.getChannel }
for {
_ always { channel.write(ChannelBuffers.buffer(0)) } // Force flush by waiting on a final dummy write
_ always { channel.disconnect() }
} channel.close()
}
val uniqueIdCounter = new AtomicInteger(0)

View file

@ -35,7 +35,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
remoteSocketAddress: InetSocketAddress): Unit = ChannelLocalActor.set(channel, Some(listener))
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
new TcpAssociationHandle(localAddress, remoteAddress, channel)
new TcpAssociationHandle(localAddress, remoteAddress, transport, channel)
override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
notifyListener(e.getChannel, Disassociated)
@ -76,8 +76,12 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress
/**
* INTERNAL API
*/
private[remote] class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel)
private[remote] class TcpAssociationHandle(val localAddress: Address,
val remoteAddress: Address,
val transport: NettyTransport,
private val channel: Channel)
extends AssociationHandle {
import transport.executionContext
override val readHandlerPromise: Promise[HandleEventListener] = Promise()