diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala new file mode 100644 index 0000000000..dc4f12f1bb --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/BenchTestSource.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.stream.Attributes +import akka.stream.Outlet +import akka.stream.SourceShape +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.OutHandler + +/** + * Emits integers from 1 to the given `elementCount`. The `java.lang.Integer` + * objects are allocated in the constructor of the stage, so it should be created + * before the benchmark is started. + */ +class BenchTestSource(elementCount: Int) extends GraphStage[SourceShape[java.lang.Integer]] { + + private val elements = Array.ofDim[java.lang.Integer](elementCount) + (1 to elementCount).map(n => elements(n - 1) = n) + + val out: Outlet[java.lang.Integer] = Outlet("BenchTestSource") + override val shape: SourceShape[java.lang.Integer] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + + var n = 0 + + override def onPull(): Unit = { + n += 1 + if (n > elementCount) + complete(out) + else + push(out, elements(n - 1)) + } + + setHandler(out, this) + } +} + +class BenchTestSourceSameElement[T](elements: Int, elem: T) extends GraphStage[SourceShape[T]] { + + val out: Outlet[T] = Outlet("BenchTestSourceSameElement") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + + var n = 0 + + override def onPull(): Unit = { + n += 1 + if (n > elements) + complete(out) + else + push(out, elem) + } + + setHandler(out, this) + } +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala new file mode 100644 index 0000000000..39e1de3ab9 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -0,0 +1,198 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.InternalActorRef +import akka.actor.Props +import akka.actor.RootActorPath +import akka.remote.AddressUidExtension +import akka.remote.EndpointManager.Send +import akka.remote.RARP +import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl._ +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(2) +@Warmup(iterations = 4) +@Measurement(iterations = 5) +class CodecBenchmark { + + val config = ConfigFactory.parseString( + """ + akka { + loglevel = WARNING + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery.enabled = on + remote.artery.hostname = localhost + remote.artery.port = 0 + } + """) + + implicit val system = ActorSystem("CodecBenchmark", config) + val systemB = ActorSystem("systemB", system.settings.config) + + val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + val compression = new Compression(system) + val headerIn = HeaderBuilder(compression) + val envelopeTemplateBuffer = ByteBuffer.allocate(ArteryTransport.MaximumFrameSize).order(ByteOrder.LITTLE_ENDIAN) + + val uniqueLocalAddress = UniqueAddress(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + AddressUidExtension(system).addressUid) + val payload = Array.ofDim[Byte](1000) + + private var materializer: ActorMaterializer = _ + private var remoteRefB: RemoteActorRef = _ + private var resolvedRef: InternalActorRef = _ + private var senderStringA: String = _ + private var recipientStringB: String = _ + + @Setup + def setup(): Unit = { + val settings = ActorMaterializerSettings(system) + materializer = ActorMaterializer(settings) + + val actorOnSystemA = system.actorOf(Props.empty, "a") + senderStringA = actorOnSystemA.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) + + val actorOnSystemB = systemB.actorOf(Props.empty, "b") + val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val rootB = RootActorPath(addressB) + remoteRefB = + Await.result(system.actorSelection(rootB / "user" / "b").resolveOne(5.seconds), 5.seconds) + .asInstanceOf[RemoteActorRef] + resolvedRef = actorOnSystemA.asInstanceOf[InternalActorRef] + recipientStringB = remoteRefB.path.toSerializationFormatWithAddress(addressB) + + val envelope = new EnvelopeBuffer(envelopeTemplateBuffer) + headerIn.version = 1 + headerIn.uid = 42 + headerIn.senderActorRef = senderStringA + headerIn.recipientActorRef = recipientStringB + headerIn.serializer = "4" + headerIn.classManifest = "" + envelope.writeHeader(headerIn) + envelope.byteBuffer.put(payload) + envelope.byteBuffer.flip() + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + Await.result(systemB.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(100000) + def reference(): Unit = { + val latch = new CountDownLatch(1) + val N = 100000 + + Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) + .runWith(new LatchSink(N, latch))(materializer) + + latch.await(30, TimeUnit.SECONDS) + } + + @Benchmark + @OperationsPerInvocation(100000) + def encode(): Unit = { + val latch = new CountDownLatch(1) + val N = 100000 + + val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) + + Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) + .map(_ ⇒ Send(payload, None, remoteRefB, None)) + .via(encoder) + .map(envelope => envelopePool.release(envelope)) + .runWith(new LatchSink(N, latch))(materializer) + + latch.await(30, TimeUnit.SECONDS) + } + + @Benchmark + @OperationsPerInvocation(100000) + def decode(): Unit = { + val latch = new CountDownLatch(1) + val N = 100000 + + val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) + val provider = RARP(system).provider + val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = { + recipient ⇒ + // juggling with the refs, since we don't run the real thing + val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address) + resolved + } + + val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = + Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], + resolveActorRefWithLocalAddress, compression, envelopePool)) + + Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) + .map { _ => + val envelope = envelopePool.acquire() + envelopeTemplateBuffer.rewind() + envelope.byteBuffer.put(envelopeTemplateBuffer) + envelope.byteBuffer.flip() + envelope + } + .via(decoder) + .runWith(new LatchSink(N, latch))(materializer) + + latch.await(30, TimeUnit.SECONDS) + } + + @Benchmark + @OperationsPerInvocation(100000) + def encode_decode(): Unit = { + val latch = new CountDownLatch(1) + val N = 100000 + + val encoder: Flow[Send, EnvelopeBuffer, NotUsed] = + Flow.fromGraph(new Encoder(uniqueLocalAddress, system, compression, envelopePool)) + + val localRecipient = resolvedRef.path.toSerializationFormatWithAddress(uniqueLocalAddress.address) + val provider = RARP(system).provider + val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = { + recipient ⇒ + // juggling with the refs, since we don't run the real thing + val resolved = provider.resolveActorRefWithLocalAddress(localRecipient, uniqueLocalAddress.address) + resolved + } + + val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = + Flow.fromGraph(new Decoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], + resolveActorRefWithLocalAddress, compression, envelopePool)) + + Source.fromGraph(new BenchTestSourceSameElement(N, "elem")) + .map(_ ⇒ Send(payload, None, remoteRefB, None)) + .via(encoder) + .via(decoder) + .runWith(new LatchSink(N, latch))(materializer) + + latch.await(30, TimeUnit.SECONDS) + } + +} diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala new file mode 100644 index 0000000000..c60bead1ce --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/LatchSink.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.CountDownLatch + +import akka.stream.Attributes +import akka.stream.Inlet +import akka.stream.SinkShape +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler + +class LatchSink(countDownAfter: Int, latch: CountDownLatch) extends GraphStage[SinkShape[Any]] { + val in: Inlet[Any] = Inlet("LatchSink") + override val shape: SinkShape[Any] = SinkShape(in) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler { + + var n = 0 + + override def preStart(): Unit = pull(in) + + override def onPush(): Unit = { + n += 1 + if (n == countDownAfter) + latch.countDown() + grab(in) + pull(in) + } + + setHandler(in, this) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index ece54fa867..d399264299 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -260,9 +260,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val restartCounter = new RestartCounter(maxRestarts, restartTimeout) val envelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) - val largeEnvelopePool: Option[EnvelopeBufferPool] = - if (largeMessageDestinationsEnabled) Some(new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers)) - else None + val largeEnvelopePool = new EnvelopeBufferPool(ArteryTransport.MaximumLargeFrameSize, ArteryTransport.MaximumPooledBuffers) // FIXME: Compression table must be owned by each channel instead // of having a global one @@ -403,15 +401,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundLargeMessagesStream(): Unit = { - largeEnvelopePool.foreach { largePool ⇒ - // TODO just cargo-cult programming here - val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largePool)) - .async // FIXME measure - .via(inboundFlow) - .runWith(Sink.ignore)(materializer) + val completed = Source.fromGraph(new AeronSource(inboundChannel, largeStreamId, aeron, taskRunner, largeEnvelopePool)) + .async // FIXME measure + .via(inboundLargeFlow) + .runWith(Sink.ignore)(materializer) - attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) - } + attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) } private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { @@ -491,15 +486,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { - largeEnvelopePool match { - case Some(pool) ⇒ - Flow.fromGraph(killSwitch.flow[Send]) - .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) - .via(createEncoder(pool)) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, - envelopePool, giveUpSendAfter))(Keep.right) - case None ⇒ throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured") - } + Flow.fromGraph(killSwitch.flow[Send]) + .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) + .via(createEncoder(largeEnvelopePool)) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, + envelopePool, giveUpSendAfter))(Keep.right) } def outboundControl(outboundContext: OutboundContext): Sink[Send, (OutboundControlIngress, Future[Done])] = { @@ -518,25 +509,41 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val dummySender = system.systemActorOf(Props.empty, "dummy") def createEncoder(pool: EnvelopeBufferPool): Flow[Send, EnvelopeBuffer, NotUsed] = - Flow.fromGraph(new Encoder(this, compression, pool)) - val encoder = createEncoder(envelopePool) + Flow.fromGraph(new Encoder(localAddress, system, compression, pool)) + + def encoder: Flow[Send, EnvelopeBuffer, NotUsed] = createEncoder(envelopePool) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) } - val decoder = Flow.fromGraph(new Decoder(this, compression)) + def createDecoder(pool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { + val resolveActorRefWithLocalAddress: String ⇒ InternalActorRef = + recipient ⇒ provider.resolveActorRefWithLocalAddress(recipient, localAddress.address) + Flow.fromGraph(new Decoder(localAddress, system, resolveActorRefWithLocalAddress, compression, pool)) + } - val inboundFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = { + def decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = createDecoder(envelopePool) + + def inboundSink: Sink[InboundEnvelope, NotUsed] = + Flow[InboundEnvelope] + .via(new InboundHandshake(this, inControlStream = false)) + .via(new InboundQuarantineCheck(this)) + .to(messageDispatcherSink) + + def inboundFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = { Flow.fromSinkAndSource( - decoder - .via(new InboundHandshake(this, inControlStream = false)) - .via(new InboundQuarantineCheck(this)) - .to(messageDispatcherSink), + decoder.to(inboundSink), Source.maybe[ByteString].via(killSwitch.flow)) } - val inboundControlFlow: Flow[EnvelopeBuffer, ByteString, ControlMessageSubject] = { + def inboundLargeFlow: Flow[EnvelopeBuffer, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + createDecoder(largeEnvelopePool).to(inboundSink), + Source.maybe[ByteString].via(killSwitch.flow)) + } + + def inboundControlFlow: Flow[EnvelopeBuffer, ByteString, ControlMessageSubject] = { Flow.fromSinkAndSourceMat( decoder .via(new InboundHandshake(this, inControlStream = true)) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 7a3d8b2f66..32482ff07f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -150,7 +150,7 @@ private[akka] class Association( private def isLargeMessageDestination(recipient: ActorRef): Boolean = { recipient match { - case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null ⇒ r.cachedLargeMessageDestinationFlag == LargeDestination + case r: RemoteActorRef if r.cachedLargeMessageDestinationFlag ne null ⇒ r.cachedLargeMessageDestinationFlag eq LargeDestination case r: RemoteActorRef ⇒ if (largeMessageDestinations.find(r.path.elements.iterator).data.isEmpty) { r.cachedLargeMessageDestinationFlag = RegularDestination diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 25e6fdba3a..fe30e36920 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -6,10 +6,13 @@ import akka.remote.{ MessageSerializer, UniqueAddress } import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem // TODO: Long UID class Encoder( - transport: ArteryTransport, + uniqueLocalAddress: UniqueAddress, + system: ActorSystem, compressionTable: LiteralCompressionTable, pool: EnvelopeBufferPool) extends GraphStage[FlowShape[Send, EnvelopeBuffer]] { @@ -23,11 +26,11 @@ class Encoder( private val headerBuilder = HeaderBuilder(compressionTable) headerBuilder.version = ArteryTransport.Version - headerBuilder.uid = transport.localAddress.uid - private val localAddress = transport.localAddress.address - private val serialization = SerializationExtension(transport.system) + headerBuilder.uid = uniqueLocalAddress.uid + private val localAddress = uniqueLocalAddress.address + private val serialization = SerializationExtension(system) - private val noSender = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress) + private val noSender = system.deadLetters.path.toSerializationFormatWithAddress(localAddress) private val senderCache = new java.util.HashMap[ActorRef, String] private var recipientCache = new java.util.HashMap[ActorRef, String] @@ -66,7 +69,7 @@ class Encoder( } // FIXME: Thunk allocation - Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, transport.system)) { + Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, system)) { MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope) } @@ -83,19 +86,20 @@ class Encoder( } class Decoder( - transport: ArteryTransport, - compressionTable: LiteralCompressionTable) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { + uniqueLocalAddress: UniqueAddress, + system: ExtendedActorSystem, + resolveActorRefWithLocalAddress: String ⇒ InternalActorRef, + compressionTable: LiteralCompressionTable, + pool: EnvelopeBufferPool) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] { val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - private val pool = transport.envelopePool - private val localAddress = transport.localAddress.address - private val provider = transport.provider + private val localAddress = uniqueLocalAddress.address private val headerBuilder = HeaderBuilder(compressionTable) - private val serialization = SerializationExtension(transport.system) + private val serialization = SerializationExtension(system) private val recipientCache = new java.util.HashMap[String, InternalActorRef] private val senderCache = new java.util.HashMap[String, Option[ActorRef]] @@ -111,7 +115,7 @@ class Decoder( // FIXME: Is localAddress really needed? val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match { case null ⇒ - val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress) + val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef) // FIXME we might need an efficient LRU cache, or replaced by compression table if (recipientCache.size() >= 1000) recipientCache.clear() @@ -122,7 +126,7 @@ class Decoder( val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match { case null ⇒ - val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress) + val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef) // FIXME this cache will be replaced by compression table if (senderCache.size() >= 1000) senderCache.clear() @@ -135,7 +139,7 @@ class Decoder( val decoded = InboundEnvelope( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? - MessageSerializer.deserializeForArtery(transport.system, serialization, headerBuilder, envelope), + MessageSerializer.deserializeForArtery(system, serialization, headerBuilder, envelope), senderOption, // FIXME: No need for an option, decode simply to deadLetters instead UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568