Merge pull request #26301 from jrudolph/jr/26300-Unregistered-DeadLetter-logging
=act #26300 try not to get stopped by death pact before Unregistration is complete
This commit is contained in:
commit
c676735a3b
1 changed files with 6 additions and 6 deletions
|
|
@ -47,13 +47,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
private var registration: Option[ChannelRegistration] = None
|
private var registration: Option[ChannelRegistration] = None
|
||||||
|
|
||||||
def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration)
|
def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration)
|
||||||
def signDeathPact(actor: ActorRef): Unit = {
|
protected def signDeathPact(actor: ActorRef): Unit = {
|
||||||
unsignDeathPact()
|
unsignDeathPact()
|
||||||
watchedActor = actor
|
watchedActor = actor
|
||||||
context.watch(watchedActor)
|
context.watch(watchedActor)
|
||||||
}
|
}
|
||||||
|
|
||||||
def unsignDeathPact(): Unit =
|
protected def unsignDeathPact(): Unit =
|
||||||
if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor)
|
if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor)
|
||||||
|
|
||||||
def writePending = pendingWrite ne EmptyPendingWrite
|
def writePending = pendingWrite ne EmptyPendingWrite
|
||||||
|
|
@ -65,10 +65,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
case Register(handler, keepOpenOnPeerClosed, useResumeWriting) =>
|
case Register(handler, keepOpenOnPeerClosed, useResumeWriting) =>
|
||||||
// up to this point we've been watching the commander,
|
// up to this point we've been watching the commander,
|
||||||
// but since registration is now complete we only need to watch the handler from here on
|
// but since registration is now complete we only need to watch the handler from here on
|
||||||
if (handler != commander) {
|
if (handler != commander)
|
||||||
context.unwatch(commander)
|
signDeathPact(handler) // will unsign death pact with commander automatically
|
||||||
context.watch(handler)
|
|
||||||
}
|
|
||||||
if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
|
if (TraceLogging) log.debug("[{}] registered as connection handler", handler)
|
||||||
|
|
||||||
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
|
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
|
||||||
|
|
@ -364,6 +363,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
||||||
|
|
||||||
def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = {
|
def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = {
|
||||||
closedMessage = Some(closeInfo)
|
closedMessage = Some(closeInfo)
|
||||||
|
unsignDeathPact()
|
||||||
|
|
||||||
if (closeInfo.closedEvent == Aborted || shouldAbort)
|
if (closeInfo.closedEvent == Aborted || shouldAbort)
|
||||||
prepareAbort()
|
prepareAbort()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue