diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index ff5a9cdaba..466f4d8805 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -47,13 +47,13 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private var registration: Option[ChannelRegistration] = None def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration) - def signDeathPact(actor: ActorRef): Unit = { + protected def signDeathPact(actor: ActorRef): Unit = { unsignDeathPact() watchedActor = actor context.watch(watchedActor) } - def unsignDeathPact(): Unit = + protected def unsignDeathPact(): Unit = if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor) def writePending = pendingWrite ne EmptyPendingWrite @@ -65,10 +65,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case Register(handler, keepOpenOnPeerClosed, useResumeWriting) => // up to this point we've been watching the commander, // but since registration is now complete we only need to watch the handler from here on - if (handler != commander) { - context.unwatch(commander) - context.watch(handler) - } + if (handler != commander) + signDeathPact(handler) // will unsign death pact with commander automatically + if (TraceLogging) log.debug("[{}] registered as connection handler", handler) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) @@ -364,6 +363,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = { closedMessage = Some(closeInfo) + unsignDeathPact() if (closeInfo.closedEvent == Aborted || shouldAbort) prepareAbort()