From bbc7ae7132954d1c078fc908230e2c6112ac882e Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 22 Feb 2013 12:20:27 +0100 Subject: [PATCH 1/2] Fix TcpListener frequently not accepting new connections --- .../test/scala/akka/io/TcpListenerSpec.scala | 43 ++++++++++++------- .../src/main/scala/akka/io/TcpListener.scala | 19 ++++---- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 09ed457959..4c0652664f 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -5,13 +5,13 @@ package akka.io import java.net.Socket +import java.nio.channels.SocketChannel import scala.concurrent.duration._ import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props } import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } import Tcp._ import akka.testkit.EventFilter import akka.io.SelectionHandler._ -import java.nio.channels.SelectionKey._ import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { @@ -32,27 +32,35 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { attemptConnectionToEndpoint() attemptConnectionToEndpoint() - def expectWorkerForCommand: Unit = { - selectorRouter.expectMsgPF() { - case WorkerForCommand(RegisterIncoming(chan), commander, _) ⇒ - chan.isOpen must be(true) - commander must be === listener - } - } - // since the batch-accept-limit is 2 we must only receive 2 accepted connections listener ! ChannelAcceptable - parent.expectMsg(AcceptInterest) expectWorkerForCommand expectWorkerForCommand selectorRouter.expectNoMsg(100.millis) + parent.expectMsg(AcceptInterest) // and pick up the last remaining connection on the next ChannelAcceptable listener ! ChannelAcceptable expectWorkerForCommand } + "continue to accept connections after a previous accept" in new TestSetup { + bindListener() + + attemptConnectionToEndpoint() + listener ! ChannelAcceptable + expectWorkerForCommand + selectorRouter.expectNoMsg(100.millis) + parent.expectMsg(AcceptInterest) + + attemptConnectionToEndpoint() + listener ! ChannelAcceptable + expectWorkerForCommand + selectorRouter.expectNoMsg(100.millis) + parent.expectMsg(AcceptInterest) + } + "react to Unbind commands by replying with Unbound and stopping itself" in new TestSetup { bindListener() @@ -69,12 +77,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { attemptConnectionToEndpoint() listener ! ChannelAcceptable - val channel = selectorRouter.expectMsgPF() { - case WorkerForCommand(RegisterIncoming(chan), commander, _) ⇒ - chan.isOpen must be(true) - commander must be === listener - chan - } + val channel = expectWorkerForCommand EventFilter.warning(pattern = "selector capacity limit", occurrences = 1) intercept { listener ! FailedRegisterIncoming(channel) @@ -105,6 +108,14 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { def listener = parentRef.underlyingActor.listener + def expectWorkerForCommand: SocketChannel = + selectorRouter.expectMsgPF() { + case WorkerForCommand(RegisterIncoming(chan), commander, _) ⇒ + chan.isOpen must be(true) + commander must be === listener + chan + } + private class ListenerParent extends Actor { val listener = context.actorOf( props = Props(new TcpListener(selectorRouter.ref, Tcp(system), bindCommander.ref, Bind(handler.ref, endpoint, 100, Nil))), diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 3c7dcf43e4..55e0efe640 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -84,20 +84,21 @@ private[io] class TcpListener(val selectorRouter: ActorRef, context.stop(self) } - @tailrec final def acceptAllPending(limit: Int): Unit = - if (limit > 0) { - val socketChannel = + @tailrec final def acceptAllPending(limit: Int): Unit = { + val socketChannel = + if (limit > 0) { try channel.accept() catch { case NonFatal(e) ⇒ log.error(e, "Accept error: could not accept new connection due to {}", e); null } - if (socketChannel != null) { - log.debug("New connection accepted") - socketChannel.configureBlocking(false) - selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options))) - acceptAllPending(limit - 1) - } + } else null + if (socketChannel != null) { + log.debug("New connection accepted") + socketChannel.configureBlocking(false) + selectorRouter ! WorkerForCommand(RegisterIncoming(socketChannel), self, Props(new TcpIncomingConnection(socketChannel, tcp, handler, options))) + acceptAllPending(limit - 1) } else context.parent ! AcceptInterest + } override def postStop() { try { From fd3f69c7622a63f102a37ee4598b91c36af2a27b Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 22 Feb 2013 12:20:41 +0100 Subject: [PATCH 2/2] Fix typos in TcpConnectionSpec --- .../src/test/scala/akka/io/TcpConnectionSpec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 19e504883a..ae03ae3b1d 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -259,7 +259,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") connectionHandler.expectNoMsg(500.millis) } - "abort the connection and reply with `Aborted` upong reception of an `Abort` command (simplified)" in withEstablishedConnection() { setup ⇒ + "abort the connection and reply with `Aborted` upon reception of an `Abort` command (simplified)" in withEstablishedConnection() { setup ⇒ import setup._ connectionHandler.send(connectionActor, Abort) @@ -271,7 +271,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val thrown = evaluating { serverSideChannel.read(buffer) } must produce[IOException] } - "abort the connection and reply with `Aborted` upong reception of an `Abort` command" in withEstablishedConnection() { setup ⇒ + "abort the connection and reply with `Aborted` upon reception of an `Abort` command" in withEstablishedConnection() { setup ⇒ ignoreIfWindows() import setup._ @@ -297,7 +297,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") * has hit the network medium. The only exception is when you disable the Winsock buffering by setting * SO_SNDBUF to 0." */ - "close the connection and reply with `ConfirmedClosed` upong reception of an `ConfirmedClose` command (simplified)" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ + "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command (simplified)" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ import setup._ // we should test here that a pending write command is properly finished first @@ -326,7 +326,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") assertThisConnectionActorTerminated() } - "close the connection and reply with `ConfirmedClosed` upong reception of an `ConfirmedClose` command" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ + "close the connection and reply with `ConfirmedClosed` upon reception of an `ConfirmedClose` command" in withEstablishedConnection(setSmallRcvBuffer) { setup ⇒ ignoreIfWindows() import setup._