This commit is contained in:
parent
be9abae1e3
commit
27d111b1f5
9 changed files with 460 additions and 79 deletions
|
|
@ -38,11 +38,25 @@ akka {
|
|||
# - setting it to zero means polling, i.e. calling selectNow()
|
||||
select-timeout = infinite
|
||||
|
||||
# When trying to create a new connection but the chosen selector is at
|
||||
# full capacity, retry this many times with different selectors before
|
||||
# giving up
|
||||
# When trying to assign a new connection to a selector and the chosen
|
||||
# selector is at full capacity, retry selector choosing and assignment
|
||||
# this many times before giving up
|
||||
selector-association-retries = 10
|
||||
|
||||
|
||||
# The maximum number of connection that are accepted in one go,
|
||||
# higher numbers decrease latency, lower numbers increase fairness on
|
||||
# the worker-dispatcher
|
||||
batch-accept-limit = 10
|
||||
|
||||
# The size of the thread-local direct buffers used to read or write
|
||||
# network data from the kernel. Those buffer directly add to the footprint
|
||||
# of the threads from the dispatcher tcp connection actors are using.
|
||||
direct-buffer-size = 524288
|
||||
|
||||
# The duration a connection actor waits for a `Register` message from
|
||||
# its commander before aborting the connection.
|
||||
register-timeout = 5s
|
||||
|
||||
# Fully qualified config path which holds the dispatcher configuration
|
||||
# to be used for running the select() calls in the selectors
|
||||
selector-dispatcher = "akka.io.pinned-dispatcher"
|
||||
|
|
@ -54,15 +68,6 @@ akka {
|
|||
# Fully qualified config path which holds the dispatcher configuration
|
||||
# for the selector management actors
|
||||
management-dispatcher = "akka.actor.default-dispatcher"
|
||||
|
||||
# The size of the thread-local direct buffers used to read or write
|
||||
# network data from the kernel. Those buffer directly add to the footprint
|
||||
# of the threads from the dispatcher tcp connection actors are using.
|
||||
direct-buffer-size = 524288
|
||||
|
||||
# The duration a connection actor waits for a `Register` message from
|
||||
# its commander before aborting the connection.
|
||||
register-timeout = 5s
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,18 +23,6 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
// Java API
|
||||
override def get(system: ActorSystem): TcpExt = system.extension(this)
|
||||
|
||||
/// COMMANDS
|
||||
sealed trait Command
|
||||
|
||||
case class Connect(remoteAddress: InetSocketAddress,
|
||||
localAddress: Option[InetSocketAddress] = None) extends Command
|
||||
case class Bind(handler: ActorRef,
|
||||
address: InetSocketAddress,
|
||||
backlog: Int = 100,
|
||||
options: immutable.Seq[SocketOption] = Nil) extends Command
|
||||
case object Unbind extends Command
|
||||
case class Register(handler: ActorRef) extends Command
|
||||
|
||||
/**
|
||||
* SocketOption is a package of data (from the user) and associated
|
||||
* behavior (how to apply that to a socket).
|
||||
|
|
@ -54,11 +42,12 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
*/
|
||||
def afterConnect(s: Socket): Unit = ()
|
||||
}
|
||||
|
||||
// shared socket options
|
||||
object SO {
|
||||
// shared socket options
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to set the SO_RCVBUF option
|
||||
* [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
|
||||
*
|
||||
* For more information see [[java.net.Socket.setReceiveBufferSize]]
|
||||
*/
|
||||
|
|
@ -71,7 +60,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
// server socket options
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to enable or disable SO_REUSEADDR
|
||||
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
|
||||
*
|
||||
* For more information see [[java.net.Socket.setReuseAddress]]
|
||||
*/
|
||||
|
|
@ -83,7 +72,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
// general socket options
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to enable or disable SO_KEEPALIVE
|
||||
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_KEEPALIVE
|
||||
*
|
||||
* For more information see [[java.net.Socket.setKeepAlive]]
|
||||
*/
|
||||
|
|
@ -92,7 +81,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
}
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to enable or disable OOBINLINE (receipt
|
||||
* [[akka.io.Tcp.SocketOption]] to enable or disable OOBINLINE (receipt
|
||||
* of TCP urgent data) By default, this option is disabled and TCP urgent
|
||||
* data is silently discarded.
|
||||
*
|
||||
|
|
@ -103,19 +92,19 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
}
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to set the SO_SNDBUF option.
|
||||
* [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
|
||||
*
|
||||
* For more information see [[java.net.Socket.setSendBufferSize]]
|
||||
*/
|
||||
case class SendBufferSize(size: Int) extends SocketOption {
|
||||
require(size > 0, "SendBufferSize must be > 0")
|
||||
require(size > 0, "ReceiveBufferSize must be > 0")
|
||||
override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size)
|
||||
}
|
||||
|
||||
// SO_LINGER is handled by the Close code
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to enable or disable TCP_NODELAY
|
||||
* [[akka.io.Tcp.SocketOption]] to enable or disable TCP_NODELAY
|
||||
* (disable or enable Nagle's algorithm)
|
||||
*
|
||||
* For more information see [[java.net.Socket.setTcpNoDelay]]
|
||||
|
|
@ -125,7 +114,7 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
}
|
||||
|
||||
/**
|
||||
* [[akka.io.Tcp.SO.SocketOption]] to set the traffic class or
|
||||
* [[akka.io.Tcp.SocketOption]] to set the traffic class or
|
||||
* type-of-service octet in the IP header for packets sent from this
|
||||
* socket.
|
||||
*
|
||||
|
|
@ -137,9 +126,28 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: what about close reasons?
|
||||
sealed trait CloseCommand extends Command
|
||||
case class Stats(channelsOpened: Long, channelsClosed: Long, selectorStats: Seq[SelectorStats]) {
|
||||
def channelsOpen = channelsOpened - channelsClosed
|
||||
}
|
||||
|
||||
case class SelectorStats(channelsOpened: Long, channelsClosed: Long) {
|
||||
def channelsOpen = channelsOpened - channelsClosed
|
||||
}
|
||||
|
||||
/// COMMANDS
|
||||
sealed trait Command
|
||||
|
||||
case class Connect(remoteAddress: InetSocketAddress,
|
||||
localAddress: Option[InetSocketAddress] = None,
|
||||
options: immutable.Seq[SocketOption] = Nil) extends Command
|
||||
case class Bind(handler: ActorRef,
|
||||
endpoint: InetSocketAddress,
|
||||
backlog: Int = 100,
|
||||
options: immutable.Seq[SocketOption] = Nil) extends Command
|
||||
case class Register(handler: ActorRef) extends Command
|
||||
case object Unbind extends Command
|
||||
|
||||
sealed trait CloseCommand extends Command
|
||||
case object Close extends CloseCommand
|
||||
case object ConfirmedClose extends CloseCommand
|
||||
case object Abort extends CloseCommand
|
||||
|
|
@ -154,6 +162,8 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
case object StopReading extends Command
|
||||
case object ResumeReading extends Command
|
||||
|
||||
case object GetStats extends Command
|
||||
|
||||
/// EVENTS
|
||||
sealed trait Event
|
||||
|
||||
|
|
@ -171,15 +181,12 @@ object Tcp extends ExtensionKey[TcpExt] {
|
|||
case class ErrorClose(cause: Throwable) extends ConnectionClosed
|
||||
|
||||
/// INTERNAL
|
||||
case class RegisterClientChannel(channel: SocketChannel)
|
||||
case class RegisterServerChannel(channel: ServerSocketChannel)
|
||||
case class CreateConnection(channel: SocketChannel)
|
||||
case class Reject(command: Command, commander: ActorRef)
|
||||
// Retry should be sent by Selector actors to their parent router with retriesLeft decremented. If retries are
|
||||
// depleted, the selector actor must reply directly to the manager with a Reject (above).
|
||||
case class Retry(command: Command, retriesLeft: Int, commander: ActorRef) {
|
||||
require(retriesLeft >= 0, "The upper limit for retries must be nonnegative.")
|
||||
}
|
||||
case class RegisterOutgoingConnection(channel: SocketChannel)
|
||||
case class RegisterServerSocketChannel(channel: ServerSocketChannel)
|
||||
case class RegisterIncomingConnection(channel: SocketChannel, handler: ActorRef, options: immutable.Seq[SocketOption])
|
||||
case class CreateConnection(channel: SocketChannel, handler: ActorRef, options: immutable.Seq[SocketOption])
|
||||
case class Reject(command: Command, retriesLeft: Int, commander: ActorRef)
|
||||
case class Retry(command: Command, retriesLeft: Int, commander: ActorRef)
|
||||
case object ChannelConnectable
|
||||
case object ChannelAcceptable
|
||||
case object ChannelReadable
|
||||
|
|
@ -197,21 +204,29 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
|
||||
val NrOfSelectors = getInt("nr-of-selectors")
|
||||
val MaxChannels = getInt("max-channels")
|
||||
val MaxChannelsPerSelector = MaxChannels / NrOfSelectors
|
||||
val SelectTimeout =
|
||||
if (getString("select-timeout") == "infinite") Duration.Inf
|
||||
else Duration(getMilliseconds("select-timeout"), MILLISECONDS)
|
||||
val SelectorAssociationRetries = getInt("selector-association-retries")
|
||||
val SelectorDispatcher = getString("selector-dispatcher")
|
||||
val WorkerDispatcher = getString("worker-dispatcher")
|
||||
val ManagementDispatcher = getString("management-dispatcher")
|
||||
val BatchAcceptLimit = getInt("batch-accept-limit")
|
||||
val DirectBufferSize = getInt("direct-buffer-size")
|
||||
val RegisterTimeout =
|
||||
if (getString("register-timeout") == "infinite") Duration.Undefined
|
||||
else Duration(getMilliseconds("register-timeout"), MILLISECONDS)
|
||||
val SelectorDispatcher = getString("selector-dispatcher")
|
||||
val WorkerDispatcher = getString("worker-dispatcher")
|
||||
val ManagementDispatcher = getString("management-dispatcher")
|
||||
|
||||
require(NrOfSelectors > 0, "nr-of-selectors must be > 0")
|
||||
require(MaxChannels >= 0, "max-channels must be >= 0")
|
||||
require(SelectTimeout >= Duration.Zero, "select-timeout must not be negative")
|
||||
require(SelectorAssociationRetries >= 0, "selector-association-retries must be >= 0")
|
||||
require(BatchAcceptLimit > 0, "batch-accept-limit must be > 0")
|
||||
|
||||
val MaxChannelsPerSelector = MaxChannels / NrOfSelectors
|
||||
}
|
||||
|
||||
val manager = system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
Props[TcpManager].withDispatcher(Settings.ManagementDispatcher), "IO-TCP")
|
||||
Props.empty.withDispatcher(Settings.ManagementDispatcher), "IO-TCP")
|
||||
|
||||
}
|
||||
|
|
|
|||
76
akka-io/src/main/scala/akka/io/TcpListener.scala
Normal file
76
akka-io/src/main/scala/akka/io/TcpListener.scala
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.nio.channels.ServerSocketChannel
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ ActorLogging, ActorRef, Actor }
|
||||
import Tcp._
|
||||
|
||||
class TcpListener(manager: ActorRef,
|
||||
selector: ActorRef,
|
||||
handler: ActorRef,
|
||||
endpoint: InetSocketAddress,
|
||||
backlog: Int,
|
||||
bindCommander: ActorRef,
|
||||
options: immutable.Seq[SocketOption]) extends Actor with ActorLogging {
|
||||
|
||||
val batchAcceptLimit = Tcp(context.system).Settings.BatchAcceptLimit
|
||||
val channel = {
|
||||
val serverSocketChannel = ServerSocketChannel.open
|
||||
serverSocketChannel.configureBlocking(false)
|
||||
val socket = serverSocketChannel.socket
|
||||
options.foreach(_.beforeBind(socket))
|
||||
socket.bind(endpoint, backlog) // will blow up the actor constructor if the bind fails
|
||||
serverSocketChannel
|
||||
}
|
||||
selector ! RegisterServerSocketChannel(channel)
|
||||
context.watch(bindCommander) // sign death pact
|
||||
log.debug("Successfully bound to {}", endpoint)
|
||||
|
||||
def receive: Receive = {
|
||||
case Bound ⇒
|
||||
bindCommander ! Bound
|
||||
context.become(bound)
|
||||
}
|
||||
|
||||
def bound: Receive = {
|
||||
case ChannelAcceptable ⇒
|
||||
acceptAllPending(batchAcceptLimit)
|
||||
|
||||
case Unbind ⇒
|
||||
log.debug("Unbinding endpoint {}", endpoint)
|
||||
channel.close()
|
||||
sender ! Unbound
|
||||
log.debug("Unbound endpoint {}, stopping listener", endpoint)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
@tailrec final def acceptAllPending(limit: Int): Unit =
|
||||
if (limit > 0) {
|
||||
val socketChannel =
|
||||
try channel.accept()
|
||||
catch {
|
||||
case NonFatal(e) ⇒ log.error(e, "Accept error: could not accept new connection due to {}", e); null
|
||||
}
|
||||
if (socketChannel != null) {
|
||||
log.debug("New connection accepted")
|
||||
manager ! RegisterIncomingConnection(socketChannel, handler, options)
|
||||
selector ! AcceptInterest
|
||||
acceptAllPending(limit - 1)
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
try channel.close()
|
||||
catch {
|
||||
case NonFatal(e) ⇒ log.error(e, "Error closing ServerSocketChannel")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,14 +1,21 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
|
||||
import akka.actor.{ OneForOneStrategy, Actor, Props }
|
||||
import akka.io.Tcp._
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.{ ActorLogging, Actor, Props }
|
||||
import akka.routing.RandomRouter
|
||||
import akka.actor.SupervisorStrategy.Restart
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.{ ask, pipe }
|
||||
import Tcp._
|
||||
|
||||
/**
|
||||
* TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections.
|
||||
*
|
||||
* TcpManager is obtainable by calling {{{ IO(TCP) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]])
|
||||
* TcpManager is obtainable by calling {{{ IO(Tcp) }}} (see [[akka.io.IO]] and [[akka.io.Tcp]])
|
||||
*
|
||||
* == Bind ==
|
||||
*
|
||||
|
|
@ -30,9 +37,9 @@ import akka.actor.SupervisorStrategy.Restart
|
|||
* To initiate a connection to a remote server, a [[akka.io.Tcp.Connect]] message must be sent to this actor. If the
|
||||
* connection succeeds, the sender will be notified with a [[akka.io.Tcp.Connected]] message. The sender of the
|
||||
* [[akka.io.Tcp.Connected]] message is the Connection actor (an internal actor representing the TCP connection). Before
|
||||
* starting to use the connection, a handler should be registered to the Connection actor by sending a [[akka.io.Tcp.Register]]
|
||||
* message. After a handler has been registered, all incoming data will be sent to the handler in the form of
|
||||
* [[akka.io.Tcp.Received]] messages. To write data to the connection, a [[akka.io.Tcp.Write]] message should be sent
|
||||
* starting to use the connection, a handler must be registered to the Connection actor by sending a [[akka.io.Tcp.Register]]
|
||||
* command message. After a handler has been registered, all incoming data will be sent to the handler in the form of
|
||||
* [[akka.io.Tcp.Received]] messages. To write data to the connection, a [[akka.io.Tcp.Write]] message must be sent
|
||||
* to the Connection actor.
|
||||
*
|
||||
* If the connect request is rejected because the Tcp system is not able to register more channels (see the nr-of-selectors
|
||||
|
|
@ -40,14 +47,36 @@ import akka.actor.SupervisorStrategy.Restart
|
|||
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
|
||||
*
|
||||
*/
|
||||
class TcpManager extends Actor {
|
||||
class TcpManager extends Actor with ActorLogging {
|
||||
val settings = Tcp(context.system).Settings
|
||||
val selectorNr = Iterator.from(0)
|
||||
|
||||
val selectorPool = context.actorOf(Props.empty.withRouter(RandomRouter(settings.NrOfSelectors)))
|
||||
val selectorPool = context.actorOf(
|
||||
props = Props(new TcpSelector(self)).withRouter(RandomRouter(settings.NrOfSelectors)),
|
||||
name = selectorNr.next().toString)
|
||||
|
||||
def receive = {
|
||||
case c: Connect ⇒ selectorPool forward c
|
||||
case b: Bind ⇒ selectorPool forward b
|
||||
case Reject(command, commander) ⇒ commander ! CommandFailed(command)
|
||||
case RegisterIncomingConnection(channel, handler, options) ⇒
|
||||
selectorPool ! CreateConnection(channel, handler, options)
|
||||
|
||||
case c: Connect ⇒
|
||||
selectorPool forward c
|
||||
|
||||
case b: Bind ⇒
|
||||
selectorPool forward b
|
||||
|
||||
case Reject(command, 0, commander) ⇒
|
||||
log.warning("Command '{}' failed since all {} selectors are at capacity", command, context.children.size)
|
||||
commander ! CommandFailed(command)
|
||||
|
||||
case Reject(command, retriesLeft, commander) ⇒
|
||||
log.warning("Command '{}' rejected by {} with {} retries left, retrying...", command, sender, retriesLeft)
|
||||
selectorPool ! Retry(command, retriesLeft - 1, commander)
|
||||
|
||||
case GetStats ⇒
|
||||
import context.dispatcher
|
||||
implicit val timeout: Timeout = 1 second span
|
||||
val seqFuture = Future.traverse(context.children)(_.ask(GetStats).mapTo[SelectorStats])
|
||||
seqFuture.map(s ⇒ Stats(s.map(_.channelsOpen).sum, s.map(_.channelsClosed).sum, s.toSeq)) pipeTo sender
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class TcpOutgoingConnection(_selector: ActorRef,
|
|||
if (channel.connect(remoteAddress))
|
||||
completeConnect(commander, options)
|
||||
else {
|
||||
selector ! RegisterClientChannel(channel)
|
||||
selector ! RegisterOutgoingConnection(channel)
|
||||
context.become(connecting(commander, options))
|
||||
}
|
||||
|
||||
|
|
|
|||
205
akka-io/src/main/scala/akka/io/TcpSelector.scala
Normal file
205
akka-io/src/main/scala/akka/io/TcpSelector.scala
Normal file
|
|
@ -0,0 +1,205 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
|
||||
import java.lang.Runnable
|
||||
import java.nio.channels.spi.SelectorProvider
|
||||
import java.nio.channels.{ ServerSocketChannel, SelectionKey, SocketChannel }
|
||||
import java.nio.channels.SelectionKey._
|
||||
import scala.util.control.NonFatal
|
||||
import scala.collection.immutable.HashMap
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import Tcp._
|
||||
|
||||
class TcpSelector(manager: ActorRef) extends Actor with ActorLogging {
|
||||
@volatile var childrenKeys = HashMap.empty[String, SelectionKey]
|
||||
var channelsOpened = 0L
|
||||
var channelsClosed = 0L
|
||||
val sequenceNumber = Iterator.from(0)
|
||||
val settings = Tcp(context.system).Settings
|
||||
val selectorManagementDispatcher = context.system.dispatchers.lookup(settings.SelectorDispatcher)
|
||||
val selector = SelectorProvider.provider.openSelector
|
||||
val doSelect: () ⇒ Int =
|
||||
settings.SelectTimeout match {
|
||||
case Duration.Zero ⇒ () ⇒ selector.selectNow()
|
||||
case Duration.Inf ⇒ () ⇒ selector.select()
|
||||
case x ⇒ val millis = x.toMillis; () ⇒ selector.select(millis)
|
||||
}
|
||||
|
||||
selectorManagementDispatcher.execute(select) // start selection "loop"
|
||||
|
||||
def receive: Receive = {
|
||||
case WriteInterest ⇒ execute(enableInterest(OP_WRITE, sender))
|
||||
case ReadInterest ⇒ execute(enableInterest(OP_READ, sender))
|
||||
case AcceptInterest ⇒ execute(enableInterest(OP_ACCEPT, sender))
|
||||
|
||||
case CreateConnection(channel, handler, options) ⇒
|
||||
val connection = context.actorOf(
|
||||
props = Props(
|
||||
creator = () ⇒ new TcpIncomingConnection(self, channel, handler, options),
|
||||
dispatcher = settings.WorkerDispatcher),
|
||||
name = nextName)
|
||||
execute(registerIncomingConnection(channel, handler))
|
||||
context.watch(connection)
|
||||
channelsOpened += 1
|
||||
|
||||
case cmd: Connect ⇒
|
||||
handleConnect(cmd, settings.SelectorAssociationRetries, sender)
|
||||
|
||||
case Retry(cmd: Connect, retriesLeft, commander) ⇒
|
||||
handleConnect(cmd, retriesLeft, commander)
|
||||
|
||||
case RegisterOutgoingConnection(channel) ⇒
|
||||
execute(registerOutgoingConnection(channel, sender))
|
||||
|
||||
case cmd: Bind ⇒
|
||||
handleBind(cmd, settings.SelectorAssociationRetries, sender)
|
||||
|
||||
case Retry(cmd: Bind, retriesLeft, commander) ⇒
|
||||
handleBind(cmd, retriesLeft, commander)
|
||||
|
||||
case RegisterServerSocketChannel(channel) ⇒
|
||||
execute(registerListener(channel, sender))
|
||||
|
||||
case Terminated(child) ⇒
|
||||
execute(unregister(child))
|
||||
channelsClosed += 1
|
||||
|
||||
case GetStats ⇒
|
||||
sender ! SelectorStats(channelsOpened, channelsClosed)
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
try {
|
||||
import scala.collection.JavaConverters._
|
||||
selector.keys.asScala.foreach(_.channel.close())
|
||||
selector.close()
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ log.error(e, "Error closing selector or key")
|
||||
}
|
||||
}
|
||||
|
||||
// we can never recover from failures of a connection or listener child
|
||||
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||
|
||||
def handleConnect(cmd: Connect, retriesLeft: Int, commander: ActorRef): Unit = {
|
||||
log.debug("Executing {}", cmd)
|
||||
if (canHandleMoreChannels) {
|
||||
val connection = context.actorOf(
|
||||
props = Props(
|
||||
creator = () ⇒ new TcpOutgoingConnection(self, commander, cmd.remoteAddress, cmd.localAddress, cmd.options),
|
||||
dispatcher = settings.WorkerDispatcher),
|
||||
name = nextName)
|
||||
context.watch(connection)
|
||||
channelsOpened += 1
|
||||
} else sender ! Reject(cmd, retriesLeft, commander)
|
||||
}
|
||||
|
||||
def handleBind(cmd: Bind, retriesLeft: Int, commander: ActorRef): Unit = {
|
||||
log.debug("Executing {}", cmd)
|
||||
if (canHandleMoreChannels) {
|
||||
val listener = context.actorOf(
|
||||
props = Props(
|
||||
creator = () ⇒ new TcpListener(manager, self, cmd.handler, cmd.endpoint, cmd.backlog, commander, cmd.options),
|
||||
dispatcher = settings.WorkerDispatcher),
|
||||
name = nextName)
|
||||
context.watch(listener)
|
||||
channelsOpened += 1
|
||||
} else sender ! Reject(cmd, retriesLeft, commander)
|
||||
}
|
||||
|
||||
def nextName = sequenceNumber.next().toString
|
||||
|
||||
def canHandleMoreChannels = childrenKeys.size < settings.MaxChannelsPerSelector
|
||||
|
||||
//////////////// Management Tasks scheduled via the selectorManagementDispatcher /////////////
|
||||
|
||||
def execute(task: Task): Unit = {
|
||||
selectorManagementDispatcher.execute(task)
|
||||
selector.wakeup()
|
||||
}
|
||||
|
||||
def updateKeyMap(child: ActorRef, key: SelectionKey): Unit =
|
||||
childrenKeys = childrenKeys.updated(child.path.name, key)
|
||||
|
||||
def registerOutgoingConnection(channel: SocketChannel, connection: ActorRef) =
|
||||
new Task {
|
||||
def tryRun() {
|
||||
val key = channel.register(selector, OP_CONNECT, connection)
|
||||
updateKeyMap(connection, key)
|
||||
}
|
||||
}
|
||||
|
||||
def registerListener(channel: ServerSocketChannel, listener: ActorRef) =
|
||||
new Task {
|
||||
def tryRun() {
|
||||
val key = channel.register(selector, OP_ACCEPT, listener)
|
||||
updateKeyMap(listener, key)
|
||||
listener ! Bound
|
||||
}
|
||||
}
|
||||
|
||||
def registerIncomingConnection(channel: SocketChannel, connection: ActorRef) =
|
||||
new Task {
|
||||
def tryRun() {
|
||||
// we only enable reading after the user-level connection handler has registered
|
||||
val key = channel.register(selector, 0, connection)
|
||||
updateKeyMap(connection, key)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: evaluate whether we could run this on the TcpSelector actor itself rather than
|
||||
// on the selector-management-dispatcher. The trade-off would be using a ConcurrentHashMap
|
||||
// rather than an unsynchronized one, but since switching interest ops is so frequent
|
||||
// the change might be beneficial, provided the underlying implementation really is thread-safe
|
||||
// and behaves consistently on all platforms.
|
||||
def enableInterest(op: Int, connection: ActorRef) =
|
||||
new Task {
|
||||
def tryRun() {
|
||||
val key = childrenKeys(connection.path.name)
|
||||
key.interestOps(key.interestOps | op)
|
||||
}
|
||||
}
|
||||
|
||||
def unregister(child: ActorRef) =
|
||||
new Task {
|
||||
def tryRun() {
|
||||
childrenKeys = childrenKeys - child.path.name
|
||||
}
|
||||
}
|
||||
|
||||
val select = new Task {
|
||||
def tryRun() {
|
||||
if (doSelect() > 0) {
|
||||
val keys = selector.selectedKeys
|
||||
val iterator = keys.iterator()
|
||||
while (iterator.hasNext) {
|
||||
val key = iterator.next
|
||||
val connection = key.attachment.asInstanceOf[ActorRef]
|
||||
if (key.isValid) {
|
||||
if (key.isReadable) connection ! ChannelReadable
|
||||
if (key.isWritable) connection ! ChannelWritable
|
||||
else if (key.isAcceptable) connection ! ChannelAcceptable
|
||||
else if (key.isConnectable) connection ! ChannelConnectable
|
||||
key.interestOps(0) // prevent immediate reselection by always clearing
|
||||
} else log.warning("Invalid selection key: {}", key)
|
||||
}
|
||||
keys.clear() // we need to remove the selected keys from the set, otherwise they remain selected
|
||||
}
|
||||
selectorManagementDispatcher.execute(this) // re-schedules select behind all currently queued tasks
|
||||
}
|
||||
}
|
||||
|
||||
abstract class Task extends Runnable {
|
||||
def tryRun()
|
||||
def run() {
|
||||
try tryRun()
|
||||
catch {
|
||||
case NonFatal(e) ⇒ log.error(e, "Error during selector management task: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class IOSpec extends AkkaSpec {
|
||||
|
||||
}
|
||||
|
|
@ -233,7 +233,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
val selector = TestProbe()
|
||||
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
|
||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||
selector.expectMsg(RegisterClientChannel(clientSideChannel))
|
||||
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
|
||||
|
||||
// close instead of accept
|
||||
localServer.close()
|
||||
|
|
@ -250,7 +250,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
val selector = TestProbe()
|
||||
val connectionActor = createConnectionActor(selector.ref, userHandler.ref, serverAddress = new InetSocketAddress("127.0.0.1", 63186))
|
||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||
selector.expectMsg(RegisterClientChannel(clientSideChannel))
|
||||
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
|
||||
val sel = SelectorProvider.provider().openSelector()
|
||||
val key = clientSideChannel.register(sel, SelectionKey.OP_CONNECT | SelectionKey.OP_READ)
|
||||
sel.select(200)
|
||||
|
|
@ -269,7 +269,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
val selector = TestProbe()
|
||||
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
|
||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||
selector.expectMsg(RegisterClientChannel(clientSideChannel))
|
||||
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
|
||||
localServer.accept()
|
||||
selector.send(connectionActor, ChannelConnectable)
|
||||
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
|
||||
|
|
@ -284,7 +284,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
val selector = TestProbe()
|
||||
val connectionActor = createConnectionActor(selector.ref, userHandler)
|
||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||
selector.expectMsg(RegisterClientChannel(clientSideChannel))
|
||||
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
|
||||
system.stop(userHandler)
|
||||
assertActorTerminated(connectionActor)
|
||||
}
|
||||
|
|
@ -346,7 +346,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
|||
val connectionActor = createConnectionActor(selector.ref, userHandler.ref)
|
||||
val clientSideChannel = connectionActor.underlyingActor.channel
|
||||
|
||||
selector.expectMsg(RegisterClientChannel(clientSideChannel))
|
||||
selector.expectMsg(RegisterOutgoingConnection(clientSideChannel))
|
||||
|
||||
localServer.configureBlocking(true)
|
||||
val serverSideChannel = localServer.accept()
|
||||
|
|
|
|||
62
akka-io/src/test/scala/akka/io/TcpListenerSpec.scala
Normal file
62
akka-io/src/test/scala/akka/io/TcpListenerSpec.scala
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.io
|
||||
|
||||
import java.net.{ Socket, InetSocketAddress }
|
||||
import java.nio.channels.ServerSocketChannel
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Success
|
||||
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec }
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import Tcp._
|
||||
|
||||
class TcpListenerSpec extends AkkaSpec("akka.io.tcp.batch-accept-limit = 2") {
|
||||
val port = 47323
|
||||
|
||||
"A TcpListener" must {
|
||||
val manager = TestProbe()
|
||||
val selector = TestProbe()
|
||||
val handler = TestProbe()
|
||||
val handlerRef = handler.ref
|
||||
val bindCommander = TestProbe()
|
||||
val endpoint = new InetSocketAddress("localhost", port)
|
||||
val listener = TestActorRef(new TcpListener(manager.ref, selector.ref, handler.ref, endpoint, 100,
|
||||
bindCommander.ref, Nil))
|
||||
var serverSocketChannel: Option[ServerSocketChannel] = None
|
||||
|
||||
"register its ServerSocketChannel with its selector" in {
|
||||
val RegisterServerSocketChannel(channel) = selector.receiveOne(Duration.Zero)
|
||||
serverSocketChannel = Some(channel)
|
||||
}
|
||||
|
||||
"let the Bind commander know when binding is completed" in {
|
||||
listener ! Bound
|
||||
bindCommander.expectMsg(Bound)
|
||||
}
|
||||
|
||||
"accept two acceptable connections at once and register them with the manager" in {
|
||||
new Socket("localhost", port)
|
||||
new Socket("localhost", port)
|
||||
new Socket("localhost", port)
|
||||
listener ! ChannelAcceptable
|
||||
val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero)
|
||||
val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero)
|
||||
}
|
||||
|
||||
"accept one more connection and register it with the manager" in {
|
||||
listener ! ChannelAcceptable
|
||||
val RegisterIncomingConnection(_, `handlerRef`, Nil) = manager.receiveOne(Duration.Zero)
|
||||
}
|
||||
|
||||
"react to Unbind commands by closing the ServerSocketChannel, replying with Unbound and stopping itself" in {
|
||||
implicit val timeout: Timeout = 1 second span
|
||||
listener.ask(Unbind).value must equal(Some(Success(Unbound)))
|
||||
serverSocketChannel.get.isOpen must equal(false)
|
||||
listener.isTerminated must equal(true)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue