Merge pull request #25923 from jrudolph/jr/25733-selector-woes-on-jdk11

#25733 close channels from SelectionHandler to make sure they are properly flushed from Selector
This commit is contained in:
Johannes Rudolph 2018-12-04 15:52:53 +01:00 committed by GitHub
commit 9e739ea2f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 184 additions and 75 deletions

View file

@ -29,8 +29,11 @@ import java.util.Random
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.nio.file.Files import java.nio.file.Files
import akka.testkit.WithLogCapturing
import com.google.common.jimfs.{ Configuration, Jimfs } import com.google.common.jimfs.{ Configuration, Jimfs }
import scala.util.Try
object TcpConnectionSpec { object TcpConnectionSpec {
case class Ack(i: Int) extends Event case class Ack(i: Int) extends Event
object Ack extends Ack(0) object Ack extends Ack(0)
@ -38,9 +41,12 @@ object TcpConnectionSpec {
} }
class TcpConnectionSpec extends AkkaSpec(""" class TcpConnectionSpec extends AkkaSpec("""
akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.io.tcp.trace-logging = on
akka.io.tcp.register-timeout = 500ms akka.io.tcp.register-timeout = 500ms
akka.actor.serialize-creators = on akka.actor.serialize-creators = on
""") { thisSpecs """) with WithLogCapturing { thisSpecs
import TcpConnectionSpec._ import TcpConnectionSpec._
// Helper to avoid Windows localization specific differences // Helper to avoid Windows localization specific differences
@ -153,6 +159,8 @@ class TcpConnectionSpec extends AkkaSpec("""
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
userHandler.send(connectionActor, Register(userHandler.ref)) userHandler.send(connectionActor, Register(userHandler.ref))
interestCallReceiver.expectMsg(OP_READ)
selector.send(connectionActor, ChannelReadable)
userHandler.expectMsgType[Received].data.decodeString("ASCII") should ===("immediatedata") userHandler.expectMsgType[Received].data.decodeString("ASCII") should ===("immediatedata")
ignoreWindowsWorkaroundForTicket15766() ignoreWindowsWorkaroundForTicket15766()
interestCallReceiver.expectMsg(OP_READ) interestCallReceiver.expectMsg(OP_READ)
@ -915,9 +923,11 @@ class TcpConnectionSpec extends AkkaSpec("""
new ChannelRegistration { new ChannelRegistration {
def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op
def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op
def cancel(): Unit = () def cancelAndClose(andThen: () Unit): Unit = onCancelAndClose(andThen)
} }
protected def onCancelAndClose(andThen: () Unit): Unit = andThen()
def createConnectionActorWithoutRegistration( def createConnectionActorWithoutRegistration(
serverAddress: InetSocketAddress = serverAddress, serverAddress: InetSocketAddress = serverAddress,
options: immutable.Seq[SocketOption] = Nil, options: immutable.Seq[SocketOption] = Nil,
@ -1029,6 +1039,15 @@ class TcpConnectionSpec extends AkkaSpec("""
(sel, key) (sel, key)
} }
override protected def onCancelAndClose(andThen: () Unit): Unit =
try {
if (clientSideChannel.isOpen) clientSideChannel.close()
if (nioSelector.isOpen) {
nioSelector.selectNow()
nioSelector.selectedKeys().clear()
}
} finally Try(andThen())
/** /**
* Tries to simultaneously act on client and server side to read from the server all pending data from the client. * Tries to simultaneously act on client and server side to read from the server all pending data from the client.
*/ */
@ -1105,7 +1124,8 @@ class TcpConnectionSpec extends AkkaSpec("""
log.debug("setSoLinger(true, 0) failed with {}", e) log.debug("setSoLinger(true, 0) failed with {}", e)
} }
channel.close() channel.close()
if (Helpers.isWindows) nioSelector.select(10) // Windows needs this nioSelector.selectNow()
nioSelector.selectedKeys().clear()
} }
} }

View file

