From cbe582454cfd37334f9da37f98859d60f52ee93f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 25 Feb 2013 15:27:24 +0100 Subject: [PATCH] Fixed TcpConnetionSpec to be l10n aware #3089 --- .../scala/akka/io/TcpConnectionSpec.scala | 114 ++++++++---------- 1 file changed, 50 insertions(+), 64 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 605e43755f..ad276a9015 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -35,7 +35,40 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } } + lazy val ConnectionResetByPeerMessage: String = { + val serverSocket = ServerSocketChannel.open() + serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0)) + + try { + val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort)) + val clientSocketOnServer = acceptServerSideConnection(serverSocket) + clientSocketOnServer.socket.setSoLinger(true, 0) + clientSocketOnServer.close() + clientSocket.write(ByteBuffer.allocate(1)) + null + } catch { + case NonFatal(e) ⇒ e.getMessage + } + } + + lazy val ConnectionRefusedMessagePrefix: String = { + val serverSocket = ServerSocketChannel.open() + serverSocket.socket.bind(new InetSocketAddress("127.0.0.1", 0)) + + try { + serverSocket.close() + val clientSocket = SocketChannel.open(new InetSocketAddress("127.0.0.1", serverSocket.socket().getLocalPort)) + clientSocket.finishConnect() + clientSocket.write(ByteBuffer.allocate(1)) + null + } catch { + case NonFatal(e) ⇒ e.getMessage.substring(0, 15) + } + } + "An outgoing connection" must { + info("Connecition reset by peer message expected is " + ConnectionResetByPeerMessage) + info("Connection refused message prefix expected is " + ConnectionRefusedMessagePrefix) // common behavior "set socket options before connecting" in withLocalServer() { localServer ⇒ @@ -98,11 +131,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ import unregisteredSetup._ - @volatile var serverSideChannel: SocketChannel = null - awaitCond { - serverSideChannel = localServer.accept() - serverSideChannel != null - } + val serverSideChannel = acceptServerSideConnection(localServer) serverSideChannel.write(ByteBuffer.wrap("immediatedata".getBytes("ASCII"))) serverSideChannel.configureBlocking(false) @@ -262,21 +291,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") connectionHandler.expectNoMsg(500.millis) } - "abort the connection and reply with `Aborted` upon reception of an `Abort` command (simplified)" in withEstablishedConnection() { setup ⇒ - import setup._ - - connectionHandler.send(connectionActor, Abort) - connectionHandler.expectMsg(Aborted) - - assertThisConnectionActorTerminated() - - val buffer = ByteBuffer.allocate(1) - val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] - } - "abort the connection and reply with `Aborted` upon reception of an `Abort` command" in withEstablishedConnection() { setup ⇒ - info("Temporarily disabled due to l10n problems") - pending import setup._ connectionHandler.send(connectionActor, Abort) @@ -286,7 +301,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val buffer = ByteBuffer.allocate(1) val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] - thrown.getMessage must be("Connection reset by peer") + thrown.getMessage must be(ConnectionResetByPeerMessage) } /* @@ -374,30 +389,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } - "report when peer aborted the connection (simplified)" in withEstablishedConnection() { setup ⇒ - import setup._ - - EventFilter[IOException](occurrences = 1) intercept { - abortClose(serverSideChannel) - selector.send(connectionActor, ChannelReadable) - val err = connectionHandler.expectMsgType[ErrorClosed] - } - // wait a while - connectionHandler.expectNoMsg(200.millis) - - assertThisConnectionActorTerminated() - } "report when peer aborted the connection" in withEstablishedConnection() { setup ⇒ import setup._ - info("Temporarily disabled due to l10n problems") - pending EventFilter[IOException](occurrences = 1) intercept { abortClose(serverSideChannel) selector.send(connectionActor, ChannelReadable) val err = connectionHandler.expectMsgType[ErrorClosed] - err.cause must be("Connection reset by peer") + err.cause must be(ConnectionResetByPeerMessage) } // wait a while connectionHandler.expectNoMsg(200.millis) @@ -424,44 +424,25 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // no TCP level connection has been established with the client does not hold. "report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒ import setup._ - info("Temporarily disabled due to l10n problems") - pending + ignoreIfWindows + // close instead of accept localServer.close() EventFilter[SocketException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) val err = userHandler.expectMsgType[ErrorClosed] - err.cause must be("Connection reset by peer") + err.cause must be(ConnectionResetByPeerMessage) } verifyActorTermination(connectionActor) } val UnboundAddress = temporaryServerAddress() - "report failed connection attempt when target is unreachable (simplified)" in - withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup ⇒ - import setup._ - - val sel = SelectorProvider.provider().openSelector() - val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) - // This timeout should be large enough to work on Windows - sel.select(3000) - - key.isConnectable must be(true) - EventFilter[ConnectException](occurrences = 1) intercept { - selector.send(connectionActor, ChannelConnectable) - val err = userHandler.expectMsgType[ErrorClosed] - } - - verifyActorTermination(connectionActor) - } "report failed connection attempt when target is unreachable" in withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup ⇒ import setup._ - info("Temporarily disabled due to l10n problems") - pending val sel = SelectorProvider.provider().openSelector() val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) @@ -472,7 +453,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") EventFilter[ConnectException](occurrences = 1) intercept { selector.send(connectionActor, ChannelConnectable) val err = userHandler.expectMsgType[ErrorClosed] - err.cause must be("Connection refused") + err.cause.startsWith(ConnectionRefusedMessagePrefix) must be(true) } verifyActorTermination(connectionActor) @@ -513,6 +494,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } } + def acceptServerSideConnection(localServer: ServerSocketChannel): SocketChannel = { + @volatile var serverSideChannel: SocketChannel = null + awaitCond { + serverSideChannel = localServer.accept() + serverSideChannel != null + } + serverSideChannel + } + def withLocalServer(setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ ())(body: ServerSocketChannel ⇒ Any): Unit = { val localServer = ServerSocketChannel.open() try { @@ -673,11 +663,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") clientSocketOptions: immutable.Seq[SocketOption] = Nil)(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions, createConnectionActor(options = clientSocketOptions)) { unregisteredSetup ⇒ import unregisteredSetup._ - @volatile var serverSideChannel: SocketChannel = null - awaitCond { - serverSideChannel = localServer.accept() - serverSideChannel != null - } + val serverSideChannel = acceptServerSideConnection(localServer) serverSideChannel.configureBlocking(false) serverSideChannel must not be (null)