diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 31da412b33..934cdf3fd8 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -32,7 +32,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val userHandler = TestProbe() val selector = TestProbe() val connectionActor = - createConnectionActor(selector.ref, userHandler.ref, options = Vector(SO.ReuseAddress(true))) + createConnectionActor(options = Vector(SO.ReuseAddress(true)))(selector.ref, userHandler.ref) val clientChannel = connectionActor.underlyingActor.channel clientChannel.socket.getReuseAddress must be(true) } @@ -41,7 +41,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val userHandler = TestProbe() val selector = TestProbe() val connectionActor = - createConnectionActor(selector.ref, userHandler.ref, options = Vector(SO.KeepAlive(true))) + createConnectionActor(options = Vector(SO.KeepAlive(true)))(selector.ref, userHandler.ref) val clientChannel = connectionActor.underlyingActor.channel clientChannel.socket.getKeepAlive must be(false) // only set after connection is established selector.send(connectionActor, ChannelConnectable) @@ -228,15 +228,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } // error conditions - "report failed connection attempt while not registered" in withLocalServer() { localServer ⇒ - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = createConnectionActor(selector.ref, userHandler.ref) - val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) - + "report failed connection attempt while not accepted" in withUnacceptedConnection() { setup ⇒ + import setup._ // close instead of accept localServer.close() + selector.send(connectionActor, ChannelConnectable) userHandler.expectMsgPF() { case ErrorClose(e) ⇒ e.getMessage must be("Connection reset by peer") @@ -245,31 +241,27 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertActorTerminated(connectionActor) } - "report failed connection attempt when target is unreachable" in { - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = createConnectionActor(selector.ref, userHandler.ref, serverAddress = new InetSocketAddress("127.0.0.1", 63186)) - val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) - val sel = SelectorProvider.provider().openSelector() - val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) - sel.select(200) + val UnknownAddress = new InetSocketAddress("127.0.0.1", 63186) + "report failed connection attempt when target is unreachable" in + withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnknownAddress)) { setup ⇒ + import setup._ - key.isConnectable must be(true) - selector.send(connectionActor, ChannelConnectable) - userHandler.expectMsgPF() { - case ErrorClose(e) ⇒ e.getMessage must be("Connection refused") + val sel = SelectorProvider.provider().openSelector() + val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) + sel.select(200) + + key.isConnectable must be(true) + selector.send(connectionActor, ChannelConnectable) + userHandler.expectMsgPF() { + case ErrorClose(e) ⇒ e.getMessage must be("Connection refused") + } + + assertActorTerminated(connectionActor) } - assertActorTerminated(connectionActor) - } + "time out when Connected isn't answered with Register" in withUnacceptedConnection() { setup ⇒ + import setup._ - "time out when Connected isn't answered with Register" in withLocalServer() { localServer ⇒ - val userHandler = TestProbe() - val selector = TestProbe() - val connectionActor = createConnectionActor(selector.ref, userHandler.ref) - val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) localServer.accept() selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) @@ -277,15 +269,12 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertActorTerminated(connectionActor) } - "close the connection when user handler dies while connecting" in withLocalServer() { localServer ⇒ - val userHandler = system.actorOf(Props(new Actor { - def receive = PartialFunction.empty - })) - val selector = TestProbe() - val connectionActor = createConnectionActor(selector.ref, userHandler) - val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) - system.stop(userHandler) + "close the connection when user handler dies while connecting" in withUnacceptedConnection() { setup ⇒ + import setup._ + + // simulate death of userHandler test probe + userHandler.send(connectionActor, akka.actor.Terminated(userHandler.ref)(false, false)) + assertActorTerminated(connectionActor) } @@ -309,14 +298,22 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } finally localServer.close() } - case class Setup( + case class UnacceptedSetup( + localServer: ServerSocketChannel, userHandler: TestProbe, - connectionHandler: TestProbe, selector: TestProbe, connectionActor: TestActorRef[TcpOutgoingConnection], - clientSideChannel: SocketChannel, + clientSideChannel: SocketChannel) + case class RegisteredSetup( + unregisteredSetup: UnacceptedSetup, + connectionHandler: TestProbe, serverSideChannel: SocketChannel) { + def userHandler: TestProbe = unregisteredSetup.userHandler + def selector: TestProbe = unregisteredSetup.selector + def connectionActor: TestActorRef[TcpOutgoingConnection] = unregisteredSetup.connectionActor + def clientSideChannel: SocketChannel = unregisteredSetup.clientSideChannel + val buffer = ByteBuffer.allocate(TestSize) @tailrec final def pullFromServerSide(remaining: Int): Unit = if (remaining > 0) { @@ -339,14 +336,29 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") clientSideChannel must not be ('open) } } - def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ ())(body: Setup ⇒ Any): Unit = withLocalServer(setServerSocketOptions) { localServer ⇒ - val userHandler = TestProbe() - val connectionHandler = TestProbe() - val selector = TestProbe() - val connectionActor = createConnectionActor(selector.ref, userHandler.ref) - val clientSideChannel = connectionActor.underlyingActor.channel + def withUnacceptedConnection( + setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (), + connectionActorCons: (ActorRef, ActorRef) ⇒ TestActorRef[TcpOutgoingConnection] = createConnectionActor())(body: UnacceptedSetup ⇒ Any): Unit = - selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) + withLocalServer(setServerSocketOptions) { localServer ⇒ + val userHandler = TestProbe() + val selector = TestProbe() + val connectionActor = connectionActorCons(selector.ref, userHandler.ref) + val clientSideChannel = connectionActor.underlyingActor.channel + + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) + + body { + UnacceptedSetup( + localServer, + userHandler, + selector, + connectionActor, + clientSideChannel) + } + } + def withEstablishedConnection(setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ ())(body: RegisteredSetup ⇒ Any): Unit = withUnacceptedConnection(setServerSocketOptions) { unregisteredSetup ⇒ + import unregisteredSetup._ localServer.configureBlocking(true) val serverSideChannel = localServer.accept() @@ -354,16 +366,15 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") serverSideChannel must not be (null) selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) + + val connectionHandler = TestProbe() userHandler.send(connectionActor, Register(connectionHandler.ref)) selector.expectMsg(ReadInterest) body { - Setup( - userHandler, + RegisteredSetup( + unregisteredSetup, connectionHandler, - selector, - connectionActor, - clientSideChannel, serverSideChannel) } } @@ -377,11 +388,11 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") channel.socket.setReceiveBufferSize(1024) def createConnectionActor( - selector: ActorRef, - commander: ActorRef, serverAddress: InetSocketAddress = serverAddress, localAddress: Option[InetSocketAddress] = None, - options: immutable.Seq[Tcp.SocketOption] = Nil): TestActorRef[TcpOutgoingConnection] = { + options: immutable.Seq[Tcp.SocketOption] = Nil)( + selector: ActorRef, + commander: ActorRef): TestActorRef[TcpOutgoingConnection] = { TestActorRef( new TcpOutgoingConnection(selector, commander, serverAddress, localAddress, options) {