diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index 6087caa905..f0d30be6d7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -89,7 +89,7 @@ object IOActorSpec { (ByteString("*" + (all.size * 2) + "\r\n") /: all) { case (result, (k, v)) ⇒ val kBytes = ByteString(k) - ByteString.concat(result, ByteString("$" + kBytes.length + "\r\n"), kBytes, ByteString("$" + v.length + "\r\n"), v) + result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v } } } @@ -233,7 +233,6 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { server.stop ioManager.stop } - } } diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index cf1d465e63..f177240d08 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -4,7 +4,7 @@ package akka.actor import akka.config.Supervision.Permanent -import akka.util.{ ByteString, ByteRope } +import akka.util.ByteString import akka.dispatch.MessageInvocation import java.net.InetSocketAddress @@ -113,8 +113,8 @@ object IO { socket } - private class HandleState(var readBytes: ByteRope, var connected: Boolean) { - def this() = this(ByteRope.empty, false) + private class HandleState(var readBytes: ByteString, var connected: Boolean) { + def this() = this(ByteString.empty, false) } sealed trait IOSuspendable[+A] @@ -150,7 +150,7 @@ trait IO { final def receive: Receive = { case Read(handle, newBytes) ⇒ val st = state(handle) - st.readBytes :+= newBytes + st.readBytes ++= newBytes run() case Connected(socket) ⇒ state(socket).connected = true @@ -169,6 +169,19 @@ trait IO { private lazy val _receiveIO = receiveIO + // only reinvoke messages from the original message to avoid stack overflow + private var reinvoked = false + private def reinvoke(): Unit = { + if (!reinvoked && (_next eq Idle) && _messages.nonEmpty) { + try { + reinvoked = true + while ((_next eq Idle) && _messages.nonEmpty) self invoke _messages.dequeue + } finally { + reinvoked = false + } + } + } + @tailrec private def run(): Unit = { _next match { @@ -176,7 +189,7 @@ trait IO { self.currentMessage = message val st = state(handle) if (st.readBytes.length >= waitingFor) { - val bytes = st.readBytes.take(waitingFor).toByteString + val bytes = st.readBytes.take(waitingFor) //.compact st.readBytes = st.readBytes.drop(waitingFor) _next = continuation(bytes) run() @@ -187,7 +200,7 @@ trait IO { val idx = st.readBytes.indexOfSlice(delimiter, scanned) if (idx >= 0) { val index = if (inclusive) idx + delimiter.length else idx - val bytes = st.readBytes.take(index).toByteString + val bytes = st.readBytes.take(index) //.compact st.readBytes = st.readBytes.drop(idx + delimiter.length) _next = continuation(bytes) run() @@ -198,12 +211,12 @@ trait IO { self.currentMessage = message val st = state(handle) if (st.readBytes.length > 0) { - val bytes = st.readBytes.toByteString - st.readBytes = ByteRope.empty + val bytes = st.readBytes //.compact + st.readBytes = ByteString.empty _next = continuation(bytes) run() } - case Idle ⇒ if (_messages.nonEmpty) self invoke _messages.dequeue + case Idle ⇒ reinvoke() } } } diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index e20191b7c5..948489c335 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -9,46 +9,28 @@ import scala.collection.generic.{ CanBuildFrom, GenericCompanion } object ByteString { - def apply(bytes: Array[Byte]): ByteString = new ByteString(bytes.clone) + def apply(bytes: Array[Byte]): ByteString = ByteString1(bytes.clone) def apply(bytes: Byte*): ByteString = { val ar = new Array[Byte](bytes.size) bytes.copyToArray(ar) - new ByteString(ar) + ByteString1(ar) } def apply[T](bytes: T*)(implicit num: Integral[T]): ByteString = - new ByteString(bytes.map(x ⇒ num.toInt(x).toByte)(collection.breakOut)) + ByteString1(bytes.map(x ⇒ num.toInt(x).toByte)(collection.breakOut)) def apply(bytes: ByteBuffer): ByteString = { val ar = new Array[Byte](bytes.remaining) bytes.get(ar) - new ByteString(ar) + ByteString1(ar) } def apply(string: String): ByteString = apply(string, "UTF-8") - def apply(string: String, charset: String): ByteString = new ByteString(string.getBytes(charset)) + def apply(string: String, charset: String): ByteString = ByteString1(string.getBytes(charset)) - def concat(xss: Traversable[Byte]*): ByteString = { - var length = 0 - val li = xss.iterator - while (li.hasNext) { - length += li.next.size - } - val ar = new Array[Byte](length) - var pos = 0 - val i = xss.iterator - while (i.hasNext) { - val cur = i.next - val len = cur.size - cur.copyToArray(ar, pos, len) - pos += len - } - new ByteString(ar) - } - - val empty: ByteString = new ByteString(Array.empty[Byte]) + val empty: ByteString = ByteString1(Array.empty[Byte]) def newBuilder: Builder[Byte, ByteString] = new ArrayBuilder.ofByte mapResult apply @@ -57,128 +39,178 @@ object ByteString { def apply() = newBuilder } -} - -final class ByteString private (bytes: Array[Byte], startIndex: Int, endIndex: Int) extends IndexedSeq[Byte] with IndexedSeqOptimized[Byte, ByteString] { - - private def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length) - - override protected[this] def newBuilder = ByteString.newBuilder - - def apply(idx: Int): Byte = bytes(checkRangeConvert(idx)) - - private def checkRangeConvert(index: Int) = { - val idx = index + startIndex - if (0 <= index && idx < endIndex) - idx - else - throw new IndexOutOfBoundsException(index.toString) + private object ByteString1 { + def apply(bytes: Array[Byte]) = new ByteString1(bytes) } - def length: Int = endIndex - startIndex + final class ByteString1 private (bytes: Array[Byte], startIndex: Int, endIndex: Int) extends ByteString { - override def clone: ByteString = ByteString(toArray) + private def this(bytes: Array[Byte]) = this(bytes, 0, bytes.length) - def toArray: Array[Byte] = { - val ar = new Array[Byte](length) - Array.copy(bytes, startIndex, ar, 0, length) - ar - } + def apply(idx: Int): Byte = bytes(checkRangeConvert(idx)) - def asByteBuffer: ByteBuffer = { - val buffer = ByteBuffer.wrap(bytes, startIndex, length).asReadOnlyBuffer - if (buffer.remaining < bytes.length) buffer.slice - else buffer - } - - def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray) - - def utf8String: String = new String(bytes, "UTF-8") - - def mapI(f: Byte ⇒ Int): ByteString = map(f andThen (_.toByte)) - - override def slice(from: Int, until: Int): ByteString = { - val newStartIndex = math.max(from, 0) + startIndex - val newEndIndex = math.min(until, length) + startIndex - if (newEndIndex - newStartIndex <= 0) ByteString.empty - else new ByteString(bytes, newStartIndex, newEndIndex) - } - - override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit = - Array.copy(bytes, startIndex, xs, start, math.min(math.min(length, len), xs.length - start)) - -} - -object ByteRope { - - def apply(bytes: Array[Byte]): ByteRope = new ByteRope(Vector(ByteString(bytes))) - - def apply(bytes: Byte*): ByteRope = new ByteRope(Vector(ByteString(bytes: _*))) - - def apply[T](bytes: T*)(implicit num: Integral[T]): ByteRope = new ByteRope(Vector(ByteString(bytes: _*)(num))) - - def apply(bytes: ByteBuffer): ByteRope = new ByteRope(Vector(ByteString(bytes))) - - def apply(string: String): ByteRope = new ByteRope(Vector(ByteString(string))) - - def apply(string: String, charset: String): ByteRope = new ByteRope(Vector(ByteString(string, charset))) - - def empty: ByteRope = new ByteRope(Vector.empty) - - def newBuilder: Builder[Byte, ByteRope] = new ArrayBuilder.ofByte mapResult apply - - implicit def canBuildFrom = new CanBuildFrom[TraversableOnce[Byte], Byte, ByteRope] { - def apply(from: TraversableOnce[Byte]) = newBuilder - def apply() = newBuilder - } - -} - -final class ByteRope private (bytestrings: Vector[ByteString]) extends IndexedSeq[Byte] with IndexedSeqOptimized[Byte, ByteRope] { - - override protected[this] def newBuilder = ByteRope.newBuilder - - def apply(idx: Int): Byte = - if (0 <= idx && idx < length) { - var pos = 0 - var seen = 0 - while (idx >= seen + bytestrings(pos).length) { - seen += bytestrings(pos).length - pos += 1 - } - bytestrings(pos)(idx - seen) - } else throw new IndexOutOfBoundsException(idx.toString) - - val length: Int = (0 /: bytestrings)(_ + _.length) - - override def slice(from: Int, until: Int): ByteRope = { - val start = math.max(from, 0) - val end = math.min(until, length) - if (end - start <= 0) - ByteRope.empty - else { - var pos = 0 - var seen = 0 - while (from >= seen + bytestrings(pos).length) { - seen += bytestrings(pos).length - pos += 1 - } - val startpos = pos - val startidx = start - seen - while (until > seen + bytestrings(pos).length) { - seen += bytestrings(pos).length - pos += 1 - } - val endpos = pos - val endidx = end - seen - if (startpos == endpos) - new ByteRope(Vector(bytestrings(startpos).slice(startidx, endidx))) + private def checkRangeConvert(index: Int) = { + val idx = index + startIndex + if (0 <= index && idx < endIndex) + idx else - new ByteRope(bytestrings(startpos).drop(startpos) +: bytestrings.slice(startpos + 1, endpos) :+ bytestrings(endpos).take(endidx)) + throw new IndexOutOfBoundsException(index.toString) } + + def length: Int = endIndex - startIndex + + def toArray: Array[Byte] = { + val ar = new Array[Byte](length) + Array.copy(bytes, startIndex, ar, 0, length) + ar + } + + override def clone: ByteString = new ByteString1(toArray) + + def compact: ByteString = + if (startIndex == 0 && endIndex == bytes.length) this + else clone + + def asByteBuffer: ByteBuffer = { + val buffer = ByteBuffer.wrap(bytes, startIndex, length).asReadOnlyBuffer + if (buffer.remaining < bytes.length) buffer.slice + else buffer + } + + def utf8String: String = + new String(if (startIndex == 0 && endIndex == bytes.length) bytes else toArray, "UTF-8") + + def ++(that: ByteString): ByteString = that match { + case b: ByteString1 ⇒ ByteStrings(this, b) + case bs: ByteStrings ⇒ ByteStrings(this, bs) + } + + override def slice(from: Int, until: Int): ByteString = { + val newStartIndex = math.max(from, 0) + startIndex + val newEndIndex = math.min(until, length) + startIndex + if (newEndIndex <= newStartIndex) ByteString.empty + else new ByteString1(bytes, newStartIndex, newEndIndex) + } + + override def copyToArray[A >: Byte](xs: Array[A], start: Int, len: Int): Unit = + Array.copy(bytes, startIndex, xs, start, math.min(math.min(length, len), xs.length - start)) + } - def :+(that: ByteString): ByteRope = new ByteRope(bytestrings :+ that) + private object ByteStrings { + def apply(bytestrings: Vector[ByteString1]): ByteString = new ByteStrings(bytestrings) + + def apply(b1: ByteString1, b2: ByteString1): ByteString = compare(b1, b2) match { + case 3 ⇒ new ByteStrings(Vector(b1, b2)) + case 2 ⇒ b2 + case 1 ⇒ b1 + case 0 ⇒ ByteString.empty + } + + def apply(b: ByteString1, bs: ByteStrings): ByteString = compare(b, bs) match { + case 3 ⇒ new ByteStrings(b +: bs.bytestrings) + case 2 ⇒ bs + case 1 ⇒ b + case 0 ⇒ ByteString.empty + } + + def apply(bs: ByteStrings, b: ByteString1): ByteString = compare(bs, b) match { + case 3 ⇒ new ByteStrings(bs.bytestrings :+ b) + case 2 ⇒ b + case 1 ⇒ bs + case 0 ⇒ ByteString.empty + } + + def apply(bs1: ByteStrings, bs2: ByteStrings): ByteString = compare(bs1, bs2) match { + case 3 ⇒ new ByteStrings(bs1.bytestrings ++ bs2.bytestrings) + case 2 ⇒ bs2 + case 1 ⇒ bs1 + case 0 ⇒ ByteString.empty + } + + // 0: both empty, 1: 2nd empty, 2: 1st empty, 3: neither empty + def compare(b1: ByteString, b2: ByteString): Int = + if (b1.length == 0) + if (b2.length == 0) 0 else 2 + else if (b2.length == 0) 1 else 3 + } + + final class ByteStrings private (private val bytestrings: Vector[ByteString1]) extends ByteString { + + def apply(idx: Int): Byte = + if (0 <= idx && idx < length) { + var pos = 0 + var seen = 0 + while (idx >= seen + bytestrings(pos).length) { + seen += bytestrings(pos).length + pos += 1 + } + bytestrings(pos)(idx - seen) + } else throw new IndexOutOfBoundsException(idx.toString) + + val length: Int = (0 /: bytestrings)(_ + _.length) + + override def slice(from: Int, until: Int): ByteString = { + val start = math.max(from, 0) + val end = math.min(until, length) + if (end <= start) + ByteString.empty + else { + var pos = 0 + var seen = 0 + while (from >= seen + bytestrings(pos).length) { + seen += bytestrings(pos).length + pos += 1 + } + val startpos = pos + val startidx = start - seen + while (until > seen + bytestrings(pos).length) { + seen += bytestrings(pos).length + pos += 1 + } + val endpos = pos + val endidx = end - seen + if (startpos == endpos) + bytestrings(startpos).slice(startidx, endidx) + else { + val first = bytestrings(startpos).drop(startidx).asInstanceOf[ByteString1] + val last = bytestrings(endpos).take(endidx).asInstanceOf[ByteString1] + if ((endpos - startpos) == 1) + new ByteStrings(Vector(first, last)) + else + new ByteStrings(first +: bytestrings.slice(startpos + 1, endpos) :+ last) + } + } + } + + def ++(that: ByteString): ByteString = that match { + case b: ByteString1 ⇒ ByteStrings(this, b) + case bs: ByteStrings ⇒ ByteStrings(this, bs) + } + + def compact: ByteString = { + val ar = new Array[Byte](length) + var pos = 0 + bytestrings foreach { b ⇒ + b.copyToArray(ar, pos, b.length) + pos += b.length + } + ByteString1(ar) + } + + def asByteBuffer: ByteBuffer = compact.asByteBuffer + + def utf8String: String = compact.utf8String + } - def toByteString: ByteString = ByteString.concat(bytestrings: _*) +} + +sealed trait ByteString extends IndexedSeq[Byte] with IndexedSeqOptimized[Byte, ByteString] { + override protected[this] def newBuilder = ByteString.newBuilder + def ++(that: ByteString): ByteString + def compact: ByteString + def asByteBuffer: ByteBuffer + def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray) + def utf8String: String + def mapI(f: Byte ⇒ Int): ByteString = map(f andThen (_.toByte)) }