From b5c11f189f737f292be692891876e2052c321641 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 4 Jul 2017 09:15:25 +0200 Subject: [PATCH] Better use of Artery lanes for ActorSelection, #23150 * Looked into the alternativies described in the ticket, but they were complicated so ended up with simply including the uid of the sending system in the hash for selecting inbound lane. That should be good enough, until we have any real demand for something else. * This means that different lanes can be used ActorSelection messages from different sending systems, i.e. good in a cluster, but same lane will be used for all messages originating from the same system. * Added possibility to run the benchmarks with ActorSelection * Added ActorSelection to the send consistency test --- .../remote/artery/FanInThrougputSpec.scala | 19 +++++-- .../remote/artery/FanOutThrougputSpec.scala | 15 +++-- .../remote/artery/MaxThroughputSpec.scala | 50 ++++++++++++----- .../akka/remote/artery/ArteryTransport.scala | 13 ++++- .../scala/akka/remote/artery/BufferPool.scala | 10 ++-- .../artery/RemoteSendConsistencySpec.scala | 56 +++++++++++++++++-- 6 files changed, 125 insertions(+), 38 deletions(-) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala index 5659044bc9..c12d7a4482 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala @@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._ object FanInThroughputSpec extends MultiNodeConfig { val totalNumberOfNodes = - System.getProperty("MultiJvm.akka.test.FanInThroughputSpec.nrOfNodes") match { + System.getProperty("akka.test.FanInThroughputSpec.nrOfNodes") match { case null ⇒ 4 case value ⇒ value.toInt } @@ -41,6 +41,10 @@ object FanInThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.FanInThroughputSpec.totalMessagesFactor = 10.0 akka.test.FanInThroughputSpec.real-message = off + akka.test.FanInThroughputSpec.actor-selection = off + akka.remote.artery.advanced { + inbound-lanes = 4 + } """)) .withFallback(MaxThroughputSpec.cfg) .withFallback(RemotingMultiNodeSpec.commonConfig)) @@ -61,6 +65,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanInThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.actor-selection") var plot = PlotResult() @@ -85,9 +90,12 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput super.afterAll() } - def identifyReceiver(name: String, r: RoleName): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val scenarios = List( @@ -146,7 +154,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput val ignore = TestProbe() val receivers = (1 to sendingNodes.size).map { n ⇒ identifyReceiver(receiverName + "-" + n, roles.head) - }.toArray[ActorRef] + }.toArray[Target] val idx = roles.indexOf(myself) - 1 val receiver = receivers(idx) @@ -171,7 +179,6 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput } "Max throughput of fan-in" must { - pending val reporter = BenchmarkFileReporter("FanInThroughputSpec", system) for (s ← scenarios) { s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s, reporter) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala index 74b5440ca0..3fe83e6e3b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala @@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._ object FanOutThroughputSpec extends MultiNodeConfig { val totalNumberOfNodes = - System.getProperty("MultiJvm.akka.test.FanOutThroughputSpec.nrOfNodes") match { + System.getProperty("akka.test.FanOutThroughputSpec.nrOfNodes") match { case null ⇒ 4 case value ⇒ value.toInt } @@ -41,6 +41,7 @@ object FanOutThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.FanOutThroughputSpec.totalMessagesFactor = 10.0 akka.test.FanOutThroughputSpec.real-message = off + akka.test.FanOutThroughputSpec.actor-selection = off """)) .withFallback(MaxThroughputSpec.cfg) .withFallback(RemotingMultiNodeSpec.commonConfig)) @@ -61,6 +62,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanOutThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.actor-selection") var plot = PlotResult() @@ -85,9 +87,12 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp super.afterAll() } - def identifyReceiver(name: String, r: RoleName): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val burstSize = 2000 / senderReceiverPairs @@ -143,7 +148,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp runOn(roles.head) { enterBarrier(receiverName + "-started") val ignore = TestProbe() - val receivers = targetNodes.map(target ⇒ identifyReceiver(receiverName, target)).toArray[ActorRef] + val receivers = targetNodes.map(target ⇒ identifyReceiver(receiverName, target)).toArray[Target] val senders = for ((target, i) ← targetNodes.zipWithIndex) yield { val receiver = receivers(i) val plotProbe = TestProbe() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 20718b1195..2475a15805 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -33,6 +33,7 @@ object MaxThroughputSpec extends MultiNodeConfig { # for serious measurements you should increase the totalMessagesFactor (20) akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0 akka.test.MaxThroughputSpec.real-message = off + akka.test.MaxThroughputSpec.actor-selection = off akka { loglevel = INFO log-dead-letters = 10000 @@ -78,8 +79,8 @@ object MaxThroughputSpec extends MultiNodeConfig { akka.remote.default-remote-dispatcher { fork-join-executor { # parallelism-factor = 0.5 - parallelism-min = 2 - parallelism-max = 2 + parallelism-min = 4 + parallelism-max = 4 } # Set to 10 by default. Might be worthwhile to experiment with. # throughput = 100 @@ -97,6 +98,19 @@ object MaxThroughputSpec extends MultiNodeConfig { final case class EndResult(totalReceived: Long) extends JavaSerializable final case class FlowControl(burstStartTime: Long) extends Echo + sealed trait Target { + def tell(msg: Any, sender: ActorRef): Unit + def ref: ActorRef + } + + final case class ActorRefTarget(override val ref: ActorRef) extends Target { + override def tell(msg: Any, sender: ActorRef) = ref.tell(msg, sender) + } + + final case class ActorSelectionTarget(sel: ActorSelection, override val ref: ActorRef) extends Target { + override def tell(msg: Any, sender: ActorRef) = sel.tell(msg, sender) + } + def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props = Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher") @@ -137,11 +151,11 @@ object MaxThroughputSpec extends MultiNodeConfig { } } - def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, + def senderProps(mainTarget: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props = Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter)) - class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) + class Sender(target: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) extends Actor { val numTargets = targets.size @@ -161,7 +175,7 @@ object MaxThroughputSpec extends MultiNodeConfig { def receive = { case Run ⇒ if (compressionEnabled) { - target ! Warmup(payload) + target.tell(Warmup(payload), self) context.setReceiveTimeout(1.second) context.become(waitingForCompression) } else runWarmup() @@ -169,18 +183,22 @@ object MaxThroughputSpec extends MultiNodeConfig { def waitingForCompression: Receive = { case ReceivedActorRefCompressionTable(_, table) ⇒ - if (table.dictionary.contains(target)) { + val ref = target match { + case ActorRefTarget(ref) ⇒ ref + case ActorSelectionTarget(sel, _) ⇒ sel.anchor + } + if (table.dictionary.contains(ref)) { context.setReceiveTimeout(Duration.Undefined) runWarmup() } else - target ! Warmup(payload) + target.tell(Warmup(payload), self) case ReceiveTimeout ⇒ - target ! Warmup(payload) + target.tell(Warmup(payload), self) } def runWarmup(): Unit = { sendBatch(warmup = true) // first some warmup - targets.foreach(_ ! Start(target)) // then Start, which will echo back here + targets.foreach(_.tell(Start(target.ref), self)) // then Start, which will echo back here context.become(warmup) } @@ -268,9 +286,9 @@ object MaxThroughputSpec extends MultiNodeConfig { def sendFlowControl(t0: Long): Unit = { if (remaining <= 0) { context.become(waitingForEndResult) - targets.foreach(_ ! End) + targets.foreach(_.tell(End, self)) } else - target ! FlowControl(t0) + target.tell(FlowControl(t0), self) } } @@ -331,6 +349,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor") val realMessage = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.real-message") + val actorSelection = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.actor-selection") var plot = PlotResult() @@ -355,9 +374,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec super.afterAll() } - def identifyReceiver(name: String, r: RoleName = second): ActorRef = { - system.actorSelection(node(r) / "user" / name) ! Identify(None) - expectMsgType[ActorIdentity](10.seconds).ref.get + def identifyReceiver(name: String, r: RoleName = second): Target = { + val sel = system.actorSelection(node(r) / "user" / name) + sel ! Identify(None) + val ref = expectMsgType[ActorIdentity](10.seconds).ref.get + if (actorSelection) ActorSelectionTarget(sel, ref) + else ActorRefTarget(ref) } val scenarios = List( diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 51e2d3af27..54a531e388 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -734,11 +734,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(hubKillSwitch.flow) .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) - // select lane based on destination, to preserve message order + // Select lane based on destination to preserve message order, + // Also include the uid of the sending system in the hash to spread + // "hot" destinations, e.g. ActorSelection anchor. val partitioner: InboundEnvelope ⇒ Int = env ⇒ { env.recipient match { - case OptionVal.Some(r) ⇒ math.abs(r.path.uid) % inboundLanes - case OptionVal.None ⇒ 0 + case OptionVal.Some(r) ⇒ + val a = r.path.uid + val b = env.originUid + val hashA = 23 + a + val hash: Int = 23 * hashA + java.lang.Long.hashCode(b) + math.abs(hash) % inboundLanes + case OptionVal.None ⇒ 0 } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala index e00bfc456d..909930e664 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala @@ -377,12 +377,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Write fixed length parts byteBuffer.put(VersionOffset, header.version) byteBuffer.put(FlagsOffset, header.flags) - byteBuffer.putLong(UidOffset, header.uid) - byteBuffer.putInt(SerializerOffset, header.serializer) - // compression table version numbers byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) byteBuffer.put(ClassManifestCompressionTableVersionOffset, header.outboundClassManifestCompression.version) + byteBuffer.putLong(UidOffset, header.uid) + byteBuffer.putInt(SerializerOffset, header.serializer) // maybe write some metadata // after metadata is written (or not), buffer is at correct position to continue writing literals @@ -421,12 +420,11 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Read fixed length parts header.setVersion(byteBuffer.get(VersionOffset)) header.setFlags(byteBuffer.get(FlagsOffset)) - header.setUid(byteBuffer.getLong(UidOffset)) - header.setSerializer(byteBuffer.getInt(SerializerOffset)) - // compression table versions (stored in the Tag) header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) header._inboundClassManifestCompressionTableVersion = byteBuffer.get(ClassManifestCompressionTableVersionOffset) + header.setUid(byteBuffer.getLong(UidOffset)) + header.setSerializer(byteBuffer.getInt(SerializerOffset)) byteBuffer.position(MetadataContainerAndLiteralSectionOffset) if (header.flag(MetadataPresentFlag)) { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 9eac28541e..1b3fdc5b43 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -9,6 +9,7 @@ import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe } import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ +import akka.actor.ActorSelection class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig) @@ -33,18 +34,26 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryM } }), "echo") - val remoteRef = { + val echoSel = system.actorSelection(rootB / "user" / "echo") + val echoRef = { system.actorSelection(rootB / "user" / "echo") ! Identify(None) expectMsgType[ActorIdentity](5.seconds).ref.get } - remoteRef ! "ping" + echoRef ! "ping" expectMsg("pong") - remoteRef ! "ping" + echoRef ! "ping" expectMsg("pong") - remoteRef ! "ping" + echoRef ! "ping" + expectMsg("pong") + + // and actorSelection + echoSel ! "ping" + expectMsg("pong") + + echoSel ! "ping" expectMsg("pong") } @@ -116,6 +125,45 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryM } } + "be able to send messages with actorSelection concurrently preserving order" in { + systemB.actorOf(TestActors.echoActorProps, "echoA2") + systemB.actorOf(TestActors.echoActorProps, "echoB2") + systemB.actorOf(TestActors.echoActorProps, "echoC2") + + val selA = system.actorSelection(rootB / "user" / "echoA2") + val selB = system.actorSelection(rootB / "user" / "echoB2") + val selC = system.actorSelection(rootB / "user" / "echoC2") + + def senderProps(sel: ActorSelection) = Props(new Actor { + var counter = 1000 + sel ! counter + + override def receive: Receive = { + case i: Int ⇒ + if (i != counter) testActor ! s"Failed, expected $counter got $i" + else if (counter == 0) { + testActor ! "success2" + context.stop(self) + } else { + counter -= 1 + sel ! counter + } + } + }).withDeploy(Deploy.local) + + system.actorOf(senderProps(selA)) + system.actorOf(senderProps(selB)) + system.actorOf(senderProps(selC)) + system.actorOf(senderProps(selA)) + + within(10.seconds) { + expectMsg("success2") + expectMsg("success2") + expectMsg("success2") + expectMsg("success2") + } + } + } }