diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index ad5b6a208b..22aad8ae09 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -22,7 +22,7 @@ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max val commander = TestProbe() val addresses = temporaryServerAddresses(2) commander.send(IO(Tcp), Bind(bindHandler.ref, addresses(0))) - commander.expectMsg(Bound) + commander.expectMsg(Bound(addresses(0))) // we are now at the configured max-channel capacity of 4 diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index f6029d4cc4..83974579f9 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -456,8 +456,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } - // This tets is disabled on windows, as the assumption that not calling accept on a server socket means that + // This test is disabled on windows, as the assumption that not calling accept on a server socket means that // no TCP level connection has been established with the client does not hold. + // RK: I think Windows is no different than any other OS in this regard, there was just a sleep() missing. "report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒ import setup._ ignoreIfWindows @@ -465,6 +466,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // close instead of accept localServer.close() + // must give the OS some time to send RST from server to client + Thread.sleep(100) + EventFilter[SocketException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(CommandFailed(Connect(serverAddress))) @@ -679,6 +683,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val userHandler = TestProbe() val selector = TestProbe() val connectionActor = connectionActorCons(selector.ref, userHandler.ref) + // calling .underlyingActor ensures that the actor is actually created at this point val clientSideChannel = connectionActor.underlyingActor.channel selector.expectMsg(RegisterChannel(clientSideChannel, OP_CONNECT)) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index 358b1d614a..bc1c3efa30 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -24,7 +24,7 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ def bindServer(): Unit = { val bindCommander = TestProbe() bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions)) - bindCommander.expectMsg(Bound) + bindCommander.expectMsg(Bound(endpoint)) } def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 2504d48d7f..6bb12241b8 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -23,7 +23,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { "let the Bind commander know when binding is completed" in new TestSetup { listener ! ChannelRegistered - bindCommander.expectMsg(Bound) + bindCommander.expectMsgType[Bound] } "accept acceptable connections and register them with its parent" in new TestSetup { @@ -102,7 +102,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { def bindListener() { listener ! ChannelRegistered - bindCommander.expectMsg(Bound) + bindCommander.expectMsgType[Bound] } def attemptConnectionToEndpoint(): Unit = new Socket(endpoint.getHostName, endpoint.getPort) diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index aa1e871d00..04f973ed16 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -17,7 +17,7 @@ class UdpConnectedIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() commander.send(IO(Udp), Udp.Bind(handler, address)) - commander.expectMsg(Udp.Bound) + commander.expectMsg(Udp.Bound(address)) commander.sender } diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index 6dba329661..080cc8fbad 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -17,7 +17,7 @@ class UdpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitS def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() commander.send(IO(Udp), Bind(handler, address)) - commander.expectMsg(Bound) + commander.expectMsg(Bound(address)) commander.sender } diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index b1a9bb62f2..52cc39358d 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -117,8 +117,7 @@ object Tcp extends ExtensionKey[TcpExt] { case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - sealed trait Bound extends Event - case object Bound extends Bound + case class Bound(localAddress: InetSocketAddress) extends Event sealed trait Unbound extends Event case object Unbound extends Unbound diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 30baaa4c06..9036d190f3 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -11,6 +11,7 @@ import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ import akka.io.Tcp._ import akka.io.IO.HasFailureMessage +import java.net.InetSocketAddress /** * INTERNAL API @@ -42,8 +43,11 @@ private[io] class TcpListener(val selectorRouter: ActorRef, serverSocketChannel.configureBlocking(false) val socket = serverSocketChannel.socket options.foreach(_.beforeServerSocketBind(socket)) - try socket.bind(endpoint, backlog) - catch { + try { + socket.bind(endpoint, backlog) + require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], + s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") + } catch { case NonFatal(e) ⇒ bindCommander ! bind.failureMessage log.error(e, "Bind failed for TCP channel on endpoint [{}]", endpoint) @@ -58,7 +62,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, def receive: Receive = { case ChannelRegistered ⇒ - bindCommander ! Bound + bindCommander ! Bound(channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]) context.become(bound) } diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 11c3402ed4..d6c6e0589a 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -54,8 +54,7 @@ object Udp extends ExtensionKey[UdpExt] { case class Received(data: ByteString, sender: InetSocketAddress) extends Event case class CommandFailed(cmd: Command) extends Event - sealed trait Bound extends Event - case object Bound extends Bound + case class Bound(localAddress: InetSocketAddress) extends Event sealed trait SimpleSendReady extends Event case object SimpleSendReady extends SimpleSendReady diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index 827903a6d3..570d0af209 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -34,8 +34,11 @@ private[io] class UdpListener(val udp: UdpExt, datagramChannel.configureBlocking(false) val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) - try socket.bind(endpoint) - catch { + try { + socket.bind(endpoint) + require(socket.getLocalSocketAddress.isInstanceOf[InetSocketAddress], + s"bound to unknown SocketAddress [${socket.getLocalSocketAddress}]") + } catch { case NonFatal(e) ⇒ bindCommander ! CommandFailed(bind) log.error(e, "Failed to bind UDP channel to endpoint [{}]", endpoint) @@ -48,7 +51,7 @@ private[io] class UdpListener(val udp: UdpExt, def receive: Receive = { case ChannelRegistered ⇒ - bindCommander ! Bound + bindCommander ! Bound(channel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]) context.become(readHandlers orElse sendHandlers, discardOld = true) }