From b908f01b5952cc31d3928edbedd9189290d1128c Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 29 Jan 2019 14:29:43 +0100 Subject: [PATCH] =act #26300 try not to get stopped by death pact before Unregistration is complete Otherwise, there will be noisy DeadLetter logging and also the ordering guarantee that we try to hold up that the connection actor is not closed before the channel is violated. --- .../src/main/scala/akka/io/TcpConnection.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 026c549182..090fae5d3a 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -45,13 +45,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private var registration: Option[ChannelRegistration] = None def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration) - def signDeathPact(actor: ActorRef): Unit = { + protected def signDeathPact(actor: ActorRef): Unit = { unsignDeathPact() watchedActor = actor context.watch(watchedActor) } - def unsignDeathPact(): Unit = + protected def unsignDeathPact(): Unit = if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor) def writePending = pendingWrite ne EmptyPendingWrite @@ -63,10 +63,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case Register(handler, keepOpenOnPeerClosed, useResumeWriting) ⇒ // up to this point we've been watching the commander, // but since registration is now complete we only need to watch the handler from here on - if (handler != commander) { - context.unwatch(commander) - context.watch(handler) - } + if (handler != commander) + signDeathPact(handler) // will unsign death pact with commander automatically + if (TraceLogging) log.debug("[{}] registered as connection handler", handler) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) @@ -359,6 +358,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = { closedMessage = Some(closeInfo) + unsignDeathPact() if (closeInfo.closedEvent == Aborted || shouldAbort) prepareAbort()