diff --git a/akka-io/src/test/scala/akka/io/IntegrationSpec.scala b/akka-io/src/test/scala/akka/io/IntegrationSpec.scala index f69b8347d0..93c78ecab2 100644 --- a/akka-io/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-io/src/test/scala/akka/io/IntegrationSpec.scala @@ -9,6 +9,8 @@ import akka.actor.ActorRef import akka.util.ByteString import Tcp._ import TestUtils._ +import collection.immutable +import annotation.tailrec class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { @@ -29,6 +31,15 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { verifyActorTermination(serverConnection) } + "properly handle connection abort from one side" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + clientHandler.send(clientConnection, Abort) + clientHandler.expectMsg(Aborted) + serverHandler.expectMsgType[ErrorClose] + verifyActorTermination(clientConnection) + verifyActorTermination(serverConnection) + } + "properly complete one client/server request/response cycle" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() @@ -48,6 +59,20 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { verifyActorTermination(serverConnection) } + "waiting for writes works with backpressure" in new TestSetup { + val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() + + serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 4223)) + serverHandler.expectMsg(4223) + + expectReceivedData(clientHandler, 100000) + + override def bindOptions: immutable.Traversable[SocketOption] = + List(SO.SendBufferSize(1024)) + + override def connectOptions: immutable.Traversable[SocketOption] = + List(SO.ReceiveBufferSize(1024)) + } ////////////////////////////////////// ///////// more tests to come ///////// ////////////////////////////////////// @@ -61,13 +86,13 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { def bindServer(): Unit = { val bindCommander = TestProbe() - bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint)) + bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions)) bindCommander.expectMsg(Bound) } def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = { val connectCommander = TestProbe() - connectCommander.send(IO(Tcp), Connect(endpoint)) + connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions)) val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected] val clientHandler = TestProbe() connectCommander.sender ! Register(clientHandler.ref) @@ -78,6 +103,18 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { (clientHandler, connectCommander.sender, serverHandler, bindHandler.sender) } + + @tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit = + if (remaining > 0) { + val recv = handler.expectMsgType[Received] + expectReceivedData(handler, remaining - recv.data.size) + } + + /** allow overriding socket options for server side channel */ + def bindOptions: collection.immutable.Traversable[SocketOption] = Nil + + /** allow overriding socket options for client side channel */ + def connectOptions: collection.immutable.Traversable[SocketOption] = Nil } -} \ No newline at end of file +}