IOManager: support setting socket options

Note: This is not backwards compatible and should only be included in 2.1
This commit is contained in:
Scott R. Parish 2012-05-07 22:16:05 -05:00
parent 3fdc3c757b
commit 16ba2d990b

View file

@ -4,7 +4,7 @@
package akka.actor
import akka.dispatch.{ Future, ExecutionContext }
import akka.util.ByteString
import akka.util.{ ByteString, Duration }
import java.net.{ SocketAddress, InetSocketAddress }
import java.io.IOException
import java.nio.ByteBuffer
@ -108,19 +108,106 @@ object IO {
* This can also be performed by creating a new [[akka.actor.IO.SocketHandle]]
* and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]].
*
* @param options Seq of [[akka.actor.IO.SocketOptions]] to set on accepted socket
* @param socketOwner the [[akka.actor.ActorRef]] that should receive events
* associated with the SocketChannel. The ActorRef for the
* current Actor will be used implicitly.
* @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel.
*/
def accept()(implicit socketOwner: ActorRef): SocketHandle = {
def accept(options: Seq[SocketOption])(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this)
ioManager ! Accept(socket, this, options)
socket
}
/**
* Sends a request to the [[akka.actor.IOManager]] to accept an incoming
* connection to the ServerSocketChannel associated with this
* [[akka.actor.IO.Handle]].
*
* This can also be performed by creating a new [[akka.actor.IO.SocketHandle]]
* and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]].
*
* @param socketOwner the [[akka.actor.ActorRef]] that should receive events
* associated with the SocketChannel. The ActorRef for the
* current Actor will be used implicitly.
* @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel.
*/
def accept()(implicit socketOwner: ActorRef): SocketHandle = accept(Seq.empty)
}
/**
* Options to be set when setting up a [[akka.actor.IO.SocketHandle]]
*/
sealed trait SocketOption
/**
* Options to be set when setting up a [[akka.actor.IO.ServerHandle]]
*/
sealed trait ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_KEEPALIVE
*/
case class KeepAlive(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable OOBINLINE (receipt
* of TCP urgent data) By default, this option is disabled and TCP urgent
* data received on a [[akka.actor.IO.SocketHandle]] is silently discarded.
*/
case class OOBInline(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set performance preferences for this
* [[akka.actor.IO.SocketHandle]].
*/
case class PerformancePreferences(connectionTime: Int, latency: Int, bandwidth: Int) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_RCVBUF option for this
* [[akka.actor.IO.SocketHandle]].
*/
case class ReceiveBufferSize(size: Int) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_REUSEADDR
*/
case class ReuseAddress(on: Boolean) extends SocketOption with ServerSocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the SO_SNDBUF option for this
* [[akka.actor.IO.SocketHandle]].
*/
case class SendBufferSize(size: Int) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_LINGER with the
* specified linger time rounded down to the nearest second.
*/
case class SoLinger(on: Boolean, linger: Duration) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable SO_TIMEOUT with the
* specified timeout rounded down to the nearest millisecond.
*/
case class SoTimeout(timeout: Duration) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to enable or disable TCP_NODELAY
* (disable or enable Nagle's algorithm)
*/
case class TcpNoDelay(on: Boolean) extends SocketOption
/**
* [[akka.actor.IO.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* [[akka.actor.IO.SocketHandle]].
*/
case class TrafficClass(tc: Int) extends SocketOption
/**
* Messages used to communicate with an [[akka.actor.IOManager]].
*/
@ -128,11 +215,12 @@ object IO {
/**
* Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel
* listening on the provided address.
* listening on the provided address with the given
* [[akka.actor.IO.ServerSocketOption]]s.
*
* Normally sent using IOManager.listen()
*/
case class Listen(server: ServerHandle, address: SocketAddress) extends IOMessage
case class Listen(server: ServerHandle, address: SocketAddress, options: Seq[ServerSocketOption] = Seq.empty) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is
@ -149,19 +237,20 @@ object IO {
case class NewClient(server: ServerHandle) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to accept a new connection.
* Message to an [[akka.actor.IOManager]] to accept a new connection with the
* given [[akka.actor.IO.SocketOption]]s.
*
* Normally sent using [[akka.actor.IO.ServerHandle]].accept()
*/
case class Accept(socket: SocketHandle, server: ServerHandle) extends IOMessage
case class Accept(socket: SocketHandle, server: ServerHandle, options: Seq[SocketOption] = Seq.empty) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to create a SocketChannel connected
* to the provided address.
* to the provided address with the given [[akka.actor.IO.SocketOption]]s.
*
* Normally sent using IOManager.connect()
*/
case class Connect(socket: SocketHandle, address: SocketAddress) extends IOMessage
case class Connect(socket: SocketHandle, address: SocketAddress, options: Seq[SocketOption] = Seq.empty) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that the SocketChannel has
@ -691,14 +780,26 @@ final class IOManager private (system: ActorSystem) extends Extension {
*
* @param address the address to listen on
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @param option Seq of [[akka.actor.IO.ServerSocketOptions]] to setup on socket
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = {
def listen(address: SocketAddress, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = {
val server = IO.ServerHandle(owner, actor)
actor ! IO.Listen(server, address)
actor ! IO.Listen(server, address, options)
server
}
/**
* Create a ServerSocketChannel listening on an address. Messages will be
* sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
*
* @param address the address to listen on
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Seq.empty)
/**
* Create a ServerSocketChannel listening on a host and port. Messages will
* be sent from the [[akka.actor.IOManagerActor]] to the owner
@ -706,11 +807,41 @@ final class IOManager private (system: ActorSystem) extends Extension {
*
* @param host the hostname or IP to listen on
* @param port the port to listen on
* @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle =
listen(new InetSocketAddress(host, port))(owner)
def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle =
listen(new InetSocketAddress(host, port), options)(owner)
/**
* Create a ServerSocketChannel listening on a host and port. Messages will
* be sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
*
* @param host the hostname or IP to listen on
* @param port the port to listen on
* @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket
*/
def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle = listen(new InetSocketAddress(host, port), Seq.empty)(owner)
/**
* Create a SocketChannel connecting to an address. Messages will be
* sent from the [[akka.actor.IOManagerActor]] to the owner
* [[akka.actor.ActorRef]].
*
* @param address the address to connect to
* @param options Seq of [[akka.actor.IO.SocketOption]] to setup on established socket
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/
def connect(address: SocketAddress, options: Seq[IO.SocketOption])(implicit owner: ActorRef): IO.SocketHandle = {
val socket = IO.SocketHandle(owner, actor)
actor ! IO.Connect(socket, address, options)
socket
}
/**
* Create a SocketChannel connecting to an address. Messages will be
@ -721,11 +852,7 @@ final class IOManager private (system: ActorSystem) extends Extension {
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/
def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = {
val socket = IO.SocketHandle(owner, actor)
actor ! IO.Connect(socket, address)
socket
}
def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = connect(address, Seq.empty)
/**
* Create a SocketChannel connecting to a host and port. Messages will
@ -734,6 +861,7 @@ final class IOManager private (system: ActorSystem) extends Extension {
*
* @param host the hostname or IP to connect to
* @param port the port to connect to
* @param options Seq of [[akka.actor.IO.SocketOption]] to setup on established socket
* @param owner the ActorRef that will receive messages from the IOManagerActor
* @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket
*/
@ -752,7 +880,7 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider {
*
* Use [[akka.actor.IOManager]] to retrieve an instance of this Actor.
*/
final class IOManagerActor extends Actor {
final class IOManagerActor extends Actor with ActorLogging {
import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT }
private val bufferSize = 8192 // TODO: make buffer size configurable
@ -828,32 +956,61 @@ final class IOManagerActor extends Actor {
lastSelect = 0
}
private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) {
options foreach {
case IO.KeepAlive(on) socket.setKeepAlive(on)
case IO.OOBInline(on) socket.setOOBInline(on)
case IO.ReceiveBufferSize(size) socket.setReceiveBufferSize(size)
case IO.ReuseAddress(on) socket.setReuseAddress(on)
case IO.SendBufferSize(size) socket.setSendBufferSize(size)
case IO.SoLinger(on, linger) socket.setSoLinger(on, linger.toSeconds.toInt)
case IO.SoTimeout(timeout) socket.setSoTimeout(timeout.toMillis.toInt)
case IO.TcpNoDelay(on) socket.setTcpNoDelay(on)
case IO.TrafficClass(tc) socket.setTrafficClass(tc)
case IO.PerformancePreferences(connTime, latency, bandwidth)
socket.setPerformancePreferences(connTime, latency, bandwidth)
}
}
protected def receive = {
case Select
select()
if (running) self ! Select
selectSent = running
case IO.Listen(server, address)
case IO.Listen(server, address, options)
val channel = ServerSocketChannel open ()
channel configureBlocking false
val sock = channel.socket
options foreach {
case IO.ReceiveBufferSize(size) sock.setReceiveBufferSize(size)
case IO.ReuseAddress(on) sock.setReuseAddress(on)
case IO.PerformancePreferences(connTime, latency, bandwidth)
sock.setPerformancePreferences(connTime, latency, bandwidth)
}
channel.socket bind (address, 1000) // TODO: make backlog configurable
channels update (server, channel)
channel register (selector, OP_ACCEPT, server)
server.owner ! IO.Listening(server, channel.socket.getLocalSocketAddress())
run()
case IO.Connect(socket, address)
case IO.Connect(socket, address, options)
val channel = SocketChannel open ()
channel configureBlocking false
channel connect address
setSocketOptions(channel.socket, options)
channels update (socket, channel)
channel register (selector, OP_CONNECT | OP_READ, socket)
run()
case IO.Accept(socket, server)
case IO.Accept(socket, server, options)
val queue = accepted(server)
val channel = queue.dequeue()
channel match { case socketChannel: SocketChannel setSocketOptions(socketChannel.socket, options) }
channels update (socket, channel)
channel register (selector, OP_READ, socket)
run()