=act #26300 try not to get stopped by death pact before Unregistration is complete

Otherwise, there will be noisy DeadLetter logging and also the ordering guarantee
that we try to hold up that the connection actor is not closed before the channel
is violated.
This commit is contained in:
Johannes Rudolph 2019-01-29 14:29:43 +01:00
parent 2c531f0422
commit b908f01b59
No known key found for this signature in database
GPG key ID: 4D293A24CCD39E19

View file

@ -45,13 +45,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private var registration: Option[ChannelRegistration] = None private var registration: Option[ChannelRegistration] = None
def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration) def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration)
def signDeathPact(actor: ActorRef): Unit = { protected def signDeathPact(actor: ActorRef): Unit = {
unsignDeathPact() unsignDeathPact()
watchedActor = actor watchedActor = actor
context.watch(watchedActor) context.watch(watchedActor)
} }
def unsignDeathPact(): Unit = protected def unsignDeathPact(): Unit =
if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor) if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor)
def writePending = pendingWrite ne EmptyPendingWrite def writePending = pendingWrite ne EmptyPendingWrite
@ -63,10 +63,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case Register(handler, keepOpenOnPeerClosed, useResumeWriting) case Register(handler, keepOpenOnPeerClosed, useResumeWriting)
// up to this point we've been watching the commander, // up to this point we've been watching the commander,
// but since registration is now complete we only need to watch the handler from here on // but since registration is now complete we only need to watch the handler from here on
if (handler != commander) { if (handler != commander)
context.unwatch(commander) signDeathPact(handler) // will unsign death pact with commander automatically
context.watch(handler)
}
if (TraceLogging) log.debug("[{}] registered as connection handler", handler) if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
@ -359,6 +358,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = { def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = {
closedMessage = Some(closeInfo) closedMessage = Some(closeInfo)
unsignDeathPact()
if (closeInfo.closedEvent == Aborted || shouldAbort) if (closeInfo.closedEvent == Aborted || shouldAbort)
prepareAbort() prepareAbort()