add JMH benchmark for encoder decoder stage

* CodecBenchmark that tests encode, decode and combined
  encode + decode
* refactoring of codec stages to make it possible to
  run them without real ArteryTransport
* also fixed a bug in inbound stream for large messages,
  it was using wrong envelope pool
This commit is contained in:
Patrik Nordwall 2016-05-26 10:42:08 +02:00
parent 5477b4e99b
commit 5b7c978844
6 changed files with 353 additions and 45 deletions

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
/**
* Emits integers from 1 to the given `elementCount`. The `java.lang.Integer`
* objects are allocated in the constructor of the stage, so it should be created
* before the benchmark is started.
*/
class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] {
private val elements = Array.ofDim[java.lang.Integer](elementCount)
(1 to elementCount).map(n => elements(n - 1) = n)
val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource")
override val shape: SourceShape[java.lang.Integer] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
var n = 0
override def onPull(): Unit = {
n += 1
if (n > elementCount)
complete(out)
else
push(out, elements(n - 1))
}
setHandler(out, this)
}
}
class BenchTestSourceSameElement[T](elements: Int, elem: T) extends GraphStage[SourceShape[T]] {
val out: Outlet[T] = Outlet("BenchTestSourceSameElement")
override val shape: SourceShape[T] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
var n = 0
override def onPull(): Unit = {
n += 1
if (n > elements)
complete(out)
else
push(out, elem)
}
setHandler(out, this)
}
}

View file

@ -0,0 +1,198 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
import akka.actor.Props
import akka.actor.RootActorPath
import akka.remote.AddressUidExtension
import akka.remote.EndpointManager.Send
import akka.remote.RARP
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Fork(2)
@Warmup(iterations = 4)
@Measurement(iterations = 5)
class CodecBenchmark {
val config = ConfigFactory.parseString(
"""
akka {
loglevel = WARNING
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.artery.enabled = on
remote.artery.hostname = localhost
remote.artery.port = 0
}
""")
implicit val system = ActorSystem("CodecBenchmark", config)
val systemB = ActorSystem("systemB", system.settings.config)
val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val compression = new Compression(system)
val headerIn = HeaderBuilder(compression)
val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN)
val uniqueLocalAddress = UniqueAddress(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
AddressUidExtension(system).addressUid)
val payload = Array.ofDim[Byte](1000)
private var materializer: ActorMaterializer = _
private var remoteRefB: RemoteActorRef = _
private var resolvedRef: InternalActorRef = _
private var senderStringA: String = _
private var recipientStringB: String = _
@Setup
def setup(): Unit = {
val settings = ActorMaterializerSettings(system)
materializer = ActorMaterializer(settings)
val actorOnSystemA = system.actorOf(Props.empty, "a")
senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val actorOnSystemB = systemB.actorOf(Props.empty, "b")
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
remoteRefB =
Await.result(system.actorSelection(rootB / "user" / "b").resolveOne(5.seconds), 5.seconds)
.asInstanceOf[RemoteActorRef]
resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef]
recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB)
val envelope = new EnvelopeBuffer(envelopeTemplateBuffer)
headerIn.version = 1
headerIn.uid = 42
headerIn.senderActorRef = senderStringA
headerIn.recipientActorRef = recipientStringB
headerIn.serializer = "4"
headerIn.classManifest = ""
envelope.writeHeader(headerIn)
envelope.byteBuffer.put(payload)
envelope.byteBuffer.flip()
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
Await.result(systemB.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def reference(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
}
@Benchmark
@OperationsPerInvocation(100000)
def encode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val encoder: Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(_ Send(payload, None, remoteRefB, None))
.via(encoder)
.map(envelope => envelopePool.release(envelope))
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
}
@Benchmark
@OperationsPerInvocation(100000)
def decode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val provider = RARP(system).provider
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient
// juggling with the refs, since we don't run the real thing
val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address)
resolved
}
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, compression, envelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map { _ =>
val envelope = envelopePool.acquire()
envelopeTemplateBuffer.rewind()
envelope.byteBuffer.put(envelopeTemplateBuffer)
envelope.byteBuffer.flip()
envelope
}
.via(decoder)
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
}
@Benchmark
@OperationsPerInvocation(100000)
def encode_decode(): Unit = {
val latch = new CountDownLatch(1)
val N = 100000
val encoder: Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool))
val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address)
val provider = RARP(system).provider
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient
// juggling with the refs, since we don't run the real thing
val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address)
resolved
}
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, compression, envelopePool))
Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(_ Send(payload, None, remoteRefB, None))
.via(encoder)
.via(decoder)
.runWith(new LatchSink(N, latch))(materializer)
latch.await(30, TimeUnit.SECONDS)
}
}

View file

@ -0,0 +1,36 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.CountDownLatch
import akka.stream.Attributes
import akka.stream.Inlet
import akka.stream.SinkShape
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[SinkShape[Any]] {
val in: Inlet[Any] = Inlet("LatchSink")
override val shape: SinkShape[Any] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler {
var n = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
n += 1
if (n == countDownAfter)
latch.countDown()
grab(in)
pull(in)
}
setHandler(in, this)
}
}

View file

@ -260,9 +260,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val restartCounter = new RestartCounter(maxRestarts, restartTimeout)
val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers)
val largeEnvelopePool: Option[EnvelopeBufferPool] =
if (largeMessageDestinationsEnabled) Some(new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers))
else None
val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers)
// FIXME: Compression table must be owned by each channel instead
// of having a global one
@ -403,16 +401,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
private def runInboundLargeMessagesStream(): Unit = {
largeEnvelopePool.foreach { largePool
// TODO just cargo-cult programming here
val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largePool))
val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largeEnvelopePool))
.async // FIXME measure
.via(inboundFlow)
.via(inboundLargeFlow)
.runWith(Sink.ignore)(materializer)
attachStreamRestart("Inbound large message stream", completed, () runInboundLargeMessagesStream())
}
}
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () Unit): Unit = {
implicit val ec = materializer.executionContext
@ -491,15 +486,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = {
largeEnvelopePool match {
case Some(pool)
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval))
.via(createEncoder(pool))
.via(createEncoder(largeEnvelopePool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter))(Keep.right)
case None throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured")
}
}
def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = {
@ -518,25 +509,41 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private val dummySender = system.systemActorOf(Props.empty, "dummy")
def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] =
Flow.fromGraph(new Encoder(this, compression, pool))
val encoder = createEncoder(envelopePool)
Flow.fromGraph(new Encoder(localAddress, system, compression, pool))
def encoder: Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool)
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption)
}
val decoder = Flow.fromGraph(new Decoder(this, compression))
def createDecoder(pool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
val resolveActorRefWithLocalAddress: String InternalActorRef =
recipient provider.resolveActorRefWithLocalAddress(recipient, localAddress.address)
Flow.fromGraph(new Decoder(localAddress, system, resolveActorRefWithLocalAddress, compression, pool))
}
val inboundFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
decoder
def decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = createDecoder(envelopePool)
def inboundSink: Sink[InboundEnvelope, NotUsed] =
Flow[InboundEnvelope]
.via(new InboundHandshake(this, inControlStream = false))
.via(new InboundQuarantineCheck(this))
.to(messageDispatcherSink),
.to(messageDispatcherSink)
def inboundFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
decoder.to(inboundSink),
Source.maybe[ByteString].via(killSwitch.flow))
}
val inboundControlFlow: Flow[EnvelopeBuffer, ByteString, ControlMessageSubject] = {
def inboundLargeFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
createDecoder(largeEnvelopePool).to(inboundSink),
Source.maybe[ByteString].via(killSwitch.flow))
}
def inboundControlFlow: Flow[EnvelopeBuffer, ByteString, ControlMessageSubject] = {
Flow.fromSinkAndSourceMat(
decoder
.via(new InboundHandshake(this, inControlStream = true))

View file

@ -150,7 +150,7 @@ private[akka] class Association(
private def isLargeMessageDestination(recipient: ActorRef): Boolean = {
recipient match {
case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null r.cachedLargeMessageDestinationFlag == LargeDestination
case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null r.cachedLargeMessageDestinationFlag eq LargeDestination
case r: RemoteActorRef
if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) {
r.cachedLargeMessageDestinationFlag = RegularDestination

View file

@ -6,10 +6,13 @@ import akka.remote.{ MessageSerializer, UniqueAddress }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
// TODO: Long UID
class Encoder(
transport: ArteryTransport,
uniqueLocalAddress: UniqueAddress,
system: ActorSystem,
compressionTable: LiteralCompressionTable,
pool: EnvelopeBufferPool)
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
@ -23,11 +26,11 @@ class Encoder(
private val headerBuilder = HeaderBuilder(compressionTable)
headerBuilder.version = ArteryTransport.Version
headerBuilder.uid = transport.localAddress.uid
private val localAddress = transport.localAddress.address
private val serialization = SerializationExtension(transport.system)
headerBuilder.uid = uniqueLocalAddress.uid
private val localAddress = uniqueLocalAddress.address
private val serialization = SerializationExtension(system)
private val noSender = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress)
private val noSender = system.deadLetters.path.toSerializationFormatWithAddress(localAddress)
private val senderCache = new java.util.HashMap[ActorRef, String]
private var recipientCache = new java.util.HashMap[ActorRef, String]
@ -66,7 +69,7 @@ class Encoder(
}
// FIXME: Thunk allocation
Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, transport.system)) {
Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, system)) {
MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope)
}
@ -83,19 +86,20 @@ class Encoder(
}
class Decoder(
transport: ArteryTransport,
compressionTable: LiteralCompressionTable) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
uniqueLocalAddress: UniqueAddress,
system: ExtendedActorSystem,
resolveActorRefWithLocalAddress: String InternalActorRef,
compressionTable: LiteralCompressionTable,
pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val pool = transport.envelopePool
private val localAddress = transport.localAddress.address
private val provider = transport.provider
private val localAddress = uniqueLocalAddress.address
private val headerBuilder = HeaderBuilder(compressionTable)
private val serialization = SerializationExtension(transport.system)
private val serialization = SerializationExtension(system)
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
private val senderCache = new java.util.HashMap[String, Option[ActorRef]]
@ -111,7 +115,7 @@ class Decoder(
// FIXME: Is localAddress really needed?
val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match {
case null
val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress)
val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef)
// FIXME we might need an efficient LRU cache, or replaced by compression table
if (recipientCache.size() >= 1000)
recipientCache.clear()
@ -122,7 +126,7 @@ class Decoder(
val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match {
case null
val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress)
val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef)
// FIXME this cache will be replaced by compression table
if (senderCache.size() >= 1000)
senderCache.clear()
@ -135,7 +139,7 @@ class Decoder(
val decoded = InboundEnvelope(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
MessageSerializer.deserializeForArtery(transport.system, serialization, headerBuilder, envelope),
MessageSerializer.deserializeForArtery(system, serialization, headerBuilder, envelope),
senderOption, // FIXME: No need for an option, decode simply to deadLetters instead
UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568