From 389768b488fd390c47e9e4c4ee031d7236f2dead Mon Sep 17 00:00:00 2001 From: Roland Date: Wed, 30 Jan 2013 11:28:47 +0100 Subject: [PATCH] clean up test output and help aggregation to pass on my Mac --- .../test/scala/akka/io/IntegrationSpec.scala | 6 +- .../scala/akka/io/TcpConnectionSpec.scala | 86 ++++++++++++------- .../test/scala/akka/io/TcpListenerSpec.scala | 8 +- 3 files changed, 63 insertions(+), 37 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala index 0863707936..e4d53f5f9b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala @@ -8,6 +8,8 @@ import akka.testkit.AkkaSpec import akka.util.ByteString import Tcp._ import TestUtils._ +import akka.testkit.EventFilter +import java.io.IOException class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationSpecSupport { @@ -26,7 +28,9 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationS "properly handle connection abort from one side" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - clientHandler.send(clientConnection, Abort) + EventFilter[IOException](occurrences = 1) intercept { + clientHandler.send(clientConnection, Abort) + } clientHandler.expectMsg(Aborted) serverHandler.expectMsgType[ErrorClosed] verifyActorTermination(clientConnection) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index b9b02313af..c5d67a50be 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -4,24 +4,24 @@ package akka.io -import scala.annotation.tailrec - -import org.scalatest.matchers.{ MatchResult, BeMatcher } - -import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel } -import java.nio.ByteBuffer -import java.nio.channels.spi.SelectorProvider import java.io.IOException -import java.net._ +import java.net.{ ConnectException, InetSocketAddress, SocketException } +import java.nio.ByteBuffer +import java.nio.channels.{ SelectionKey, Selector, ServerSocketChannel, SocketChannel } +import java.nio.channels.spi.SelectorProvider +import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ import scala.util.control.NonFatal -import akka.actor.{ PoisonPill, ActorRef, Terminated } -import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } -import akka.util.ByteString -import TestUtils._ -import TcpSelector._ +import org.scalatest.matchers._ import Tcp._ +import TcpSelector._ +import TestUtils._ +import akka.actor.{ ActorRef, PoisonPill, Terminated } +import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } +import akka.util.ByteString +import akka.actor.DeathPactException +import akka.actor.DeathPactException class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { val serverAddress = temporaryServerAddress() @@ -45,8 +45,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") createConnectionActor(options = Vector(SO.KeepAlive(true)))(selector.ref, userHandler.ref) val clientChannel = connectionActor.underlyingActor.channel clientChannel.socket.getKeepAlive must be(false) // only set after connection is established - selector.send(connectionActor, ChannelConnectable) - clientChannel.socket.getKeepAlive must be(true) + EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + clientChannel.socket.getKeepAlive must be(true) + } } "send incoming data to the connection handler" in withEstablishedConnection() { setup ⇒ @@ -61,6 +63,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") expectReceivedString("testdata2testdata3") } + "bundle incoming Received messages as long as more data is available" in withEstablishedConnection( clientSocketOptions = List(SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through ) { setup ⇒ @@ -70,6 +73,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val bigData = new Array[Byte](DataSize) val buffer = ByteBuffer.wrap(bigData) + serverSideChannel.socket.setSendBufferSize(150000) val wrote = serverSideChannel.write(buffer) wrote must be > 140000 @@ -80,6 +84,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // 140000 is more than the direct buffer size connectionHandler.expectMsgType[Received].data.length must be > 140000 } + "receive data directly when the connection is established" in withUnacceptedConnection() { unregisteredSetup ⇒ import unregisteredSetup._ @@ -282,9 +287,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "report when peer aborted the connection" in withEstablishedConnection() { setup ⇒ import setup._ - abortClose(serverSideChannel) - selector.send(connectionActor, ChannelReadable) - connectionHandler.expectMsgType[ErrorClosed].cause must be("Connection reset by peer") + EventFilter[IOException](occurrences = 1) intercept { + abortClose(serverSideChannel) + selector.send(connectionActor, ChannelReadable) + connectionHandler.expectMsgType[ErrorClosed].cause must be("Connection reset by peer") + } // wait a while connectionHandler.expectNoMsg(200.millis) @@ -296,9 +303,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val writer = TestProbe() abortClose(serverSideChannel) - writer.send(connectionActor, Write(ByteString("testdata"))) - // bother writer and handler should get the message - writer.expectMsgType[ErrorClosed] + EventFilter[IOException](occurrences = 1) intercept { + writer.send(connectionActor, Write(ByteString("testdata"))) + // bother writer and handler should get the message + writer.expectMsgType[ErrorClosed] + } connectionHandler.expectMsgType[ErrorClosed] assertThisConnectionActorTerminated() @@ -310,8 +319,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // close instead of accept localServer.close() - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsgType[ErrorClosed].cause must be("Connection reset by peer") + EventFilter[SocketException](occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsgType[ErrorClosed].cause must be("Connection reset by peer") + } verifyActorTermination(connectionActor) } @@ -326,8 +337,10 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") sel.select(200) key.isConnectable must be(true) - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsgType[ErrorClosed].cause must be("Connection refused") + EventFilter[ConnectException](occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsgType[ErrorClosed].cause must be("Connection refused") + } verifyActorTermination(connectionActor) } @@ -336,27 +349,34 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") import setup._ localServer.accept() - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - verifyActorTermination(connectionActor) + EventFilter.warning(pattern = "registration timeout", occurrences = 1) intercept { + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + verifyActorTermination(connectionActor) + } } "close the connection when user handler dies while connecting" in withUnacceptedConnection() { setup ⇒ import setup._ - userHandler.ref ! PoisonPill + EventFilter[DeathPactException](occurrences = 1) intercept { + userHandler.ref ! PoisonPill - verifyActorTermination(connectionActor) + verifyActorTermination(connectionActor) + } } "close the connection when connection handler dies while connected" in withEstablishedConnection() { setup ⇒ import setup._ watch(connectionHandler.ref) watch(connectionActor) - system.stop(connectionHandler.ref) - expectMsgType[Terminated].actor must be(connectionHandler.ref) - expectMsgType[Terminated].actor must be(connectionActor) + EventFilter[DeathPactException](occurrences = 1) intercept { + system.stop(connectionHandler.ref) + expectMsgType[Terminated].actor must be(connectionHandler.ref) + expectMsgType[Terminated].actor must be(connectionActor) + } } } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 91902d0e5a..2a38c1547b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -10,6 +10,7 @@ import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props } import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } import TcpSelector._ import Tcp._ +import akka.testkit.EventFilter class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { @@ -61,9 +62,10 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { val channel = selectorRouter.expectMsgType[RegisterIncomingConnection].channel channel.isOpen must be(true) - listener ! CommandFailed(RegisterIncomingConnection(channel, handler.ref, Nil)) - - awaitCond(!channel.isOpen) + EventFilter.warning(pattern = "selector capacity limit", occurrences = 1) intercept { + listener ! CommandFailed(RegisterIncomingConnection(channel, handler.ref, Nil)) + awaitCond(!channel.isOpen) + } } }