=act: TCP actor should unwatch handler/commander after Close

(cherry picked from commit 9578d46)
This commit is contained in:
Endre Sándor Varga 2015-07-06 14:09:06 +02:00
parent 403369a29e
commit b27e5fbb4e
3 changed files with 14 additions and 2 deletions

View file

@ -38,6 +38,16 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var readingSuspended = pullMode private[this] var readingSuspended = pullMode
private[this] var interestedInResume: Option[ActorRef] = None private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
private var watchedActor: ActorRef = context.system.deadLetters
def signDeathPact(actor: ActorRef): Unit = {
unsignDeathPact()
watchedActor = actor
context.watch(watchedActor)
}
def unsignDeathPact(): Unit =
if (watchedActor ne context.system.deadLetters) context.unwatch(watchedActor)
def writePending = pendingWrite ne EmptyPendingWrite def writePending = pendingWrite ne EmptyPendingWrite
@ -268,6 +278,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
peerClosed = true peerClosed = true
context.become(peerSentEOF(info)) context.become(peerSentEOF(info))
case _ if writePending // finish writing first case _ if writePending // finish writing first
// Our registered actor is now free to terminate cleanly
unsignDeathPact()
if (TraceLogging) log.debug("Got Close command but write is still pending.") if (TraceLogging) log.debug("Got Close command but write is still pending.")
context.become(closingWithPendingWrite(info, closeCommander, closedEvent)) context.become(closingWithPendingWrite(info, closeCommander, closedEvent))
case ConfirmedClosed // shutdown output and wait for confirmation case ConfirmedClosed // shutdown output and wait for confirmation

View file

@ -23,7 +23,7 @@ private[io] class TcpIncomingConnection(_tcp: TcpExt,
readThrottling: Boolean) readThrottling: Boolean)
extends TcpConnection(_tcp, _channel, readThrottling) { extends TcpConnection(_tcp, _channel, readThrottling) {
context.watch(bindHandler) // sign death pact signDeathPact(bindHandler)
registry.register(channel, initialOps = 0) registry.register(channel, initialOps = 0)

View file

@ -30,7 +30,7 @@ private[io] class TcpOutgoingConnection(_tcp: TcpExt,
import context._ import context._
import connect._ import connect._
context.watch(commander) // sign death pact signDeathPact(commander)
options.foreach(_.beforeConnect(channel.socket)) options.foreach(_.beforeConnect(channel.socket))
localAddress.foreach(channel.socket.bind) localAddress.foreach(channel.socket.bind)