@ -6,19 +6,23 @@ package akka.io
import akka.actor.{ ActorRef, PoisonPill } import akka.actor.{ ActorRef, PoisonPill }
import akka.io.Tcp._ import akka.io.Tcp._
import akka.testkit.{ TestProbe, AkkaSpec } import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.ByteString import akka.util.ByteString
import java.io.IOException import java.io.IOException
import java.net.{ ServerSocket, InetSocketAddress } import java.net.{ InetSocketAddress, ServerSocket }
import org.scalatest.concurrent.Timeouts
import scala.concurrent.duration._
import akka.testkit.WithLogCapturing
import org.scalatest.concurrent.Timeouts
import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
class TcpIntegrationSpec extends AkkaSpec(""" class TcpIntegrationSpec extends AkkaSpec("""
akka.loglevel = INFO akka.loglevel = debug
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.io.tcp.trace-logging = on
akka.actor.serialize-creators = on akka.actor.serialize-creators = on
""") with TcpIntegrationSpecSupport with Timeouts { """) with TcpIntegrationSpecSupport with Timeouts with WithLogCapturing {
def verifyActorTermination(actor: ActorRef): Unit = { def verifyActorTermination(actor: ActorRef): Unit = {
watch(actor) watch(actor)
@ -152,6 +156,13 @@ class TcpIntegrationSpec extends AkkaSpec("""
override def bindOptions = List(SO.SendBufferSize(1024)) override def bindOptions = List(SO.SendBufferSize(1024))
override def connectOptions = List(SO.ReceiveBufferSize(1024)) override def connectOptions = List(SO.ReceiveBufferSize(1024))
serverHandler.send(serverConnection, Close)
serverHandler.expectMsg(Closed)
clientHandler.expectMsg(PeerClosed)
verifyActorTermination(clientConnection)
verifyActorTermination(serverConnection)
} }
"don't report Connected when endpoint isn't responding" in { "don't report Connected when endpoint isn't responding" in {

View file

@ -7,13 +7,15 @@ package akka.io
import java.net.Socket import java.net.Socket
import java.nio.channels.{ SelectableChannel, SocketChannel } import java.nio.channels.{ SelectableChannel, SocketChannel }
import java.nio.channels.SelectionKey.OP_ACCEPT import java.nio.channels.SelectionKey.OP_ACCEPT
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec, EventFilter } import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } import akka.io.TcpListener.{ FailedRegisterIncoming, RegisterIncoming }
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.testkit.SocketUtil import akka.testkit.SocketUtil
import Tcp._ import Tcp._
import akka.io.TcpListenerSpec.RegisterChannel
class TcpListenerSpec extends AkkaSpec(""" class TcpListenerSpec extends AkkaSpec("""
akka.io.tcp.batch-accept-limit = 2 akka.io.tcp.batch-accept-limit = 2
@ -28,7 +30,7 @@ class TcpListenerSpec extends AkkaSpec("""
listener ! new ChannelRegistration { listener ! new ChannelRegistration {
def disableInterest(op: Int) = () def disableInterest(op: Int) = ()
def enableInterest(op: Int) = () def enableInterest(op: Int) = ()
def cancel() = () def cancelAndClose(andThen: () Unit): Unit = ()
} }
bindCommander.expectMsgType[Bound] bindCommander.expectMsgType[Bound]
} }
@ -143,13 +145,18 @@ class TcpListenerSpec extends AkkaSpec("""
private val parentRef = TestActorRef(new ListenerParent(pullMode)) private val parentRef = TestActorRef(new ListenerParent(pullMode))
registerCallReceiver.expectMsg(if (pullMode) 0 else OP_ACCEPT) val register = registerCallReceiver.expectMsgType[RegisterChannel]
register.initialOps should ===(if (pullMode) 0 else OP_ACCEPT)
def bindListener(): Unit = { def bindListener(): Unit = {
listener ! new ChannelRegistration { listener ! new ChannelRegistration {
def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op
def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op
def cancel(): Unit = () def cancelAndClose(andThen: () Unit): Unit = {
register.channel.close()
require(!register.channel.isRegistered)
andThen()
}
} }
bindCommander.expectMsgType[Bound] bindCommander.expectMsgType[Bound]
} }
@ -178,8 +185,11 @@ class TcpListenerSpec extends AkkaSpec("""
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit =
registerCallReceiver.ref.tell(initialOps, channelActor) registerCallReceiver.ref.tell(RegisterChannel(channel, initialOps), channelActor)
} }
} }
} }
object TcpListenerSpec {
final case class RegisterChannel(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded
}

View file

@ -46,4 +46,12 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.dns.UnknownRecord
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.UnknownRecord.ttlInSeconds") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.UnknownRecord.ttlInSeconds")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.dns.UnknownRecord.ttl") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.dns.UnknownRecord.ttl")
# Changes to internal implementation classes
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.stopWith")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.closedMessage_=")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.closedMessage")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this")

