diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index fb3beff1ff..10ca9a1fe9 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -4,7 +4,7 @@ package akka.actor import akka.dispatch.{ Future, ExecutionContext } -import akka.util.ByteString +import akka.util.{ ByteString, Duration } import java.net.{ SocketAddress, InetSocketAddress } import java.io.IOException import java.nio.ByteBuffer @@ -108,19 +108,106 @@ object IO { * This can also be performed by creating a new [[akka.actor.IO.SocketHandle]] * and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]]. * + * @param options Seq of [[akka.actor.IO.SocketOptions]] to set on accepted socket * @param socketOwner the [[akka.actor.ActorRef]] that should receive events * associated with the SocketChannel. The ActorRef for the * current Actor will be used implicitly. * @return a new SocketHandle that can be used to perform actions on the * new connection's SocketChannel. */ - def accept()(implicit socketOwner: ActorRef): SocketHandle = { + def accept(options: Seq[SocketOption])(implicit socketOwner: ActorRef): SocketHandle = { val socket = SocketHandle(socketOwner, ioManager) - ioManager ! Accept(socket, this) + ioManager ! Accept(socket, this, options) socket } + + /** + * Sends a request to the [[akka.actor.IOManager]] to accept an incoming + * connection to the ServerSocketChannel associated with this + * [[akka.actor.IO.Handle]]. + * + * This can also be performed by creating a new [[akka.actor.IO.SocketHandle]] + * and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]]. + * + * @param socketOwner the [[akka.actor.ActorRef]] that should receive events + * associated with the SocketChannel. The ActorRef for the + * current Actor will be used implicitly. + * @return a new SocketHandle that can be used to perform actions on the + * new connection's SocketChannel. + */ + def accept()(implicit socketOwner: ActorRef): SocketHandle = accept(Seq.empty) } + /** + * Options to be set when setting up a [[akka.actor.IO.SocketHandle]] + */ + sealed trait SocketOption + + /** + * Options to be set when setting up a [[akka.actor.IO.ServerHandle]] + */ + sealed trait ServerSocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable SO_KEEPALIVE + */ + case class KeepAlive(on: Boolean) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable OOBINLINE (receipt + * of TCP urgent data) By default, this option is disabled and TCP urgent + * data received on a [[akka.actor.IO.SocketHandle]] is silently discarded. + */ + case class OOBInline(on: Boolean) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to set performance preferences for this + * [[akka.actor.IO.SocketHandle]]. + */ + case class PerformancePreferences(connectionTime: Int, latency: Int, bandwidth: Int) extends SocketOption with ServerSocketOption + + /** + * [[akka.actor.IO.SocketOption]] to set the SO_RCVBUF option for this + * [[akka.actor.IO.SocketHandle]]. + */ + case class ReceiveBufferSize(size: Int) extends SocketOption with ServerSocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable SO_REUSEADDR + */ + case class ReuseAddress(on: Boolean) extends SocketOption with ServerSocketOption + + /** + * [[akka.actor.IO.SocketOption]] to set the SO_SNDBUF option for this + * [[akka.actor.IO.SocketHandle]]. + */ + case class SendBufferSize(size: Int) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable SO_LINGER with the + * specified linger time rounded down to the nearest second. + */ + case class SoLinger(on: Boolean, linger: Duration) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable SO_TIMEOUT with the + * specified timeout rounded down to the nearest millisecond. + */ + case class SoTimeout(timeout: Duration) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to enable or disable TCP_NODELAY + * (disable or enable Nagle's algorithm) + */ + case class TcpNoDelay(on: Boolean) extends SocketOption + + /** + * [[akka.actor.IO.SocketOption]] to set the traffic class or + * type-of-service octet in the IP header for packets sent from this + * [[akka.actor.IO.SocketHandle]]. + */ + case class TrafficClass(tc: Int) extends SocketOption + /** * Messages used to communicate with an [[akka.actor.IOManager]]. */ @@ -128,11 +215,12 @@ object IO { /** * Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel - * listening on the provided address. + * listening on the provided address with the given + * [[akka.actor.IO.ServerSocketOption]]s. * * Normally sent using IOManager.listen() */ - case class Listen(server: ServerHandle, address: SocketAddress) extends IOMessage + case class Listen(server: ServerHandle, address: SocketAddress, options: Seq[ServerSocketOption] = Seq.empty) extends IOMessage /** * Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is @@ -149,19 +237,20 @@ object IO { case class NewClient(server: ServerHandle) extends IOMessage /** - * Message to an [[akka.actor.IOManager]] to accept a new connection. + * Message to an [[akka.actor.IOManager]] to accept a new connection with the + * given [[akka.actor.IO.SocketOption]]s. * * Normally sent using [[akka.actor.IO.ServerHandle]].accept() */ - case class Accept(socket: SocketHandle, server: ServerHandle) extends IOMessage + case class Accept(socket: SocketHandle, server: ServerHandle, options: Seq[SocketOption] = Seq.empty) extends IOMessage /** * Message to an [[akka.actor.IOManager]] to create a SocketChannel connected - * to the provided address. + * to the provided address with the given [[akka.actor.IO.SocketOption]]s. * * Normally sent using IOManager.connect() */ - case class Connect(socket: SocketHandle, address: SocketAddress) extends IOMessage + case class Connect(socket: SocketHandle, address: SocketAddress, options: Seq[SocketOption] = Seq.empty) extends IOMessage /** * Message from an [[akka.actor.IOManager]] that the SocketChannel has @@ -691,14 +780,26 @@ final class IOManager private (system: ActorSystem) extends Extension { * * @param address the address to listen on * @param owner the ActorRef that will receive messages from the IOManagerActor + * @param option Seq of [[akka.actor.IO.ServerSocketOptions]] to setup on socket * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket */ - def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = { + def listen(address: SocketAddress, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = { val server = IO.ServerHandle(owner, actor) - actor ! IO.Listen(server, address) + actor ! IO.Listen(server, address, options) server } + /** + * Create a ServerSocketChannel listening on an address. Messages will be + * sent from the [[akka.actor.IOManagerActor]] to the owner + * [[akka.actor.ActorRef]]. + * + * @param address the address to listen on + * @param owner the ActorRef that will receive messages from the IOManagerActor + * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket + */ + def listen(address: SocketAddress)(implicit owner: ActorRef): IO.ServerHandle = listen(address, Seq.empty) + /** * Create a ServerSocketChannel listening on a host and port. Messages will * be sent from the [[akka.actor.IOManagerActor]] to the owner @@ -706,11 +807,41 @@ final class IOManager private (system: ActorSystem) extends Extension { * * @param host the hostname or IP to listen on * @param port the port to listen on + * @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket */ - def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle = - listen(new InetSocketAddress(host, port))(owner) + def listen(host: String, port: Int, options: Seq[IO.ServerSocketOption])(implicit owner: ActorRef): IO.ServerHandle = + listen(new InetSocketAddress(host, port), options)(owner) + + /** + * Create a ServerSocketChannel listening on a host and port. Messages will + * be sent from the [[akka.actor.IOManagerActor]] to the owner + * [[akka.actor.ActorRef]]. + * + * @param host the hostname or IP to listen on + * @param port the port to listen on + * @param options Seq of [[akka.actor.IO.ServerSocketOption]] to setup on socket + * @param owner the ActorRef that will receive messages from the IOManagerActor + * @return a [[akka.actor.IO.ServerHandle]] to uniquely identify the created socket + */ + def listen(host: String, port: Int)(implicit owner: ActorRef): IO.ServerHandle = listen(new InetSocketAddress(host, port), Seq.empty)(owner) + + /** + * Create a SocketChannel connecting to an address. Messages will be + * sent from the [[akka.actor.IOManagerActor]] to the owner + * [[akka.actor.ActorRef]]. + * + * @param address the address to connect to + * @param options Seq of [[akka.actor.IO.SocketOption]] to setup on established socket + * @param owner the ActorRef that will receive messages from the IOManagerActor + * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket + */ + def connect(address: SocketAddress, options: Seq[IO.SocketOption])(implicit owner: ActorRef): IO.SocketHandle = { + val socket = IO.SocketHandle(owner, actor) + actor ! IO.Connect(socket, address, options) + socket + } /** * Create a SocketChannel connecting to an address. Messages will be @@ -721,11 +852,7 @@ final class IOManager private (system: ActorSystem) extends Extension { * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket */ - def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = { - val socket = IO.SocketHandle(owner, actor) - actor ! IO.Connect(socket, address) - socket - } + def connect(address: SocketAddress)(implicit owner: ActorRef): IO.SocketHandle = connect(address, Seq.empty) /** * Create a SocketChannel connecting to a host and port. Messages will @@ -734,6 +861,7 @@ final class IOManager private (system: ActorSystem) extends Extension { * * @param host the hostname or IP to connect to * @param port the port to connect to + * @param options Seq of [[akka.actor.IO.SocketOption]] to setup on established socket * @param owner the ActorRef that will receive messages from the IOManagerActor * @return a [[akka.actor.IO.SocketHandle]] to uniquely identify the created socket */ @@ -752,7 +880,7 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { * * Use [[akka.actor.IOManager]] to retrieve an instance of this Actor. */ -final class IOManagerActor extends Actor { +final class IOManagerActor extends Actor with ActorLogging { import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT } private val bufferSize = 8192 // TODO: make buffer size configurable @@ -828,32 +956,61 @@ final class IOManagerActor extends Actor { lastSelect = 0 } + private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) { + options foreach { + case IO.KeepAlive(on) ⇒ socket.setKeepAlive(on) + case IO.OOBInline(on) ⇒ socket.setOOBInline(on) + case IO.ReceiveBufferSize(size) ⇒ socket.setReceiveBufferSize(size) + case IO.ReuseAddress(on) ⇒ socket.setReuseAddress(on) + case IO.SendBufferSize(size) ⇒ socket.setSendBufferSize(size) + case IO.SoLinger(on, linger) ⇒ socket.setSoLinger(on, linger.toSeconds.toInt) + case IO.SoTimeout(timeout) ⇒ socket.setSoTimeout(timeout.toMillis.toInt) + case IO.TcpNoDelay(on) ⇒ socket.setTcpNoDelay(on) + case IO.TrafficClass(tc) ⇒ socket.setTrafficClass(tc) + case IO.PerformancePreferences(connTime, latency, bandwidth) ⇒ + socket.setPerformancePreferences(connTime, latency, bandwidth) + } + } + protected def receive = { case Select ⇒ select() if (running) self ! Select selectSent = running - case IO.Listen(server, address) ⇒ + case IO.Listen(server, address, options) ⇒ val channel = ServerSocketChannel open () channel configureBlocking false + + val sock = channel.socket + options foreach { + case IO.ReceiveBufferSize(size) ⇒ sock.setReceiveBufferSize(size) + case IO.ReuseAddress(on) ⇒ sock.setReuseAddress(on) + case IO.PerformancePreferences(connTime, latency, bandwidth) ⇒ + sock.setPerformancePreferences(connTime, latency, bandwidth) + } + channel.socket bind (address, 1000) // TODO: make backlog configurable channels update (server, channel) channel register (selector, OP_ACCEPT, server) server.owner ! IO.Listening(server, channel.socket.getLocalSocketAddress()) run() - case IO.Connect(socket, address) ⇒ + case IO.Connect(socket, address, options) ⇒ val channel = SocketChannel open () channel configureBlocking false channel connect address + setSocketOptions(channel.socket, options) channels update (socket, channel) channel register (selector, OP_CONNECT | OP_READ, socket) run() - case IO.Accept(socket, server) ⇒ + case IO.Accept(socket, server, options) ⇒ val queue = accepted(server) val channel = queue.dequeue() + + channel match { case socketChannel: SocketChannel ⇒ setSocketOptions(socketChannel.socket, options) } + channels update (socket, channel) channel register (selector, OP_READ, socket) run()