diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/NIOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/NIOActor.scala new file mode 100644 index 0000000000..1694c4d8e7 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/NIOActor.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterEach + +import java.nio.ByteBuffer +import java.nio.channels.{ SelectableChannel, SocketChannel, ServerSocketChannel, SelectionKey } +import java.io.IOException +import java.net.InetSocketAddress + +import scala.collection.immutable.Queue + +object NIOActorSpec { + + class NIOTestActor(val nioHandler: NIOHandler) extends Actor with NIO { + + val readBuffer = ByteBuffer.allocate(8192) + + var writeBuffers = Map.empty[SocketChannel, Queue[ByteBuffer]].withDefaultValue(Queue.empty) + + override def preStart = { + val inetAddress = new InetSocketAddress("localhost", 8064) + val channel = ServerSocketChannel.open() + channel.configureBlocking(false) + channel.socket.bind(inetAddress) + + nioHandler.register(channel, SelectionKey.OP_ACCEPT) + } + + def accept(channel: SelectableChannel): Unit = channel match { + case serverChannel: ServerSocketChannel ⇒ + val ch = serverChannel.accept() + ch.configureBlocking(false) + nioHandler.register(ch, SelectionKey.OP_READ) + } + + def read(channel: SelectableChannel): Unit = channel match { + case ch: SocketChannel ⇒ + readBuffer.clear + try { + val readLen = ch.read(readBuffer) + if (readLen == -1) { + nioHandler.unregister(ch) + ch.close + } else { + val ar = new Array[Byte](readLen) + readBuffer.flip + readBuffer.get(ar) + writeBuffers += (ch -> writeBuffers(ch).enqueue(ByteBuffer.wrap(ar))) + nioHandler.interestOps(ch, SelectionKey.OP_WRITE) + } + } catch { + case e: IOException ⇒ + nioHandler.unregister(ch) + ch.close + } + } + + def write(channel: SelectableChannel): Unit = { + def writeChannel(ch: SocketChannel): Unit = { + val queue = writeBuffers(ch) + if (queue.nonEmpty) { + val (buf, bufs) = writeBuffers(ch).dequeue + ch.write(buf) + if (buf.remaining == 0) { + if (bufs.isEmpty) { + writeBuffers -= ch + nioHandler.interestOps(ch, SelectionKey.OP_READ) + } else { + writeBuffers += (ch -> bufs) + writeChannel(ch) + } + } + } + } + channel match { + case ch: SocketChannel ⇒ + writeChannel(channel.asInstanceOf[SocketChannel]) + } + } + + def receive = { + case msg ⇒ println(msg) + } + } +} + +class NIOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { + import NIOActorSpec._ + + "an NIO Actor" must { + "run" in { + val nioHandler = new NIOHandler + val actor = Actor.actorOf(new NIOTestActor(nioHandler)).start + Thread.sleep(600000) + } + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/NIO.scala b/akka-actor/src/main/scala/akka/actor/NIO.scala new file mode 100644 index 0000000000..2f5d8336e3 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/NIO.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.actor + +import java.nio.channels.{ SelectableChannel, Selector, SelectionKey } +import java.io.IOException +import java.util.concurrent.atomic.AtomicReference + +object NIO { + private[akka] case class Accept(key: SelectionKey) + private[akka] case class Read(key: SelectionKey) + private[akka] case class Write(key: SelectionKey) +} + +class NIOHandler { + private[akka] val activeKeys = java.util.Collections.synchronizedSet(new java.util.HashSet[SelectionKey]()) + + private val pendingInterestOps = new AtomicReference(Map.empty[SelectableChannel, Int]) + private val pendingRegister = new AtomicReference(Map.empty[SelectableChannel, (Int, ActorRef)]) + + private val selector: Selector = Selector.open() + + private val select = new Runnable { + def run(): Unit = { + while (true) { + selector.select + selector.selectedKeys.removeAll(activeKeys) + val selectedKeys = selector.selectedKeys.iterator + while (selectedKeys.hasNext) { + val key = selectedKeys.next + selectedKeys.remove + activeKeys.add(key) + if (key.isValid) { + val owner = key.attachment.asInstanceOf[ActorRef] + if (key.isAcceptable) owner ! NIO.Accept(key) + else if (key.isReadable) owner ! NIO.Read(key) + else if (key.isWritable) owner ! NIO.Write(key) + } + } + pendingRegister.getAndSet(Map.empty[SelectableChannel, (Int, ActorRef)]) foreach { + case (channel, (interestOps, actor)) ⇒ channel.register(selector, interestOps, actor) + } + pendingInterestOps.getAndSet(Map.empty[SelectableChannel, Int]) foreach { + case (channel, interestOps) ⇒ channel.keyFor(selector).interestOps(interestOps) + } + } + } + } + + private val thread = new Thread(select) + thread.start + + @scala.annotation.tailrec + final def register(channel: SelectableChannel, interestOps: Int)(implicit owner: Some[ActorRef]): Unit = { + val orig = pendingRegister.get + if (pendingRegister.compareAndSet(orig, orig + (channel -> (interestOps, owner.get)))) + selector.wakeup + else + register(channel, interestOps) + } + + final def unregister(channel: SelectableChannel): Unit = + channel.keyFor(selector).cancel + + @scala.annotation.tailrec + final def interestOps(channel: SelectableChannel, interestOps: Int): Unit = { + val orig = pendingInterestOps.get + if (pendingInterestOps.compareAndSet(orig, orig + (channel -> interestOps))) + selector.wakeup + else + this.interestOps(channel, interestOps) + } +} + +trait NIO { + this: Actor ⇒ + + import NIO._ + + def nioHandler: NIOHandler + + val originalReceive = receive + + become { + case Accept(key) ⇒ + accept(key.channel()) + nioHandler.activeKeys.remove(key) + case Read(key) ⇒ + read(key.channel()) + nioHandler.activeKeys.remove(key) + case Write(key) ⇒ + write(key.channel()) + nioHandler.activeKeys.remove(key) + case other if originalReceive.isDefinedAt(other) ⇒ originalReceive(other) + } + + def accept(channel: SelectableChannel): Unit + + def read(channel: SelectableChannel): Unit + + def write(channel: SelectableChannel): Unit +}