Factored out common manager code and SocketOptions

This commit is contained in:
Endre Sándor Varga 2013-02-05 11:48:47 +01:00
parent 946fb0eec4
commit 1ec065b0cd
16 changed files with 188 additions and 173 deletions

View file

@ -6,6 +6,7 @@ package akka.io
import akka.testkit.AkkaSpec
import akka.util.ByteString
import akka.io.Inet
import Tcp._
import TestUtils._
import akka.testkit.EventFilter
@ -64,8 +65,8 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationS
expectReceivedData(clientHandler, 100000)
override def bindOptions = List(SO.SendBufferSize(1024))
override def connectOptions = List(SO.ReceiveBufferSize(1024))
override def bindOptions = List(Inet.SO.SendBufferSize(1024))
override def connectOptions = List(Inet.SO.ReceiveBufferSize(1024))
}
}

View file

@ -8,6 +8,7 @@ import scala.annotation.tailrec
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.actor.ActorRef
import scala.collection.immutable
import akka.io.Inet.SocketOption
import Tcp._
import TestUtils._

View file

@ -22,6 +22,7 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
import akka.util.ByteString
import akka.actor.DeathPactException
import java.nio.channels.SelectionKey._
import akka.io.Inet.SocketOption
class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") {
val serverAddress = temporaryServerAddress()
@ -33,7 +34,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
val userHandler = TestProbe()
val selector = TestProbe()
val connectionActor =
createConnectionActor(options = Vector(SO.ReuseAddress(true)))(selector.ref, userHandler.ref)
createConnectionActor(options = Vector(Inet.SO.ReuseAddress(true)))(selector.ref, userHandler.ref)
val clientChannel = connectionActor.underlyingActor.channel
clientChannel.socket.getReuseAddress must be(true)
}
@ -65,7 +66,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
}
"bundle incoming Received messages as long as more data is available" in withEstablishedConnection(
clientSocketOptions = List(SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through
) { setup
import setup._
@ -567,7 +568,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
def createConnectionActor(
serverAddress: InetSocketAddress = serverAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Seq[Tcp.SocketOption] = Nil)(
options: immutable.Seq[SocketOption] = Nil)(
_selector: ActorRef,
commander: ActorRef): TestActorRef[TcpOutgoingConnection] = {

View file

@ -4,7 +4,9 @@
package akka.io
import akka.actor.{ ActorRef, ActorSystem, ExtensionKey }
import akka.actor._
import akka.routing.RandomRouter
import akka.io.SelectionHandler.KickStartCommand
object IO {
@ -14,4 +16,23 @@ object IO {
def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager
abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {
val selectorPool = context.actorOf(
props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)),
name = "selectors")
def createKickStart(pf: PartialFunction[Any, Props], cmd: Any): PartialFunction[Any, KickStartCommand] = {
pf.andThen { props
val commander = sender
KickStartCommand(cmd, commander, props)
}
}
def kickStartReceive(pf: PartialFunction[Any, Props]): Receive = {
//case KickStartFailed =
case cmd if pf.isDefinedAt(cmd) selectorPool ! createKickStart(pf, cmd)(cmd)
}
}
}

View file

@ -0,0 +1,82 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import java.net.{ DatagramSocket, Socket, ServerSocket }
object Inet {
/**
* SocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a socket).
*/
trait SocketOption {
def beforeDatagramBind(ds: DatagramSocket): Unit = ()
def beforeServerSocketBind(ss: ServerSocket): Unit = ()
/**
* Action to be taken for this option before calling connect()
*/
def beforeConnect(s: Socket): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
def afterConnect(s: Socket): Unit = ()
}
object SO {
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
*
* For more information see [[java.net.Socket.setReceiveBufferSize]]
*/
case class ReceiveBufferSize(size: Int) extends SocketOption {
require(size > 0, "ReceiveBufferSize must be > 0")
override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size)
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size)
override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size)
}
// server socket options
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
*
* For more information see [[java.net.Socket.setReuseAddress]]
*/
case class ReuseAddress(on: Boolean) extends SocketOption {
override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReuseAddress(on)
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReuseAddress(on)
override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on)
}
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
*
* For more information see [[java.net.Socket.setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "SendBufferSize must be > 0")
override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size)
}
/**
* [[akka.io.Tcp.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* socket.
*
* For more information see [[java.net.Socket.setTrafficClass]]
*/
case class TrafficClass(tc: Int) extends SocketOption {
require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]")
override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc)
}
}
}

