diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 07d8d6a116..6cd8dcd6cd 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -22,6 +22,7 @@ import java.nio.channels.{ CancelledKeyException } +import scala.collection.mutable import scala.collection.immutable.Queue import scala.annotation.tailrec import scala.util.continuations._ @@ -84,38 +85,46 @@ trait IO { } object IOActor { - case class TokenState(messages: Queue[MessageInvocation], readBytes: Queue[ByteString], readBytesLength: Int) - val emptyTokenState = TokenState(Queue.empty, Queue.empty, 0) + class TokenState(val messages: mutable.Queue[MessageInvocation], val readBytes: mutable.Queue[ByteString], var readBytesLength: Int) { + def this() = this(mutable.Queue.empty, mutable.Queue.empty, 0) + } } trait IOActor extends Actor with IO { import IOActor._ - var sequentialIO = true + protected var sequentialIO = true - private var _messages: Queue[MessageInvocation] = Queue.empty + private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty - private var _state: Map[IO.Token, TokenState] = Map.empty.withDefaultValue(emptyTokenState) + private var _state: Map[IO.Token, TokenState] = Map.empty private var _continuations: Map[MessageInvocation, (Int, ByteString ⇒ Unit)] = Map.empty - def read(token: IO.Token, len: Int): ByteString @suspendable = shift { cont: (ByteString ⇒ Unit) ⇒ - val state = _state(token) - _state += (token -> state.copy(messages = state.messages :+ self.currentMessage)) + private def state(token: IO.Token): TokenState = _state.get(token) match { + case Some(s) ⇒ s + case _ ⇒ + val s = new TokenState() + _state += (token -> s) + s + } + + protected def read(token: IO.Token, len: Int): ByteString @suspendable = shift { cont: (ByteString ⇒ Unit) ⇒ + state(token).messages enqueue self.currentMessage _continuations += (self.currentMessage -> (len, cont)) run(token) } final def receive = { case IO.Read(token, newBytes) ⇒ - val state = _state(token) - _state += (token -> state.copy(readBytes = state.readBytes :+ newBytes, - readBytesLength = state.readBytesLength + newBytes.length)) + val st = state(token) + st.readBytes enqueue newBytes + st.readBytesLength += newBytes.length run(token) case IO.Connected(token) ⇒ () case IO.Closed(token) ⇒ _state -= token // TODO: clean up better case msg if sequentialIO && _continuations.nonEmpty ⇒ - _messages :+= self.currentMessage + _messages enqueue self.currentMessage case msg if _receiveIO.isDefinedAt(msg) ⇒ reset { _receiveIO(msg) } () @@ -127,36 +136,35 @@ trait IOActor extends Actor with IO { @tailrec private def run(token: IO.Token): Unit = { - val TokenState(messages, readBytes, readBytesLength) = _state(token) - if (messages.nonEmpty) { - val msg = messages.head + val st = state(token) + if (st.messages.nonEmpty) { + val msg = st.messages.head self.currentMessage = msg val Some((waitingFor, continuation)) = _continuations.get(msg) - if (readBytesLength >= waitingFor) { + if (st.readBytesLength >= waitingFor) { + st.messages.dequeue var left = waitingFor var take: List[ByteString] = Nil - var rest = readBytes - while (left > 0 && left >= rest.head.length) { - left -= rest.head.length - take ::= rest.head - rest = rest.tail + while (left > 0 && left >= st.readBytes.head.length) { + val bytes = st.readBytes.dequeue + st.readBytesLength -= bytes.length + left -= bytes.length + take ::= bytes } if (left > 0) { - val last = rest.head - take ::= last.take(left) - rest = last.drop(left) +: rest.tail + val bytes = st.readBytes.dequeue + take ::= bytes take left + (bytes drop left) +=: st.readBytes + st.readBytesLength -= left } val bytes = ByteString.concat(take.reverse: _*) - _state += (token -> TokenState(messages.tail, rest, readBytesLength - waitingFor)) _continuations -= msg continuation(bytes) run(token) } } else { while ((_continuations.isEmpty || !sequentialIO) && _messages.nonEmpty) { - val (msg, rest) = _messages.dequeue - _messages = rest - self invoke msg + self invoke _messages.dequeue } } }