Hold state in mutable collections to reduce allocations

This commit is contained in:
Derek Williams 2011-05-26 22:36:40 -06:00
parent 23867283eb
commit f5a1dc17b3

View file

@ -22,6 +22,7 @@ import java.nio.channels.{
CancelledKeyException
}
import scala.collection.mutable
import scala.collection.immutable.Queue
import scala.annotation.tailrec
import scala.util.continuations._
@ -84,38 +85,46 @@ trait IO {
}
object IOActor {
case class TokenState(messages: Queue[MessageInvocation], readBytes: Queue[ByteString], readBytesLength: Int)
val emptyTokenState = TokenState(Queue.empty, Queue.empty, 0)
class TokenState(val messages: mutable.Queue[MessageInvocation], val readBytes: mutable.Queue[ByteString], var readBytesLength: Int) {
def this() = this(mutable.Queue.empty, mutable.Queue.empty, 0)
}
}
trait IOActor extends Actor with IO {
import IOActor._
var sequentialIO = true
protected var sequentialIO = true
private var _messages: Queue[MessageInvocation] = Queue.empty
private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty
private var _state: Map[IO.Token, TokenState] = Map.empty.withDefaultValue(emptyTokenState)
private var _state: Map[IO.Token, TokenState] = Map.empty
private var _continuations: Map[MessageInvocation, (Int, ByteString Unit)] = Map.empty
def read(token: IO.Token, len: Int): ByteString @suspendable = shift { cont: (ByteString Unit)
val state = _state(token)
_state += (token -> state.copy(messages = state.messages :+ self.currentMessage))
private def state(token: IO.Token): TokenState = _state.get(token) match {
case Some(s) s
case _
val s = new TokenState()
_state += (token -> s)
s
}
protected def read(token: IO.Token, len: Int): ByteString @suspendable = shift { cont: (ByteString Unit)
state(token).messages enqueue self.currentMessage
_continuations += (self.currentMessage -> (len, cont))
run(token)
}
final def receive = {
case IO.Read(token, newBytes)
val state = _state(token)
_state += (token -> state.copy(readBytes = state.readBytes :+ newBytes,
readBytesLength = state.readBytesLength + newBytes.length))
val st = state(token)
st.readBytes enqueue newBytes
st.readBytesLength += newBytes.length
run(token)
case IO.Connected(token) ()
case IO.Closed(token) _state -= token // TODO: clean up better
case msg if sequentialIO && _continuations.nonEmpty
_messages :+= self.currentMessage
_messages enqueue self.currentMessage
case msg if _receiveIO.isDefinedAt(msg)
reset { _receiveIO(msg) }
()
@ -127,36 +136,35 @@ trait IOActor extends Actor with IO {
@tailrec
private def run(token: IO.Token): Unit = {
val TokenState(messages, readBytes, readBytesLength) = _state(token)
if (messages.nonEmpty) {
val msg = messages.head
val st = state(token)
if (st.messages.nonEmpty) {
val msg = st.messages.head
self.currentMessage = msg
val Some((waitingFor, continuation)) = _continuations.get(msg)
if (readBytesLength >= waitingFor) {
if (st.readBytesLength >= waitingFor) {
st.messages.dequeue
var left = waitingFor
var take: List[ByteString] = Nil
var rest = readBytes
while (left > 0 && left >= rest.head.length) {
left -= rest.head.length
take ::= rest.head
rest = rest.tail
while (left > 0 && left >= st.readBytes.head.length) {
val bytes = st.readBytes.dequeue
st.readBytesLength -= bytes.length
left -= bytes.length
take ::= bytes
}
if (left > 0) {
val last = rest.head
take ::= last.take(left)
rest = last.drop(left) +: rest.tail
val bytes = st.readBytes.dequeue
take ::= bytes take left
(bytes drop left) +=: st.readBytes
st.readBytesLength -= left
}
val bytes = ByteString.concat(take.reverse: _*)
_state += (token -> TokenState(messages.tail, rest, readBytesLength - waitingFor))
_continuations -= msg
continuation(bytes)
run(token)
}
} else {
while ((_continuations.isEmpty || !sequentialIO) && _messages.nonEmpty) {
val (msg, rest) = _messages.dequeue
_messages = rest
self invoke msg
self invoke _messages.dequeue
}
}
}