View file

@ -6,7 +6,7 @@ package akka.io
import java.net.InetSocketAddress
import java.net.Socket
import java.net.ServerSocket
import akka.io.Inet.SocketOption
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
@ -18,56 +18,13 @@ object Tcp extends ExtensionKey[TcpExt] {
// Java API
override def get(system: ActorSystem): TcpExt = system.extension(this)
/**
* SocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a socket).
*/
sealed trait SocketOption {
/**
* Action to be taken for this option before calling bind()
*/
def beforeBind(s: ServerSocket): Unit = ()
/**
* Action to be taken for this option before calling connect()
*/
def beforeConnect(s: Socket): Unit = ()
/**
* Action to be taken for this option after connect returned (i.e. on
* the slave socket for servers).
*/
def afterConnect(s: Socket): Unit = ()
}
// shared socket options
object SO {
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option
*
* For more information see [[java.net.Socket.setReceiveBufferSize]]
*/
case class ReceiveBufferSize(size: Int) extends SocketOption {
require(size > 0, "ReceiveBufferSize must be > 0")
override def beforeBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size)
override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size)
}
// server socket options
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR
*
* For more information see [[java.net.Socket.setReuseAddress]]
*/
case class ReuseAddress(on: Boolean) extends SocketOption {
override def beforeBind(s: ServerSocket): Unit = s.setReuseAddress(on)
override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on)
}
// general socket options
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable SO_KEEPALIVE
* [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE
*
* For more information see [[java.net.Socket.setKeepAlive]]
*/
@ -76,7 +33,7 @@ object Tcp extends ExtensionKey[TcpExt] {
}
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable OOBINLINE (receipt
* [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt
* of TCP urgent data) By default, this option is disabled and TCP urgent
* data is silently discarded.
*
@ -86,20 +43,10 @@ object Tcp extends ExtensionKey[TcpExt] {
override def afterConnect(s: Socket): Unit = s.setOOBInline(on)
}
/**
* [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option.
*
* For more information see [[java.net.Socket.setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "SendBufferSize must be > 0")
override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size)
}
// SO_LINGER is handled by the Close code
/**
* [[akka.io.Tcp.SocketOption]] to enable or disable TCP_NODELAY
* [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY
* (disable or enable Nagle's algorithm)
*
* For more information see [[java.net.Socket.setTcpNoDelay]]
@ -108,17 +55,6 @@ object Tcp extends ExtensionKey[TcpExt] {
override def afterConnect(s: Socket): Unit = s.setTcpNoDelay(on)
}
/**
* [[akka.io.Tcp.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* socket.
*
* For more information see [[java.net.Socket.setTrafficClass]]
*/
case class TrafficClass(tc: Int) extends SocketOption {
require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]")
override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc)
}
}
/// COMMANDS

View file

