diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index 34dd236594..e68199bea6 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -38,14 +38,18 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends IOActor { + sequentialIO = false + override def preStart: Unit = { token = connect(ioManager, host, port) } def receiveIO = { case bytes: ByteString ⇒ + println("C: Sending Request") write(bytes) self reply read(bytes.length) + println("C: Got Response") } } } @@ -55,7 +59,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { "an IO Actor" must { "run" in { - val ioManager = Actor.actorOf(new IOManager(2)).start + val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start val promise1 = client !!! ByteString("Hello World!1") diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index cffcbeb655..23a431fe22 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -85,8 +85,12 @@ trait IO { trait IOActor extends Actor with IO { + var sequentialIO = true + var messages: Queue[MessageInvocation] = Queue.empty + var continuations: Map[MessageInvocation, (Int, ByteString ⇒ Unit)] = Map.empty + var readBytes: Queue[ByteString] = Queue.empty var readBytesLength: Int = 0 @@ -100,31 +104,39 @@ trait IOActor extends Actor with IO { var currentMessage: MessageInvocation = _ def read(len: Int): ByteString @suspendable = shift { cont: (ByteString ⇒ Unit) ⇒ - waitingFor = len - next = Some(cont) + if (next.isEmpty) { + waitingFor = len + next = Some(cont) + } else { + messages :+= self.currentMessage + continuations += (self.currentMessage -> (len, cont)) + } run() } def write(bytes: ByteString): Unit = write(token, bytes) - def receive = { + final def receive = { case IO.Read(t, newBytes) if token == t ⇒ readBytes :+= newBytes readBytesLength += newBytes.length run() case IO.Connected(t) if token == t ⇒ () case IO.Closed(t) if token == t ⇒ self.stop - case msg if next.isDefined ⇒ - messages = messages.enqueue(self.currentMessage) - case msg if receiveIO.isDefinedAt(msg) ⇒ - currentMessage = self.currentMessage - reset { receiveIO(msg) } + case msg if next.isDefined && sequentialIO ⇒ + messages :+= self.currentMessage + case msg if _receiveIO.isDefinedAt(msg) ⇒ + if (next.isEmpty) currentMessage = self.currentMessage + reset { _receiveIO(msg) } () } def receiveIO: PartialFunction[Any, Unit @suspendable] - def run(): Unit = { + private lazy val _receiveIO = receiveIO + + @tailrec + private def run(): Unit = { self.currentMessage = currentMessage while (next.isDefined && readBytesLength >= waitingFor) { var left = waitingFor @@ -149,10 +161,17 @@ trait IOActor extends Actor with IO { if (next.isEmpty && messages.nonEmpty) { val (msg, rest) = messages.dequeue messages = rest - self.invoke(msg) + continuations.get(msg) match { + case Some((len, cont)) ⇒ + continuations -= msg + currentMessage = msg + next = Some(cont) + waitingFor = len + case _ ⇒ self.invoke(msg) + } + run() } } - } class IOManager(bufferSize: Int = 8192) extends Actor {