Better use of Artery lanes for ActorSelection, #23150

* Looked into the alternativies described in the ticket, but
  they were complicated so ended up with simply including the
  uid of the sending system in the hash for selecting inbound
  lane. That should be good enough, until we have any real demand
  for something else.
* This means that different lanes can be used ActorSelection messages
  from different sending systems, i.e. good in a cluster, but same lane
  will be used for all messages originating from the same system.
* Added possibility to run the benchmarks with ActorSelection
* Added ActorSelection to the send consistency test
This commit is contained in:
Patrik Nordwall 2017-07-04 09:15:25 +02:00
parent 945ade245e
commit b5c11f189f
6 changed files with 125 additions and 38 deletions

View file

@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._
object FanInThroughputSpec extends MultiNodeConfig {
val totalNumberOfNodes =
System.getProperty("MultiJvm.akka.test.FanInThroughputSpec.nrOfNodes") match {
System.getProperty("akka.test.FanInThroughputSpec.nrOfNodes") match {
case null 4
case value value.toInt
}
@ -41,6 +41,10 @@ object FanInThroughputSpec extends MultiNodeConfig {
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.FanInThroughputSpec.totalMessagesFactor = 10.0
akka.test.FanInThroughputSpec.real-message = off
akka.test.FanInThroughputSpec.actor-selection = off
akka.remote.artery.advanced {
inbound-lanes = 4
}
"""))
.withFallback(MaxThroughputSpec.cfg)
.withFallback(RemotingMultiNodeSpec.commonConfig))
@ -61,6 +65,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput
val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanInThroughputSpec.totalMessagesFactor")
val realMessage = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.real-message")
val actorSelection = system.settings.config.getBoolean("akka.test.FanInThroughputSpec.actor-selection")
var plot = PlotResult()
@ -85,9 +90,12 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput
super.afterAll()
}
def identifyReceiver(name: String, r: RoleName): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity](10.seconds).ref.get
def identifyReceiver(name: String, r: RoleName): Target = {
val sel = system.actorSelection(node(r) / "user" / name)
sel ! Identify(None)
val ref = expectMsgType[ActorIdentity](10.seconds).ref.get
if (actorSelection) ActorSelectionTarget(sel, ref)
else ActorRefTarget(ref)
}
val scenarios = List(
@ -146,7 +154,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput
val ignore = TestProbe()
val receivers = (1 to sendingNodes.size).map { n
identifyReceiver(receiverName + "-" + n, roles.head)
}.toArray[ActorRef]
}.toArray[Target]
val idx = roles.indexOf(myself) - 1
val receiver = receivers(idx)
@ -171,7 +179,6 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput
}
"Max throughput of fan-in" must {
pending
val reporter = BenchmarkFileReporter("FanInThroughputSpec", system)
for (s scenarios) {
s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s, reporter)

View file

@ -26,7 +26,7 @@ import akka.remote.artery.MaxThroughputSpec._
object FanOutThroughputSpec extends MultiNodeConfig {
val totalNumberOfNodes =
System.getProperty("MultiJvm.akka.test.FanOutThroughputSpec.nrOfNodes") match {
System.getProperty("akka.test.FanOutThroughputSpec.nrOfNodes") match {
case null 4
case value value.toInt
}
@ -41,6 +41,7 @@ object FanOutThroughputSpec extends MultiNodeConfig {
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.FanOutThroughputSpec.totalMessagesFactor = 10.0
akka.test.FanOutThroughputSpec.real-message = off
akka.test.FanOutThroughputSpec.actor-selection = off
"""))
.withFallback(MaxThroughputSpec.cfg)
.withFallback(RemotingMultiNodeSpec.commonConfig))
@ -61,6 +62,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp
val totalMessagesFactor = system.settings.config.getDouble("akka.test.FanOutThroughputSpec.totalMessagesFactor")
val realMessage = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.real-message")
val actorSelection = system.settings.config.getBoolean("akka.test.FanOutThroughputSpec.actor-selection")
var plot = PlotResult()
@ -85,9 +87,12 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp
super.afterAll()
}
def identifyReceiver(name: String, r: RoleName): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity](10.seconds).ref.get
def identifyReceiver(name: String, r: RoleName): Target = {
val sel = system.actorSelection(node(r) / "user" / name)
sel ! Identify(None)
val ref = expectMsgType[ActorIdentity](10.seconds).ref.get
if (actorSelection) ActorSelectionTarget(sel, ref)
else ActorRefTarget(ref)
}
val burstSize = 2000 / senderReceiverPairs
@ -143,7 +148,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp
runOn(roles.head) {
enterBarrier(receiverName + "-started")
val ignore = TestProbe()
val receivers = targetNodes.map(target identifyReceiver(receiverName, target)).toArray[ActorRef]
val receivers = targetNodes.map(target identifyReceiver(receiverName, target)).toArray[Target]
val senders = for ((target, i) targetNodes.zipWithIndex) yield {
val receiver = receivers(i)
val plotProbe = TestProbe()

View file

@ -33,6 +33,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0
akka.test.MaxThroughputSpec.real-message = off
akka.test.MaxThroughputSpec.actor-selection = off
akka {
loglevel = INFO
log-dead-letters = 10000
@ -78,8 +79,8 @@ object MaxThroughputSpec extends MultiNodeConfig {
akka.remote.default-remote-dispatcher {
fork-join-executor {
# parallelism-factor = 0.5
parallelism-min = 2
parallelism-max = 2
parallelism-min = 4
parallelism-max = 4
}
# Set to 10 by default. Might be worthwhile to experiment with.
# throughput = 100
@ -97,6 +98,19 @@ object MaxThroughputSpec extends MultiNodeConfig {
final case class EndResult(totalReceived: Long) extends JavaSerializable
final case class FlowControl(burstStartTime: Long) extends Echo
sealed trait Target {
def tell(msg: Any, sender: ActorRef): Unit
def ref: ActorRef
}
final case class ActorRefTarget(override val ref: ActorRef) extends Target {
override def tell(msg: Any, sender: ActorRef) = ref.tell(msg, sender)
}
final case class ActorSelectionTarget(sel: ActorSelection, override val ref: ActorRef) extends Target {
override def tell(msg: Any, sender: ActorRef) = sel.tell(msg, sender)
}
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher")
@ -137,11 +151,11 @@ object MaxThroughputSpec extends MultiNodeConfig {
}
}
def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef,
def senderProps(mainTarget: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef,
printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props =
Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter))
class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
class Sender(target: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
extends Actor {
val numTargets = targets.size
@ -161,7 +175,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
def receive = {
case Run
if (compressionEnabled) {
target ! Warmup(payload)
target.tell(Warmup(payload), self)
context.setReceiveTimeout(1.second)
context.become(waitingForCompression)
} else runWarmup()
@ -169,18 +183,22 @@ object MaxThroughputSpec extends MultiNodeConfig {
def waitingForCompression: Receive = {
case ReceivedActorRefCompressionTable(_, table)
if (table.dictionary.contains(target)) {
val ref = target match {
case ActorRefTarget(ref) ref
case ActorSelectionTarget(sel, _) sel.anchor
}
if (table.dictionary.contains(ref)) {
context.setReceiveTimeout(Duration.Undefined)
runWarmup()
} else
target ! Warmup(payload)
target.tell(Warmup(payload), self)
case ReceiveTimeout
target ! Warmup(payload)
target.tell(Warmup(payload), self)
}
def runWarmup(): Unit = {
sendBatch(warmup = true) // first some warmup
targets.foreach(_ ! Start(target)) // then Start, which will echo back here
targets.foreach(_.tell(Start(target.ref), self)) // then Start, which will echo back here
context.become(warmup)
}
@ -268,9 +286,9 @@ object MaxThroughputSpec extends MultiNodeConfig {
def sendFlowControl(t0: Long): Unit = {
if (remaining <= 0) {
context.become(waitingForEndResult)
targets.foreach(_ ! End)
targets.foreach(_.tell(End, self))
} else
target ! FlowControl(t0)
target.tell(FlowControl(t0), self)
}
}
@ -331,6 +349,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
val totalMessagesFactor = system.settings.config.getDouble("akka.test.MaxThroughputSpec.totalMessagesFactor")
val realMessage = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.real-message")
val actorSelection = system.settings.config.getBoolean("akka.test.MaxThroughputSpec.actor-selection")
var plot = PlotResult()
@ -355,9 +374,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
super.afterAll()
}
def identifyReceiver(name: String, r: RoleName = second): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity](10.seconds).ref.get
def identifyReceiver(name: String, r: RoleName = second): Target = {
val sel = system.actorSelection(node(r) / "user" / name)
sel ! Identify(None)
val ref = expectMsgType[ActorIdentity](10.seconds).ref.get
if (actorSelection) ActorSelectionTarget(sel, ref)
else ActorRefTarget(ref)
}
val scenarios = List(