From 76dab6354ef191a590681365d319d1d85177741f Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Tue, 1 May 2012 18:41:04 +0200 Subject: [PATCH] Added ByteIterator --- .../test/scala/akka/util/ByteStringSpec.scala | 71 +++- .../main/scala/akka/util/ByteIterator.scala | 354 ++++++++++++++++++ .../src/main/scala/akka/util/ByteString.scala | 9 + 3 files changed, 433 insertions(+), 1 deletion(-) create mode 100644 akka-actor/src/main/scala/akka/util/ByteIterator.scala diff --git a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala index 94997a49de..1a4dd40ffe 100644 --- a/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/ByteStringSpec.scala @@ -14,7 +14,9 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers { def genSimpleByteString(min: Int, max: Int) = for { n ← choose(min, max) b ← Gen.containerOfN[Array, Byte](n, arbitrary[Byte]) - } yield ByteString(b) + from ← choose(0, b.length) + until ← choose(from, b.length) + } yield ByteString(b).slice(from, until) implicit val arbitraryByteString: Arbitrary[ByteString] = Arbitrary { Gen.sized { s ⇒ @@ -25,6 +27,30 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers { } } + type ByteStringSlice = (ByteString, Int, Int) + + implicit val arbitraryByteStringSlice: Arbitrary[ByteStringSlice] = Arbitrary { + for { + xs ← arbitraryByteString.arbitrary + from ← choose(0, xs.length) + until ← choose(from, xs.length) + } yield (xs, from, until) + } + + def likeVecIt(bs: ByteString)(body: BufferedIterator[Byte] ⇒ Any, strict: Boolean = true): Boolean = { + val bsIterator = bs.iterator + val vecIterator = Vector(bs: _*).iterator.buffered + (body(bsIterator) == body(vecIterator)) && + (!strict || (bsIterator.toSeq == vecIterator.toSeq)) + } + + def likeVecIts(a: ByteString, b: ByteString)(body: (BufferedIterator[Byte], BufferedIterator[Byte]) ⇒ Any, strict: Boolean = true): Boolean = { + val (bsAIt, bsBIt) = (a.iterator, b.iterator) + val (vecAIt, vecBIt) = (Vector(a: _*).iterator.buffered, Vector(b: _*).iterator.buffered) + (body(bsAIt, bsBIt) == body(vecAIt, vecBIt)) && + (!strict || (bsAIt.toSeq, bsBIt.toSeq) == (vecAIt.toSeq, vecBIt.toSeq)) + } + "A ByteString" must { "have correct size" when { "concatenating" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).size == a.size + b.size) } @@ -35,4 +61,47 @@ class ByteStringSpec extends WordSpec with MustMatchers with Checkers { "dropping" in { check((a: ByteString, b: ByteString) ⇒ (a ++ b).drop(a.size) == b) } } } + + "A ByteStringIterator" must { + "behave like a buffered Vector Iterator" when { + "concatenating" in { check { (a: ByteString, b: ByteString) ⇒ likeVecIts(a, b) { (a, b) ⇒ (a ++ b).toSeq } } } + + "calling head" in { check { a: ByteString ⇒ a.isEmpty || likeVecIt(a) { _.head } } } + "calling next" in { check { a: ByteString ⇒ a.isEmpty || likeVecIt(a) { _.next() } } } + "calling hasNext" in { check { a: ByteString ⇒ likeVecIt(a) { _.hasNext } } } + "calling length" in { check { a: ByteString ⇒ likeVecIt(a) { _.length } } } + "calling duplicate" in { check { a: ByteString ⇒ likeVecIt(a)({ _.duplicate match { case (a, b) ⇒ (a.toSeq, b.toSeq) } }, strict = false) } } + "calling span" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a)({ _.span(_ == b) match { case (a, b) ⇒ (a.toSeq, b.toSeq) } }, strict = false) } } + "calling takeWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a)({ _.takeWhile(_ == b).toSeq }, strict = false) } } + "calling dropWhile" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.dropWhile(_ == b).toSeq } } } + "calling indexWhere" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.indexWhere(_ == b) } } } + "calling indexOf" in { check { (a: ByteString, b: Byte) ⇒ likeVecIt(a) { _.indexOf(b) } } } + "calling toSeq" in { check { a: ByteString ⇒ likeVecIt(a) { _.toSeq } } } + "calling foreach" in { check { a: ByteString ⇒ likeVecIt(a) { it ⇒ var acc = 0; it foreach { acc += _ }; acc } } } + "calling foldLeft" in { check { a: ByteString ⇒ likeVecIt(a) { _.foldLeft(0) { _ + _ } } } } + "calling toArray" in { check { a: ByteString ⇒ likeVecIt(a) { _.toArray.toSeq } } } + + "calling slice" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVecIt(xs)({ + _.slice(from, until).toSeq + }, strict = false) + } + } + } + + "calling copyToArray" in { + check { slice: ByteStringSlice ⇒ + slice match { + case (xs, from, until) ⇒ likeVecIt(xs)({ it ⇒ + val array = Array.ofDim[Byte](xs.length) + it.slice(from, until).copyToArray(array, from, until) + array.toSeq + }, strict = false) + } + } + } + } + } } diff --git a/akka-actor/src/main/scala/akka/util/ByteIterator.scala b/akka-actor/src/main/scala/akka/util/ByteIterator.scala new file mode 100644 index 0000000000..0c6146e93d --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/ByteIterator.scala @@ -0,0 +1,354 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.util + +import java.nio.ByteBuffer + +import scala.collection.IndexedSeqOptimized +import scala.collection.mutable.{ Builder, WrappedArray } +import scala.collection.immutable.{ IndexedSeq, VectorBuilder } +import scala.collection.generic.CanBuildFrom +import scala.collection.mutable.{ ListBuffer } +import scala.annotation.tailrec + +import java.nio.ByteBuffer + +abstract class ByteIterator extends BufferedIterator[Byte] { + def isIdenticalTo(that: Iterator[Byte]): Boolean + + def len: Int + + def head: Byte + + def next(): Byte + + protected def clear(): Unit + + def ++(that: TraversableOnce[Byte]): ByteIterator = + ByteArrayIterator(that.toArray) + + // *must* be overridden by derived classes + override def clone: ByteIterator = null + + final override def duplicate = (this, clone) + + // *must* be overridden by derived classes + override def take(n: Int): this.type = null + + // *must* be overridden by derived classes + override def drop(n: Int): this.type = null + + final override def slice(from: Int, until: Int): this.type = + drop(from).take(until - from) + + // *must* be overridden by derived classes + override def takeWhile(p: Byte ⇒ Boolean): this.type = null + + // *must* be overridden by derived classes + override def dropWhile(p: Byte ⇒ Boolean): this.type = null + + final override def span(p: Byte ⇒ Boolean): (ByteIterator, ByteIterator) = { + val that = clone + that.takeWhile(p) + drop(that.len) + (that, this) + } + + final override def indexWhere(p: Byte ⇒ Boolean): Int = { + var index = 0 + var found = false + while (!found && hasNext) if (p(next())) { found = true } else { index += 1 } + if (found) index else -1 + } + + final def indexOf(elem: Byte): Int = { + var index = 0 + var found = false + while (!found && hasNext) if (elem == next()) { found = true } else { index += 1 } + if (found) index else -1 + } + + final override def indexOf[B >: Byte](elem: B): Int = { + var index = 0 + var found = false + while (!found && hasNext) if (elem == next()) { found = true } else { index += 1 } + if (found) index else -1 + } + + def toByteString: ByteString + + override def toSeq: ByteString = toByteString + + @inline final override def foreach[@specialized U](f: Byte ⇒ U): Unit = + while (hasNext) f(next()) + + final override def foldLeft[@specialized B](z: B)(op: (B, Byte) ⇒ B): B = { + var acc = z + while (hasNext) acc = op(acc, next()) + acc + } + + final override def toArray[B >: Byte](implicit arg0: ClassManifest[B]): Array[B] = { + val target = Array.ofDim[B](len) + copyToArray(target) + target + } + + /** + * Copy as many bytes as possible to a ByteBuffer, starting from it's + * current position. This method will not overflow the buffer. + * + * @param buffer a ByteBuffer to copy bytes to + * @return the number of bytes actually copied + */ + def copyToBuffer(buffer: ByteBuffer): Int +} + +object ByteArrayIterator { + private val emptyArray = Array.ofDim[Byte](0) + + protected[akka] def apply(array: Array[Byte]): ByteArrayIterator = + new ByteArrayIterator(array, 0, array.length) + + protected[akka] def apply(array: Array[Byte], from: Int, until: Int): ByteArrayIterator = + new ByteArrayIterator(array, from, until) + + val empty: ByteArrayIterator = apply(Array.empty[Byte]) +} + +class ByteArrayIterator private (private var array: Array[Byte], private var from: Int, private var until: Int) extends ByteIterator { + protected[util] final def internalArray = array + protected[util] final def internalFrom = from + protected[util] final def internalUntil = until + + final def isIdenticalTo(that: Iterator[Byte]) = that match { + case that: ByteArrayIterator ⇒ + ((this.array) eq (that.internalArray)) && + ((this.from) == (that.from)) && ((this.until) == (that.until)) + case _ ⇒ false + } + + @inline final def len = until - from + + @inline final def hasNext = from < until + + @inline final def head = array(from) + + final def next() = { + if (!hasNext) Iterator.empty.next + else { val i = from; from = from + 1; array(i) } + } + + def clear() { this.array = ByteArrayIterator.emptyArray; from = 0; until = from } + + final override def length = { val l = len; drop(len); l } + + final override def ++(that: TraversableOnce[Byte]) = that match { + case that: ByteArrayIterator ⇒ { + if (this.isEmpty) that + else if ((this.array eq that.array) && (this.until == that.from)) { + this.until = that.until + that.clear() + this + } else { + val result = MultiByteArrayIterator(List(this, that)) + this.clear() + result + } + } + case that: MultiByteArrayIterator ⇒ if (this.isEmpty) that else (this +: that) + case _ ⇒ super.++(that) + } + + final override def clone = new ByteArrayIterator(array, from, until) + + final override def take(n: Int) = { + until = until min (from + (0 max n)) + this + } + + final override def drop(n: Int) = { + from = until min (from + (0 max n)) + this + } + + final override def takeWhile(p: Byte ⇒ Boolean) = { + val prev = from + dropWhile(p) + until = from; from = prev + this + } + + final override def dropWhile(p: Byte ⇒ Boolean) = { + var stop = false + while (!stop && hasNext) { + if (p(array(from))) { from = from + 1 } else { stop = true } + } + this + } + + final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { + val n = 0 max ((xs.length - start) min this.len min len) + Array.copy(this.array, from, xs, start, n) + this.drop(n) + } + + final override def toByteString = { + val result = + if ((from == 0) && (until == array.length)) ByteString.ByteString1C(array) + else ByteString.ByteString1(array, from, len) + clear() + result + } + + def copyToBuffer(buffer: ByteBuffer): Int = { + val copyLength = math.min(buffer.remaining, len) + if (copyLength > 0) { + buffer.put(array, from, copyLength) + drop(copyLength) + } + copyLength + } +} + +object MultiByteArrayIterator { + protected val clearedList = List(ByteArrayIterator.empty) + + val empty = new MultiByteArrayIterator(Nil) + + protected[akka] def apply(iterators: List[ByteArrayIterator]): MultiByteArrayIterator = + new MultiByteArrayIterator(iterators) +} + +class MultiByteArrayIterator private (private var iterators: List[ByteArrayIterator]) extends ByteIterator { + // After normalization: + // * iterators.isEmpty == false + // * (!iterator.head.isEmpty || iterators.tail.isEmpty) == true + private def normalize(): this.type = { + @tailrec def norm(xs: List[ByteArrayIterator]): List[ByteArrayIterator] = { + if (xs.isEmpty) MultiByteArrayIterator.clearedList + else if (xs.head.isEmpty) norm(xs.tail) + else xs + } + iterators = norm(iterators) + this + } + normalize() + + @inline private def current = iterators.head + @inline private def dropCurrent() { iterators = iterators.tail } + @inline def clear() { iterators = MultiByteArrayIterator.empty.iterators } + + final def isIdenticalTo(that: Iterator[Byte]) = false + + @inline final def hasNext = current.hasNext + + @inline final def head = current.head + + final def next() = { + val result = current.next() + normalize() + result + } + + final override def len = iterators.foldLeft(0) { _ + _.len } + + final override def length = { + var result = len + clear() + result + } + + def +:(that: ByteArrayIterator): this.type = { + iterators = that +: iterators + this + } + + final override def ++(that: TraversableOnce[Byte]) = that match { + case that: ByteArrayIterator ⇒ if (this.isEmpty) that else { + iterators = iterators :+ that + that.clear() + this + } + case that: MultiByteArrayIterator ⇒ if (this.isEmpty) that else { + iterators = this.iterators ++ that.iterators + that.clear() + this + } + case _ ⇒ super.++(that) + } + + final override def clone = new MultiByteArrayIterator(iterators map { _.clone }) + + final override def take(n: Int) = { + var rest = n + val builder = new ListBuffer[ByteArrayIterator] + while ((rest > 0) && !iterators.isEmpty) { + current.take(rest) + if (current.hasNext) { + rest -= current.len + builder += current + } + iterators = iterators.tail + } + iterators = builder.result + normalize() + } + + @tailrec final override def drop(n: Int) = if ((n > 0) && !isEmpty) { + val nCurrent = math.min(n, current.len) + current.drop(n) + val rest = n - nCurrent + assert(current.isEmpty || (rest == 0)) + normalize() + drop(rest) + } else this + + final override def takeWhile(p: Byte ⇒ Boolean) = { + var stop = false + var builder = new ListBuffer[ByteArrayIterator] + while (!stop && !iterators.isEmpty) { + val lastLen = current.len + current.takeWhile(p) + if (current.hasNext) builder += current + if (current.len < lastLen) stop = true + dropCurrent() + } + iterators = builder.result + normalize() + } + + @tailrec final override def dropWhile(p: Byte ⇒ Boolean) = if (!isEmpty) { + current.dropWhile(p) + val dropMore = current.isEmpty + normalize() + if (dropMore) dropWhile(p) else this + } else this + + final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { + var pos = start + var rest = len + while ((rest > 0) && !iterators.isEmpty) { + val n = 0 max ((xs.length - pos) min current.len min rest) + current.copyToArray(xs, pos, n) + pos += n + rest -= n + dropCurrent() + } + normalize() + } + + final override def toByteString = { + val result = iterators.foldLeft(ByteString.empty) { _ ++ _.toByteString } + clear() + result + } + + def copyToBuffer(buffer: ByteBuffer): Int = { + val n = iterators.foldLeft(0) { _ + _.copyToBuffer(buffer) } + normalize() + n + } +} diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 4efc71a325..ba3cca51d6 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -73,6 +73,8 @@ object ByteString { override def length = bytes.length + override def iterator = ByteArrayIterator(bytes, 0, bytes.length) + def toArray: Array[Byte] = bytes.clone def toByteString1: ByteString1 = ByteString1(bytes) @@ -112,6 +114,8 @@ object ByteString { def apply(idx: Int): Byte = bytes(checkRangeConvert(idx)) + override def iterator = ByteArrayIterator(bytes, startIndex, startIndex + length) + private def checkRangeConvert(index: Int) = { if (0 <= index && length > index) index + startIndex @@ -224,6 +228,8 @@ object ByteString { bytestrings(pos)(idx - seen) } else throw new IndexOutOfBoundsException(idx.toString) + override def iterator = MultiByteArrayIterator(bytestrings.toList.map { _.iterator }) + override def slice(from: Int, until: Int): ByteString = { val start = math.max(from, 0) val end = math.min(until, length) @@ -305,6 +311,9 @@ object ByteString { sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimized[Byte, ByteString] { override protected[this] def newBuilder = ByteString.newBuilder + // *must* be overridden by derived classes + override def iterator: ByteIterator = null + /** * Efficiently concatenate another ByteString. */