Various minor changes:

- added forwarders to SO objects to Inet
 - added batch read support for UDP
 - changed write for UDP
 - SimpleSender is now registered as everone else
This commit is contained in:
Endre Sándor Varga 2013-02-06 11:38:42 +01:00
parent e2ce4644f1
commit 116dcc0e54
14 changed files with 85 additions and 40 deletions

View file

@ -6,7 +6,6 @@ package akka.io
import akka.testkit.AkkaSpec
import akka.util.ByteString
import akka.io.Inet
import Tcp._
import TestUtils._
import akka.testkit.EventFilter
@ -65,8 +64,8 @@ class IntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with IntegrationS
expectReceivedData(clientHandler, 100000)
override def bindOptions = List(Inet.SO.SendBufferSize(1024))
override def connectOptions = List(Inet.SO.ReceiveBufferSize(1024))
override def bindOptions = List(SO.SendBufferSize(1024))
override def connectOptions = List(SO.ReceiveBufferSize(1024))
}
}

View file

@ -19,9 +19,9 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
(address, commander.sender)
}
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress): ActorRef = {
def connectUdp(localAddress: Option[InetSocketAddress], remoteAddress: InetSocketAddress, handler: ActorRef): ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpConn), UdpConn.Connect(testActor, localAddress, remoteAddress, Nil))
commander.send(IO(UdpConn), UdpConn.Connect(handler, localAddress, remoteAddress, Nil))
commander.expectMsg(UdpConn.Connected)
commander.sender
}
@ -32,7 +32,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
val (serverAddress, server) = bindUdp(testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(localAddress = None, serverAddress) ! UdpConn.Send(data1)
connectUdp(localAddress = None, serverAddress, testActor) ! UdpConn.Send(data1)
val clientAddress = expectMsgPF() {
case UdpFF.Received(d, a)
@ -55,7 +55,7 @@ class UdpConnIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Impli
val (serverAddress, server) = bindUdp(testActor)
val data1 = ByteString("To infinity and beyond!")
val data2 = ByteString("All your datagram belong to us")
connectUdp(Some(clientAddress), serverAddress) ! UdpConn.Send(data1)
connectUdp(Some(clientAddress), serverAddress, testActor) ! UdpConn.Send(data1)
expectMsgPF() {
case UdpFF.Received(d, a)

View file

@ -22,7 +22,7 @@ class UdpFFIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with Implici
val simpleSender: ActorRef = {
val commander = TestProbe()
commander.send(IO(UdpFF), SimpleSender)
commander.send(IO(UdpFF), SimpleSender(Nil))
commander.expectMsg(SimpleSendReady)
commander.sender
}

View file

@ -470,6 +470,11 @@ akka {
# this many times before giving up
selector-association-retries = 10
# The maximum number of datagrams that are read in one go,
# higher numbers decrease latency, lower numbers increase fairness on
# the worker-dispatcher
batch-receive-limit = 3
# The number of bytes per direct buffer in the pool used to read or write
# network data from the kernel.
direct-buffer-size = 128 KiB
@ -526,6 +531,11 @@ akka {
# this many times before giving up
selector-association-retries = 10
# The maximum number of datagrams that are read in one go,
# higher numbers decrease latency, lower numbers increase fairness on
# the worker-dispatcher
batch-receive-limit = 3
# The number of bytes per direct buffer in the pool used to read or write
# network data from the kernel.
direct-buffer-size = 128 KiB

View file

@ -79,4 +79,11 @@ object Inet {
}
trait SoForwarders {
val ReceiveBufferSize = SO.ReceiveBufferSize
val ReuseAddress = SO.ReuseAddress
val SendBufferSize = SO.SendBufferSize
val TrafficClass = SO.TrafficClass
}
}

View file

@ -208,6 +208,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler
selectorManagementDispatcher.execute(select) // start selection "loop"
// FIXME: Add possibility to signal failure of task to someone
abstract class Task extends Runnable {
def tryRun()
def run() {

View file

@ -19,7 +19,7 @@ object Tcp extends ExtensionKey[TcpExt] {
override def get(system: ActorSystem): TcpExt = system.extension(this)
// shared socket options
object SO {
object SO extends Inet.SoForwarders {
// general socket options

View file

@ -10,7 +10,7 @@ import akka.actor.{ Props, ActorSystemImpl }
object Udp {
object SO {
object SO extends Inet.SoForwarders {
/**
* [[akka.io.Inet.SocketOption]] to set the SO_BROADCAST option
@ -29,9 +29,11 @@ object Udp {
val NrOfSelectors = getInt("nr-of-selectors")
val DirectBufferSize = getIntBytes("direct-buffer-size")
val MaxDirectBufferPoolSize = getInt("max-direct-buffer-pool-size")
val BatchReceiveLimit = getInt("batch-receive-limit")
val ManagementDispatcher = getString("management-dispatcher")
// FIXME: Use new requiring
require(NrOfSelectors > 0, "nr-of-selectors must be > 0")
override val MaxChannelsPerSelector = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1)

View file

@ -13,6 +13,8 @@ import java.nio.channels.DatagramChannel
import java.nio.channels.SelectionKey._
import scala.collection.immutable
import scala.util.control.NonFatal
import java.nio.ByteBuffer
import scala.annotation.tailrec
private[io] class UdpConnection(selectorRouter: ActorRef,
handler: ActorRef,
@ -36,6 +38,7 @@ private[io] class UdpConnection(selectorRouter: ActorRef,
datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket
options.foreach(_.beforeDatagramBind(socket))
// FIXME: All bind failures have to be reported to the commander in TCP as well
localAddress foreach { socket.bind } // will blow up the actor constructor if the bind fails
datagramChannel.connect(remoteAddress)
datagramChannel
@ -46,13 +49,14 @@ private[io] class UdpConnection(selectorRouter: ActorRef,
def receive = {
case ChannelRegistered
bindCommander ! Connected
selector ! ReadInterest
context.become(connected, discardOld = true)
}
def connected: Receive = {
case StopReading selector ! DisableReadInterest
case ResumeReading selector ! ReadInterest
case ChannelReadable doRead(handler)
case ChannelReadable println("read"); doRead(handler)
case Close
log.debug("Closing UDP connection to {}", remoteAddress)
@ -77,14 +81,17 @@ private[io] class UdpConnection(selectorRouter: ActorRef,
}
def doRead(handler: ActorRef): Unit = {
val buffer = bufferPool.acquire()
try {
@tailrec def innerRead(readsLeft: Int, buffer: ByteBuffer): Unit = {
buffer.clear()
buffer.limit(DirectBufferSize)
if (channel.read(buffer) > 0) handler ! Received(ByteString(buffer))
} finally {
if (channel.read(buffer) > 0) {
handler ! Received(ByteString(buffer))
innerRead(readsLeft - 1, buffer)
}
}
val buffer = bufferPool.acquire()
try innerRead(BatchReceiveLimit, buffer) finally {
selector ! ReadInterest
bufferPool.release(buffer)
}
@ -104,7 +111,6 @@ private[io] class UdpConnection(selectorRouter: ActorRef,
// Datagram channel either sends the whole message, or nothing
if (writtenBytes == 0) commander ! CommandFailed(send)
else if (send.wantsAck) commander ! send.ack
} finally {
udpConn.bufferPool.release(buffer)
pendingSend = null

View file

@ -34,7 +34,7 @@ object UdpFF extends ExtensionKey[UdpFFExt] {
options: immutable.Traversable[SocketOption] = Nil) extends Command
case object Unbind extends Command
case object SimpleSender extends Command
case class SimpleSender(options: immutable.Traversable[SocketOption] = Nil) extends Command
case object StopReading extends Command
case object ResumeReading extends Command

View file

@ -15,6 +15,8 @@ import scala.collection.immutable
import scala.util.control.NonFatal
import akka.io.UdpFF.Received
import akka.io.SelectionHandler.RegisterChannel
import scala.annotation.tailrec
import java.nio.ByteBuffer
private[io] class UdpFFListener(selectorRouter: ActorRef,
handler: ActorRef,
@ -34,6 +36,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef,
datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket
options.foreach(_.beforeDatagramBind(socket))
// FIXME: signal bind failures
socket.bind(endpoint) // will blow up the actor constructor if the bind fails
datagramChannel
}
@ -60,8 +63,7 @@ private[io] class UdpFFListener(selectorRouter: ActorRef,
}
def doReceive(handler: ActorRef): Unit = {
val buffer = bufferPool.acquire()
try {
@tailrec def innerReceive(readsLeft: Int, buffer: ByteBuffer) {
buffer.clear()
buffer.limit(DirectBufferSize)
@ -69,11 +71,16 @@ private[io] class UdpFFListener(selectorRouter: ActorRef,
case sender: InetSocketAddress
buffer.flip()
handler ! Received(ByteString(buffer), sender)
case _ // Ignore
if (readsLeft > 0) innerReceive(readsLeft - 1, buffer)
case null // null means no data was available
}
}
val buffer = bufferPool.acquire()
try innerReceive(BatchReceiveLimit, buffer) finally {
bufferPool.release(buffer)
selector ! ReadInterest
} finally bufferPool.release(buffer)
}
}
override def postStop() {

View file

@ -44,17 +44,13 @@ import akka.io.UdpFF._
*/
private[io] class UdpFFManager(udpFF: UdpFFExt) extends SelectorBasedManager(udpFF.settings, udpFF.settings.NrOfSelectors) {
// FIXME: fix close overs
lazy val anonymousSender: ActorRef = context.actorOf(
props = Props(new UdpFFSender(udpFF, selectorPool)),
name = "simplesend")
def receive = workerForCommand {
case Bind(handler, endpoint, options)
val commander = sender
Props(new UdpFFListener(selectorPool, handler, endpoint, commander, udpFF, options))
} orElse {
case SimpleSender anonymousSender forward SimpleSender
case SimpleSender(options)
val commander = sender
Props(new UdpFFSender(udpFF, options, commander))
}
}

View file

@ -7,27 +7,34 @@ import akka.actor._
import java.nio.channels.DatagramChannel
import akka.io.UdpFF._
import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel }
import scala.collection.immutable
import akka.io.Inet.SocketOption
/**
* Base class for TcpIncomingConnection and TcpOutgoingConnection.
*/
private[io] class UdpFFSender(val udpFF: UdpFFExt, val selector: ActorRef)
private[io] class UdpFFSender(val udpFF: UdpFFExt, options: immutable.Traversable[SocketOption], val commander: ActorRef)
extends Actor with ActorLogging with WithUdpFFSend {
def selector: ActorRef = context.parent
val channel = {
val datagramChannel = DatagramChannel.open
datagramChannel.configureBlocking(false)
val socket = datagramChannel.socket
options foreach { o
o.beforeDatagramBind(socket)
}
datagramChannel
}
selector ! RegisterChannel(channel, 0)
def receive: Receive = {
case ChannelRegistered context.become(simpleSendHandlers orElse sendHandlers, discardOld = true)
case _ sender ! SimpleSendReady // FIXME: queueing here?
}
def simpleSendHandlers: Receive = {
case SimpleSender sender ! SimpleSendReady
case ChannelRegistered
context.become(sendHandlers, discardOld = true)
commander ! SimpleSendReady
}
override def postStop(): Unit = if (channel.isOpen) channel.close()

View file

@ -12,6 +12,9 @@ private[io] trait WithUdpFFSend {
me: Actor with ActorLogging
var pendingSend: (Send, ActorRef) = null
// If send fails first, we allow a second go after selected writable, but no more. This flag signals that
// pending send was already tried once.
var retriedSend = false
def writePending = pendingSend ne null
def selector: ActorRef
@ -33,7 +36,7 @@ private[io] trait WithUdpFFSend {
case send: Send
pendingSend = (send, sender)
selector ! WriteInterest
doSend()
case ChannelWritable doSend()
@ -51,12 +54,19 @@ private[io] trait WithUdpFFSend {
if (TraceLogging) log.debug("Wrote {} bytes to channel", writtenBytes)
// Datagram channel either sends the whole message, or nothing
if (writtenBytes == 0) commander ! CommandFailed(send)
else if (send.wantsAck) commander ! send.ack
if (writtenBytes == 0) {
if (retriedSend) {
commander ! CommandFailed(send)
retriedSend = false
pendingSend = null
} else {
selector ! WriteInterest
retriedSend = true
}
} else if (send.wantsAck) commander ! send.ack
} finally {
udpFF.bufferPool.release(buffer)
pendingSend = null
}
}