Option to process each message sequentially (safer), or only each read sequentially (better performing)

This commit is contained in:
Derek Williams 2011-05-26 19:24:38 -06:00
parent a2cc661f22
commit 4acce04e77
2 changed files with 35 additions and 12 deletions

View file

@ -38,14 +38,18 @@ object IOActorSpec {
class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends IOActor {
sequentialIO = false
override def preStart: Unit = {
token = connect(ioManager, host, port)
}
def receiveIO = {
case bytes: ByteString
println("C: Sending Request")
write(bytes)
self reply read(bytes.length)
println("C: Got Response")
}
}
}
@ -55,7 +59,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
"an IO Actor" must {
"run" in {
val ioManager = Actor.actorOf(new IOManager(2)).start
val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
val promise1 = client !!! ByteString("Hello World!1")

View file

@ -85,8 +85,12 @@ trait IO {
trait IOActor extends Actor with IO {
var sequentialIO = true
var messages: Queue[MessageInvocation] = Queue.empty
var continuations: Map[MessageInvocation, (Int, ByteString Unit)] = Map.empty
var readBytes: Queue[ByteString] = Queue.empty
var readBytesLength: Int = 0
@ -100,31 +104,39 @@ trait IOActor extends Actor with IO {
var currentMessage: MessageInvocation = _
def read(len: Int): ByteString @suspendable = shift { cont: (ByteString Unit)
waitingFor = len
next = Some(cont)
if (next.isEmpty) {
waitingFor = len
next = Some(cont)
} else {
messages :+= self.currentMessage
continuations += (self.currentMessage -> (len, cont))
}
run()
}
def write(bytes: ByteString): Unit = write(token, bytes)
def receive = {
final def receive = {
case IO.Read(t, newBytes) if token == t
readBytes :+= newBytes
readBytesLength += newBytes.length
run()
case IO.Connected(t) if token == t ()
case IO.Closed(t) if token == t self.stop
case msg if next.isDefined
messages = messages.enqueue(self.currentMessage)
case msg if receiveIO.isDefinedAt(msg)
currentMessage = self.currentMessage
reset { receiveIO(msg) }
case msg if next.isDefined && sequentialIO
messages :+= self.currentMessage
case msg if _receiveIO.isDefinedAt(msg)
if (next.isEmpty) currentMessage = self.currentMessage
reset { _receiveIO(msg) }
()
}
def receiveIO: PartialFunction[Any, Unit @suspendable]
def run(): Unit = {
private lazy val _receiveIO = receiveIO
@tailrec
private def run(): Unit = {
self.currentMessage = currentMessage
while (next.isDefined && readBytesLength >= waitingFor) {
var left = waitingFor
@ -149,10 +161,17 @@ trait IOActor extends Actor with IO {
if (next.isEmpty && messages.nonEmpty) {
val (msg, rest) = messages.dequeue
messages = rest
self.invoke(msg)
continuations.get(msg) match {
case Some((len, cont))
continuations -= msg
currentMessage = msg
next = Some(cont)
waitingFor = len
case _ self.invoke(msg)
}
run()
}
}
}
class IOManager(bufferSize: Int = 8192) extends Actor {