diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 2f34acda3a..90cc34edce 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -38,6 +38,16 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private[this] var readingSuspended = pullMode private[this] var interestedInResume: Option[ActorRef] = None var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop + private var watchedActor: ActorRef = context.system.deadLetters + + def signDeathPact(actor: ActorRef): Unit = { + unsignDeathPact() + watchedActor = actor + context.watch(watchedActor) + } + + def unsignDeathPact(): Unit = + if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor) def writePending = pendingWrite ne EmptyPendingWrite @@ -268,6 +278,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha peerClosed = true context.become(peerSentEOF(info)) case _ if writePending ⇒ // finish writing first + // Our registered actor is now free to terminate cleanly + unsignDeathPact() if (TraceLogging) log.debug("Got Close command but write is still pending.") context.become(closingWithPendingWrite(info, closeCommander, closedEvent)) case ConfirmedClosed ⇒ // shutdown output and wait for confirmation diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index 7153022400..a5f835fcfa 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -23,7 +23,7 @@ private[io] class TcpIncomingConnection(_tcp: TcpExt, readThrottling: Boolean) extends TcpConnection(_tcp, _channel, readThrottling) { - context.watch(bindHandler) // sign death pact + signDeathPact(bindHandler) registry.register(channel, initialOps = 0) diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 80015d5303..79e6426467 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -30,7 +30,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt, import context._ import connect._ - context.watch(commander) // sign death pact + signDeathPact(commander) options.foreach(_.beforeConnect(channel.socket)) localAddress.foreach(channel.socket.bind)