refactor tests to reuse common connection setup

This commit is contained in:
Johannes Rudolph 2013-01-16 16:59:55 +01:00
parent 27d111b1f5
commit e22c80655d

View file

@ -32,7 +32,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor =
createConnectionActor(selector.ref, userHandler.ref, options = Vector(SO.ReuseAddress(true)))
createConnectionActor(options = Vector(SO.ReuseAddress(true)))(selector.ref, userHandler.ref)
val clientChannel = connectionActor.underlyingActor.channel
clientChannel.socket.getReuseAddress must be(true)
}
@ -41,7 +41,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor =
createConnectionActor(selector.ref, userHandler.ref, options = Vector(SO.KeepAlive(true)))
createConnectionActor(options = Vector(SO.KeepAlive(true)))(selector.ref, userHandler.ref)
val clientChannel = connectionActor.underlyingActor.channel
clientChannel.socket.getKeepAlive must be(false) // only set after connection is established
selector.send(connectionActor, ChannelConnectable)
@ -228,15 +228,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
}
// error conditions
"report failed connection attempt while not registered" in withLocalServer() { localServer
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
val clientSideChannel = connectionActor.underlyingActor.channel
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
"report failed connection attempt while not accepted" in withUnacceptedConnection() { setup
import setup._
// close instead of accept
localServer.close()
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsgPF() {
case ErrorClose(e) e.getMessage must be("Connection reset by peer")
@ -245,31 +241,27 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertActorTerminated(connectionActor)
}
"report failed connection attempt when target is unreachable" in {
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor = createConnectionActor(selector.ref, userHandler.ref, serverAddress = new InetSocketAddress("127.0.0.1", 63186))
val clientSideChannel = connectionActor.underlyingActor.channel
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
val sel = SelectorProvider.provider().openSelector()
val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
sel.select(200)
val UnknownAddress = new InetSocketAddress("127.0.0.1", 63186)
"report failed connection attempt when target is unreachable" in
withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnknownAddress)) { setup
import setup._
key.isConnectable must be(true)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsgPF() {
case ErrorClose(e) e.getMessage must be("Connection refused")
val sel = SelectorProvider.provider().openSelector()
val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
sel.select(200)
key.isConnectable must be(true)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsgPF() {
case ErrorClose(e) e.getMessage must be("Connection refused")
}
assertActorTerminated(connectionActor)
}
assertActorTerminated(connectionActor)
}
"time out when Connected isn't answered with Register" in withUnacceptedConnection() { setup
import setup._
"time out when Connected isn't answered with Register" in withLocalServer() { localServer
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
val clientSideChannel = connectionActor.underlyingActor.channel
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
localServer.accept()
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
@ -277,15 +269,12 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
assertActorTerminated(connectionActor)
}
"close the connection when user handler dies while connecting" in withLocalServer() { localServer
val userHandler = system.actorOf(Props(new Actor {
def receive = PartialFunction.empty
}))
val selector = TestProbe()
val connectionActor = createConnectionActor(selector.ref, userHandler)
val clientSideChannel = connectionActor.underlyingActor.channel
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
system.stop(userHandler)
"close the connection when user handler dies while connecting" in withUnacceptedConnection() { setup
import setup._
// simulate death of userHandler test probe
userHandler.send(connectionActor, akka.actor.Terminated(userHandler.ref)(false, false))
assertActorTerminated(connectionActor)
}
@ -309,14 +298,22 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
} finally localServer.close()
}
case class Setup(
case class UnacceptedSetup(
localServer: ServerSocketChannel,
userHandler: TestProbe,
connectionHandler: TestProbe,
selector: TestProbe,
connectionActor: TestActorRef[TcpOutgoingConnection],
clientSideChannel: SocketChannel,
clientSideChannel: SocketChannel)
case class RegisteredSetup(
unregisteredSetup: UnacceptedSetup,
connectionHandler: TestProbe,
serverSideChannel: SocketChannel) {
def userHandler: TestProbe = unregisteredSetup.userHandler
def selector: TestProbe = unregisteredSetup.selector
def connectionActor: TestActorRef[TcpOutgoingConnection] = unregisteredSetup.connectionActor
def clientSideChannel: SocketChannel = unregisteredSetup.clientSideChannel
val buffer = ByteBuffer.allocate(TestSize)
@tailrec final def pullFromServerSide(remaining: Int): Unit =
if (remaining > 0) {
@ -339,14 +336,29 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
clientSideChannel must not be ('open)
}
}
def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel Unit = _ ())(body: Setup Any): Unit = withLocalServer(setServerSocketOptions) { localServer
val userHandler = TestProbe()
val connectionHandler = TestProbe()
val selector = TestProbe()
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
val clientSideChannel = connectionActor.underlyingActor.channel
def withUnacceptedConnection(
setServerSocketOptions: ServerSocketChannel Unit = _ (),
connectionActorCons: (ActorRef, ActorRef) TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup Any): Unit =
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
withLocalServer(setServerSocketOptions) { localServer
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor = connectionActorCons(selector.ref, userHandler.ref)
val clientSideChannel = connectionActor.underlyingActor.channel
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
body {
UnacceptedSetup(
localServer,
userHandler,
selector,
connectionActor,
clientSideChannel)
}
}
def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel Unit = _ ())(body: RegisteredSetup Any): Unit = withUnacceptedConnection(setServerSocketOptions) { unregisteredSetup
import unregisteredSetup._
localServer.configureBlocking(true)
val serverSideChannel = localServer.accept()
@ -354,16 +366,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
serverSideChannel must not be (null)
selector.send(connectionActor, ChannelConnectable)
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
val connectionHandler = TestProbe()
userHandler.send(connectionActor, Register(connectionHandler.ref))
selector.expectMsg(ReadInterest)
body {
Setup(
userHandler,
RegisteredSetup(
unregisteredSetup,
connectionHandler,
selector,
connectionActor,
clientSideChannel,
serverSideChannel)
}
}
@ -377,11 +388,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
channel.socket.setReceiveBufferSize(1024)
def createConnectionActor(
selector: ActorRef,
commander: ActorRef,
serverAddress: InetSocketAddress = serverAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Seq[Tcp.SocketOption] = Nil): TestActorRef[TcpOutgoingConnection] = {
options: immutable.Seq[Tcp.SocketOption] = Nil)(
selector: ActorRef,
commander: ActorRef): TestActorRef[TcpOutgoingConnection] = {
TestActorRef(
new TcpOutgoingConnection(selector, commander, serverAddress, localAddress, options) {