View file

@ -6,15 +6,17 @@ package akka.io
import java.util.{ Iterator JIterator } import java.util.{ Iterator JIterator }
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.nio.channels.{ SelectableChannel, SelectionKey, CancelledKeyException } import java.nio.channels.{ CancelledKeyException, SelectableChannel, SelectionKey }
import java.nio.channels.SelectionKey._ import java.nio.channels.SelectionKey._
import java.nio.channels.spi.SelectorProvider import java.nio.channels.spi.SelectorProvider
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.Helpers.Requiring import akka.util.Helpers.Requiring
import akka.util.SerializedSuspendableExecutionContext import akka.util.SerializedSuspendableExecutionContext
import akka.actor._ import akka.actor._
@ -22,6 +24,8 @@ import akka.routing.RandomPool
import akka.event.Logging import akka.event.Logging
import java.nio.channels.ClosedChannelException import java.nio.channels.ClosedChannelException
import scala.util.Try
abstract class SelectionHandlerSettings(config: Config) { abstract class SelectionHandlerSettings(config: Config) {
import config._ import config._
@ -60,11 +64,10 @@ private[io] trait ChannelRegistration extends NoSerializationVerificationNeeded
def disableInterest(op: Int): Unit def disableInterest(op: Int): Unit
/** /**
* Explicitly cancel the registration * Explicitly cancel the registration and close the underlying channel. Then run the given `andThen` method.
* * The `andThen` method is run from another thread so make sure it's safe to execute from there.
* This wakes up the selector to make sure the cancellation takes effect immediately.
*/ */
def cancel(): Unit def cancelAndClose(andThen: () Unit): Unit
} }
private[io] object SelectionHandler { private[io] object SelectionHandler {
@ -117,7 +120,7 @@ private[io] object SelectionHandler {
} else super.logFailure(context, child, cause, decision) } else super.logFailure(context, child, cause, decision)
} }
private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry { private class ChannelRegistryImpl(executionContext: ExecutionContext, settings: SelectionHandlerSettings, log: LoggingAdapter) extends ChannelRegistry {
private[this] val selector = SelectorProvider.provider.openSelector private[this] val selector = SelectorProvider.provider.openSelector
private[this] val wakeUp = new AtomicBoolean(false) private[this] val wakeUp = new AtomicBoolean(false)
@ -164,21 +167,19 @@ private[io] object SelectionHandler {
executionContext.execute(select) // start selection "loop" executionContext.execute(select) // start selection "loop"
def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = {
if (settings.TraceLogging) log.debug(s"Scheduling Registering channel $channel with initialOps $initialOps")
execute { execute {
new Task { new Task {
def tryRun(): Unit = try { def tryRun(): Unit = try {
if (settings.TraceLogging) log.debug(s"Registering channel $channel with initialOps $initialOps")
val key = channel.register(selector, initialOps, channelActor) val key = channel.register(selector, initialOps, channelActor)
channelActor ! new ChannelRegistration { channelActor ! new ChannelRegistration {
def enableInterest(ops: Int): Unit = enableInterestOps(key, ops) def enableInterest(ops: Int): Unit = enableInterestOps(key, ops)
def disableInterest(ops: Int): Unit = disableInterestOps(key, ops) def disableInterest(ops: Int): Unit = disableInterestOps(key, ops)
def cancel(): Unit = {
// On Windows the selector does not effectively cancel the registration until after the def cancelAndClose(andThen: () Unit): Unit = cancelKeyAndClose(key, andThen)
// selector has woken up. Because here the registration is explicitly cancelled, the selector
// will be woken up which makes sure the cancellation (e.g. sending a RST packet for a cancelled TCP connection)
// is performed immediately.
cancelKey(key)
}
} }
} catch { } catch {
case _: ClosedChannelException case _: ClosedChannelException
@ -186,6 +187,7 @@ private[io] object SelectionHandler {
} }
} }
} }
}
def shutdown(): Unit = def shutdown(): Unit =
execute { execute {
@ -208,6 +210,7 @@ private[io] object SelectionHandler {
execute { execute {
new Task { new Task {
def tryRun(): Unit = { def tryRun(): Unit = {
if (settings.TraceLogging) log.debug(s"Enabling $ops on $key")
val currentOps = key.interestOps val currentOps = key.interestOps
val newOps = currentOps | ops val newOps = currentOps | ops
if (newOps != currentOps) key.interestOps(newOps) if (newOps != currentOps) key.interestOps(newOps)
@ -215,10 +218,30 @@ private[io] object SelectionHandler {
} }
} }
private def cancelKey(key: SelectionKey): Unit = private def cancelKeyAndClose(key: SelectionKey, andThen: () Unit): Unit =
execute { execute {
new Task { new Task {
def tryRun(): Unit = key.cancel() def tryRun(): Unit = {
Try(key.cancel())
Try(key.channel().close())
// In JDK 11 (and for Windows also in previous JDKs), it is necessary to completely flush a cancelled / closed channel
// from the selector to close a channel completely on the OS level.
// We want select to be called before we call the thunk, so we schedule the thunk here which will run it
// after the next select call.
// (It's tempting to just call `selectNow` here, instead of registering the thunk, but that can mess up
// the wakeUp state of the selector leading to selection operations being stuck behind the next selection
// until that returns regularly the next time.)
runThunk(andThen)
}
}
}
private def runThunk(andThen: () Unit): Unit =
execute {
new Task {
def tryRun(): Unit = andThen()
} }
} }
@ -262,7 +285,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A
private[this] var childCount = 0 private[this] var childCount = 0
private[this] val registry = { private[this] val registry = {
val dispatcher = context.system.dispatchers.lookup(SelectorDispatcher) val dispatcher = context.system.dispatchers.lookup(SelectorDispatcher)
new ChannelRegistryImpl(SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher), log) new ChannelRegistryImpl(SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher), settings, log)
} }
def receive: Receive = { def receive: Receive = {

View file

@ -40,10 +40,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
private[this] var writingSuspended = false private[this] var writingSuspended = false
private[this] var readingSuspended = pullMode private[this] var readingSuspended = pullMode
private[this] var interestedInResume: Option[ActorRef] = None private[this] var interestedInResume: Option[ActorRef] = None
var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop private[this] var closedMessage: Option[CloseInformation] = None // for ConnectionClosed message in postStop
private var watchedActor: ActorRef = context.system.deadLetters private var watchedActor: ActorRef = context.system.deadLetters
private var registration: Option[ChannelRegistration] = None private var registration: Option[ChannelRegistration] = None
def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration)
def signDeathPact(actor: ActorRef): Unit = { def signDeathPact(actor: ActorRef): Unit = {
unsignDeathPact() unsignDeathPact()
watchedActor = actor watchedActor = actor
@ -70,9 +71,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting)
// if we have resumed reading from pullMode while waiting for Register then register OP_READ interest // if we are in push mode or already have resumed reading in pullMode while waiting for Register
if (pullMode && !readingSuspended) resumeReading(info) // then register OP_READ interest
doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended if (!pullMode || ( /*pullMode && */ !readingSuspended)) resumeReading(info)
context.setReceiveTimeout(Duration.Undefined) context.setReceiveTimeout(Duration.Undefined)
context.become(connected(info)) context.become(connected(info))
@ -190,6 +191,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task case WriteFileFailed(e) handleError(info.handler, e) // rethrow exception from dispatcher task
} }
/** stopWith sets this state while waiting for the SelectionHandler to execute the `cancelAndClose` thunk */
def unregistering: Receive = {
case Unregistered context.stop(self) // postStop will notify interested parties
}
// AUXILIARIES and IMPLEMENTATION // AUXILIARIES and IMPLEMENTATION
/** used in subclasses to start the common machinery above once a channel is connected */ /** used in subclasses to start the common machinery above once a channel is connected */
@ -227,6 +233,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
info.registration.enableInterest(OP_READ) info.registration.enableInterest(OP_READ)
} }
/**
* Read from the channel and potentially send out `Received` message to handler.
*
* In some cases, this method will change the state with `context.become`.
*/
def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit =
if (!readingSuspended) { if (!readingSuspended) {
@tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult = @tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult =
@ -305,8 +316,6 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
} }
def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = { def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = {
if (closedEvent == Aborted) abort()
else channel.close()
stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent)) stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent))
} }
@ -314,6 +323,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
log.debug("Closing connection due to IO error {}", exception) log.debug("Closing connection due to IO error {}", exception)
stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))) stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception))))
} }
def safeShutdownOutput(): Boolean = def safeShutdownOutput(): Boolean =
try { try {
channel.socket().shutdownOutput() channel.socket().shutdownOutput()
@ -331,7 +341,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
} }
} }
def abort(): Unit = { def prepareAbort(): Unit = {
try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST
catch { catch {
case NonFatal(e) case NonFatal(e)
@ -339,36 +349,52 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
// (also affected: OS/X Java 1.6.0_37) // (also affected: OS/X Java 1.6.0_37)
if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e) if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e)
} }
channel.close() // Actual channel closing is done in stopWith or postStop by calling registration.cancelAndClose()
// which makes sure the channel is flushed from the selector as well.
// On linux, closing the channel directly triggers a RST as a side effect of `preClose` // This is necessary because on Windows (and all platforms starting with JDK 11) the connection is merely added
// called from `sun.nio.ch.SocketChannelImpl#implCloseSelectableChannel`. // to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`,
// On windows, however, the connection is merely added to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`,
// and `sun.nio.ch.SelectorImpl` will kill those from `processDeregisterQueue` after the select poll has returned. // and `sun.nio.ch.SelectorImpl` will kill those from `processDeregisterQueue` after the select poll has returned.
// We don't want to have to wait for that, hence explicitly triggering the cancellation:
registration.foreach(_.cancel())
} }
def stopWith(closeInfo: CloseInformation): Unit = { def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = {
closedMessage = closeInfo closedMessage = Some(closeInfo)
if (closeInfo.closedEvent == Aborted || shouldAbort)
prepareAbort()
registration match {
case None
context.stop(self) context.stop(self)
case Some(reg)
context.become(unregistering)
reg.cancelAndClose(() self ! Unregistered)
}
} }
override def postStop(): Unit = { override def postStop(): Unit = {
if (channel.isOpen)
abort()
if (writePending) pendingWrite.release() if (writePending) pendingWrite.release()
if (closedMessage != null) { val interestedInClose: Set[ActorRef] =
val interestedInClose = (if (writePending) Set(pendingWrite.commander) else Set.empty) ++
if (writePending) closedMessage.notificationsTo + pendingWrite.commander closedMessage.toSet[CloseInformation].flatMap(_.notificationsTo)
else closedMessage.notificationsTo
interestedInClose.foreach(_ ! closedMessage.closedEvent) if (channel.isOpen) // if channel is still open here, we didn't go through stopWith => unexpected actor termination
} prepareAbort()
def isCommandFailed: Boolean = closedMessage.exists(_.closedEvent.isInstanceOf[CommandFailed])
def notifyInterested(): Unit =
for {
msg closedMessage
ref interestedInClose
} ref ! msg.closedEvent
if (!channel.isOpen || isCommandFailed || registration.isEmpty)
// if channel was already closed we can send out notification directly
notifyInterested()
else
// otherwise, we unregister and notify afterwards
registration.foreach(_.cancelAndClose(() notifyInterested()))
} }
override def postRestart(reason: Throwable): Unit = override def postRestart(reason: Throwable): Unit =
@ -506,6 +532,7 @@ private[io] object TcpConnection {
final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () Unit) extends NoSerializationVerificationNeeded final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () Unit) extends NoSerializationVerificationNeeded
final case class WriteFileFailed(e: IOException) final case class WriteFileFailed(e: IOException)
case object Unregistered
sealed abstract class PendingWrite { sealed abstract class PendingWrite {
def commander: ActorRef def commander: ActorRef

View file

@ -13,7 +13,6 @@ import akka.actor._
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.io.Tcp._ import akka.io.Tcp._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.Helpers
/** /**
* INTERNAL API * INTERNAL API
@ -98,10 +97,13 @@ private[io] class TcpListener(
case Unbind case Unbind
log.debug("Unbinding endpoint {}", localAddress) log.debug("Unbinding endpoint {}", localAddress)
channel.close() registration.cancelAndClose { () self ! Unbound }
// see https://github.com/akka/akka/issues/20282
if (Helpers.isWindows) registration.enableInterest(1) context.become(unregistering(sender()))
sender() ! Unbound }
def unregistering(requester: ActorRef): Receive = {
case Unbound
requester ! Unbound
log.debug("Unbound endpoint {}, stopping listener", localAddress) log.debug("Unbound endpoint {}, stopping listener", localAddress)
context.stop(self) context.stop(self)
} }

View file

@ -39,7 +39,8 @@ private[io] class TcpOutgoingConnection(
channelRegistry.register(channel, 0) channelRegistry.register(channel, 0)
timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied
private def stop(cause: Throwable): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause))) private def stop(cause: Throwable): Unit =
stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause)), shouldAbort = true)
private def reportConnectFailure(thunk: Unit): Unit = { private def reportConnectFailure(thunk: Unit): Unit = {
try { try {
@ -53,6 +54,7 @@ private[io] class TcpOutgoingConnection(
def receive: Receive = { def receive: Receive = {
case registration: ChannelRegistration case registration: ChannelRegistration
setRegistration(registration)
reportConnectFailure { reportConnectFailure {
if (remoteAddress.isUnresolved) { if (remoteAddress.isUnresolved) {
log.debug("Resolving {} before connecting", remoteAddress.getHostName) log.debug("Resolving {} before connecting", remoteAddress.getHostName)

View file

@ -12,7 +12,7 @@ import scala.annotation.tailrec
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef } import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.{ ByteString, Helpers } import akka.util.ByteString
import akka.io.Inet.DatagramChannelCreator import akka.io.Inet.DatagramChannelCreator
import akka.io.SelectionHandler._ import akka.io.SelectionHandler._
import akka.io.Udp._ import akka.io.Udp._
@ -75,12 +75,15 @@ private[io] class UdpListener(
case Unbind case Unbind
log.debug("Unbinding endpoint [{}]", bind.localAddress) log.debug("Unbinding endpoint [{}]", bind.localAddress)
try { registration.cancelAndClose(() self ! Unbound)
channel.close() context.become(unregistering(sender()))
if (Helpers.isWindows) registration.enableInterest(OP_READ) }
sender() ! Unbound
def unregistering(requester: ActorRef): Receive = {
case Unbound
log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress) log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress)
} finally context.stop(self) requester ! Unbound
context.stop(self)
} }
def doReceive(registration: ChannelRegistration, handler: ActorRef): Unit = { def doReceive(registration: ChannelRegistration, handler: ActorRef): Unit = {

View file

@ -19,6 +19,7 @@ import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe } import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe }
import akka.testkit.SocketUtil.temporaryServerAddress import akka.testkit.SocketUtil.temporaryServerAddress
import akka.testkit.WithLogCapturing
import akka.util.ByteString import akka.util.ByteString
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
@ -31,9 +32,11 @@ import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
import scala.util.control.NonFatal import scala.util.control.NonFatal
class TcpSpec extends StreamSpec(""" class TcpSpec extends StreamSpec("""
akka.loglevel = info akka.loglevel = debug
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.io.tcp.trace-logging = true
akka.stream.materializer.subscription-timeout.timeout = 2s akka.stream.materializer.subscription-timeout.timeout = 2s
""") with TcpHelper { """) with TcpHelper with WithLogCapturing {
"Outgoing TCP stream" must { "Outgoing TCP stream" must {