From 1ec065b0cd33ab147bfaf7417c27de5cd997cd9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 5 Feb 2013 11:48:47 +0100 Subject: [PATCH] Factored out common manager code and SocketOptions --- .../test/scala/akka/io/IntegrationSpec.scala | 5 +- .../akka/io/IntegrationSpecSupport.scala | 1 + .../scala/akka/io/TcpConnectionSpec.scala | 7 +- akka-actor/src/main/scala/akka/io/IO.scala | 23 +++++- akka-actor/src/main/scala/akka/io/Inet.scala | 82 +++++++++++++++++++ akka-actor/src/main/scala/akka/io/Tcp.scala | 72 +--------------- .../main/scala/akka/io/TcpConnection.scala | 3 +- .../scala/akka/io/TcpIncomingConnection.scala | 2 +- .../src/main/scala/akka/io/TcpListener.scala | 5 +- .../src/main/scala/akka/io/TcpManager.scala | 22 ++--- .../scala/akka/io/TcpOutgoingConnection.scala | 3 +- akka-actor/src/main/scala/akka/io/Udp.scala | 24 ++++++ .../src/main/scala/akka/io/UdpConn.scala | 21 +++++ akka-actor/src/main/scala/akka/io/UdpFF.scala | 70 +--------------- .../main/scala/akka/io/UdpFFListener.scala | 3 +- .../src/main/scala/akka/io/UdpFFManager.scala | 18 ++-- 16 files changed, 188 insertions(+), 173 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/io/Inet.scala create mode 100644 akka-actor/src/main/scala/akka/io/Udp.scala create mode 100644 akka-actor/src/main/scala/akka/io/UdpConn.scala diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala index e4d53f5f9b..1e7a40eb90 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala @@ -6,6 +6,7 @@ package akka.io import akka.testkit.AkkaSpec import akka.util.ByteString +import akka.io.Inet import Tcp._ import TestUtils._ import akka.testkit.EventFilter @@ -64,8 +65,8 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationS expectReceivedData(clientHandler, 100000) - override def bindOptions = List(SO.SendBufferSize(1024)) - override def connectOptions = List(SO.ReceiveBufferSize(1024)) + override def bindOptions = List(Inet.SO.SendBufferSize(1024)) + override def connectOptions = List(Inet.SO.ReceiveBufferSize(1024)) } } diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala index 0feeb3809f..692815b96a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpecSupport.scala @@ -8,6 +8,7 @@ import scala.annotation.tailrec import akka.testkit.{ AkkaSpec, TestProbe } import akka.actor.ActorRef import scala.collection.immutable +import akka.io.Inet.SocketOption import Tcp._ import TestUtils._ diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 08c7b745f2..a59f2ccad4 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -22,6 +22,7 @@ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.ByteString import akka.actor.DeathPactException import java.nio.channels.SelectionKey._ +import akka.io.Inet.SocketOption class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") { val serverAddress = temporaryServerAddress() @@ -33,7 +34,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") val userHandler = TestProbe() val selector = TestProbe() val connectionActor = - createConnectionActor(options = Vector(SO.ReuseAddress(true)))(selector.ref, userHandler.ref) + createConnectionActor(options = Vector(Inet.SO.ReuseAddress(true)))(selector.ref, userHandler.ref) val clientChannel = connectionActor.underlyingActor.channel clientChannel.socket.getReuseAddress must be(true) } @@ -65,7 +66,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") } "bundle incoming Received messages as long as more data is available" in withEstablishedConnection( - clientSocketOptions = List(SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through + clientSocketOptions = List(Inet.SO.ReceiveBufferSize(1000000)) // to make sure enough data gets through ) { setup ⇒ import setup._ @@ -567,7 +568,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") def createConnectionActor( serverAddress: InetSocketAddress = serverAddress, localAddress: Option[InetSocketAddress] = None, - options: immutable.Seq[Tcp.SocketOption] = Nil)( + options: immutable.Seq[SocketOption] = Nil)( _selector: ActorRef, commander: ActorRef): TestActorRef[TcpOutgoingConnection] = { diff --git a/akka-actor/src/main/scala/akka/io/IO.scala b/akka-actor/src/main/scala/akka/io/IO.scala index e787f1295b..2b6cbd9f57 100644 --- a/akka-actor/src/main/scala/akka/io/IO.scala +++ b/akka-actor/src/main/scala/akka/io/IO.scala @@ -4,7 +4,9 @@ package akka.io -import akka.actor.{ ActorRef, ActorSystem, ExtensionKey } +import akka.actor._ +import akka.routing.RandomRouter +import akka.io.SelectionHandler.KickStartCommand object IO { @@ -14,4 +16,23 @@ object IO { def apply[T <: Extension](key: ExtensionKey[T])(implicit system: ActorSystem): ActorRef = key(system).manager + abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor { + + val selectorPool = context.actorOf( + props = Props(new SelectionHandler(self, selectorSettings)).withRouter(RandomRouter(nrOfSelectors)), + name = "selectors") + + def createKickStart(pf: PartialFunction[Any, Props], cmd: Any): PartialFunction[Any, KickStartCommand] = { + pf.andThen { props ⇒ + val commander = sender + KickStartCommand(cmd, commander, props) + } + } + + def kickStartReceive(pf: PartialFunction[Any, Props]): Receive = { + //case KickStartFailed = + case cmd if pf.isDefinedAt(cmd) ⇒ selectorPool ! createKickStart(pf, cmd)(cmd) + } + } + } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala new file mode 100644 index 0000000000..9e53507284 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.io + +import java.net.{ DatagramSocket, Socket, ServerSocket } + +object Inet { + + /** + * SocketOption is a package of data (from the user) and associated + * behavior (how to apply that to a socket). + */ + trait SocketOption { + + def beforeDatagramBind(ds: DatagramSocket): Unit = () + + def beforeServerSocketBind(ss: ServerSocket): Unit = () + + /** + * Action to be taken for this option before calling connect() + */ + def beforeConnect(s: Socket): Unit = () + /** + * Action to be taken for this option after connect returned (i.e. on + * the slave socket for servers). + */ + def afterConnect(s: Socket): Unit = () + } + + object SO { + + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option + * + * For more information see [[java.net.Socket.setReceiveBufferSize]] + */ + case class ReceiveBufferSize(size: Int) extends SocketOption { + require(size > 0, "ReceiveBufferSize must be > 0") + override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size) + override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size) + override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size) + } + + // server socket options + + /** + * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR + * + * For more information see [[java.net.Socket.setReuseAddress]] + */ + case class ReuseAddress(on: Boolean) extends SocketOption { + override def beforeServerSocketBind(s: ServerSocket): Unit = s.setReuseAddress(on) + override def beforeDatagramBind(s: DatagramSocket): Unit = s.setReuseAddress(on) + override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on) + } + + /** + * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option. + * + * For more information see [[java.net.Socket.setSendBufferSize]] + */ + case class SendBufferSize(size: Int) extends SocketOption { + require(size > 0, "SendBufferSize must be > 0") + override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size) + } + + /** + * [[akka.io.Tcp.SocketOption]] to set the traffic class or + * type-of-service octet in the IP header for packets sent from this + * socket. + * + * For more information see [[java.net.Socket.setTrafficClass]] + */ + case class TrafficClass(tc: Int) extends SocketOption { + require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]") + override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc) + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 4b71c81676..f7a31a76fe 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -6,7 +6,7 @@ package akka.io import java.net.InetSocketAddress import java.net.Socket -import java.net.ServerSocket +import akka.io.Inet.SocketOption import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable @@ -18,56 +18,13 @@ object Tcp extends ExtensionKey[TcpExt] { // Java API override def get(system: ActorSystem): TcpExt = system.extension(this) - /** - * SocketOption is a package of data (from the user) and associated - * behavior (how to apply that to a socket). - */ - sealed trait SocketOption { - /** - * Action to be taken for this option before calling bind() - */ - def beforeBind(s: ServerSocket): Unit = () - /** - * Action to be taken for this option before calling connect() - */ - def beforeConnect(s: Socket): Unit = () - /** - * Action to be taken for this option after connect returned (i.e. on - * the slave socket for servers). - */ - def afterConnect(s: Socket): Unit = () - } - // shared socket options object SO { - /** - * [[akka.io.Tcp.SocketOption]] to set the SO_RCVBUF option - * - * For more information see [[java.net.Socket.setReceiveBufferSize]] - */ - case class ReceiveBufferSize(size: Int) extends SocketOption { - require(size > 0, "ReceiveBufferSize must be > 0") - override def beforeBind(s: ServerSocket): Unit = s.setReceiveBufferSize(size) - override def beforeConnect(s: Socket): Unit = s.setReceiveBufferSize(size) - } - - // server socket options - - /** - * [[akka.io.Tcp.SocketOption]] to enable or disable SO_REUSEADDR - * - * For more information see [[java.net.Socket.setReuseAddress]] - */ - case class ReuseAddress(on: Boolean) extends SocketOption { - override def beforeBind(s: ServerSocket): Unit = s.setReuseAddress(on) - override def beforeConnect(s: Socket): Unit = s.setReuseAddress(on) - } - // general socket options /** - * [[akka.io.Tcp.SocketOption]] to enable or disable SO_KEEPALIVE + * [[akka.io.Inet.SocketOption]] to enable or disable SO_KEEPALIVE * * For more information see [[java.net.Socket.setKeepAlive]] */ @@ -76,7 +33,7 @@ object Tcp extends ExtensionKey[TcpExt] { } /** - * [[akka.io.Tcp.SocketOption]] to enable or disable OOBINLINE (receipt + * [[akka.io.Inet.SocketOption]] to enable or disable OOBINLINE (receipt * of TCP urgent data) By default, this option is disabled and TCP urgent * data is silently discarded. * @@ -86,20 +43,10 @@ object Tcp extends ExtensionKey[TcpExt] { override def afterConnect(s: Socket): Unit = s.setOOBInline(on) } - /** - * [[akka.io.Tcp.SocketOption]] to set the SO_SNDBUF option. - * - * For more information see [[java.net.Socket.setSendBufferSize]] - */ - case class SendBufferSize(size: Int) extends SocketOption { - require(size > 0, "SendBufferSize must be > 0") - override def afterConnect(s: Socket): Unit = s.setSendBufferSize(size) - } - // SO_LINGER is handled by the Close code /** - * [[akka.io.Tcp.SocketOption]] to enable or disable TCP_NODELAY + * [[akka.io.Inet.SocketOption]] to enable or disable TCP_NODELAY * (disable or enable Nagle's algorithm) * * For more information see [[java.net.Socket.setTcpNoDelay]] @@ -108,17 +55,6 @@ object Tcp extends ExtensionKey[TcpExt] { override def afterConnect(s: Socket): Unit = s.setTcpNoDelay(on) } - /** - * [[akka.io.Tcp.SocketOption]] to set the traffic class or - * type-of-service octet in the IP header for packets sent from this - * socket. - * - * For more information see [[java.net.Socket.setTrafficClass]] - */ - case class TrafficClass(tc: Int) extends SocketOption { - require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]") - override def afterConnect(s: Socket): Unit = s.setTrafficClass(tc) - } } /// COMMANDS diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index b2dea0571c..b331ef0622 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -14,7 +14,8 @@ import scala.util.control.NonFatal import scala.concurrent.duration._ import akka.actor._ import akka.util.ByteString -import Tcp._ +import akka.io.Inet.SocketOption +import akka.io.Tcp._ import akka.io.SelectionHandler._ /** diff --git a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala index 5da609c617..ea2703c40e 100644 --- a/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpIncomingConnection.scala @@ -7,7 +7,7 @@ package akka.io import java.nio.channels.SocketChannel import scala.collection.immutable import akka.actor.ActorRef -import akka.io.Tcp.SocketOption +import akka.io.Inet.SocketOption import akka.io.SelectionHandler.RegisterChannel /** diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index f3cc55d4a5..541d101327 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -11,7 +11,8 @@ import scala.collection.immutable import scala.util.control.NonFatal import akka.actor.{ Props, ActorLogging, ActorRef, Actor } import akka.io.SelectionHandler._ -import Tcp._ +import akka.io.Inet.SocketOption +import akka.io.Tcp._ private[io] class TcpListener(selectorRouter: ActorRef, handler: ActorRef, @@ -29,7 +30,7 @@ private[io] class TcpListener(selectorRouter: ActorRef, val serverSocketChannel = ServerSocketChannel.open serverSocketChannel.configureBlocking(false) val socket = serverSocketChannel.socket - options.foreach(_.beforeBind(socket)) + options.foreach(_.beforeServerSocketBind(socket)) socket.bind(endpoint, backlog) // will blow up the actor constructor if the bind fails serverSocketChannel } diff --git a/akka-actor/src/main/scala/akka/io/TcpManager.scala b/akka-actor/src/main/scala/akka/io/TcpManager.scala index f8dc6373ad..a6d3cc95ab 100644 --- a/akka-actor/src/main/scala/akka/io/TcpManager.scala +++ b/akka-actor/src/main/scala/akka/io/TcpManager.scala @@ -8,6 +8,7 @@ import akka.actor.{ ActorLogging, Actor, Props } import akka.routing.RandomRouter import Tcp._ import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } +import akka.io.IO.SelectorBasedManager /** * TcpManager is a facade for accepting commands ([[akka.io.Tcp.Command]]) to open client or server TCP connections. @@ -44,22 +45,15 @@ import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } * with a [[akka.io.Tcp.CommandFailed]] message. This message contains the original command for reference. * */ -private[io] class TcpManager(tcp: TcpExt) extends Actor with ActorLogging { +private[io] class TcpManager(tcp: TcpExt) extends SelectorBasedManager(tcp.Settings, tcp.Settings.NrOfSelectors) with ActorLogging { - val selectorPool = context.actorOf( - props = Props(new SelectionHandler(self, tcp.Settings)).withRouter(RandomRouter(tcp.Settings.NrOfSelectors)), - name = "selectors") - - def receive = { - //case x @ (_: Connect | _: Bind) ⇒ selectorPool forward x - case c @ Connect(remoteAddress, localAddress, options) ⇒ + def receive = kickStartReceive { + case Connect(remoteAddress, localAddress, options) ⇒ val commander = sender - selectorPool ! KickStartCommand(c, commander, Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options))) - - case b @ Bind(handler, endpoint, backlog, options) ⇒ + Props(new TcpOutgoingConnection(tcp, commander, remoteAddress, localAddress, options)) + case Bind(handler, endpoint, backlog, options) ⇒ val commander = sender - selectorPool ! KickStartCommand(b, commander, Props(new TcpListener(selectorPool, handler, endpoint, backlog, commander, tcp, options))) - - case KickStartFailed(cmd: Command, commander) ⇒ commander ! CommandFailed(cmd) + Props(new TcpListener(selectorPool, handler, endpoint, backlog, commander, tcp, options)) } + } diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index 1a04f6bc5c..2c28bbada2 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -10,7 +10,8 @@ import java.nio.channels.{ SelectionKey, SocketChannel } import scala.collection.immutable import akka.actor.ActorRef import akka.io.SelectionHandler._ -import Tcp._ +import akka.io.Inet.SocketOption +import akka.io.Tcp._ /** * An actor handling the connection state machine for an outgoing connection diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala new file mode 100644 index 0000000000..c543cf4927 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.io + +import java.net.DatagramSocket +import akka.io.Inet.SocketOption + +object Udp { + + object SO { + + /** + * [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option + * + * For more information see [[java.net.DatagramSocket#setBroadcast]] + */ + case class Broadcast(on: Boolean) extends SocketOption { + override def beforeDatagramBind(s: DatagramSocket): Unit = s.setBroadcast(on) + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/io/UdpConn.scala b/akka-actor/src/main/scala/akka/io/UdpConn.scala new file mode 100644 index 0000000000..651953e65e --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/UdpConn.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.io + +import akka.actor.{ ExtendedActorSystem, Props, ActorSystemImpl, ExtensionKey } + +object UdpConn extends ExtensionKey[UdpConnExt] { + +} + +class UdpConnExt(system: ExtendedActorSystem) extends IO.Extension { + + val manager = { + system.asInstanceOf[ActorSystemImpl].systemActorOf( + props = Props.empty, + name = "IO-UDP-CONN") + } + +} + diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 6d9274e191..31913dce3d 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -5,82 +5,16 @@ package akka.io import akka.actor._ import akka.util.ByteString -import java.net.{ DatagramSocket, Socket, InetSocketAddress } +import java.net.{ DatagramSocket, InetSocketAddress } import scala.collection.immutable import com.typesafe.config.Config -import scala.concurrent.duration.Duration -import java.nio.ByteBuffer +import akka.io.Inet.SocketOption object UdpFF extends ExtensionKey[UdpFFExt] { // Java API override def get(system: ActorSystem): UdpFFExt = system.extension(this) - /** - * SocketOption is a package of data (from the user) and associated - * behavior (how to apply that to a socket). - */ - sealed trait SocketOption { - /** - * Action to be taken for this option before calling bind() - */ - def beforeBind(s: DatagramSocket): Unit = () - - } - - object SO { - - /** - * [[akka.io.UdpFF.SocketOption]] to set the SO_BROADCAST option - * - * For more information see [[java.net.DatagramSocket#setBroadcast]] - */ - case class Broadcast(on: Boolean) extends SocketOption { - override def beforeBind(s: DatagramSocket): Unit = s.setBroadcast(on) - } - - /** - * [[akka.io.UdpFF.SocketOption]] to set the SO_RCVBUF option - * - * For more information see [[java.net.Socket#setReceiveBufferSize]] - */ - case class ReceiveBufferSize(size: Int) extends SocketOption { - require(size > 0, "ReceiveBufferSize must be > 0") - override def beforeBind(s: DatagramSocket): Unit = s.setReceiveBufferSize(size) - } - - /** - * [[akka.io.UdpFF.SocketOption]] to enable or disable SO_REUSEADDR - * - * For more information see [[java.net.Socket#setReuseAddress]] - */ - case class ReuseAddress(on: Boolean) extends SocketOption { - override def beforeBind(s: DatagramSocket): Unit = s.setReuseAddress(on) - } - - /** - * [[akka.io.UdpFF.SocketOption]] to set the SO_SNDBUF option. - * - * For more information see [[java.net.Socket#setSendBufferSize]] - */ - case class SendBufferSize(size: Int) extends SocketOption { - require(size > 0, "SendBufferSize must be > 0") - override def beforeBind(s: DatagramSocket): Unit = s.setSendBufferSize(size) - } - - /** - * [[akka.io.UdpFF.SocketOption]] to set the traffic class or - * type-of-service octet in the IP header for packets sent from this - * socket. - * - * For more information see [[java.net.Socket#setTrafficClass]] - */ - case class TrafficClass(tc: Int) extends SocketOption { - require(0 <= tc && tc <= 255, "TrafficClass needs to be in the interval [0, 255]") - override def beforeBind(s: DatagramSocket): Unit = s.setTrafficClass(tc) - } - } - trait Command case object NoAck diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index a481559e07..c3600ff121 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -5,6 +5,7 @@ package akka.io import akka.actor.{ ActorLogging, Actor, ActorRef } import akka.io.UdpFF._ +import akka.io.Inet.SocketOption import akka.io.SelectionHandler._ import akka.util.ByteString import java.net.InetSocketAddress @@ -30,7 +31,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, val datagramChannel = DatagramChannel.open datagramChannel.configureBlocking(false) val socket = datagramChannel.socket - options.foreach(_.beforeBind(socket)) + options.foreach(_.beforeDatagramBind(socket)) socket.bind(endpoint) // will blow up the actor constructor if the bind fails datagramChannel } diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 1a9311a43b..be417166d0 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -7,6 +7,7 @@ import akka.actor.{ ActorRef, Props, Actor } import akka.io.UdpFF._ import akka.routing.RandomRouter import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } +import akka.io.IO.SelectorBasedManager /** * UdpFFManager is a facade for simple fire-and-forget style UDP operations @@ -43,24 +44,19 @@ import akka.io.SelectionHandler.{ KickStartFailed, KickStartCommand } * discarded. * */ -private[io] class UdpFFManager(udpFF: UdpFFExt) extends Actor { - - val selectorPool = context.actorOf( - props = Props(new SelectionHandler(self, udpFF.settings)).withRouter(RandomRouter(udpFF.settings.NrOfSelectors)), - name = "selectors") +private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) { // FIXME: fix close overs lazy val anonymousSender: ActorRef = context.actorOf( props = Props(new UdpFFSender(udpFF, selectorPool)), name = "simplesend") - def receive = { - case b @ Bind(handler, endpoint, options) ⇒ + def receive = kickStartReceive { + case Bind(handler, endpoint, options) ⇒ val commander = sender - selectorPool ! KickStartCommand(b, commander, Props( - new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options))) - case SimpleSender ⇒ anonymousSender forward SimpleSender - case KickStartFailed(cmd: Command, commander) ⇒ commander ! CommandFailed(cmd) + Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options)) + } orElse { + case SimpleSender ⇒ anonymousSender forward SimpleSender } }