add more integration tests
This commit is contained in:
parent
6d5458dfeb
commit
54c3d77db2
1 changed files with 40 additions and 3 deletions
|
|
@ -9,6 +9,8 @@ import akka.actor.ActorRef
|
|||
import akka.util.ByteString
|
||||
import Tcp._
|
||||
import TestUtils._
|
||||
import collection.immutable
|
||||
import annotation.tailrec
|
||||
|
||||
class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") {
|
||||
|
||||
|
|
@ -29,6 +31,15 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
verifyActorTermination(serverConnection)
|
||||
}
|
||||
|
||||
"properly handle connection abort from one side" in new TestSetup {
|
||||
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
|
||||
clientHandler.send(clientConnection, Abort)
|
||||
clientHandler.expectMsg(Aborted)
|
||||
serverHandler.expectMsgType[ErrorClose]
|
||||
verifyActorTermination(clientConnection)
|
||||
verifyActorTermination(serverConnection)
|
||||
}
|
||||
|
||||
"properly complete one client/server request/response cycle" in new TestSetup {
|
||||
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
|
||||
|
||||
|
|
@ -48,6 +59,20 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
verifyActorTermination(serverConnection)
|
||||
}
|
||||
|
||||
"waiting for writes works with backpressure" in new TestSetup {
|
||||
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
|
||||
|
||||
serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 4223))
|
||||
serverHandler.expectMsg(4223)
|
||||
|
||||
expectReceivedData(clientHandler, 100000)
|
||||
|
||||
override def bindOptions: immutable.Traversable[SocketOption] =
|
||||
List(SO.SendBufferSize(1024))
|
||||
|
||||
override def connectOptions: immutable.Traversable[SocketOption] =
|
||||
List(SO.ReceiveBufferSize(1024))
|
||||
}
|
||||
//////////////////////////////////////
|
||||
///////// more tests to come /////////
|
||||
//////////////////////////////////////
|
||||
|
|
@ -61,13 +86,13 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
|
||||
def bindServer(): Unit = {
|
||||
val bindCommander = TestProbe()
|
||||
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint))
|
||||
bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions))
|
||||
bindCommander.expectMsg(Bound)
|
||||
}
|
||||
|
||||
def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = {
|
||||
val connectCommander = TestProbe()
|
||||
connectCommander.send(IO(Tcp), Connect(endpoint))
|
||||
connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions))
|
||||
val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected]
|
||||
val clientHandler = TestProbe()
|
||||
connectCommander.sender ! Register(clientHandler.ref)
|
||||
|
|
@ -78,6 +103,18 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") {
|
|||
|
||||
(clientHandler, connectCommander.sender, serverHandler, bindHandler.sender)
|
||||
}
|
||||
|
||||
@tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit =
|
||||
if (remaining > 0) {
|
||||
val recv = handler.expectMsgType[Received]
|
||||
expectReceivedData(handler, remaining - recv.data.size)
|
||||
}
|
||||
|
||||
/** allow overriding socket options for server side channel */
|
||||
def bindOptions: collection.immutable.Traversable[SocketOption] = Nil
|
||||
|
||||
/** allow overriding socket options for client side channel */
|
||||
def connectOptions: collection.immutable.Traversable[SocketOption] = Nil
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue