ByteString optimisations of methods in HTTP parsing hot-path (#20994)
* =act #20992 prepare benchmarks for ByteString optimisations * =act #20992 optimise common ByteString operations: drop,take,slice... * =act,htc #15965 add ByteString.decodeString(java.nio.charsets.Charset)
This commit is contained in:
parent
d3ea9e49db
commit
fde9d86879
12 changed files with 552 additions and 60 deletions
|
|
@ -12,10 +12,10 @@ import scala.annotation.{ tailrec, varargs }
|
|||
import scala.collection.IndexedSeqOptimized
|
||||
import scala.collection.mutable.{ Builder, WrappedArray }
|
||||
import scala.collection.immutable
|
||||
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
|
||||
import scala.collection.immutable.{ IndexedSeq, VectorBuilder, VectorIterator }
|
||||
import scala.collection.generic.CanBuildFrom
|
||||
import scala.reflect.ClassTag
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.charset.{ Charset, StandardCharsets }
|
||||
|
||||
object ByteString {
|
||||
|
||||
|
|
@ -104,6 +104,7 @@ object ByteString {
|
|||
}
|
||||
|
||||
private[akka] object ByteString1C extends Companion {
|
||||
def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes)
|
||||
def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes)
|
||||
val SerializationIdentity = 1.toByte
|
||||
|
||||
|
|
@ -124,29 +125,49 @@ object ByteString {
|
|||
|
||||
override def length: Int = bytes.length
|
||||
|
||||
// Avoid `iterator` in performance sensitive code, call ops directly on ByteString instead
|
||||
override def iterator: ByteIterator.ByteArrayIterator = ByteIterator.ByteArrayIterator(bytes, 0, bytes.length)
|
||||
|
||||
private[akka] def toByteString1: ByteString1 = ByteString1(bytes)
|
||||
/** INTERNAL API */
|
||||
private[akka] def toByteString1: ByteString1 = ByteString1(bytes, 0, bytes.length)
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] def byteStringCompanion = ByteString1C
|
||||
|
||||
def asByteBuffer: ByteBuffer = toByteString1.asByteBuffer
|
||||
override def asByteBuffer: ByteBuffer = toByteString1.asByteBuffer
|
||||
|
||||
def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = List(asByteBuffer)
|
||||
override def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = List(asByteBuffer)
|
||||
|
||||
def decodeString(charset: String): String =
|
||||
override def decodeString(charset: String): String =
|
||||
if (isEmpty) "" else new String(bytes, charset)
|
||||
|
||||
def ++(that: ByteString): ByteString =
|
||||
override def decodeString(charset: Charset): String =
|
||||
if (isEmpty) "" else new String(bytes, charset)
|
||||
|
||||
override def ++(that: ByteString): ByteString = {
|
||||
if (that.isEmpty) this
|
||||
else if (this.isEmpty) that
|
||||
else toByteString1 ++ that
|
||||
}
|
||||
|
||||
override def take(n: Int): ByteString =
|
||||
if (n <= 0) ByteString.empty
|
||||
else toByteString1.take(n)
|
||||
|
||||
override def dropRight(n: Int): ByteString =
|
||||
if (n <= 0) this
|
||||
else toByteString1.dropRight(n)
|
||||
|
||||
override def drop(n: Int): ByteString =
|
||||
if (n <= 0) this
|
||||
else toByteString1.drop(n)
|
||||
|
||||
override def slice(from: Int, until: Int): ByteString =
|
||||
if ((from != 0) || (until != length)) toByteString1.slice(from, until)
|
||||
else this
|
||||
if ((from == 0) && (until == length)) this
|
||||
else if (from > length) ByteString.empty
|
||||
else toByteString1.slice(from, until)
|
||||
|
||||
private[akka] def writeToOutputStream(os: ObjectOutputStream): Unit =
|
||||
private[akka] override def writeToOutputStream(os: ObjectOutputStream): Unit =
|
||||
toByteString1.writeToOutputStream(os)
|
||||
|
||||
override def copyToBuffer(buffer: ByteBuffer): Int =
|
||||
|
|
@ -154,7 +175,7 @@ object ByteString {
|
|||
|
||||
/** INTERNAL API: Specialized for internal use, writing multiple ByteString1C into the same ByteBuffer. */
|
||||
private[akka] def writeToBuffer(buffer: ByteBuffer, offset: Int): Int = {
|
||||
val copyLength = math.min(buffer.remaining, offset + length)
|
||||
val copyLength = Math.min(buffer.remaining, offset + length)
|
||||
if (copyLength > 0) {
|
||||
buffer.put(bytes, offset, copyLength)
|
||||
drop(copyLength)
|
||||
|
|
@ -164,11 +185,14 @@ object ByteString {
|
|||
|
||||
}
|
||||
|
||||
/** INTERNAL API: ByteString backed by exactly one array, with start / end markers */
|
||||
private[akka] object ByteString1 extends Companion {
|
||||
val empty: ByteString1 = new ByteString1(Array.empty[Byte])
|
||||
def apply(bytes: Array[Byte]): ByteString1 = ByteString1(bytes, 0, bytes.length)
|
||||
def fromString(s: String): ByteString1 = apply(s.getBytes)
|
||||
def apply(bytes: Array[Byte]): ByteString1 = apply(bytes, 0, bytes.length)
|
||||
def apply(bytes: Array[Byte], startIndex: Int, length: Int): ByteString1 =
|
||||
if (length == 0) empty else new ByteString1(bytes, startIndex, length)
|
||||
if (length == 0) empty
|
||||
else new ByteString1(bytes, Math.max(0, startIndex), Math.max(0, length))
|
||||
|
||||
val SerializationIdentity = 0.toByte
|
||||
|
||||
|
|
@ -185,6 +209,7 @@ object ByteString {
|
|||
|
||||
def apply(idx: Int): Byte = bytes(checkRangeConvert(idx))
|
||||
|
||||
// Avoid `iterator` in performance sensitive code, call ops directly on ByteString instead
|
||||
override def iterator: ByteIterator.ByteArrayIterator =
|
||||
ByteIterator.ByteArrayIterator(bytes, startIndex, startIndex + length)
|
||||
|
||||
|
|
@ -204,12 +229,41 @@ object ByteString {
|
|||
|
||||
private[akka] def byteStringCompanion = ByteString1
|
||||
|
||||
override def dropRight(n: Int): ByteString =
|
||||
dropRight1(n)
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] def dropRight1(n: Int): ByteString1 =
|
||||
if (n <= 0) this
|
||||
else if (length - n <= 0) ByteString1.empty
|
||||
else new ByteString1(bytes, startIndex, length - n)
|
||||
|
||||
override def drop(n: Int): ByteString =
|
||||
if (n <= 0) this else drop1(n)
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] def drop1(n: Int): ByteString1 = {
|
||||
val nextStartIndex = startIndex + n
|
||||
if (nextStartIndex >= bytes.length) ByteString1.empty
|
||||
else ByteString1(bytes, nextStartIndex, length - n)
|
||||
}
|
||||
|
||||
override def take(n: Int): ByteString =
|
||||
if (n <= 0) ByteString.empty
|
||||
else ByteString1(bytes, startIndex, Math.min(n, length))
|
||||
|
||||
override def slice(from: Int, until: Int): ByteString = {
|
||||
if (from <= 0 && until >= length) this // we can do < / > since we're Compact
|
||||
else if (until <= from) ByteString1.empty
|
||||
else ByteString1(bytes, startIndex + from, until - from)
|
||||
}
|
||||
|
||||
override def copyToBuffer(buffer: ByteBuffer): Int =
|
||||
writeToBuffer(buffer)
|
||||
|
||||
/** INTERNAL API: Specialized for internal use, writing multiple ByteString1C into the same ByteBuffer. */
|
||||
private[akka] def writeToBuffer(buffer: ByteBuffer): Int = {
|
||||
val copyLength = math.min(buffer.remaining, length)
|
||||
val copyLength = Math.min(buffer.remaining, length)
|
||||
if (copyLength > 0) {
|
||||
buffer.put(bytes, startIndex, copyLength)
|
||||
drop(copyLength)
|
||||
|
|
@ -228,7 +282,10 @@ object ByteString {
|
|||
|
||||
def asByteBuffers: scala.collection.immutable.Iterable[ByteBuffer] = List(asByteBuffer)
|
||||
|
||||
def decodeString(charset: String): String =
|
||||
override def decodeString(charset: String): String =
|
||||
new String(if (length == bytes.length) bytes else toArray, charset)
|
||||
|
||||
override def decodeString(charset: Charset): String = // avoids Charset.forName lookup in String internals
|
||||
new String(if (length == bytes.length) bytes else toArray, charset)
|
||||
|
||||
def ++(that: ByteString): ByteString = {
|
||||
|
|
@ -311,8 +368,9 @@ object ByteString {
|
|||
*/
|
||||
final class ByteStrings private (private[akka] val bytestrings: Vector[ByteString1], val length: Int) extends ByteString with Serializable {
|
||||
if (bytestrings.isEmpty) throw new IllegalArgumentException("bytestrings must not be empty")
|
||||
if (bytestrings.head.isEmpty) throw new IllegalArgumentException("bytestrings.head must not be empty")
|
||||
|
||||
def apply(idx: Int): Byte =
|
||||
def apply(idx: Int): Byte = {
|
||||
if (0 <= idx && idx < length) {
|
||||
var pos = 0
|
||||
var seen = 0
|
||||
|
|
@ -322,7 +380,9 @@ object ByteString {
|
|||
}
|
||||
bytestrings(pos)(idx - seen)
|
||||
} else throw new IndexOutOfBoundsException(idx.toString)
|
||||
}
|
||||
|
||||
// Avoid `iterator` in performance sensitive code, call ops directly on ByteString instead
|
||||
override def iterator: ByteIterator.MultiByteArrayIterator =
|
||||
ByteIterator.MultiByteArrayIterator(bytestrings.toStream map { _.iterator })
|
||||
|
||||
|
|
@ -367,11 +427,83 @@ object ByteString {
|
|||
|
||||
def decodeString(charset: String): String = compact.decodeString(charset)
|
||||
|
||||
def decodeString(charset: Charset): String =
|
||||
compact.decodeString(charset)
|
||||
|
||||
private[akka] def writeToOutputStream(os: ObjectOutputStream): Unit = {
|
||||
os.writeInt(bytestrings.length)
|
||||
bytestrings.foreach(_.writeToOutputStream(os))
|
||||
}
|
||||
|
||||
override def take(n: Int): ByteString = {
|
||||
@tailrec def take0(n: Int, b: ByteStringBuilder, bs: Vector[ByteString1]): ByteString =
|
||||
if (bs.isEmpty || n <= 0) b.result
|
||||
else {
|
||||
val head = bs.head
|
||||
if (n <= head.length) b.append(head.take(n)).result
|
||||
else take0(n - head.length, b.append(head), bs.tail)
|
||||
}
|
||||
|
||||
if (n <= 0) ByteString.empty
|
||||
else if (n >= length) this
|
||||
else take0(n, ByteString.newBuilder, bytestrings)
|
||||
}
|
||||
|
||||
override def dropRight(n: Int): ByteString =
|
||||
if (n <= 0) this
|
||||
else {
|
||||
val last = bytestrings.last
|
||||
if (n < last.length) new ByteStrings(bytestrings.init :+ last.dropRight1(n), length - n)
|
||||
else {
|
||||
val remaining = bytestrings.init
|
||||
if (remaining.isEmpty) ByteString.empty
|
||||
else {
|
||||
val s = new ByteStrings(remaining, length - last.length)
|
||||
val remainingToBeDropped = n - last.length
|
||||
s.dropRight(remainingToBeDropped)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def slice(from: Int, until: Int): ByteString =
|
||||
if ((from == 0) && (until == length)) this
|
||||
else if (from > length || until <= from) ByteString.empty
|
||||
else drop(from).dropRight(length - until)
|
||||
|
||||
override def drop(n: Int): ByteString =
|
||||
if (n <= 0) this
|
||||
else if (n > length) ByteString.empty
|
||||
else drop0(n)
|
||||
|
||||
private def drop0(n: Int): ByteString = {
|
||||
var continue = true
|
||||
var fullDrops = 0
|
||||
var remainingToDrop = n
|
||||
do {
|
||||
// impl note: could be optimised a bit by using VectorIterator instead,
|
||||
// however then we're forced to call .toVector which halfs performance
|
||||
// We can work around that, as there's a Scala private method "remainingVector" which is fast,
|
||||
// but let's not go into calling private APIs here just yet.
|
||||
val currentLength = bytestrings(fullDrops).length
|
||||
if (remainingToDrop >= currentLength) {
|
||||
fullDrops += 1
|
||||
remainingToDrop -= currentLength
|
||||
} else continue = false
|
||||
} while (remainingToDrop > 0 && continue)
|
||||
|
||||
val remainingByteStrings = bytestrings.drop(fullDrops)
|
||||
if (remainingByteStrings.isEmpty) ByteString.empty
|
||||
else if (remainingToDrop > 0) {
|
||||
val h: ByteString1 = remainingByteStrings.head.drop1(remainingToDrop)
|
||||
val bs = remainingByteStrings.tail
|
||||
|
||||
if (h.isEmpty)
|
||||
if (bs.isEmpty) ByteString.empty
|
||||
else new ByteStrings(bs, length - n)
|
||||
else new ByteStrings(h +: bs, length - n)
|
||||
} else ByteStrings(remainingByteStrings, length - n)
|
||||
}
|
||||
|
||||
protected def writeReplace(): AnyRef = new SerializationProxy(this)
|
||||
}
|
||||
|
||||
|
|
@ -422,6 +554,8 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
// *must* be overridden by derived classes. This construction is necessary
|
||||
// to specialize the return type, as the method is already implemented in
|
||||
// a parent trait.
|
||||
//
|
||||
// Avoid `iterator` in performance sensitive code, call ops directly on ByteString instead
|
||||
override def iterator: ByteIterator = throw new UnsupportedOperationException("Method iterator is not implemented in ByteString")
|
||||
|
||||
override def head: Byte = apply(0)
|
||||
|
|
@ -429,14 +563,19 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
override def last: Byte = apply(length - 1)
|
||||
override def init: ByteString = dropRight(1)
|
||||
|
||||
override def slice(from: Int, until: Int): ByteString =
|
||||
if ((from == 0) && (until == length)) this
|
||||
else iterator.slice(from, until).toByteString
|
||||
|
||||
override def take(n: Int): ByteString = slice(0, n)
|
||||
// *must* be overridden by derived classes.
|
||||
override def take(n: Int): ByteString = throw new UnsupportedOperationException("Method slice is not implemented in ByteString")
|
||||
override def takeRight(n: Int): ByteString = slice(length - n, length)
|
||||
override def drop(n: Int): ByteString = slice(n, length)
|
||||
override def dropRight(n: Int): ByteString = slice(0, length - n)
|
||||
|
||||
// these methods are optimized in derived classes utilising the maximum knowlage about data layout available to them:
|
||||
// *must* be overridden by derived classes.
|
||||
override def slice(from: Int, until: Int): ByteString = throw new UnsupportedOperationException("Method slice is not implemented in ByteString")
|
||||
|
||||
// *must* be overridden by derived classes.
|
||||
override def drop(n: Int): ByteString = throw new UnsupportedOperationException("Method drop is not implemented in ByteString")
|
||||
|
||||
// *must* be overridden by derived classes.
|
||||
override def dropRight(n: Int): ByteString = throw new UnsupportedOperationException("Method dropRight is not implemented in ByteString")
|
||||
|
||||
override def takeWhile(p: Byte ⇒ Boolean): ByteString = iterator.takeWhile(p).toByteString
|
||||
override def dropWhile(p: Byte ⇒ Boolean): ByteString = iterator.dropWhile(p).toByteString
|
||||
|
|
@ -461,7 +600,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
*
|
||||
* @return this ByteString copied into a byte array
|
||||
*/
|
||||
protected[ByteString] def toArray: Array[Byte] = toArray[Byte] // protected[ByteString] == public to Java but hidden to Scala * fnizz *
|
||||
protected[ByteString] def toArray: Array[Byte] = toArray[Byte]
|
||||
|
||||
override def toArray[B >: Byte](implicit arg0: ClassTag[B]): Array[B] = iterator.toArray
|
||||
override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit =
|
||||
|
|
@ -488,11 +627,8 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
* @param buffer a ByteBuffer to copy bytes to
|
||||
* @return the number of bytes actually copied
|
||||
*/
|
||||
def copyToBuffer(buffer: ByteBuffer): Int = {
|
||||
// TODO: remove this impl, make it an abstract method when possible
|
||||
// specialized versions of this method exist in sub-classes, we keep this impl for binary compatibility, it never is actually invoked
|
||||
iterator.copyToBuffer(buffer)
|
||||
}
|
||||
// *must* be overridden by derived classes.
|
||||
def copyToBuffer(buffer: ByteBuffer): Int = throw new UnsupportedOperationException("Method copyToBuffer is not implemented in ByteString")
|
||||
|
||||
/**
|
||||
* Create a new ByteString with all contents compacted into a single,
|
||||
|
|
@ -544,9 +680,16 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
|
||||
/**
|
||||
* Decodes this ByteString using a charset to produce a String.
|
||||
* If you have a [[Charset]] instance available, use `decodeString(charset: java.nio.charset.Charset` instead.
|
||||
*/
|
||||
def decodeString(charset: String): String
|
||||
|
||||
/**
|
||||
* Decodes this ByteString using a charset to produce a String.
|
||||
* Avoids Charset.forName lookup in String internals, thus is preferable to `decodeString(charset: String)`.
|
||||
*/
|
||||
def decodeString(charset: Charset): String
|
||||
|
||||
/**
|
||||
* map method that will automatically cast Int back into Byte.
|
||||
*/
|
||||
|
|
@ -608,8 +751,8 @@ object CompactByteString {
|
|||
* an Array.
|
||||
*/
|
||||
def fromArray(array: Array[Byte], offset: Int, length: Int): CompactByteString = {
|
||||
val copyOffset = math.max(offset, 0)
|
||||
val copyLength = math.max(math.min(array.length - copyOffset, length), 0)
|
||||
val copyOffset = Math.max(offset, 0)
|
||||
val copyLength = Math.max(Math.min(array.length - copyOffset, length), 0)
|
||||
if (copyLength == 0) empty
|
||||
else {
|
||||
val copyArray = new Array[Byte](copyLength)
|
||||
|
|
@ -706,6 +849,8 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
|
||||
override def ++=(xs: TraversableOnce[Byte]): this.type = {
|
||||
xs match {
|
||||
case b: ByteString if b.isEmpty ⇒
|
||||
// do nothing
|
||||
case b: ByteString1C ⇒
|
||||
clearTemp()
|
||||
_builder += b.toByteString1
|
||||
|
|
@ -748,7 +893,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
/**
|
||||
* Java API: append a ByteString to this builder.
|
||||
*/
|
||||
def append(bs: ByteString): this.type = this ++= bs
|
||||
def append(bs: ByteString): this.type = if (bs.isEmpty) this else this ++= bs
|
||||
|
||||
/**
|
||||
* Add a single Byte to this builder.
|
||||
|
|
@ -915,7 +1060,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
fillByteBuffer(len * 8, byteOrder) { _.asDoubleBuffer.put(array, start, len) }
|
||||
|
||||
def clear(): Unit = {
|
||||
_builder.clear
|
||||
_builder.clear()
|
||||
_length = 0
|
||||
_tempLength = 0
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue