Problem with address already in use in udp tests, see #3084

* Subsequent bind to dynamic port might result in same port if
  the socket is closed inbetween
This commit is contained in:
Patrik Nordwall 2013-02-22 14:14:13 +01:00
parent 00b16997f2
commit 31f3100af9
4 changed files with 34 additions and 20 deletions

View file

@ -19,12 +19,13 @@ class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max
// we now have three channels registered: a listener, a server connection and a client connection
// so register one more channel
val commander = TestProbe()
commander.send(IO(Tcp), Bind(bindHandler.ref, temporaryServerAddress()))
val addresses = temporaryServerAddresses(2)
commander.send(IO(Tcp), Bind(bindHandler.ref, addresses(0)))
commander.expectMsg(Bound)
// we are now at the configured max-channel capacity of 4
val bindToFail = Bind(bindHandler.ref, temporaryServerAddress())
val bindToFail = Bind(bindHandler.ref, addresses(1))
commander.send(IO(Tcp), bindToFail)
commander.expectMsgType[CommandFailed].cmd must be theSameInstanceAs (bindToFail)

View file

@ -4,6 +4,7 @@
package akka.io
import scala.collection.immutable
import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
import akka.actor.{ Terminated, ActorSystem, ActorRef }
@ -11,13 +12,18 @@ import akka.testkit.TestProbe
object TestUtils {
def temporaryServerAddress(address: String = "127.0.0.1"): InetSocketAddress = {
val serverSocket = ServerSocketChannel.open()
try {
serverSocket.socket.bind(new InetSocketAddress(address, 0))
def temporaryServerAddress(address: String = "127.0.0.1"): InetSocketAddress =
temporaryServerAddresses(1, address).head
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1"): immutable.IndexedSeq[InetSocketAddress] = {
val sockets = for (_ 1 to numberOfAddresses) yield {
val serverSocket = ServerSocketChannel.open()
serverSocket.socket.bind(new InetSocketAddress(hostname, 0))
val port = serverSocket.socket.getLocalPort
new InetSocketAddress(address, port)
} finally serverSocket.close()
(serverSocket, new InetSocketAddress(hostname, port))
}
sockets collect { case (socket, address) socket.close(); address }
}
def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = {

View file

@ -11,12 +11,13 @@ import akka.actor.ActorRef
class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
def bindUdp(handler: ActorRef): (InetSocketAddress, ActorRef) = {
val address = temporaryServerAddress()
val addresses = temporaryServerAddresses(3)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), UdpFF.Bind(handler, address))
commander.expectMsg(UdpFF.Bound)
(address, commander.sender)
commander.sender
}
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = {
@ -29,7 +30,8 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
"The UDP connection oriented implementation" must {
"be able to send and receive without binding" in {
val (serverAddress, server) = bindUdp(testActor)
val serverAddress = addresses(0)
val server = bindUdp(serverAddress, testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(localAddress = None, serverAddress, testActor) ! UdpConn.Send(data1)
@ -49,8 +51,9 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
}
"be able to send and receive with binding" in {
val clientAddress = temporaryServerAddress()
val (serverAddress, server) = bindUdp(testActor)
val serverAddress = addresses(1)
val clientAddress = addresses(2)
val server = bindUdp(serverAddress, testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConn.Send(data1)

View file

@ -12,12 +12,13 @@ import akka.actor.ActorRef
class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with ImplicitSender {
def bindUdp(handler: ActorRef): (InetSocketAddress, ActorRef) = {
val address = temporaryServerAddress()
val addresses = temporaryServerAddresses(3)
def bindUdp(address: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), Bind(handler, address))
commander.expectMsg(Bound)
(address, commander.sender)
commander.sender
}
val simpleSender: ActorRef = {
@ -30,7 +31,8 @@ class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Implici
"The UDP Fire-and-Forget implementation" must {
"be able to send without binding" in {
val (serverAddress, server) = bindUdp(testActor)
val serverAddress = addresses(0)
val server = bindUdp(serverAddress, testActor)
val data = ByteString("To infinity and beyond!")
simpleSender ! Send(data, serverAddress)
@ -39,8 +41,10 @@ class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Implici
}
"be able to send with binding" in {
val (serverAddress, server) = bindUdp(testActor)
val (clientAddress, client) = bindUdp(testActor)
val serverAddress = addresses(1)
val clientAddress = addresses(2)
val server = bindUdp(serverAddress, testActor)
val client = bindUdp(clientAddress, testActor)
val data = ByteString("Fly little packet!")
client ! Send(data, serverAddress)