Fixed TcpConnetionSpec to be l10n aware #3089

This commit is contained in:
Endre Sándor Varga 2013-02-25 15:27:24 +01:00
parent 52e1bb3d92
commit cbe582454c

View file

@ -35,7 +35,40 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} }
} }
lazy val ConnectionResetByPeerMessage: String = {
val serverSocket = ServerSocketChannel.open()
serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0))
try {
val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort))
val clientSocketOnServer = acceptServerSideConnection(serverSocket)
clientSocketOnServer.socket.setSoLinger(true, 0)
clientSocketOnServer.close()
clientSocket.write(ByteBuffer.allocate(1))
null
} catch {
case NonFatal(e) e.getMessage
}
}
lazy val ConnectionRefusedMessagePrefix: String = {
val serverSocket = ServerSocketChannel.open()
serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0))
try {
serverSocket.close()
val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort))
clientSocket.finishConnect()
clientSocket.write(ByteBuffer.allocate(1))
null
} catch {
case NonFatal(e) e.getMessage.substring(0, 15)
}
}
"An outgoing connection" must { "An outgoing connection" must {
info("Connecition reset by peer message expected is " + ConnectionResetByPeerMessage)
info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix)
// common behavior // common behavior
"set socket options before connecting" in withLocalServer() { localServer "set socket options before connecting" in withLocalServer() { localServer
@ -98,11 +131,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup
import unregisteredSetup._ import unregisteredSetup._
@volatile var serverSideChannel: SocketChannel = null val serverSideChannel = acceptServerSideConnection(localServer)
awaitCond {
serverSideChannel = localServer.accept()
serverSideChannel != null
}
serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII")))
serverSideChannel.configureBlocking(false) serverSideChannel.configureBlocking(false)
@ -262,21 +291,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
connectionHandler.expectNoMsg(500.millis) connectionHandler.expectNoMsg(500.millis)
} }
"abort the connection and reply with `Aborted` upon reception of an `Abort` command (simplified)" in withEstablishedConnection() { setup
import setup._
connectionHandler.send(connectionActor, Abort)
connectionHandler.expectMsg(Aborted)
assertThisConnectionActorTerminated()
val buffer = ByteBuffer.allocate(1)
val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException]
}
"abort the connection and reply with `Aborted` upon reception of an `Abort` command" in withEstablishedConnection() { setup "abort the connection and reply with `Aborted` upon reception of an `Abort` command" in withEstablishedConnection() { setup
info("Temporarily disabled due to l10n problems")
pending
import setup._ import setup._
connectionHandler.send(connectionActor, Abort) connectionHandler.send(connectionActor, Abort)
@ -286,7 +301,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
val buffer = ByteBuffer.allocate(1) val buffer = ByteBuffer.allocate(1)
val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException]
thrown.getMessage must be("Connection reset by peer") thrown.getMessage must be(ConnectionResetByPeerMessage)
} }
/* /*
@ -374,30 +389,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertThisConnectionActorTerminated() assertThisConnectionActorTerminated()
} }
"report when peer aborted the connection (simplified)" in withEstablishedConnection() { setup
import setup._
EventFilter[IOException](occurrences = 1) intercept {
abortClose(serverSideChannel)
selector.send(connectionActor, ChannelReadable)
val err = connectionHandler.expectMsgType[ErrorClosed]
}
// wait a while
connectionHandler.expectNoMsg(200.millis)
assertThisConnectionActorTerminated()
}
"report when peer aborted the connection" in withEstablishedConnection() { setup "report when peer aborted the connection" in withEstablishedConnection() { setup
import setup._ import setup._
info("Temporarily disabled due to l10n problems")
pending
EventFilter[IOException](occurrences = 1) intercept { EventFilter[IOException](occurrences = 1) intercept {
abortClose(serverSideChannel) abortClose(serverSideChannel)
selector.send(connectionActor, ChannelReadable) selector.send(connectionActor, ChannelReadable)
val err = connectionHandler.expectMsgType[ErrorClosed] val err = connectionHandler.expectMsgType[ErrorClosed]
err.cause must be("Connection reset by peer") err.cause must be(ConnectionResetByPeerMessage)
} }
// wait a while // wait a while
connectionHandler.expectNoMsg(200.millis) connectionHandler.expectNoMsg(200.millis)
@ -424,44 +424,25 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
// no TCP level connection has been established with the client does not hold. // no TCP level connection has been established with the client does not hold.
"report failed connection attempt while not accepted" in withUnacceptedConnection() { setup "report failed connection attempt while not accepted" in withUnacceptedConnection() { setup
import setup._ import setup._
info("Temporarily disabled due to l10n problems") ignoreIfWindows
pending
// close instead of accept // close instead of accept
localServer.close() localServer.close()
EventFilter[SocketException](occurrences = 1) intercept { EventFilter[SocketException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
val err = userHandler.expectMsgType[ErrorClosed] val err = userHandler.expectMsgType[ErrorClosed]
err.cause must be("Connection reset by peer") err.cause must be(ConnectionResetByPeerMessage)
} }
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)
} }
val UnboundAddress = temporaryServerAddress() val UnboundAddress = temporaryServerAddress()
"report failed connection attempt when target is unreachable (simplified)" in
withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup
import setup._
val sel = SelectorProvider.provider().openSelector()
val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
// This timeout should be large enough to work on Windows
sel.select(3000)
key.isConnectable must be(true)
EventFilter[ConnectException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable)
val err = userHandler.expectMsgType[ErrorClosed]
}
verifyActorTermination(connectionActor)
}
"report failed connection attempt when target is unreachable" in "report failed connection attempt when target is unreachable" in
withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup
import setup._ import setup._
info("Temporarily disabled due to l10n problems")
pending
val sel = SelectorProvider.provider().openSelector() val sel = SelectorProvider.provider().openSelector()
val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
@ -472,7 +453,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
EventFilter[ConnectException](occurrences = 1) intercept { EventFilter[ConnectException](occurrences = 1) intercept {
selector.send(connectionActor, ChannelConnectable) selector.send(connectionActor, ChannelConnectable)
val err = userHandler.expectMsgType[ErrorClosed] val err = userHandler.expectMsgType[ErrorClosed]
err.cause must be("Connection refused") err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true)
} }
verifyActorTermination(connectionActor) verifyActorTermination(connectionActor)
@ -513,6 +494,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} }
} }
def acceptServerSideConnection(localServer: ServerSocketChannel): SocketChannel = {
@volatile var serverSideChannel: SocketChannel = null
awaitCond {
serverSideChannel = localServer.accept()
serverSideChannel != null
}
serverSideChannel
}
def withLocalServer(setServerSocketOptions: ServerSocketChannel Unit = _ ())(body: ServerSocketChannel Any): Unit = { def withLocalServer(setServerSocketOptions: ServerSocketChannel Unit = _ ())(body: ServerSocketChannel Any): Unit = {
val localServer = ServerSocketChannel.open() val localServer = ServerSocketChannel.open()
try { try {
@ -673,11 +663,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup
import unregisteredSetup._ import unregisteredSetup._
@volatile var serverSideChannel: SocketChannel = null val serverSideChannel = acceptServerSideConnection(localServer)
awaitCond {
serverSideChannel = localServer.accept()
serverSideChannel != null
}
serverSideChannel.configureBlocking(false) serverSideChannel.configureBlocking(false)
serverSideChannel must not be (null) serverSideChannel must not be (null)