diff --git a/akka-io/src/main/resources/reference.conf b/akka-io/src/main/resources/reference.conf index a907706b02..33927a9d45 100644 --- a/akka-io/src/main/resources/reference.conf +++ b/akka-io/src/main/resources/reference.conf @@ -27,8 +27,8 @@ akka { # no intrinsic general limit, this setting is meant to enable DoS # protection by limiting the number of concurrently connected clients. # Also note that this is a "soft" limit; in certain cases the implementation - # will accept a few connections more than the number configured here. - # Set to 0 for "unlimited". + # will accept a few connections more or a few less than the number configured + # here. Set to 0 for "unlimited". max-channels = 256000 # The select loop can be used in two modes: diff --git a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala index fcaecc0bc4..dfd5e93771 100644 --- a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -7,7 +7,6 @@ package akka.io import java.net.InetSocketAddress import java.io.IOException import java.nio.channels.SocketChannel -import scala.collection.immutable import akka.actor.ActorRef import Tcp._ @@ -34,7 +33,6 @@ class TcpOutgoingConnection(_selector: ActorRef, else { selector ! RegisterOutgoingConnection(channel) context.become(connecting(commander, options)) - Seq(1,2,3) } def receive: Receive = PartialFunction.empty diff --git a/akka-io/src/main/scala/akka/io/TcpSelector.scala b/akka-io/src/main/scala/akka/io/TcpSelector.scala index 48f1c5ab36..69f5ff89c9 100644 --- a/akka-io/src/main/scala/akka/io/TcpSelector.scala +++ b/akka-io/src/main/scala/akka/io/TcpSelector.scala @@ -28,6 +28,8 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin case ReadInterest ⇒ execute(enableInterest(OP_READ, sender)) case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender)) + case StopReading ⇒ execute(disableInterest(OP_READ, sender)) + case cmd: RegisterIncomingConnection ⇒ handleIncomingConnection(cmd, SelectorAssociationRetries) @@ -148,7 +150,7 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin } } - // TODO: evaluate whether we could run this on the TcpSelector actor itself rather than + // TODO: evaluate whether we could run the following two tasks directly on the TcpSelector actor itself rather than // on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap // rather than an unsynchronized one, but since switching interest ops is so frequent // the change might be beneficial, provided the underlying implementation really is thread-safe @@ -161,6 +163,14 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin } } + def disableInterest(op: Int, connection: ActorRef) = + new Task { + def tryRun() { + val key = childrenKeys(connection.path.name) + key.interestOps(key.interestOps & ~op) + } + } + def unregister(child: ActorRef) = new Task { def tryRun() { @@ -182,6 +192,7 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin while (iterator.hasNext) { val key = iterator.next if (key.isValid) { + key.interestOps(0) // prevent immediate reselection by always clearing val connection = key.attachment.asInstanceOf[ActorRef] key.readyOps match { case OP_READ ⇒ connection ! ChannelReadable @@ -191,7 +202,6 @@ class TcpSelector(manager: ActorRef, tcp: TcpExt) extends Actor with ActorLoggin case x if (x & OP_CONNECT) > 0 ⇒ connection ! ChannelConnectable case x ⇒ log.warning("Invalid readyOps: {}", x) } - key.interestOps(0) // prevent immediate reselection by always clearing } else log.warning("Invalid selection key: {}", key) } keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected diff --git a/akka-io/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-io/src/test/scala/akka/io/CapacityLimitSpec.scala new file mode 100644 index 0000000000..f7dec60449 --- /dev/null +++ b/akka-io/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import akka.testkit.{ TestProbe, AkkaSpec } +import Tcp._ +import TestUtils._ + +class CapacityLimitSpec extends AkkaSpec("akka.loglevel = ERROR\nakka.io.tcp.max-channels = 4") + with IntegrationSpecSupport { + + "The TCP transport implementation" should { + + "reply with CommandFailed to a Bind or Connect command if max-channels capacity has been reached" in new TestSetup { + establishNewClientConnection() + + // we now have three channels registered: a listener, a server connection and a client connection + // so register one more channel + val bindCommander = TestProbe() + bindCommander.send(IO(Tcp), Bind(bindHandler.ref, temporaryServerAddress())) + bindCommander.expectMsg(Bound) + + // we are now at the configured max-channel capacity of 4 + val bindToFail = Bind(bindHandler.ref, temporaryServerAddress()) + bindCommander.send(IO(Tcp), bindToFail) + bindCommander.expectMsgType[CommandFailed].cmd must be theSameInstanceAs (bindToFail) + } + + } + +} diff --git a/akka-io/src/test/scala/akka/io/IntegrationSpec.scala b/akka-io/src/test/scala/akka/io/IntegrationSpec.scala index 93c78ecab2..87df405611 100644 --- a/akka-io/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-io/src/test/scala/akka/io/IntegrationSpec.scala @@ -4,24 +4,17 @@ package akka.io -import akka.testkit.{ TestProbe, AkkaSpec } -import akka.actor.ActorRef +import akka.testkit.AkkaSpec import akka.util.ByteString import Tcp._ import TestUtils._ -import collection.immutable -import annotation.tailrec -class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { +class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationSpecSupport { "The TCP transport implementation" should { "properly bind a test server" in new TestSetup - "allow connecting to the test server" in new TestSetup { - establishNewClientConnection() - } - "allow connecting to and disconnecting from the test server" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() clientHandler.send(clientConnection, Close) @@ -43,12 +36,12 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { "properly complete one client/server request/response cycle" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), 42)) - clientHandler.expectMsg(42) + clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), 'Aye)) + clientHandler.expectMsg('Aye) serverHandler.expectMsgType[Received].data.decodeString("ASCII") must be("Captain on the bridge!") - serverHandler.send(serverConnection, Write(ByteString("For the king!"), 4242)) - serverHandler.expectMsg(4242) + serverHandler.send(serverConnection, Write(ByteString("For the king!"), 'Yes)) + serverHandler.expectMsg('Yes) clientHandler.expectMsgType[Received].data.decodeString("ASCII") must be("For the king!") serverHandler.send(serverConnection, Close) @@ -59,62 +52,17 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") { verifyActorTermination(serverConnection) } - "waiting for writes works with backpressure" in new TestSetup { + "support waiting for writes with backpressure" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 4223)) - serverHandler.expectMsg(4223) + serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 'Ack)) + serverHandler.expectMsg('Ack) expectReceivedData(clientHandler, 100000) - override def bindOptions: immutable.Traversable[SocketOption] = - List(SO.SendBufferSize(1024)) - - override def connectOptions: immutable.Traversable[SocketOption] = - List(SO.ReceiveBufferSize(1024)) + override def bindOptions = List(SO.SendBufferSize(1024)) + override def connectOptions = List(SO.ReceiveBufferSize(1024)) } - ////////////////////////////////////// - ///////// more tests to come ///////// - ////////////////////////////////////// - } - - class TestSetup { - val bindHandler = TestProbe() - val endpoint = TemporaryServerAddress() - - bindServer() - - def bindServer(): Unit = { - val bindCommander = TestProbe() - bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions)) - bindCommander.expectMsg(Bound) - } - - def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = { - val connectCommander = TestProbe() - connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions)) - val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected] - val clientHandler = TestProbe() - connectCommander.sender ! Register(clientHandler.ref) - - val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected] - val serverHandler = TestProbe() - bindHandler.sender ! Register(serverHandler.ref) - - (clientHandler, connectCommander.sender, serverHandler, bindHandler.sender) - } - - @tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit = - if (remaining > 0) { - val recv = handler.expectMsgType[Received] - expectReceivedData(handler, remaining - recv.data.size) - } - - /** allow overriding socket options for server side channel */ - def bindOptions: collection.immutable.Traversable[SocketOption] = Nil - - /** allow overriding socket options for client side channel */ - def connectOptions: collection.immutable.Traversable[SocketOption] = Nil } } diff --git a/akka-io/src/test/scala/akka/io/IntegrationSpecSupport.scala b/akka-io/src/test/scala/akka/io/IntegrationSpecSupport.scala new file mode 100644 index 0000000000..ea93b7809a --- /dev/null +++ b/akka-io/src/test/scala/akka/io/IntegrationSpecSupport.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import scala.annotation.tailrec +import akka.testkit.{ AkkaSpec, TestProbe } +import akka.actor.ActorRef +import Tcp._ +import TestUtils._ + +trait IntegrationSpecSupport { _: AkkaSpec ⇒ + + class TestSetup { + val bindHandler = TestProbe() + val endpoint = temporaryServerAddress() + + bindServer() + + def bindServer(): Unit = { + val bindCommander = TestProbe() + bindCommander.send(IO(Tcp), Bind(bindHandler.ref, endpoint, options = bindOptions)) + bindCommander.expectMsg(Bound) + } + + def establishNewClientConnection(): (TestProbe, ActorRef, TestProbe, ActorRef) = { + val connectCommander = TestProbe() + connectCommander.send(IO(Tcp), Connect(endpoint, options = connectOptions)) + val Connected(`endpoint`, localAddress) = connectCommander.expectMsgType[Connected] + val clientHandler = TestProbe() + connectCommander.sender ! Register(clientHandler.ref) + + val Connected(`localAddress`, `endpoint`) = bindHandler.expectMsgType[Connected] + val serverHandler = TestProbe() + bindHandler.sender ! Register(serverHandler.ref) + + (clientHandler, connectCommander.sender, serverHandler, bindHandler.sender) + } + + @tailrec final def expectReceivedData(handler: TestProbe, remaining: Int): Unit = + if (remaining > 0) { + val recv = handler.expectMsgType[Received] + expectReceivedData(handler, remaining - recv.data.size) + } + + /** allow overriding socket options for server side channel */ + def bindOptions: Traversable[SocketOption] = Nil + + /** allow overriding socket options for client side channel */ + def connectOptions: Traversable[SocketOption] = Nil + } + +} diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 659add2f83..7cdcc4f202 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -21,7 +21,7 @@ import TestUtils._ import Tcp._ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { - val serverAddress = TemporaryServerAddress() + val serverAddress = temporaryServerAddress() "An outgoing connection" must { // common behavior @@ -265,7 +265,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") verifyActorTermination(connectionActor) } - val UnboundAddress = TemporaryServerAddress() + val UnboundAddress = temporaryServerAddress() "report failed connection attempt when target is unreachable" in withUnacceptedConnection(connectionActorCons = createConnectionActor(serverAddress = UnboundAddress)) { setup ⇒ import setup._ diff --git a/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala index 5d238f28e9..8db5cb0d74 100644 --- a/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala @@ -5,9 +5,7 @@ package akka.io import java.net.Socket -import scala.annotation.tailrec import scala.concurrent.duration._ -import org.scalatest.exceptions.TestFailedException import akka.actor.{ Terminated, SupervisorStrategy, Actor, Props } import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } import Tcp._ @@ -64,9 +62,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { listener ! CommandFailed(RegisterIncomingConnection(channel, handler.ref, Nil)) - within(1.second) { - channel.isOpen must be(false) - } + awaitCond(!channel.isOpen) } } @@ -78,7 +74,7 @@ class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { val handlerRef = handler.ref val bindCommander = TestProbe() val parent = TestProbe() - val endpoint = TemporaryServerAddress() + val endpoint = TestUtils.temporaryServerAddress() private val parentRef = TestActorRef(new ListenerParent) def bindListener() { diff --git a/akka-io/src/test/scala/akka/io/TemporaryServerAddress.scala b/akka-io/src/test/scala/akka/io/TemporaryServerAddress.scala deleted file mode 100644 index 20b4e3c16b..0000000000 --- a/akka-io/src/test/scala/akka/io/TemporaryServerAddress.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.io - -import java.nio.channels.ServerSocketChannel -import java.net.InetSocketAddress - -object TemporaryServerAddress { - - def apply(address: String = "127.0.0.1"): InetSocketAddress = { - val serverSocket = ServerSocketChannel.open() - serverSocket.socket.bind(new InetSocketAddress(address, 0)) - val port = serverSocket.socket.getLocalPort - serverSocket.close() - new InetSocketAddress(address, port) - } -} diff --git a/akka-io/src/test/scala/akka/io/TestUtils.scala b/akka-io/src/test/scala/akka/io/TestUtils.scala index 339599d9cb..d084060dc1 100644 --- a/akka-io/src/test/scala/akka/io/TestUtils.scala +++ b/akka-io/src/test/scala/akka/io/TestUtils.scala @@ -4,34 +4,25 @@ package akka.io -import scala.annotation.tailrec -import scala.concurrent.duration._ -import org.scalatest.exceptions.TestFailedException -import akka.actor.{ ActorSystem, ActorRef, Terminated } +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel +import akka.actor.{ Terminated, ActorSystem, ActorRef } import akka.testkit.TestProbe object TestUtils { + def temporaryServerAddress(address: String = "127.0.0.1"): InetSocketAddress = { + val serverSocket = ServerSocketChannel.open() + serverSocket.socket.bind(new InetSocketAddress(address, 0)) + val port = serverSocket.socket.getLocalPort + serverSocket.close() + new InetSocketAddress(address, port) + } + def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = { val watcher = TestProbe() watcher.watch(actor) assert(watcher.expectMsgType[Terminated].actor == actor) } - // why doesn't scalatest provide this by default? (specs2 does with its `eventually` matcher modifier!) - // Due to its general utility, should this be moved to the testkit? - @tailrec final def within(timeLeft: Duration, retrySpan: Duration = 10.millis)(test: ⇒ Unit): Unit = { - val start = System.currentTimeMillis - lazy val end = System.currentTimeMillis - val tryAgain = try { - test - false - } catch { - case (_: TestFailedException | _: AssertionError) if timeLeft.toMillis + start - end > 0 ⇒ true - } - if (tryAgain) { - Thread.sleep(retrySpan.toMillis) - within(timeLeft - Duration(end - start, MILLISECONDS) - retrySpan)(test) - } - } }