=act #25733 close channels from SelectionHandler and force flushing from selector

This is needed because starting from JDK 11 all platforms will only finally
close a channel when it is flushed from a selector.

Fixes #25733.
This commit is contained in:
Johannes Rudolph 2018-11-14 15:32:30 +01:00
parent 8419671de2
commit 7899708ca6
No known key found for this signature in database
GPG key ID: 4D293A24CCD39E19
10 changed files with 184 additions and 75 deletions

View file

@ -40,10 +40,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var writingSuspended = false
private[this] var readingSuspended = pullMode
private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop
private[this] var closedMessage: Option[CloseInformation] = None // for ConnectionClosed message in postStop
private var watchedActor: ActorRef = context.system.deadLetters
private var registration: Option[ChannelRegistration] = None
def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration)
def signDeathPact(actor: ActorRef): Unit = {
unsignDeathPact()
watchedActor = actor
@ -70,9 +71,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
// if we have resumed reading from pullMode while waiting for Register then register OP_READ interest
if (pullMode && !readingSuspended) resumeReading(info)
doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended
// if we are in push mode or already have resumed reading in pullMode while waiting for Register
// then register OP_READ interest
if (!pullMode || ( /*pullMode && */ !readingSuspended)) resumeReading(info)
context.setReceiveTimeout(Duration.Undefined)
context.become(connected(info))
@ -190,6 +191,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
}
/** stopWith sets this state while waiting for the SelectionHandler to execute the `cancelAndClose` thunk */
def unregistering: Receive = {
case Unregistered context.stop(self) // postStop will notify interested parties
}
// AUXILIARIES and IMPLEMENTATION
/** used in subclasses to start the common machinery above once a channel is connected */
@ -227,6 +233,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
info.registration.enableInterest(OP_READ)
}
/**
* Read from the channel and potentially send out `Received` message to handler.
*
* In some cases, this method will change the state with `context.become`.
*/
def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit =
if (!readingSuspended) {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
@ -305,8 +316,6 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
}
def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = {
if (closedEvent == Aborted) abort()
else channel.close()
stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent))
}
@ -314,6 +323,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
log.debug("Closing connection due to IO error {}", exception)
stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))))
}
def safeShutdownOutput(): Boolean =
try {
channel.socket().shutdownOutput()
@ -331,7 +341,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
}
}
def abort(): Unit = {
def prepareAbort(): Unit = {
try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST
catch {
case NonFatal(e)
@ -339,36 +349,52 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
// (also affected: OS/X Java 1.6.0_37)
if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e)
}
channel.close()
// Actual channel closing is done in stopWith or postStop by calling registration.cancelAndClose()
// which makes sure the channel is flushed from the selector as well.
// On linux, closing the channel directly triggers a RST as a side effect of `preClose`
// called from `sun.nio.ch.SocketChannelImpl#implCloseSelectableChannel`.
// On windows, however, the connection is merely added to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`,
// This is necessary because on Windows (and all platforms starting with JDK 11) the connection is merely added
// to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`,
// and `sun.nio.ch.SelectorImpl` will kill those from `processDeregisterQueue` after the select poll has returned.
// We don't want to have to wait for that, hence explicitly triggering the cancellation:
registration.foreach(_.cancel())
}
def stopWith(closeInfo: CloseInformation): Unit = {
closedMessage = closeInfo
context.stop(self)
def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = {
closedMessage = Some(closeInfo)
if (closeInfo.closedEvent == Aborted || shouldAbort)
prepareAbort()
registration match {
case None
context.stop(self)
case Some(reg)
context.become(unregistering)
reg.cancelAndClose(() self ! Unregistered)
}
}
override def postStop(): Unit = {
if (channel.isOpen)
abort()
if (writePending) pendingWrite.release()
if (closedMessage != null) {
val interestedInClose =
if (writePending) closedMessage.notificationsTo + pendingWrite.commander
else closedMessage.notificationsTo
val interestedInClose: Set[ActorRef] =
(if (writePending) Set(pendingWrite.commander) else Set.empty) ++
closedMessage.toSet[CloseInformation].flatMap(_.notificationsTo)
interestedInClose.foreach(_ ! closedMessage.closedEvent)
}
if (channel.isOpen) // if channel is still open here, we didn't go through stopWith => unexpected actor termination
prepareAbort()
def isCommandFailed: Boolean = closedMessage.exists(_.closedEvent.isInstanceOf[CommandFailed])
def notifyInterested(): Unit =
for {
msg closedMessage
ref interestedInClose
} ref ! msg.closedEvent
if (!channel.isOpen || isCommandFailed || registration.isEmpty)
// if channel was already closed we can send out notification directly
notifyInterested()
else
// otherwise, we unregister and notify afterwards
registration.foreach(_.cancelAndClose(() notifyInterested()))
}
override def postRestart(reason: Throwable): Unit =
@ -506,6 +532,7 @@ private[io] object TcpConnection {
final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () Unit) extends NoSerializationVerificationNeeded
final case class WriteFileFailed(e: IOException)
case object Unregistered
sealed abstract class PendingWrite {
def commander: ActorRef