From 7899708ca684ce696fd3aa306cf56b396637b632 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 14 Nov 2018 15:32:30 +0100 Subject: [PATCH] =act #25733 close channels from SelectionHandler and force flushing from selector This is needed because starting from JDK 11 all platforms will only finally close a channel when it is flushed from a selector. Fixes #25733. --- .../scala/akka/io/TcpConnectionSpec.scala | 26 +++++- .../scala/akka/io/TcpIntegrationSpec.scala | 23 ++++-- .../test/scala/akka/io/TcpListenerSpec.scala | 22 +++-- .../mima-filters/2.5.18.backwards.excludes | 8 ++ .../main/scala/akka/io/SelectionHandler.scala | 59 +++++++++----- .../main/scala/akka/io/TcpConnection.scala | 81 ++++++++++++------- .../src/main/scala/akka/io/TcpListener.scala | 12 +-- .../scala/akka/io/TcpOutgoingConnection.scala | 4 +- .../src/main/scala/akka/io/UdpListener.scala | 17 ++-- .../test/scala/akka/stream/io/TcpSpec.scala | 7 +- 10 files changed, 184 insertions(+), 75 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 43ec81ae7c..c2efa0c187 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -29,8 +29,11 @@ import java.util.Random import java.net.SocketTimeoutException import java.nio.file.Files +import akka.testkit.WithLogCapturing import com.google.common.jimfs.{ Configuration, Jimfs } +import scala.util.Try + object TcpConnectionSpec { case class Ack(i: Int) extends Event object Ack extends Ack(0) @@ -38,9 +41,12 @@ object TcpConnectionSpec { } class TcpConnectionSpec extends AkkaSpec(""" + akka.loglevel = DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.io.tcp.trace-logging = on akka.io.tcp.register-timeout = 500ms akka.actor.serialize-creators = on - """) { thisSpecs ⇒ + """) with WithLogCapturing { thisSpecs ⇒ import TcpConnectionSpec._ // Helper to avoid Windows localization specific differences @@ -153,6 +159,8 @@ class TcpConnectionSpec extends AkkaSpec(""" userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) userHandler.send(connectionActor, Register(userHandler.ref)) + interestCallReceiver.expectMsg(OP_READ) + selector.send(connectionActor, ChannelReadable) userHandler.expectMsgType[Received].data.decodeString("ASCII") should ===("immediatedata") ignoreWindowsWorkaroundForTicket15766() interestCallReceiver.expectMsg(OP_READ) @@ -915,9 +923,11 @@ class TcpConnectionSpec extends AkkaSpec(""" new ChannelRegistration { def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op - def cancel(): Unit = () + def cancelAndClose(andThen: () ⇒ Unit): Unit = onCancelAndClose(andThen) } + protected def onCancelAndClose(andThen: () ⇒ Unit): Unit = andThen() + def createConnectionActorWithoutRegistration( serverAddress: InetSocketAddress = serverAddress, options: immutable.Seq[SocketOption] = Nil, @@ -1029,6 +1039,15 @@ class TcpConnectionSpec extends AkkaSpec(""" (sel, key) } + override protected def onCancelAndClose(andThen: () ⇒ Unit): Unit = + try { + if (clientSideChannel.isOpen) clientSideChannel.close() + if (nioSelector.isOpen) { + nioSelector.selectNow() + nioSelector.selectedKeys().clear() + } + } finally Try(andThen()) + /** * Tries to simultaneously act on client and server side to read from the server all pending data from the client. */ @@ -1105,7 +1124,8 @@ class TcpConnectionSpec extends AkkaSpec(""" log.debug("setSoLinger(true, 0) failed with {}", e) } channel.close() - if (Helpers.isWindows) nioSelector.select(10) // Windows needs this + nioSelector.selectNow() + nioSelector.selectedKeys().clear() } } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index dcda3196ea..f4faf35770 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -6,19 +6,23 @@ package akka.io import akka.actor.{ ActorRef, PoisonPill } import akka.io.Tcp._ -import akka.testkit.{ TestProbe, AkkaSpec } +import akka.testkit.{ AkkaSpec, TestProbe } import akka.util.ByteString import java.io.IOException -import java.net.{ ServerSocket, InetSocketAddress } -import org.scalatest.concurrent.Timeouts -import scala.concurrent.duration._ +import java.net.{ InetSocketAddress, ServerSocket } +import akka.testkit.WithLogCapturing +import org.scalatest.concurrent.Timeouts + +import scala.concurrent.duration._ import scala.language.postfixOps class TcpIntegrationSpec extends AkkaSpec(""" - akka.loglevel = INFO + akka.loglevel = debug + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.io.tcp.trace-logging = on akka.actor.serialize-creators = on - """) with TcpIntegrationSpecSupport with Timeouts { + """) with TcpIntegrationSpecSupport with Timeouts with WithLogCapturing { def verifyActorTermination(actor: ActorRef): Unit = { watch(actor) @@ -152,6 +156,13 @@ class TcpIntegrationSpec extends AkkaSpec(""" override def bindOptions = List(SO.SendBufferSize(1024)) override def connectOptions = List(SO.ReceiveBufferSize(1024)) + + serverHandler.send(serverConnection, Close) + serverHandler.expectMsg(Closed) + clientHandler.expectMsg(PeerClosed) + + verifyActorTermination(clientConnection) + verifyActorTermination(serverConnection) } "don't report Connected when endpoint isn't responding" in { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index c26cef0788..9cc5e097a0 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -7,13 +7,15 @@ package akka.io import java.net.Socket import java.nio.channels.{ SelectableChannel, SocketChannel } import java.nio.channels.SelectionKey.OP_ACCEPT + import scala.concurrent.duration._ import akka.actor._ -import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec, EventFilter } -import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } +import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } +import akka.io.TcpListener.{ FailedRegisterIncoming, RegisterIncoming } import akka.io.SelectionHandler._ import akka.testkit.SocketUtil import Tcp._ +import akka.io.TcpListenerSpec.RegisterChannel class TcpListenerSpec extends AkkaSpec(""" akka.io.tcp.batch-accept-limit = 2 @@ -28,7 +30,7 @@ class TcpListenerSpec extends AkkaSpec(""" listener ! new ChannelRegistration { def disableInterest(op: Int) = () def enableInterest(op: Int) = () - def cancel() = () + def cancelAndClose(andThen: () ⇒ Unit): Unit = () } bindCommander.expectMsgType[Bound] } @@ -143,13 +145,18 @@ class TcpListenerSpec extends AkkaSpec(""" private val parentRef = TestActorRef(new ListenerParent(pullMode)) - registerCallReceiver.expectMsg(if (pullMode) 0 else OP_ACCEPT) + val register = registerCallReceiver.expectMsgType[RegisterChannel] + register.initialOps should ===(if (pullMode) 0 else OP_ACCEPT) def bindListener(): Unit = { listener ! new ChannelRegistration { def enableInterest(op: Int): Unit = interestCallReceiver.ref ! op def disableInterest(op: Int): Unit = interestCallReceiver.ref ! -op - def cancel(): Unit = () + def cancelAndClose(andThen: () ⇒ Unit): Unit = { + register.channel.close() + require(!register.channel.isRegistered) + andThen() + } } bindCommander.expectMsgType[Bound] } @@ -178,8 +185,11 @@ class TcpListenerSpec extends AkkaSpec(""" override def supervisorStrategy = SupervisorStrategy.stoppingStrategy def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = - registerCallReceiver.ref.tell(initialOps, channelActor) + registerCallReceiver.ref.tell(RegisterChannel(channel, initialOps), channelActor) } } } +object TcpListenerSpec { + final case class RegisterChannel(channel: SelectableChannel, initialOps: Int) extends NoSerializationVerificationNeeded +} diff --git a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes index 977f3e2703..df53d665f4 100644 --- a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes @@ -46,4 +46,12 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.dns.UnknownRecord ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.dns.UnknownRecord.ttlInSeconds") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.dns.UnknownRecord.ttl") +# Changes to internal implementation classes +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.stopWith") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.closedMessage_=") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.closedMessage") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this") diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 88d2781b46..86b37c018e 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -6,15 +6,17 @@ package akka.io import java.util.{ Iterator ⇒ JIterator } import java.util.concurrent.atomic.AtomicBoolean -import java.nio.channels.{ SelectableChannel, SelectionKey, CancelledKeyException } +import java.nio.channels.{ CancelledKeyException, SelectableChannel, SelectionKey } import java.nio.channels.SelectionKey._ import java.nio.channels.spi.SelectorProvider + import com.typesafe.config.Config + import scala.annotation.tailrec import scala.util.control.NonFatal import scala.concurrent.ExecutionContext import akka.event.LoggingAdapter -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.Helpers.Requiring import akka.util.SerializedSuspendableExecutionContext import akka.actor._ @@ -22,6 +24,8 @@ import akka.routing.RandomPool import akka.event.Logging import java.nio.channels.ClosedChannelException +import scala.util.Try + abstract class SelectionHandlerSettings(config: Config) { import config._ @@ -60,11 +64,10 @@ private[io] trait ChannelRegistration extends NoSerializationVerificationNeeded def disableInterest(op: Int): Unit /** - * Explicitly cancel the registration - * - * This wakes up the selector to make sure the cancellation takes effect immediately. + * Explicitly cancel the registration and close the underlying channel. Then run the given `andThen` method. + * The `andThen` method is run from another thread so make sure it's safe to execute from there. */ - def cancel(): Unit + def cancelAndClose(andThen: () ⇒ Unit): Unit } private[io] object SelectionHandler { @@ -117,7 +120,7 @@ private[io] object SelectionHandler { } else super.logFailure(context, child, cause, decision) } - private class ChannelRegistryImpl(executionContext: ExecutionContext, log: LoggingAdapter) extends ChannelRegistry { + private class ChannelRegistryImpl(executionContext: ExecutionContext, settings: SelectionHandlerSettings, log: LoggingAdapter) extends ChannelRegistry { private[this] val selector = SelectorProvider.provider.openSelector private[this] val wakeUp = new AtomicBoolean(false) @@ -164,21 +167,19 @@ private[io] object SelectionHandler { executionContext.execute(select) // start selection "loop" - def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = + def register(channel: SelectableChannel, initialOps: Int)(implicit channelActor: ActorRef): Unit = { + if (settings.TraceLogging) log.debug(s"Scheduling Registering channel $channel with initialOps $initialOps") execute { new Task { def tryRun(): Unit = try { + if (settings.TraceLogging) log.debug(s"Registering channel $channel with initialOps $initialOps") val key = channel.register(selector, initialOps, channelActor) channelActor ! new ChannelRegistration { def enableInterest(ops: Int): Unit = enableInterestOps(key, ops) + def disableInterest(ops: Int): Unit = disableInterestOps(key, ops) - def cancel(): Unit = { - // On Windows the selector does not effectively cancel the registration until after the - // selector has woken up. Because here the registration is explicitly cancelled, the selector - // will be woken up which makes sure the cancellation (e.g. sending a RST packet for a cancelled TCP connection) - // is performed immediately. - cancelKey(key) - } + + def cancelAndClose(andThen: () ⇒ Unit): Unit = cancelKeyAndClose(key, andThen) } } catch { case _: ClosedChannelException ⇒ @@ -186,6 +187,7 @@ private[io] object SelectionHandler { } } } + } def shutdown(): Unit = execute { @@ -208,6 +210,7 @@ private[io] object SelectionHandler { execute { new Task { def tryRun(): Unit = { + if (settings.TraceLogging) log.debug(s"Enabling $ops on $key") val currentOps = key.interestOps val newOps = currentOps | ops if (newOps != currentOps) key.interestOps(newOps) @@ -215,10 +218,30 @@ private[io] object SelectionHandler { } } - private def cancelKey(key: SelectionKey): Unit = + private def cancelKeyAndClose(key: SelectionKey, andThen: () ⇒ Unit): Unit = execute { new Task { - def tryRun(): Unit = key.cancel() + def tryRun(): Unit = { + Try(key.cancel()) + Try(key.channel().close()) + + // In JDK 11 (and for Windows also in previous JDKs), it is necessary to completely flush a cancelled / closed channel + // from the selector to close a channel completely on the OS level. + // We want select to be called before we call the thunk, so we schedule the thunk here which will run it + // after the next select call. + // (It's tempting to just call `selectNow` here, instead of registering the thunk, but that can mess up + // the wakeUp state of the selector leading to selection operations being stuck behind the next selection + // until that returns regularly the next time.) + + runThunk(andThen) + } + } + } + + private def runThunk(andThen: () ⇒ Unit): Unit = + execute { + new Task { + def tryRun(): Unit = andThen() } } @@ -262,7 +285,7 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A private[this] var childCount = 0 private[this] val registry = { val dispatcher = context.system.dispatchers.lookup(SelectorDispatcher) - new ChannelRegistryImpl(SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher), log) + new ChannelRegistryImpl(SerializedSuspendableExecutionContext(dispatcher.throughput)(dispatcher), settings, log) } def receive: Receive = { diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 7171563c4f..0b0b8a507d 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -40,10 +40,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha private[this] var writingSuspended = false private[this] var readingSuspended = pullMode private[this] var interestedInResume: Option[ActorRef] = None - var closedMessage: CloseInformation = _ // for ConnectionClosed message in postStop + private[this] var closedMessage: Option[CloseInformation] = None // for ConnectionClosed message in postStop private var watchedActor: ActorRef = context.system.deadLetters private var registration: Option[ChannelRegistration] = None + def setRegistration(registration: ChannelRegistration): Unit = this.registration = Some(registration) def signDeathPact(actor: ActorRef): Unit = { unsignDeathPact() watchedActor = actor @@ -70,9 +71,9 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha val info = ConnectionInfo(registration, handler, keepOpenOnPeerClosed, useResumeWriting) - // if we have resumed reading from pullMode while waiting for Register then register OP_READ interest - if (pullMode && !readingSuspended) resumeReading(info) - doRead(info, None) // immediately try reading, pullMode is handled by readingSuspended + // if we are in push mode or already have resumed reading in pullMode while waiting for Register + // then register OP_READ interest + if (!pullMode || ( /*pullMode && */ !readingSuspended)) resumeReading(info) context.setReceiveTimeout(Duration.Undefined) context.become(connected(info)) @@ -190,6 +191,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha case WriteFileFailed(e) ⇒ handleError(info.handler, e) // rethrow exception from dispatcher task } + /** stopWith sets this state while waiting for the SelectionHandler to execute the `cancelAndClose` thunk */ + def unregistering: Receive = { + case Unregistered ⇒ context.stop(self) // postStop will notify interested parties + } + // AUXILIARIES and IMPLEMENTATION /** used in subclasses to start the common machinery above once a channel is connected */ @@ -227,6 +233,11 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha info.registration.enableInterest(OP_READ) } + /** + * Read from the channel and potentially send out `Received` message to handler. + * + * In some cases, this method will change the state with `context.become`. + */ def doRead(info: ConnectionInfo, closeCommander: Option[ActorRef]): Unit = if (!readingSuspended) { @tailrec def innerRead(buffer: ByteBuffer, remainingLimit: Int): ReadResult = @@ -305,8 +316,6 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha } def doCloseConnection(handler: ActorRef, closeCommander: Option[ActorRef], closedEvent: ConnectionClosed): Unit = { - if (closedEvent == Aborted) abort() - else channel.close() stopWith(CloseInformation(Set(handler) ++ closeCommander, closedEvent)) } @@ -314,6 +323,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha log.debug("Closing connection due to IO error {}", exception) stopWith(CloseInformation(Set(handler), ErrorClosed(extractMsg(exception)))) } + def safeShutdownOutput(): Boolean = try { channel.socket().shutdownOutput() @@ -331,7 +341,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha } } - def abort(): Unit = { + def prepareAbort(): Unit = { try channel.socket.setSoLinger(true, 0) // causes the following close() to send TCP RST catch { case NonFatal(e) ⇒ @@ -339,36 +349,52 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha // (also affected: OS/X Java 1.6.0_37) if (TraceLogging) log.debug("setSoLinger(true, 0) failed with [{}]", e) } - channel.close() + // Actual channel closing is done in stopWith or postStop by calling registration.cancelAndClose() + // which makes sure the channel is flushed from the selector as well. - // On linux, closing the channel directly triggers a RST as a side effect of `preClose` - // called from `sun.nio.ch.SocketChannelImpl#implCloseSelectableChannel`. - - // On windows, however, the connection is merely added to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`, + // This is necessary because on Windows (and all platforms starting with JDK 11) the connection is merely added + // to the `cancelledKeys` of the `java.nio.channels.spi.AbstractSelector`, // and `sun.nio.ch.SelectorImpl` will kill those from `processDeregisterQueue` after the select poll has returned. - - // We don't want to have to wait for that, hence explicitly triggering the cancellation: - registration.foreach(_.cancel()) } - def stopWith(closeInfo: CloseInformation): Unit = { - closedMessage = closeInfo - context.stop(self) + def stopWith(closeInfo: CloseInformation, shouldAbort: Boolean = false): Unit = { + closedMessage = Some(closeInfo) + + if (closeInfo.closedEvent == Aborted || shouldAbort) + prepareAbort() + + registration match { + case None ⇒ + context.stop(self) + case Some(reg) ⇒ + context.become(unregistering) + reg.cancelAndClose(() ⇒ self ! Unregistered) + } } override def postStop(): Unit = { - if (channel.isOpen) - abort() - if (writePending) pendingWrite.release() - if (closedMessage != null) { - val interestedInClose = - if (writePending) closedMessage.notificationsTo + pendingWrite.commander - else closedMessage.notificationsTo + val interestedInClose: Set[ActorRef] = + (if (writePending) Set(pendingWrite.commander) else Set.empty) ++ + closedMessage.toSet[CloseInformation].flatMap(_.notificationsTo) - interestedInClose.foreach(_ ! closedMessage.closedEvent) - } + if (channel.isOpen) // if channel is still open here, we didn't go through stopWith => unexpected actor termination + prepareAbort() + + def isCommandFailed: Boolean = closedMessage.exists(_.closedEvent.isInstanceOf[CommandFailed]) + def notifyInterested(): Unit = + for { + msg ← closedMessage + ref ← interestedInClose + } ref ! msg.closedEvent + + if (!channel.isOpen || isCommandFailed || registration.isEmpty) + // if channel was already closed we can send out notification directly + notifyInterested() + else + // otherwise, we unregister and notify afterwards + registration.foreach(_.cancelAndClose(() ⇒ notifyInterested())) } override def postRestart(reason: Throwable): Unit = @@ -506,6 +532,7 @@ private[io] object TcpConnection { final case class UpdatePendingWriteAndThen(remainingWrite: PendingWrite, work: () ⇒ Unit) extends NoSerializationVerificationNeeded final case class WriteFileFailed(e: IOException) + case object Unregistered sealed abstract class PendingWrite { def commander: ActorRef diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 6238f2c86c..0613eefa28 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -13,7 +13,6 @@ import akka.actor._ import akka.io.SelectionHandler._ import akka.io.Tcp._ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import akka.util.Helpers /** * INTERNAL API @@ -98,10 +97,13 @@ private[io] class TcpListener( case Unbind ⇒ log.debug("Unbinding endpoint {}", localAddress) - channel.close() - // see https://github.com/akka/akka/issues/20282 - if (Helpers.isWindows) registration.enableInterest(1) - sender() ! Unbound + registration.cancelAndClose { () ⇒ self ! Unbound } + + context.become(unregistering(sender())) + } + def unregistering(requester: ActorRef): Receive = { + case Unbound ⇒ + requester ! Unbound log.debug("Unbound endpoint {}, stopping listener", localAddress) context.stop(self) } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 1cd6636857..ded40beb02 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -39,7 +39,8 @@ private[io] class TcpOutgoingConnection( channelRegistry.register(channel, 0) timeout foreach context.setReceiveTimeout //Initiate connection timeout if supplied - private def stop(cause: Throwable): Unit = stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause))) + private def stop(cause: Throwable): Unit = + stopWith(CloseInformation(Set(commander), connect.failureMessage.withCause(cause)), shouldAbort = true) private def reportConnectFailure(thunk: ⇒ Unit): Unit = { try { @@ -53,6 +54,7 @@ private[io] class TcpOutgoingConnection( def receive: Receive = { case registration: ChannelRegistration ⇒ + setRegistration(registration) reportConnectFailure { if (remoteAddress.isUnresolved) { log.debug("Resolving {} before connecting", remoteAddress.getHostName) diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index 43e82fd3e6..e83657e473 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -12,7 +12,7 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ Actor, ActorLogging, ActorRef } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import akka.util.{ ByteString, Helpers } +import akka.util.ByteString import akka.io.Inet.DatagramChannelCreator import akka.io.SelectionHandler._ import akka.io.Udp._ @@ -75,12 +75,15 @@ private[io] class UdpListener( case Unbind ⇒ log.debug("Unbinding endpoint [{}]", bind.localAddress) - try { - channel.close() - if (Helpers.isWindows) registration.enableInterest(OP_READ) - sender() ! Unbound - log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress) - } finally context.stop(self) + registration.cancelAndClose(() ⇒ self ! Unbound) + context.become(unregistering(sender())) + } + + def unregistering(requester: ActorRef): Receive = { + case Unbound ⇒ + log.debug("Unbound endpoint [{}], stopping listener", bind.localAddress) + requester ! Unbound + context.stop(self) } def doReceive(registration: ChannelRegistration, handler: ActorRef): Unit = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 9f0856d46e..a79095daa3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -19,6 +19,7 @@ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit._ import akka.testkit.{ EventFilter, TestKit, TestLatch, TestProbe } import akka.testkit.SocketUtil.temporaryServerAddress +import akka.testkit.WithLogCapturing import akka.util.ByteString import akka.{ Done, NotUsed } import com.typesafe.config.ConfigFactory @@ -31,9 +32,11 @@ import scala.concurrent.{ Await, ExecutionContext, Future, Promise } import scala.util.control.NonFatal class TcpSpec extends StreamSpec(""" - akka.loglevel = info + akka.loglevel = debug + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.io.tcp.trace-logging = true akka.stream.materializer.subscription-timeout.timeout = 2s - """) with TcpHelper { + """) with TcpHelper with WithLogCapturing { "Outgoing TCP stream" must {