add NIO trait for Actor to handle nonblocking IO

This commit is contained in:
Derek Williams 2011-05-20 22:14:59 -06:00
parent cd18e72342
commit 170eb47ab0
2 changed files with 207 additions and 0 deletions

View file

@ -0,0 +1,104 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterEach
import java.nio.ByteBuffer
import java.nio.channels.{ SelectableChannel, SocketChannel, ServerSocketChannel, SelectionKey }
import java.io.IOException
import java.net.InetSocketAddress
import scala.collection.immutable.Queue
object NIOActorSpec {
class NIOTestActor(val nioHandler: NIOHandler) extends Actor with NIO {
val readBuffer = ByteBuffer.allocate(8192)
var writeBuffers = Map.empty[SocketChannel, Queue[ByteBuffer]].withDefaultValue(Queue.empty)
override def preStart = {
val inetAddress = new InetSocketAddress("localhost", 8064)
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
channel.socket.bind(inetAddress)
nioHandler.register(channel, SelectionKey.OP_ACCEPT)
}
def accept(channel: SelectableChannel): Unit = channel match {
case serverChannel: ServerSocketChannel
val ch = serverChannel.accept()
ch.configureBlocking(false)
nioHandler.register(ch, SelectionKey.OP_READ)
}
def read(channel: SelectableChannel): Unit = channel match {
case ch: SocketChannel
readBuffer.clear
try {
val readLen = ch.read(readBuffer)
if (readLen == -1) {
nioHandler.unregister(ch)
ch.close
} else {
val ar = new Array[Byte](readLen)
readBuffer.flip
readBuffer.get(ar)
writeBuffers += (ch -> writeBuffers(ch).enqueue(ByteBuffer.wrap(ar)))
nioHandler.interestOps(ch, SelectionKey.OP_WRITE)
}
} catch {
case e: IOException
nioHandler.unregister(ch)
ch.close
}
}
def write(channel: SelectableChannel): Unit = {
def writeChannel(ch: SocketChannel): Unit = {
val queue = writeBuffers(ch)
if (queue.nonEmpty) {
val (buf, bufs) = writeBuffers(ch).dequeue
ch.write(buf)
if (buf.remaining == 0) {
if (bufs.isEmpty) {
writeBuffers -= ch
nioHandler.interestOps(ch, SelectionKey.OP_READ)
} else {
writeBuffers += (ch -> bufs)
writeChannel(ch)
}
}
}
}
channel match {
case ch: SocketChannel
writeChannel(channel.asInstanceOf[SocketChannel])
}
}
def receive = {
case msg println(msg)
}
}
}
class NIOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
import NIOActorSpec._
"an NIO Actor" must {
"run" in {
val nioHandler = new NIOHandler
val actor = Actor.actorOf(new NIOTestActor(nioHandler)).start
Thread.sleep(600000)
}
}
}

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import java.nio.channels.{ SelectableChannel, Selector, SelectionKey }
import java.io.IOException
import java.util.concurrent.atomic.AtomicReference
object NIO {
private[akka] case class Accept(key: SelectionKey)
private[akka] case class Read(key: SelectionKey)
private[akka] case class Write(key: SelectionKey)
}
class NIOHandler {
private[akka] val activeKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[SelectionKey]())
private val pendingInterestOps = new AtomicReference(Map.empty[SelectableChannel, Int])
private val pendingRegister = new AtomicReference(Map.empty[SelectableChannel, (Int, ActorRef)])
private val selector: Selector = Selector.open()
private val select = new Runnable {
def run(): Unit = {
while (true) {
selector.select
selector.selectedKeys.removeAll(activeKeys)
val selectedKeys = selector.selectedKeys.iterator
while (selectedKeys.hasNext) {
val key = selectedKeys.next
selectedKeys.remove
activeKeys.add(key)
if (key.isValid) {
val owner = key.attachment.asInstanceOf[ActorRef]
if (key.isAcceptable) owner ! NIO.Accept(key)
else if (key.isReadable) owner ! NIO.Read(key)
else if (key.isWritable) owner ! NIO.Write(key)
}
}
pendingRegister.getAndSet(Map.empty[SelectableChannel, (Int, ActorRef)]) foreach {
case (channel, (interestOps, actor)) channel.register(selector, interestOps, actor)
}
pendingInterestOps.getAndSet(Map.empty[SelectableChannel, Int]) foreach {
case (channel, interestOps) channel.keyFor(selector).interestOps(interestOps)
}
}
}
}
private val thread = new Thread(select)
thread.start
@scala.annotation.tailrec
final def register(channel: SelectableChannel, interestOps: Int)(implicit owner: Some[ActorRef]): Unit = {
val orig = pendingRegister.get
if (pendingRegister.compareAndSet(orig, orig + (channel -> (interestOps, owner.get))))
selector.wakeup
else
register(channel, interestOps)
}
final def unregister(channel: SelectableChannel): Unit =
channel.keyFor(selector).cancel
@scala.annotation.tailrec
final def interestOps(channel: SelectableChannel, interestOps: Int): Unit = {
val orig = pendingInterestOps.get
if (pendingInterestOps.compareAndSet(orig, orig + (channel -> interestOps)))
selector.wakeup
else
this.interestOps(channel, interestOps)
}
}
trait NIO {
this: Actor
import NIO._
def nioHandler: NIOHandler
val originalReceive = receive
become {
case Accept(key)
accept(key.channel())
nioHandler.activeKeys.remove(key)
case Read(key)
read(key.channel())
nioHandler.activeKeys.remove(key)
case Write(key)
write(key.channel())
nioHandler.activeKeys.remove(key)
case other if originalReceive.isDefinedAt(other) originalReceive(other)
}
def accept(channel: SelectableChannel): Unit
def read(channel: SelectableChannel): Unit
def write(channel: SelectableChannel): Unit
}