From 27d111b1f5f679967e7da6896006a1d13666c2b1 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 16 Jan 2013 15:21:11 +0100 Subject: [PATCH] tcp selector and listener actors, extension of tcp manager actor, see #2885 and #2887 --- akka-io/src/main/resources/reference.conf | 31 +-- akka-io/src/main/scala/akka/io/Tcp.scala | 89 ++++---- .../src/main/scala/akka/io/TcpListener.scala | 76 +++++++ .../src/main/scala/akka/io/TcpManager.scala | 53 ++++- .../scala/akka/io/TcpOutgoingConnection.scala | 2 +- .../src/main/scala/akka/io/TcpSelector.scala | 205 ++++++++++++++++++ akka-io/src/test/scala/akka/io/IOSpec.scala | 11 - .../scala/akka/io/TcpConnectionSpec.scala | 10 +- .../test/scala/akka/io/TcpListenerSpec.scala | 62 ++++++ 9 files changed, 460 insertions(+), 79 deletions(-) create mode 100644 akka-io/src/main/scala/akka/io/TcpListener.scala create mode 100644 akka-io/src/main/scala/akka/io/TcpSelector.scala delete mode 100644 akka-io/src/test/scala/akka/io/IOSpec.scala create mode 100644 akka-io/src/test/scala/akka/io/TcpListenerSpec.scala diff --git a/akka-io/src/main/resources/reference.conf b/akka-io/src/main/resources/reference.conf index ebcd1fa2e9..6104ac96c5 100644 --- a/akka-io/src/main/resources/reference.conf +++ b/akka-io/src/main/resources/reference.conf @@ -38,11 +38,25 @@ akka { # - setting it to zero means polling, i.e. calling selectNow() select-timeout = infinite - # When trying to create a new connection but the chosen selector is at - # full capacity, retry this many times with different selectors before - # giving up + # When trying to assign a new connection to a selector and the chosen + # selector is at full capacity, retry selector choosing and assignment + # this many times before giving up selector-association-retries = 10 - + + # The maximum number of connection that are accepted in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + batch-accept-limit = 10 + + # The size of the thread-local direct buffers used to read or write + # network data from the kernel. Those buffer directly add to the footprint + # of the threads from the dispatcher tcp connection actors are using. + direct-buffer-size = 524288 + + # The duration a connection actor waits for a `Register` message from + # its commander before aborting the connection. + register-timeout = 5s + # Fully qualified config path which holds the dispatcher configuration # to be used for running the select() calls in the selectors selector-dispatcher = "akka.io.pinned-dispatcher" @@ -54,15 +68,6 @@ akka { # Fully qualified config path which holds the dispatcher configuration # for the selector management actors management-dispatcher = "akka.actor.default-dispatcher" - - # The size of the thread-local direct buffers used to read or write - # network data from the kernel. Those buffer directly add to the footprint - # of the threads from the dispatcher tcp connection actors are using. - direct-buffer-size = 524288 - - # The duration a connection actor waits for a `Register` message from - # its commander before aborting the connection. - register-timeout = 5s } } diff --git a/akka-io/src/main/scala/akka/io/Tcp.scala b/akka-io/src/main/scala/akka/io/Tcp.scala index 9d7e754b98..7d0421f42a 100644 --- a/akka-io/src/main/scala/akka/io/Tcp.scala +++ b/akka-io/src/main/scala/akka/io/Tcp.scala @@ -23,18 +23,6 @@ object Tcp extends ExtensionKey[TcpExt] { // Java API override def get(system: ActorSystem): TcpExt = system.extension(this) - /// COMMANDS - sealed trait Command - - case class Connect(remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress] = None) extends Command - case class Bind(handler: ActorRef, - address: InetSocketAddress, - backlog: Int = 100, - options: immutable.Seq[SocketOption] = Nil) extends Command - case object Unbind extends Command - case class Register(handler: ActorRef) extends Command - /** * SocketOption is a package of data (from the user) and associated * behavior (how to apply that to a socket). @@ -54,11 +42,12 @@ object Tcp extends ExtensionKey[TcpExt] { */ def afterConnect(s: Socket): Unit = () } + + // shared socket options object SO { - // shared socket options /** - * [[akka.io.Tcp.SO.SocketOption]] to set the SO_RCVBUF option + * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option * * For more information see [[java.net.Socket.setReceiveBufferSize]] */ @@ -71,7 +60,7 @@ object Tcp extends ExtensionKey[TcpExt] { // server socket options /** - * [[akka.io.Tcp.SO.SocketOption]] to enable or disable SO_REUSEADDR + * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR * * For more information see [[java.net.Socket.setReuseAddress]] */ @@ -83,7 +72,7 @@ object Tcp extends ExtensionKey[TcpExt] { // general socket options /** - * [[akka.io.Tcp.SO.SocketOption]] to enable or disable SO_KEEPALIVE + * [[akka.io.Tcp.SocketOption]] to enable or disable SO_KEEPALIVE * * For more information see [[java.net.Socket.setKeepAlive]] */ @@ -92,7 +81,7 @@ object Tcp extends ExtensionKey[TcpExt] { } /** - * [[akka.io.Tcp.SO.SocketOption]] to enable or disable OOBINLINE (receipt + * [[akka.io.Tcp.SocketOption]] to enable or disable OOBINLINE (receipt * of TCP urgent data) By default, this option is disabled and TCP urgent * data is silently discarded. * @@ -103,19 +92,19 @@ object Tcp extends ExtensionKey[TcpExt] { } /** - * [[akka.io.Tcp.SO.SocketOption]] to set the SO_SNDBUF option. + * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option. * * For more information see [[java.net.Socket.setSendBufferSize]] */ case class SendBufferSize(size: Int) extends SocketOption { - require(size > 0, "SendBufferSize must be > 0") + require(size > 0, "ReceiveBufferSize must be > 0") override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size) } // SO_LINGER is handled by the Close code /** - * [[akka.io.Tcp.SO.SocketOption]] to enable or disable TCP_NODELAY + * [[akka.io.Tcp.SocketOption]] to enable or disable TCP_NODELAY * (disable or enable Nagle's algorithm) * * For more information see [[java.net.Socket.setTcpNoDelay]] @@ -125,7 +114,7 @@ object Tcp extends ExtensionKey[TcpExt] { } /** - * [[akka.io.Tcp.SO.SocketOption]] to set the traffic class or + * [[akka.io.Tcp.SocketOption]] to set the traffic class or * type-of-service octet in the IP header for packets sent from this * socket. * @@ -137,9 +126,28 @@ object Tcp extends ExtensionKey[TcpExt] { } } - // TODO: what about close reasons? - sealed trait CloseCommand extends Command + case class Stats(channelsOpened: Long, channelsClosed: Long, selectorStats: Seq[SelectorStats]) { + def channelsOpen = channelsOpened - channelsClosed + } + case class SelectorStats(channelsOpened: Long, channelsClosed: Long) { + def channelsOpen = channelsOpened - channelsClosed + } + + /// COMMANDS + sealed trait Command + + case class Connect(remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress] = None, + options: immutable.Seq[SocketOption] = Nil) extends Command + case class Bind(handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int = 100, + options: immutable.Seq[SocketOption] = Nil) extends Command + case class Register(handler: ActorRef) extends Command + case object Unbind extends Command + + sealed trait CloseCommand extends Command case object Close extends CloseCommand case object ConfirmedClose extends CloseCommand case object Abort extends CloseCommand @@ -154,6 +162,8 @@ object Tcp extends ExtensionKey[TcpExt] { case object StopReading extends Command case object ResumeReading extends Command + case object GetStats extends Command + /// EVENTS sealed trait Event @@ -171,15 +181,12 @@ object Tcp extends ExtensionKey[TcpExt] { case class ErrorClose(cause: Throwable) extends ConnectionClosed /// INTERNAL - case class RegisterClientChannel(channel: SocketChannel) - case class RegisterServerChannel(channel: ServerSocketChannel) - case class CreateConnection(channel: SocketChannel) - case class Reject(command: Command, commander: ActorRef) - // Retry should be sent by Selector actors to their parent router with retriesLeft decremented. If retries are - // depleted, the selector actor must reply directly to the manager with a Reject (above). - case class Retry(command: Command, retriesLeft: Int, commander: ActorRef) { - require(retriesLeft >= 0, "The upper limit for retries must be nonnegative.") - } + case class RegisterOutgoingConnection(channel: SocketChannel) + case class RegisterServerSocketChannel(channel: ServerSocketChannel) + case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef, options: immutable.Seq[SocketOption]) + case class CreateConnection(channel: SocketChannel, handler: ActorRef, options: immutable.Seq[SocketOption]) + case class Reject(command: Command, retriesLeft: Int, commander: ActorRef) + case class Retry(command: Command, retriesLeft: Int, commander: ActorRef) case object ChannelConnectable case object ChannelAcceptable case object ChannelReadable @@ -197,21 +204,29 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { val NrOfSelectors = getInt("nr-of-selectors") val MaxChannels = getInt("max-channels") - val MaxChannelsPerSelector = MaxChannels / NrOfSelectors val SelectTimeout = if (getString("select-timeout") == "infinite") Duration.Inf else Duration(getMilliseconds("select-timeout"), MILLISECONDS) val SelectorAssociationRetries = getInt("selector-association-retries") - val SelectorDispatcher = getString("selector-dispatcher") - val WorkerDispatcher = getString("worker-dispatcher") - val ManagementDispatcher = getString("management-dispatcher") + val BatchAcceptLimit = getInt("batch-accept-limit") val DirectBufferSize = getInt("direct-buffer-size") val RegisterTimeout = if (getString("register-timeout") == "infinite") Duration.Undefined else Duration(getMilliseconds("register-timeout"), MILLISECONDS) + val SelectorDispatcher = getString("selector-dispatcher") + val WorkerDispatcher = getString("worker-dispatcher") + val ManagementDispatcher = getString("management-dispatcher") + + require(NrOfSelectors > 0, "nr-of-selectors must be > 0") + require(MaxChannels >= 0, "max-channels must be >= 0") + require(SelectTimeout >= Duration.Zero, "select-timeout must not be negative") + require(SelectorAssociationRetries >= 0, "selector-association-retries must be >= 0") + require(BatchAcceptLimit > 0, "batch-accept-limit must be > 0") + + val MaxChannelsPerSelector = MaxChannels / NrOfSelectors } val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props[TcpManager].withDispatcher(Settings.ManagementDispatcher), "IO-TCP") + Props.empty.withDispatcher(Settings.ManagementDispatcher), "IO-TCP") } diff --git a/akka-io/src/main/scala/akka/io/TcpListener.scala b/akka-io/src/main/scala/akka/io/TcpListener.scala new file mode 100644 index 0000000000..1f339be702 --- /dev/null +++ b/akka-io/src/main/scala/akka/io/TcpListener.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.io + +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel +import scala.annotation.tailrec +import scala.collection.immutable +import scala.util.control.NonFatal +import akka.actor.{ ActorLogging, ActorRef, Actor } +import Tcp._ + +class TcpListener(manager: ActorRef, + selector: ActorRef, + handler: ActorRef, + endpoint: InetSocketAddress, + backlog: Int, + bindCommander: ActorRef, + options: immutable.Seq[SocketOption]) extends Actor with ActorLogging { + + val batchAcceptLimit = Tcp(context.system).Settings.BatchAcceptLimit + val channel = { + val serverSocketChannel = ServerSocketChannel.open + serverSocketChannel.configureBlocking(false) + val socket = serverSocketChannel.socket + options.foreach(_.beforeBind(socket)) + socket.bind(endpoint, backlog) // will blow up the actor constructor if the bind fails + serverSocketChannel + } + selector ! RegisterServerSocketChannel(channel) + context.watch(bindCommander) // sign death pact + log.debug("Successfully bound to {}", endpoint) + + def receive: Receive = { + case Bound ⇒ + bindCommander ! Bound + context.become(bound) + } + + def bound: Receive = { + case ChannelAcceptable ⇒ + acceptAllPending(batchAcceptLimit) + + case Unbind ⇒ + log.debug("Unbinding endpoint {}", endpoint) + channel.close() + sender ! Unbound + log.debug("Unbound endpoint {}, stopping listener", endpoint) + context.stop(self) + } + + @tailrec final def acceptAllPending(limit: Int): Unit = + if (limit > 0) { + val socketChannel = + try channel.accept() + catch { + case NonFatal(e) ⇒ log.error(e, "Accept error: could not accept new connection due to {}", e); null + } + if (socketChannel != null) { + log.debug("New connection accepted") + manager ! RegisterIncomingConnection(socketChannel, handler, options) + selector ! AcceptInterest + acceptAllPending(limit - 1) + } + } + + override def postStop() { + try channel.close() + catch { + case NonFatal(e) ⇒ log.error(e, "Error closing ServerSocketChannel") + } + } + +} diff --git a/akka-io/src/main/scala/akka/io/TcpManager.scala b/akka-io/src/main/scala/akka/io/TcpManager.scala index 350a70eead..bba033dc9b 100644 --- a/akka-io/src/main/scala/akka/io/TcpManager.scala +++ b/akka-io/src/main/scala/akka/io/TcpManager.scala @@ -1,14 +1,21 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.io -import akka.actor.{ OneForOneStrategy, Actor, Props } -import akka.io.Tcp._ +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.actor.{ ActorLogging, Actor, Props } import akka.routing.RandomRouter -import akka.actor.SupervisorStrategy.Restart +import akka.util.Timeout +import akka.pattern.{ ask, pipe } +import Tcp._ /** * TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections. * - * TcpManager is obtainable by calling {{{ IO(TCP) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]]) + * TcpManager is obtainable by calling {{{ IO(Tcp) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]]) * * == Bind == * @@ -30,9 +37,9 @@ import akka.actor.SupervisorStrategy.Restart * To initiate a connection to a remote server, a [[akka.io.Tcp.Connect]] message must be sent to this actor. If the * connection succeeds, the sender will be notified with a [[akka.io.Tcp.Connected]] message. The sender of the * [[akka.io.Tcp.Connected]] message is the Connection actor (an internal actor representing the TCP connection). Before - * starting to use the connection, a handler should be registered to the Connection actor by sending a [[akka.io.Tcp.Register]] - * message. After a handler has been registered, all incoming data will be sent to the handler in the form of - * [[akka.io.Tcp.Received]] messages. To write data to the connection, a [[akka.io.Tcp.Write]] message should be sent + * starting to use the connection, a handler must be registered to the Connection actor by sending a [[akka.io.Tcp.Register]] + * command message. After a handler has been registered, all incoming data will be sent to the handler in the form of + * [[akka.io.Tcp.Received]] messages. To write data to the connection, a [[akka.io.Tcp.Write]] message must be sent * to the Connection actor. * * If the connect request is rejected because the Tcp system is not able to register more channels (see the nr-of-selectors @@ -40,14 +47,36 @@ import akka.actor.SupervisorStrategy.Restart * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. * */ -class TcpManager extends Actor { +class TcpManager extends Actor with ActorLogging { val settings = Tcp(context.system).Settings + val selectorNr = Iterator.from(0) - val selectorPool = context.actorOf(Props.empty.withRouter(RandomRouter(settings.NrOfSelectors))) + val selectorPool = context.actorOf( + props = Props(new TcpSelector(self)).withRouter(RandomRouter(settings.NrOfSelectors)), + name = selectorNr.next().toString) def receive = { - case c: Connect ⇒ selectorPool forward c - case b: Bind ⇒ selectorPool forward b - case Reject(command, commander) ⇒ commander ! CommandFailed(command) + case RegisterIncomingConnection(channel, handler, options) ⇒ + selectorPool ! CreateConnection(channel, handler, options) + + case c: Connect ⇒ + selectorPool forward c + + case b: Bind ⇒ + selectorPool forward b + + case Reject(command, 0, commander) ⇒ + log.warning("Command '{}' failed since all {} selectors are at capacity", command, context.children.size) + commander ! CommandFailed(command) + + case Reject(command, retriesLeft, commander) ⇒ + log.warning("Command '{}' rejected by {} with {} retries left, retrying...", command, sender, retriesLeft) + selectorPool ! Retry(command, retriesLeft - 1, commander) + + case GetStats ⇒ + import context.dispatcher + implicit val timeout: Timeout = 1 second span + val seqFuture = Future.traverse(context.children)(_.ask(GetStats).mapTo[SelectorStats]) + seqFuture.map(s ⇒ Stats(s.map(_.channelsOpen).sum, s.map(_.channelsClosed).sum, s.toSeq)) pipeTo sender } } diff --git a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala index 6df56c696d..cfdc7a2af0 100644 --- a/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-io/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -30,7 +30,7 @@ class TcpOutgoingConnection(_selector: ActorRef, if (channel.connect(remoteAddress)) completeConnect(commander, options) else { - selector ! RegisterClientChannel(channel) + selector ! RegisterOutgoingConnection(channel) context.become(connecting(commander, options)) } diff --git a/akka-io/src/main/scala/akka/io/TcpSelector.scala b/akka-io/src/main/scala/akka/io/TcpSelector.scala new file mode 100644 index 0000000000..189755c079 --- /dev/null +++ b/akka-io/src/main/scala/akka/io/TcpSelector.scala @@ -0,0 +1,205 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.io + +import java.lang.Runnable +import java.nio.channels.spi.SelectorProvider +import java.nio.channels.{ ServerSocketChannel, SelectionKey, SocketChannel } +import java.nio.channels.SelectionKey._ +import scala.util.control.NonFatal +import scala.collection.immutable.HashMap +import scala.concurrent.duration._ +import akka.actor._ +import Tcp._ + +class TcpSelector(manager: ActorRef) extends Actor with ActorLogging { + @volatile var childrenKeys = HashMap.empty[String, SelectionKey] + var channelsOpened = 0L + var channelsClosed = 0L + val sequenceNumber = Iterator.from(0) + val settings = Tcp(context.system).Settings + val selectorManagementDispatcher = context.system.dispatchers.lookup(settings.SelectorDispatcher) + val selector = SelectorProvider.provider.openSelector + val doSelect: () ⇒ Int = + settings.SelectTimeout match { + case Duration.Zero ⇒ () ⇒ selector.selectNow() + case Duration.Inf ⇒ () ⇒ selector.select() + case x ⇒ val millis = x.toMillis; () ⇒ selector.select(millis) + } + + selectorManagementDispatcher.execute(select) // start selection "loop" + + def receive: Receive = { + case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender)) + case ReadInterest ⇒ execute(enableInterest(OP_READ, sender)) + case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender)) + + case CreateConnection(channel, handler, options) ⇒ + val connection = context.actorOf( + props = Props( + creator = () ⇒ new TcpIncomingConnection(self, channel, handler, options), + dispatcher = settings.WorkerDispatcher), + name = nextName) + execute(registerIncomingConnection(channel, handler)) + context.watch(connection) + channelsOpened += 1 + + case cmd: Connect ⇒ + handleConnect(cmd, settings.SelectorAssociationRetries, sender) + + case Retry(cmd: Connect, retriesLeft, commander) ⇒ + handleConnect(cmd, retriesLeft, commander) + + case RegisterOutgoingConnection(channel) ⇒ + execute(registerOutgoingConnection(channel, sender)) + + case cmd: Bind ⇒ + handleBind(cmd, settings.SelectorAssociationRetries, sender) + + case Retry(cmd: Bind, retriesLeft, commander) ⇒ + handleBind(cmd, retriesLeft, commander) + + case RegisterServerSocketChannel(channel) ⇒ + execute(registerListener(channel, sender)) + + case Terminated(child) ⇒ + execute(unregister(child)) + channelsClosed += 1 + + case GetStats ⇒ + sender ! SelectorStats(channelsOpened, channelsClosed) + } + + override def postStop() { + try { + import scala.collection.JavaConverters._ + selector.keys.asScala.foreach(_.channel.close()) + selector.close() + } catch { + case NonFatal(e) ⇒ log.error(e, "Error closing selector or key") + } + } + + // we can never recover from failures of a connection or listener child + override def supervisorStrategy = SupervisorStrategy.stoppingStrategy + + def handleConnect(cmd: Connect, retriesLeft: Int, commander: ActorRef): Unit = { + log.debug("Executing {}", cmd) + if (canHandleMoreChannels) { + val connection = context.actorOf( + props = Props( + creator = () ⇒ new TcpOutgoingConnection(self, commander, cmd.remoteAddress, cmd.localAddress, cmd.options), + dispatcher = settings.WorkerDispatcher), + name = nextName) + context.watch(connection) + channelsOpened += 1 + } else sender ! Reject(cmd, retriesLeft, commander) + } + + def handleBind(cmd: Bind, retriesLeft: Int, commander: ActorRef): Unit = { + log.debug("Executing {}", cmd) + if (canHandleMoreChannels) { + val listener = context.actorOf( + props = Props( + creator = () ⇒ new TcpListener(manager, self, cmd.handler, cmd.endpoint, cmd.backlog, commander, cmd.options), + dispatcher = settings.WorkerDispatcher), + name = nextName) + context.watch(listener) + channelsOpened += 1 + } else sender ! Reject(cmd, retriesLeft, commander) + } + + def nextName = sequenceNumber.next().toString + + def canHandleMoreChannels = childrenKeys.size < settings.MaxChannelsPerSelector + + //////////////// Management Tasks scheduled via the selectorManagementDispatcher ///////////// + + def execute(task: Task): Unit = { + selectorManagementDispatcher.execute(task) + selector.wakeup() + } + + def updateKeyMap(child: ActorRef, key: SelectionKey): Unit = + childrenKeys = childrenKeys.updated(child.path.name, key) + + def registerOutgoingConnection(channel: SocketChannel, connection: ActorRef) = + new Task { + def tryRun() { + val key = channel.register(selector, OP_CONNECT, connection) + updateKeyMap(connection, key) + } + } + + def registerListener(channel: ServerSocketChannel, listener: ActorRef) = + new Task { + def tryRun() { + val key = channel.register(selector, OP_ACCEPT, listener) + updateKeyMap(listener, key) + listener ! Bound + } + } + + def registerIncomingConnection(channel: SocketChannel, connection: ActorRef) = + new Task { + def tryRun() { + // we only enable reading after the user-level connection handler has registered + val key = channel.register(selector, 0, connection) + updateKeyMap(connection, key) + } + } + + // TODO: evaluate whether we could run this on the TcpSelector actor itself rather than + // on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap + // rather than an unsynchronized one, but since switching interest ops is so frequent + // the change might be beneficial, provided the underlying implementation really is thread-safe + // and behaves consistently on all platforms. + def enableInterest(op: Int, connection: ActorRef) = + new Task { + def tryRun() { + val key = childrenKeys(connection.path.name) + key.interestOps(key.interestOps | op) + } + } + + def unregister(child: ActorRef) = + new Task { + def tryRun() { + childrenKeys = childrenKeys - child.path.name + } + } + + val select = new Task { + def tryRun() { + if (doSelect() > 0) { + val keys = selector.selectedKeys + val iterator = keys.iterator() + while (iterator.hasNext) { + val key = iterator.next + val connection = key.attachment.asInstanceOf[ActorRef] + if (key.isValid) { + if (key.isReadable) connection ! ChannelReadable + if (key.isWritable) connection ! ChannelWritable + else if (key.isAcceptable) connection ! ChannelAcceptable + else if (key.isConnectable) connection ! ChannelConnectable + key.interestOps(0) // prevent immediate reselection by always clearing + } else log.warning("Invalid selection key: {}", key) + } + keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected + } + selectorManagementDispatcher.execute(this) // re-schedules select behind all currently queued tasks + } + } + + abstract class Task extends Runnable { + def tryRun() + def run() { + try tryRun() + catch { + case NonFatal(e) ⇒ log.error(e, "Error during selector management task: {}", e) + } + } + } +} \ No newline at end of file diff --git a/akka-io/src/test/scala/akka/io/IOSpec.scala b/akka-io/src/test/scala/akka/io/IOSpec.scala deleted file mode 100644 index 553aef8f4a..0000000000 --- a/akka-io/src/test/scala/akka/io/IOSpec.scala +++ /dev/null @@ -1,11 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.io - -import akka.testkit.AkkaSpec - -class IOSpec extends AkkaSpec { - -} \ No newline at end of file diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 2a5d3690f7..31da412b33 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -233,7 +233,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val selector = TestProbe() val connectionActor = createConnectionActor(selector.ref, userHandler.ref) val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterClientChannel(clientSideChannel)) + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) // close instead of accept localServer.close() @@ -250,7 +250,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val selector = TestProbe() val connectionActor = createConnectionActor(selector.ref, userHandler.ref, serverAddress = new InetSocketAddress("127.0.0.1", 63186)) val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterClientChannel(clientSideChannel)) + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) val sel = SelectorProvider.provider().openSelector() val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ) sel.select(200) @@ -269,7 +269,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val selector = TestProbe() val connectionActor = createConnectionActor(selector.ref, userHandler.ref) val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterClientChannel(clientSideChannel)) + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) localServer.accept() selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) @@ -284,7 +284,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val selector = TestProbe() val connectionActor = createConnectionActor(selector.ref, userHandler) val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterClientChannel(clientSideChannel)) + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) system.stop(userHandler) assertActorTerminated(connectionActor) } @@ -346,7 +346,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val connectionActor = createConnectionActor(selector.ref, userHandler.ref) val clientSideChannel = connectionActor.underlyingActor.channel - selector.expectMsg(RegisterClientChannel(clientSideChannel)) + selector.expectMsg(RegisterOutgoingConnection(clientSideChannel)) localServer.configureBlocking(true) val serverSideChannel = localServer.accept() diff --git a/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala new file mode 100644 index 0000000000..addf9f7f16 --- /dev/null +++ b/akka-io/src/test/scala/akka/io/TcpListenerSpec.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import java.net.{ Socket, InetSocketAddress } +import java.nio.channels.ServerSocketChannel +import scala.concurrent.duration._ +import scala.util.Success +import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec } +import akka.util.Timeout +import akka.pattern.ask +import Tcp._ + +class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") { + val port = 47323 + + "A TcpListener" must { + val manager = TestProbe() + val selector = TestProbe() + val handler = TestProbe() + val handlerRef = handler.ref + val bindCommander = TestProbe() + val endpoint = new InetSocketAddress("localhost", port) + val listener = TestActorRef(new TcpListener(manager.ref, selector.ref, handler.ref, endpoint, 100, + bindCommander.ref, Nil)) + var serverSocketChannel: Option[ServerSocketChannel] = None + + "register its ServerSocketChannel with its selector" in { + val RegisterServerSocketChannel(channel) = selector.receiveOne(Duration.Zero) + serverSocketChannel = Some(channel) + } + + "let the Bind commander know when binding is completed" in { + listener ! Bound + bindCommander.expectMsg(Bound) + } + + "accept two acceptable connections at once and register them with the manager" in { + new Socket("localhost", port) + new Socket("localhost", port) + new Socket("localhost", port) + listener ! ChannelAcceptable + val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero) + val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero) + } + + "accept one more connection and register it with the manager" in { + listener ! ChannelAcceptable + val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero) + } + + "react to Unbind commands by closing the ServerSocketChannel, replying with Unbound and stopping itself" in { + implicit val timeout: Timeout = 1 second span + listener.ask(Unbind).value must equal(Some(Success(Unbound))) + serverSocketChannel.get.isOpen must equal(false) + listener.isTerminated must equal(true) + } + } + +} \ No newline at end of file