@ -14,7 +14,8 @@ import scala.util.control.NonFatal
import scala.concurrent.duration._
import akka.actor._
import akka.util.ByteString
import Tcp._
import akka.io.Inet.SocketOption
import akka.io.Tcp._
import akka.io.SelectionHandler._
/**

View file

@ -7,7 +7,7 @@ package akka.io
import java.nio.channels.SocketChannel
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.Tcp.SocketOption
import akka.io.Inet.SocketOption
import akka.io.SelectionHandler.RegisterChannel
/**

View file

@ -11,7 +11,8 @@ import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.{ Props, ActorLogging, ActorRef, Actor }
import akka.io.SelectionHandler._
import Tcp._
import akka.io.Inet.SocketOption
import akka.io.Tcp._
private[io] class TcpListener(selectorRouter: ActorRef,
handler: ActorRef,
@ -29,7 +30,7 @@ private[io] class TcpListener(selectorRouter: ActorRef,
val serverSocketChannel = ServerSocketChannel.open
serverSocketChannel.configureBlocking(false)
val socket = serverSocketChannel.socket
options.foreach(_.beforeBind(socket))
options.foreach(_.beforeServerSocketBind(socket))
socket.bind(endpoint, backlog) // will blow up the actor constructor if the bind fails
serverSocketChannel
}

View file

@ -8,6 +8,7 @@ import akka.actor.{ ActorLogging, Actor, Props }
import akka.routing.RandomRouter
import Tcp._
import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
import akka.io.IO.SelectorBasedManager
/**
* TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections.
@ -44,22 +45,15 @@ import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
* with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference.
*
*/
private[io] class TcpManager(tcp: TcpExt) extends Actor with ActorLogging {
private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging {
val selectorPool = context.actorOf(
props = Props(new SelectionHandler(self, tcp.Settings)).withRouter(RandomRouter(tcp.Settings.NrOfSelectors)),
name = "selectors")
def receive = {
//case x @ (_: Connect | _: Bind) selectorPool forward x
case c @ Connect(remoteAddress, localAddress, options)
def receive = kickStartReceive {
case Connect(remoteAddress, localAddress, options)
val commander = sender
selectorPool ! KickStartCommand(c, commander, Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options)))
case b @ Bind(handler, endpoint, backlog, options)
Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options))
case Bind(handler, endpoint, backlog, options)
val commander = sender
selectorPool ! KickStartCommand(b, commander, Props(new TcpListener(selectorPool, handler, endpoint, backlog, commander, tcp, options)))
case KickStartFailed(cmd: Command, commander) commander ! CommandFailed(cmd)
Props(new TcpListener(selectorPool, handler, endpoint, backlog, commander, tcp, options))
}
}

View file

@ -10,7 +10,8 @@ import java.nio.channels.{ SelectionKey, SocketChannel }
import scala.collection.immutable
import akka.actor.ActorRef
import akka.io.SelectionHandler._
import Tcp._
import akka.io.Inet.SocketOption
import akka.io.Tcp._
/**
* An actor handling the connection state machine for an outgoing connection

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import java.net.DatagramSocket
import akka.io.Inet.SocketOption
object Udp {
object SO {
/**
* [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option
*
* For more information see [[java.net.DatagramSocket#setBroadcast]]
*/
case class Broadcast(on: Boolean) extends SocketOption {
override def beforeDatagramBind(s: DatagramSocket): Unit = s.setBroadcast(on)
}
}
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.actor.{ ExtendedActorSystem, Props, ActorSystemImpl, ExtensionKey }
object UdpConn extends ExtensionKey[UdpConnExt] {
}
class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension {
val manager = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(
props = Props.empty,
name = "IO-UDP-CONN")
}
}

View file

