diff --git a/akka-actor/src/main/scala/akka/util/ByteIterator.scala b/akka-actor/src/main/scala/akka/util/ByteIterator.scala index b763ef93fe..3ca0092d49 100644 --- a/akka-actor/src/main/scala/akka/util/ByteIterator.scala +++ b/akka-actor/src/main/scala/akka/util/ByteIterator.scala @@ -341,7 +341,8 @@ object ByteIterator { def getDoubles(xs: Array[Double], offset: Int, n: Int)(implicit byteOrder: ByteOrder): this.type = getToArray(xs, offset, n, 8) { getDouble(byteOrder) } { current.getDoubles(_, _, _)(byteOrder) } - def copyToBuffer(buffer: ByteBuffer): Int = { + override def copyToBuffer(buffer: ByteBuffer): Int = { + // the fold here is better than indexing into the LinearSeq val n = iterators.foldLeft(0) { _ + _.copyToBuffer(buffer) } normalize() n diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 8ccfa2518b..89204c7169 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -7,7 +7,8 @@ package akka.util import java.io.{ ObjectInputStream, ObjectOutputStream } import java.nio.{ ByteBuffer, ByteOrder } import java.lang.{ Iterable ⇒ JIterable } -import scala.annotation.varargs + +import scala.annotation.{ tailrec, varargs } import scala.collection.IndexedSeqOptimized import scala.collection.mutable.{ Builder, WrappedArray } import scala.collection.immutable @@ -147,6 +148,20 @@ object ByteString { private[akka] def writeToOutputStream(os: ObjectOutputStream): Unit = toByteString1.writeToOutputStream(os) + + override def copyToBuffer(buffer: ByteBuffer): Int = + writeToBuffer(buffer, offset = 0) + + /** INTERNAL API: Specialized for internal use, writing multiple ByteString1C into the same ByteBuffer. */ + private[akka] def writeToBuffer(buffer: ByteBuffer, offset: Int): Int = { + val copyLength = math.min(buffer.remaining, offset + length) + if (copyLength > 0) { + buffer.put(bytes, offset, copyLength) + drop(copyLength) + } + copyLength + } + } private[akka] object ByteString1 extends Companion { @@ -189,6 +204,19 @@ object ByteString { private[akka] def byteStringCompanion = ByteString1 + override def copyToBuffer(buffer: ByteBuffer): Int = + writeToBuffer(buffer) + + /** INTERNAL API: Specialized for internal use, writing multiple ByteString1C into the same ByteBuffer. */ + private[akka] def writeToBuffer(buffer: ByteBuffer): Int = { + val copyLength = math.min(buffer.remaining, length) + if (copyLength > 0) { + buffer.put(bytes, startIndex, copyLength) + drop(copyLength) + } + copyLength + } + def compact: CompactByteString = if (isCompact) ByteString1C(bytes) else ByteString1C(toArray) @@ -312,6 +340,14 @@ object ByteString { def isCompact: Boolean = if (bytestrings.length == 1) bytestrings.head.isCompact else false + override def copyToBuffer(buffer: ByteBuffer): Int = { + @tailrec def copyItToTheBuffer(buffer: ByteBuffer, i: Int, written: Int): Int = + if (i < bytestrings.length) copyItToTheBuffer(buffer, i + 1, written + bytestrings(i).writeToBuffer(buffer)) + else written + + copyItToTheBuffer(buffer, 0, 0) + } + def compact: CompactByteString = { if (isCompact) bytestrings.head.compact else { @@ -452,7 +488,11 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz * @param buffer a ByteBuffer to copy bytes to * @return the number of bytes actually copied */ - def copyToBuffer(buffer: ByteBuffer): Int = iterator.copyToBuffer(buffer) + def copyToBuffer(buffer: ByteBuffer): Int = { + // TODO: remove this impl, make it an abstract method when possible + // specialized versions of this method exist in sub-classes, we keep this impl for binary compatibility, it never is actually invoked + iterator.copyToBuffer(buffer) + } /** * Create a new ByteString with all contents compacted into a single, diff --git a/akka-bench-jmh/src/main/scala/akka/util/ByteStringBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/util/ByteStringBenchmark.scala new file mode 100644 index 0000000000..ec62572d8c --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/util/ByteStringBenchmark.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2014-2016 Lightbend Inc. + */ +package akka.util + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit + +import akka.util.ByteString.{ ByteString1, ByteString1C, ByteStrings } +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole + +@State(Scope.Benchmark) +@Measurement(timeUnit = TimeUnit.MILLISECONDS) +class ByteStringBenchmark { + + val _bs_mini = ByteString(Array.ofDim[Byte](128 * 4)) + val _bs_small = ByteString(Array.ofDim[Byte](1024 * 1)) + val _bs_large = ByteString(Array.ofDim[Byte](1024 * 4)) + + val bs_mini = ByteString(Array.ofDim[Byte](128 * 4 * 4)) + val bs_small = ByteString(Array.ofDim[Byte](1024 * 1 * 4)) + val bs_large = ByteString(Array.ofDim[Byte](1024 * 4 * 4)) + + val bss_mini = ByteStrings(Vector.fill(4)(bs_mini.asInstanceOf[ByteString1C].toByteString1), 4 * bs_mini.length) + val bss_small = ByteStrings(Vector.fill(4)(bs_small.asInstanceOf[ByteString1C].toByteString1), 4 * bs_small.length) + val bss_large = ByteStrings(Vector.fill(4)(bs_large.asInstanceOf[ByteString1C].toByteString1), 4 * bs_large.length) + val bss_pc_large = bss_large.compact + + val buf = ByteBuffer.allocate(1024 * 4 * 4) + + /* + BEFORE + + [info] Benchmark Mode Cnt Score Error Units + [info] ByteStringBenchmark.bs_large_copyToBuffer thrpt 40 142 163 289.866 ± 21751578.294 ops/s + [info] ByteStringBenchmark.bss_large_copyToBuffer thrpt 40 1 489 195.631 ± 209165.487 ops/s << that's the interesting case, we needlessly fold and allocate tons of Stream etc + [info] ByteStringBenchmark.bss_large_pc_copyToBuffer thrpt 40 184 466 756.364 ± 9169108.378 ops/s // "can't beat that" + + + [info] ....[Thread state: RUNNABLE]........................................................................ + [info] 35.9% 35.9% scala.collection.Iterator$class.toStream + [info] 20.2% 20.2% scala.collection.immutable.Stream.foldLeft + [info] 11.6% 11.6% scala.collection.immutable.Stream$StreamBuilder. + [info] 10.9% 10.9% akka.util.ByteIterator. + [info] 6.1% 6.1% scala.collection.mutable.ListBuffer. + [info] 5.2% 5.2% akka.util.ByteString.copyToBuffer + [info] 5.2% 5.2% scala.collection.AbstractTraversable. + [info] 2.2% 2.2% scala.collection.immutable.VectorIterator.initFrom + [info] 1.2% 1.2% akka.util.generated.ByteStringBenchmark_bss_large_copyToBuffer.bss_large_copyToBuffer_thrpt_jmhStub + [info] 0.3% 0.3% akka.util.ByteIterator$MultiByteArrayIterator.copyToBuffer + [info] 1.2% 1.2% + + + AFTER specializing impls + + [info] ....[Thread state: RUNNABLE]........................................................................ + [info] 99.5% 99.6% akka.util.generated.ByteStringBenchmark_bss_large_copyToBuffer_jmhTest.bss_large_copyToBuffer_thrpt_jmhStub + [info] 0.1% 0.1% java.util.concurrent.CountDownLatch.countDown + [info] 0.1% 0.1% sun.reflect.NativeMethodAccessorImpl.invoke0 + [info] 0.1% 0.1% sun.misc.Unsafe.putObject + [info] 0.1% 0.1% org.openjdk.jmh.infra.IterationParamsL2.getBatchSize + [info] 0.1% 0.1% java.lang.Thread.currentThread + [info] 0.1% 0.1% sun.misc.Unsafe.compareAndSwapInt + [info] 0.1% 0.1% sun.reflect.AccessorGenerator.internalize + + [info] Benchmark Mode Cnt Score Error Units + [info] ByteStringBenchmark.bs_large_copyToBuffer thrpt 40 177 328 585.473 ± 7742067.648 ops/s + [info] ByteStringBenchmark.bss_large_copyToBuffer thrpt 40 113 535 003.488 ± 3899763.124 ops/s // previous bad case now very good (was 2M/s) + [info] ByteStringBenchmark.bss_large_pc_copyToBuffer thrpt 40 203 590 896.493 ± 7582752.024 ops/s // "can't beat that" + + */ + + @Benchmark + def bs_large_copyToBuffer(): Int = { + buf.flip() + bs_large.copyToBuffer(buf) + } + + @Benchmark + def bss_large_copyToBuffer(): Int = { + buf.flip() + bss_large.copyToBuffer(buf) + } + + // /** compact + copy */ + // @Benchmark + // def bss_large_c_copyToBuffer: Int = + // bss_large.compact.copyToBuffer(buf) + + /** Pre-compacted */ + @Benchmark + def bss_large_pc_copyToBuffer(): Int = { + buf.flip() + bss_pc_large.copyToBuffer(buf) + } + +}