diff --git a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala index 1e7a40eb90..e4d53f5f9b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/IntegrationSpec.scala @@ -6,7 +6,6 @@ package akka.io import akka.testkit.AkkaSpec import akka.util.ByteString -import akka.io.Inet import Tcp._ import TestUtils._ import akka.testkit.EventFilter @@ -65,8 +64,8 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationS expectReceivedData(clientHandler, 100000) - override def bindOptions = List(Inet.SO.SendBufferSize(1024)) - override def connectOptions = List(Inet.SO.ReceiveBufferSize(1024)) + override def bindOptions = List(SO.SendBufferSize(1024)) + override def connectOptions = List(SO.ReceiveBufferSize(1024)) } } diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala index 90bfdb222e..ece503de33 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnIntegrationSpec.scala @@ -19,9 +19,9 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli (address, commander.sender) } - def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress): ActorRef = { + def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = { val commander = TestProbe() - commander.send(IO(UdpConn), UdpConn.Connect(testActor, localAddress, remoteAddress, Nil)) + commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil)) commander.expectMsg(UdpConn.Connected) commander.sender } @@ -32,7 +32,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli val (serverAddress, server) = bindUdp(testActor) val data1 = ByteString("To infinity and beyond!") val data2 = ByteString("All your datagram belong to us") - connectUdp(localAddress = None, serverAddress) ! UdpConn.Send(data1) + connectUdp(localAddress = None, serverAddress, testActor) ! UdpConn.Send(data1) val clientAddress = expectMsgPF() { case UdpFF.Received(d, a) ⇒ @@ -55,7 +55,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli val (serverAddress, server) = bindUdp(testActor) val data1 = ByteString("To infinity and beyond!") val data2 = ByteString("All your datagram belong to us") - connectUdp(Some(clientAddress), serverAddress) ! UdpConn.Send(data1) + connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConn.Send(data1) expectMsgPF() { case UdpFF.Received(d, a) ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala index 21d61c3e1d..a0f138f041 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpFFIntegrationSpec.scala @@ -22,7 +22,7 @@ class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Implici val simpleSender: ActorRef = { val commander = TestProbe() - commander.send(IO(UdpFF), SimpleSender) + commander.send(IO(UdpFF), SimpleSender(Nil)) commander.expectMsg(SimpleSendReady) commander.sender } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index b755af0e09..c17594a307 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -470,6 +470,11 @@ akka { # this many times before giving up selector-association-retries = 10 + # The maximum number of datagrams that are read in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + batch-receive-limit = 3 + # The number of bytes per direct buffer in the pool used to read or write # network data from the kernel. direct-buffer-size = 128 KiB @@ -526,6 +531,11 @@ akka { # this many times before giving up selector-association-retries = 10 + # The maximum number of datagrams that are read in one go, + # higher numbers decrease latency, lower numbers increase fairness on + # the worker-dispatcher + batch-receive-limit = 3 + # The number of bytes per direct buffer in the pool used to read or write # network data from the kernel. direct-buffer-size = 128 KiB diff --git a/akka-actor/src/main/scala/akka/io/Inet.scala b/akka-actor/src/main/scala/akka/io/Inet.scala index 9e53507284..0b9fb4ca0c 100644 --- a/akka-actor/src/main/scala/akka/io/Inet.scala +++ b/akka-actor/src/main/scala/akka/io/Inet.scala @@ -79,4 +79,11 @@ object Inet { } + trait SoForwarders { + val ReceiveBufferSize = SO.ReceiveBufferSize + val ReuseAddress = SO.ReuseAddress + val SendBufferSize = SO.SendBufferSize + val TrafficClass = SO.TrafficClass + } + } diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 0f0c068017..e7046d8b34 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -208,6 +208,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler selectorManagementDispatcher.execute(select) // start selection "loop" + // FIXME: Add possibility to signal failure of task to someone abstract class Task extends Runnable { def tryRun() def run() { diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 38c8051714..95218f16cf 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -19,7 +19,7 @@ object Tcp extends ExtensionKey[TcpExt] { override def get(system: ActorSystem): TcpExt = system.extension(this) // shared socket options - object SO { + object SO extends Inet.SoForwarders { // general socket options diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index 18d874baa4..83e8b0d5f1 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -10,7 +10,7 @@ import akka.actor.{ Props, ActorSystemImpl } object Udp { - object SO { + object SO extends Inet.SoForwarders { /** * [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option @@ -29,9 +29,11 @@ object Udp { val NrOfSelectors = getInt("nr-of-selectors") val DirectBufferSize = getIntBytes("direct-buffer-size") val MaxDirectBufferPoolSize = getInt("max-direct-buffer-pool-size") + val BatchReceiveLimit = getInt("batch-receive-limit") val ManagementDispatcher = getString("management-dispatcher") + // FIXME: Use new requiring require(NrOfSelectors > 0, "nr-of-selectors must be > 0") override val MaxChannelsPerSelector = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 8f8fd2fb35..11a5a17f71 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -13,6 +13,8 @@ import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ import scala.collection.immutable import scala.util.control.NonFatal +import java.nio.ByteBuffer +import scala.annotation.tailrec private[io] class UdpConnection(selectorRouter: ActorRef, handler: ActorRef, @@ -36,6 +38,7 @@ private[io] class UdpConnection(selectorRouter: ActorRef, datagramChannel.configureBlocking(false) val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) + // FIXME: All bind failures have to be reported to the commander in TCP as well localAddress foreach { socket.bind } // will blow up the actor constructor if the bind fails datagramChannel.connect(remoteAddress) datagramChannel @@ -46,13 +49,14 @@ private[io] class UdpConnection(selectorRouter: ActorRef, def receive = { case ChannelRegistered ⇒ bindCommander ! Connected + selector ! ReadInterest context.become(connected, discardOld = true) } def connected: Receive = { case StopReading ⇒ selector ! DisableReadInterest case ResumeReading ⇒ selector ! ReadInterest - case ChannelReadable ⇒ doRead(handler) + case ChannelReadable ⇒ println("read"); doRead(handler) case Close ⇒ log.debug("Closing UDP connection to {}", remoteAddress) @@ -77,14 +81,17 @@ private[io] class UdpConnection(selectorRouter: ActorRef, } def doRead(handler: ActorRef): Unit = { - val buffer = bufferPool.acquire() - try { + @tailrec def innerRead(readsLeft: Int, buffer: ByteBuffer): Unit = { buffer.clear() buffer.limit(DirectBufferSize) - if (channel.read(buffer) > 0) handler ! Received(ByteString(buffer)) - - } finally { + if (channel.read(buffer) > 0) { + handler ! Received(ByteString(buffer)) + innerRead(readsLeft - 1, buffer) + } + } + val buffer = bufferPool.acquire() + try innerRead(BatchReceiveLimit, buffer) finally { selector ! ReadInterest bufferPool.release(buffer) } @@ -104,7 +111,6 @@ private[io] class UdpConnection(selectorRouter: ActorRef, // Datagram channel either sends the whole message, or nothing if (writtenBytes == 0) commander ! CommandFailed(send) else if (send.wantsAck) commander ! send.ack - } finally { udpConn.bufferPool.release(buffer) pendingSend = null diff --git a/akka-actor/src/main/scala/akka/io/UdpFF.scala b/akka-actor/src/main/scala/akka/io/UdpFF.scala index 51b1b7429c..f6d86f7053 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFF.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFF.scala @@ -34,7 +34,7 @@ object UdpFF extends ExtensionKey[UdpFFExt] { options: immutable.Traversable[SocketOption] = Nil) extends Command case object Unbind extends Command - case object SimpleSender extends Command + case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command case object StopReading extends Command case object ResumeReading extends Command diff --git a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala index 3f464bccc5..aa8000eb0b 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFListener.scala @@ -15,6 +15,8 @@ import scala.collection.immutable import scala.util.control.NonFatal import akka.io.UdpFF.Received import akka.io.SelectionHandler.RegisterChannel +import scala.annotation.tailrec +import java.nio.ByteBuffer private[io] class UdpFFListener(selectorRouter: ActorRef, handler: ActorRef, @@ -34,6 +36,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, datagramChannel.configureBlocking(false) val socket = datagramChannel.socket options.foreach(_.beforeDatagramBind(socket)) + // FIXME: signal bind failures socket.bind(endpoint) // will blow up the actor constructor if the bind fails datagramChannel } @@ -60,8 +63,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, } def doReceive(handler: ActorRef): Unit = { - val buffer = bufferPool.acquire() - try { + @tailrec def innerReceive(readsLeft: Int, buffer: ByteBuffer) { buffer.clear() buffer.limit(DirectBufferSize) @@ -69,11 +71,16 @@ private[io] class UdpFFListener(selectorRouter: ActorRef, case sender: InetSocketAddress ⇒ buffer.flip() handler ! Received(ByteString(buffer), sender) - case _ ⇒ // Ignore + if (readsLeft > 0) innerReceive(readsLeft - 1, buffer) + case null ⇒ // null means no data was available } + } + val buffer = bufferPool.acquire() + try innerReceive(BatchReceiveLimit, buffer) finally { + bufferPool.release(buffer) selector ! ReadInterest - } finally bufferPool.release(buffer) + } } override def postStop() { diff --git a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala index 3f5c677991..28c47ef995 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFManager.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFManager.scala @@ -44,17 +44,13 @@ import akka.io.UdpFF._ */ private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) { - // FIXME: fix close overs - lazy val anonymousSender: ActorRef = context.actorOf( - props = Props(new UdpFFSender(udpFF, selectorPool)), - name = "simplesend") - def receive = workerForCommand { case Bind(handler, endpoint, options) ⇒ val commander = sender Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options)) - } orElse { - case SimpleSender ⇒ anonymousSender forward SimpleSender + case SimpleSender(options) ⇒ + val commander = sender + Props(new UdpFFSender(udpFF, options, commander)) } } diff --git a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala index 24e222e71c..5303aa79d9 100644 --- a/akka-actor/src/main/scala/akka/io/UdpFFSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpFFSender.scala @@ -7,27 +7,34 @@ import akka.actor._ import java.nio.channels.DatagramChannel import akka.io.UdpFF._ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } +import scala.collection.immutable +import akka.io.Inet.SocketOption /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. */ -private[io] class UdpFFSender(val udpFF: UdpFFExt, val selector: ActorRef) +private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) extends Actor with ActorLogging with WithUdpFFSend { + def selector: ActorRef = context.parent + val channel = { val datagramChannel = DatagramChannel.open datagramChannel.configureBlocking(false) + val socket = datagramChannel.socket + + options foreach { o ⇒ + o.beforeDatagramBind(socket) + } + datagramChannel } selector ! RegisterChannel(channel, 0) def receive: Receive = { - case ChannelRegistered ⇒ context.become(simpleSendHandlers orElse sendHandlers, discardOld = true) - case _ ⇒ sender ! SimpleSendReady // FIXME: queueing here? - } - - def simpleSendHandlers: Receive = { - case SimpleSender ⇒ sender ! SimpleSendReady + case ChannelRegistered ⇒ + context.become(sendHandlers, discardOld = true) + commander ! SimpleSendReady } override def postStop(): Unit = if (channel.isOpen) channel.close() diff --git a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala index e04c4ca6f5..09dd37666c 100644 --- a/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala +++ b/akka-actor/src/main/scala/akka/io/WithUdpFFSend.scala @@ -12,6 +12,9 @@ private[io] trait WithUdpFFSend { me: Actor with ActorLogging ⇒ var pendingSend: (Send, ActorRef) = null + // If send fails first, we allow a second go after selected writable, but no more. This flag signals that + // pending send was already tried once. + var retriedSend = false def writePending = pendingSend ne null def selector: ActorRef @@ -33,7 +36,7 @@ private[io] trait WithUdpFFSend { case send: Send ⇒ pendingSend = (send, sender) - selector ! WriteInterest + doSend() case ChannelWritable ⇒ doSend() @@ -51,12 +54,19 @@ private[io] trait WithUdpFFSend { if (TraceLogging) log.debug("Wrote {} bytes to channel", writtenBytes) // Datagram channel either sends the whole message, or nothing - if (writtenBytes == 0) commander ! CommandFailed(send) - else if (send.wantsAck) commander ! send.ack + if (writtenBytes == 0) { + if (retriedSend) { + commander ! CommandFailed(send) + retriedSend = false + pendingSend = null + } else { + selector ! WriteInterest + retriedSend = true + } + } else if (send.wantsAck) commander ! send.ack } finally { udpFF.bufferPool.release(buffer) - pendingSend = null } }