From 4bdccd416fd220be0b4d2f0490521fce1919b37d Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 1 Jun 2017 00:32:58 -0700 Subject: [PATCH] Make sure connections are aborted correctly on Windows #19611 --- .../scala/akka/io/TcpConnectionSpec.scala | 1 + .../test/scala/akka/io/TcpListenerSpec.scala | 2 ++ .../main/scala/akka/io/SelectionHandler.scala | 25 +++++++++++++++++-- .../main/scala/akka/io/TcpConnection.scala | 12 +++++++++ project/MiMa.scala | 4 ++- 5 files changed, 41 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 11847c112b..6b90705288 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -911,6 +911,7 @@ class TcpConnectionSpec extends AkkaSpec(""" new ChannelRegistration { def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op + def cancel(): Unit = () } def createConnectionActorWithoutRegistration( diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index e27025b583..897ee8a7ad 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -28,6 +28,7 @@ class TcpListenerSpec extends AkkaSpec(""" listener ! new ChannelRegistration { def disableInterest(op: Int) = () def enableInterest(op: Int) = () + def cancel() = () } bindCommander.expectMsgType[Bound] } @@ -148,6 +149,7 @@ class TcpListenerSpec extends AkkaSpec(""" listener ! new ChannelRegistration { def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op + def cancel(): Unit = () } bindCommander.expectMsgType[Bound] } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 341d35c3ff..cdd4516b7b 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -56,8 +56,15 @@ private[io] trait ChannelRegistry { * Enables a channel actor to directly schedule interest setting tasks to the selector management dispatcher. */ private[io] trait ChannelRegistration extends NoSerializationVerificationNeeded { - def enableInterest(op: Int) - def disableInterest(op: Int) + def enableInterest(op: Int): Unit + def disableInterest(op: Int): Unit + + /** + * Explicitly cancel the registration + * + * This wakes up the selector to make sure the cancellation takes effect immediately. + */ + def cancel(): Unit } private[io] object SelectionHandler { @@ -159,6 +166,13 @@ private[io] object SelectionHandler { channelActor ! new ChannelRegistration { def enableInterest(ops: Int): Unit = enableInterestOps(key, ops) def disableInterest(ops: Int): Unit = disableInterestOps(key, ops) + def cancel(): Unit = { + // On Windows the selector does not effectively cancel the registration until after the + // selector has woken up. Because here the registration is explicitly cancelled, the selector + // will be woken up which makes sure the cancellation (e.g. sending a RST packet for a cancelled TCP connection) + // is performed immediately. + cancelKey(key) + } } } catch { case _: ClosedChannelException ⇒ @@ -195,6 +209,13 @@ private[io] object SelectionHandler { } } + private def cancelKey(key: SelectionKey): Unit = + execute { + new Task { + def tryRun(): Unit = key.cancel() + } + } + private def disableInterestOps(key: SelectionKey, ops: Int): Unit = execute { new Task { diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 2401fb6fa5..a11e7db131 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -41,6 +41,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private[this] var interestedInResume: Option[ActorRef] = None var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop private var watchedActor: ActorRef = context.system.deadLetters + private var registration: Option[ChannelRegistration] = None def signDeathPact(actor: ActorRef): Unit = { unsignDeathPact() @@ -193,6 +194,8 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha /** used in subclasses to start the common machinery above once a channel is connected */ def completeConnect(registration: ChannelRegistration, commander: ActorRef, options: immutable.Traversable[SocketOption]): Unit = { + this.registration = Some(registration) + // Turn off Nagle's algorithm by default try channel.socket.setTcpNoDelay(true) catch { case e: SocketException ⇒ @@ -336,6 +339,15 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e) } channel.close() + + // On linux, closing the channel directly triggers a RST as a side effect of `preClose` + // called from `sun.nio.ch.SocketChannelImpl#implCloseSelectableChannel`. + + // On windows, however, the connection is merely added to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`, + // and `sun.nio.ch.SelectorImpl` will kill those from `processDeregisterQueue` after the select poll has returned. + + // We don't want to have to wait for that, hence explicitly triggering the cancellation: + registration.foreach(_.cancel()) } def stopWith(closeInfo: CloseInformation): Unit = { diff --git a/project/MiMa.scala b/project/MiMa.scala index 7b8135ef97..d2b513fa8f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1192,13 +1192,15 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async") ), "2.5.1" -> Seq( - // #22794 watchWith ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ActorContext.watchWith"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.watchWith"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.DeathWatch.akka$actor$dungeon$DeathWatch$$watching_="), + // #22881 Make sure connections are aborted correctly on Windows + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancel"), + // #22868 store shards ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"),