@ -5,82 +5,16 @@ package akka.io
import akka.actor._
import akka.util.ByteString
import java.net.{ DatagramSocket, Socket, InetSocketAddress }
import java.net.{ DatagramSocket, InetSocketAddress }
import scala.collection.immutable
import com.typesafe.config.Config
import scala.concurrent.duration.Duration
import java.nio.ByteBuffer
import akka.io.Inet.SocketOption
object UdpFF extends ExtensionKey[UdpFFExt] {
// Java API
override def get(system: ActorSystem): UdpFFExt = system.extension(this)
/**
* SocketOption is a package of data (from the user) and associated
* behavior (how to apply that to a socket).
*/
sealed trait SocketOption {
/**
* Action to be taken for this option before calling bind()
*/
def beforeBind(s: DatagramSocket): Unit = ()
}
object SO {
/**
* [[akka.io.UdpFF.SocketOption]] to set the SO_BROADCAST option
*
* For more information see [[java.net.DatagramSocket#setBroadcast]]
*/
case class Broadcast(on: Boolean) extends SocketOption {
override def beforeBind(s: DatagramSocket): Unit = s.setBroadcast(on)
}
/**
* [[akka.io.UdpFF.SocketOption]] to set the SO_RCVBUF option
*
* For more information see [[java.net.Socket#setReceiveBufferSize]]
*/
case class ReceiveBufferSize(size: Int) extends SocketOption {
require(size > 0, "ReceiveBufferSize must be > 0")
override def beforeBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size)
}
/**
* [[akka.io.UdpFF.SocketOption]] to enable or disable SO_REUSEADDR
*
* For more information see [[java.net.Socket#setReuseAddress]]
*/
case class ReuseAddress(on: Boolean) extends SocketOption {
override def beforeBind(s: DatagramSocket): Unit = s.setReuseAddress(on)
}
/**
* [[akka.io.UdpFF.SocketOption]] to set the SO_SNDBUF option.
*
* For more information see [[java.net.Socket#setSendBufferSize]]
*/
case class SendBufferSize(size: Int) extends SocketOption {
require(size > 0, "SendBufferSize must be > 0")
override def beforeBind(s: DatagramSocket): Unit = s.setSendBufferSize(size)
}
/**
* [[akka.io.UdpFF.SocketOption]] to set the traffic class or
* type-of-service octet in the IP header for packets sent from this
* socket.
*
* For more information see [[java.net.Socket#setTrafficClass]]
*/
case class TrafficClass(tc: Int) extends SocketOption {
require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]")
override def beforeBind(s: DatagramSocket): Unit = s.setTrafficClass(tc)
}
}
trait Command
case object NoAck

View file

@ -5,6 +5,7 @@ package akka.io
import akka.actor.{ ActorLogging, Actor, ActorRef }
import akka.io.UdpFF._
import akka.io.Inet.SocketOption
import akka.io.SelectionHandler._
import akka.util.ByteString
import java.net.InetSocketAddress
@ -30,7 +31,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef,
val datagramChannel = DatagramChannel.open
datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket
options.foreach(_.beforeBind(socket))
options.foreach(_.beforeDatagramBind(socket))
socket.bind(endpoint) // will blow up the actor constructor if the bind fails
datagramChannel
}

View file

@ -7,6 +7,7 @@ import akka.actor.{ ActorRef, Props, Actor }
import akka.io.UdpFF._
import akka.routing.RandomRouter
import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
import akka.io.IO.SelectorBasedManager
/**
* UdpFFManager is a facade for simple fire-and-forget style UDP operations
@ -43,24 +44,19 @@ import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand }
* discarded.
*
*/
private[io] class UdpFFManager(udpFF: UdpFFExt) extends Actor {
val selectorPool = context.actorOf(
props = Props(new SelectionHandler(self, udpFF.settings)).withRouter(RandomRouter(udpFF.settings.NrOfSelectors)),
name = "selectors")
private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) {
// FIXME: fix close overs
lazy val anonymousSender: ActorRef = context.actorOf(
props = Props(new UdpFFSender(udpFF, selectorPool)),
name = "simplesend")
def receive = {
case b @ Bind(handler, endpoint, options)
def receive = kickStartReceive {
case Bind(handler, endpoint, options)
val commander = sender
selectorPool ! KickStartCommand(b, commander, Props(
new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options)))
case SimpleSender anonymousSender forward SimpleSender
case KickStartFailed(cmd: Command, commander) commander ! CommandFailed(cmd)
Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options))
} orElse {
case SimpleSender anonymousSender forward SimpleSender
}
}