Simplify Artery remote deployment and make inbound-lanes=4 default, #21422

* DaemonMsgCreate is not a system message. We send it over the control
  stream because remote deployment process depends on message ordering
  for DaemonMsgCreate and Watch messages. That is all good.
* We also send DaemonMsgCreate over the ordinary message stream (all
  outbound lanes) so that the first ordinary message that is sent to
  the ref does not arrive before the actor is created. This is not needed,
  since the retried resolve in the Decoder will take care of that anyway.
* Inbound lanes were not covered, but not needed.
* Then the deduplication of DaemonMsgCreate messages in  RemoteSystemDaemon
  is not needed.
* Added some more tests for these things.
* describe lanes in reference docs
This commit is contained in:
Patrik Nordwall 2017-06-13 15:53:12 +02:00
parent be8e4b0276
commit 6b41c80f9b
10 changed files with 141 additions and 60 deletions

View file

@ -629,6 +629,18 @@ together with a tutorial for a more hands-on experience. The source code of this
## Performance tuning
### Lanes
Message serialization and deserialization can be a bottleneck for remote communication. Therefore there is support for parallel inbound and outbound lanes to perform serialization and other tasks for different destination actors in parallel. Using multiple lanes is of most value for the inbound messages, since all inbound messages from all remote systems share the same inbound stream. For outbound messages there is already one stream per remote destination system, so multiple outbound lanes only add value when sending to different actors in same destination system.
The selection of lane is based on consistent hashing of the recipient ActorRef to preserve message ordering per receiver.
Note that lowest latency can be achieved with `inbound-lanes=1` and `outbound-lanes=1` because multiple lanes introduce an asynchronous boundary.
Also note that the total amount of parallel tasks are bound by the `remote-dispatcher` and the thread pool size should not exceed the number of CPU cores minus headroom for actually processing the messages in the application, i.e. in practice the the pool size should be less than half of the number of cores.
See `inbound-lanes` and `outbound-lanes` in the @ref:[reference configuration](general/configuration.md#config-akka-remote-artery) for default values.
### Dedicated subchannel for large messages
All the communication between user defined remote actors are isolated from the channel of Akka internal messages so

View file

@ -43,7 +43,7 @@ object FanInThroughputSpec extends MultiNodeConfig {
akka.test.FanInThroughputSpec.real-message = off
akka.test.FanInThroughputSpec.actor-selection = off
akka.remote.artery.advanced {
inbound-lanes = 4
# inbound-lanes = 4
}
"""))
.withFallback(MaxThroughputSpec.cfg)

View file

@ -175,7 +175,6 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp
}
"Max throughput of fan-out" must {
pending
val reporter = BenchmarkFileReporter("FanOutThroughputSpec", system)
for (s scenarios) {
s"be great for ${s.testName}, burstSize = ${s.burstSize}, payloadSize = ${s.payloadSize}" in test(s, reporter)

View file

@ -47,6 +47,8 @@ object LatencySpec extends MultiNodeConfig {
enabled = on
advanced.idle-cpu-level = 7
advanced.inbound-lanes = 1
# for serious measurements when running this test on only one machine
# it is recommended to use external media driver
# See akka-remote/src/test/resources/aeron.properties

View file

@ -71,7 +71,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
}
advanced {
inbound-lanes = 1
# inbound-lanes = 1
# buffer-pool-size = 512
}
}

View file

@ -861,22 +861,23 @@ akka {
# Level 1 strongly prefer low CPU consumption over low latency.
# Level 10 strongly prefer low latency over low CPU consumption.
idle-cpu-level = 5
# WARNING: This feature is not supported yet. Don't use other value than 1.
# It requires more hardening and performance optimizations.
# Number of outbound lanes for each outbound association. A value greater than 1
# means that serialization can be performed in parallel for different destination
# actors. The selection of lane is based on consistent hashing of the recipient
# ActorRef to preserve message ordering per receiver.
outbound-lanes = 1
# WARNING: This feature is not supported yet. Don't use other value than 1.
# It requires more hardening and performance optimizations.
# Total number of inbound lanes, shared among all inbound associations. A value
# greater than 1 means that deserialization can be performed in parallel for
# different destination actors. The selection of lane is based on consistent
# hashing of the recipient ActorRef to preserve message ordering per receiver.
inbound-lanes = 1
# Lowest latency can be achieved with inbound-lanes=1 because of one less
# asynchronous boundary.
inbound-lanes = 4
# Number of outbound lanes for each outbound association. A value greater than 1
# means that serialization and other work can be performed in parallel for different
# destination actors. The selection of lane is based on consistent hashing of the
# recipient ActorRef to preserve message ordering per receiver. Note that messages
# for different destination systems (hosts) are handled by different streams also
# when outbound-lanes=1. Lowest latency can be achieved with outbound-lanes=1
# because of one less asynchronous boundary.
outbound-lanes = 1
# Size of the send queue for outgoing messages. Messages will be dropped if
# the queue becomes full. This may happen if you send a burst of many messages

View file

@ -60,7 +60,6 @@ private[akka] class RemoteSystemDaemon(
AddressTerminatedTopic(system).subscribe(this)
private val parent2children = new ConcurrentHashMap[ActorRef, Set[ActorRef]]
private val dedupDaemonMsgCreateMessages = new ConcurrentHashMap[String, NotUsed]
private val whitelistEnabled = system.settings.config.getBoolean("akka.remote.deployment.enable-whitelist")
private val remoteDeploymentWhitelist: immutable.Set[String] = {
@ -214,38 +213,30 @@ private[akka] class RemoteSystemDaemon(
}
private def doCreateActor(message: DaemonMsg, props: Props, deploy: Deploy, path: String, supervisor: ActorRef) = {
// Artery sends multiple DaemonMsgCreate over several streams to preserve ordering assumptions,
// DaemonMsgCreate for this unique path is already handled and therefore deduplicated
if (dedupDaemonMsgCreateMessages.putIfAbsent(path, NotUsed) == null) {
// we only need to keep the dedup info for a short period
// this is not a real actor, so no point in scheduling message
system.scheduler.scheduleOnce(5.seconds)(dedupDaemonMsgCreateMessages.remove(path))(system.dispatcher)
path match {
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote"
// TODO RK currently the extracted address is just ignored, is that okay?
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val p = this.path / subpath
val childName = {
val s = subpath.mkString("/")
val i = s.indexOf('#')
if (i < 0) s
else s.substring(0, i)
}
val isTerminating = !terminating.whileOff {
val parent = supervisor.asInstanceOf[InternalActorRef]
val actor = system.provider.actorOf(system, props, parent,
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(childName, actor)
actor.sendSystemMessage(Watch(actor, this))
actor.start()
if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
case _
log.debug("remote path does not match path from message [{}]", message)
}
path match {
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote"
// TODO RK currently the extracted address is just ignored, is that okay?
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val p = this.path / subpath
val childName = {
val s = subpath.mkString("/")
val i = s.indexOf('#')
if (i < 0) s
else s.substring(0, i)
}
val isTerminating = !terminating.whileOff {
val parent = supervisor.asInstanceOf[InternalActorRef]
val actor = system.provider.actorOf(system, props, parent,
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(childName, actor)
actor.sendSystemMessage(Watch(actor, this))
actor.start()
if (addChildParentNeedsWatch(parent, actor)) parent.sendSystemMessage(Watch(parent, this))
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
case _
log.debug("remote path does not match path from message [{}]", message)
}
}

View file

@ -348,16 +348,11 @@ private[remote] class Association(
case _: DaemonMsgCreate
// DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because
// remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages.
// It must also be sent over the ordinary message stream so that it arrives (and creates the
// destination) before the first ordinary message arrives.
val outboundEnvelope1 = createOutboundEnvelope()
if (!controlQueue.offer(outboundEnvelope1))
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope1)
(0 until outboundLanes).foreach { i
val outboundEnvelope2 = createOutboundEnvelope()
if (!queues(OrdinaryQueueIndex + i).offer(outboundEnvelope2))
dropped(OrdinaryQueueIndex + i, queueSize, outboundEnvelope2)
}
// First ordinary message may arrive earlier but then the resolve in the Decoder is retried
// so that the first message can be delivered after the remote actor has been created.
val outboundEnvelope = createOutboundEnvelope()
if (!controlQueue.offer(outboundEnvelope))
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
case _
val outboundEnvelope = createOutboundEnvelope()
val queueIndex = selectQueue(recipient)

View file

@ -30,9 +30,42 @@ object RemoteDeploymentSpec {
target ! "postStop"
}
}
def parentProps(probe: ActorRef): Props =
Props(new Parent(probe))
class Parent(probe: ActorRef) extends Actor {
var target: ActorRef = context.system.deadLetters
override val supervisorStrategy = OneForOneStrategy() {
case e: Exception
probe ! e
SupervisorStrategy.stop
}
def receive = {
case p: Props
sender() ! context.actorOf(p)
case (p: Props, firstMsg: Int)
val child = context.actorOf(p)
sender() ! child
child.tell(firstMsg, sender())
}
}
class DeadOnArrival extends Actor {
throw new Exception("init-crash")
def receive = Actor.emptyBehavior
}
}
class RemoteDeploymentSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) {
class RemoteDeploymentSpec extends ArteryMultiNodeSpec(
ConfigFactory.parseString("""
akka.remote.artery.advanced.inbound-lanes = 10
akka.remote.artery.advanced.outbound-lanes = 3
""").withFallback(ArterySpecSupport.defaultConfig)) {
import RemoteDeploymentSpec._
@ -41,7 +74,10 @@ class RemoteDeploymentSpec extends ArteryMultiNodeSpec(ArterySpecSupport.default
s"""
akka.actor.deployment {
/blub.remote = "akka://${system.name}@localhost:$port"
"/parent*/*".remote = "akka://${system.name}@localhost:$port"
}
akka.remote.artery.advanced.inbound-lanes = 10
akka.remote.artery.advanced.outbound-lanes = 3
"""
val masterSystem = newRemoteSystem(name = Some("Master" + system.name), extraConfig = Some(conf))
@ -66,6 +102,48 @@ class RemoteDeploymentSpec extends ArteryMultiNodeSpec(ArterySpecSupport.default
senderProbe.expectMsg("postStop")
}
"notice immediate death" in {
val parent = masterSystem.actorOf(parentProps(testActor), "parent")
EventFilter[ActorInitializationException](occurrences = 1).intercept {
parent.tell(Props[DeadOnArrival], testActor)
val child = expectMsgType[ActorRef]
expectMsgType[ActorInitializationException]
watch(child)
expectTerminated(child)
}(masterSystem)
}
"deliver all messages" in {
val numParents = 10
val numChildren = 20
val numMessages = 5
val parents = (0 until numParents).map { i
masterSystem.actorOf(parentProps(testActor), s"parent-$i")
}.toVector
val probes = Vector.fill(numParents, numChildren)(TestProbe()(masterSystem))
val childProps = Props[Echo1]
for (p (0 until numParents); c (0 until numChildren)) {
parents(p).tell((childProps, 0), probes(p)(c).ref)
}
for (p (0 until numParents); c (0 until numChildren)) {
val probe = probes(p)(c)
val child = probe.expectMsgType[ActorRef]
// message 0 was sent by parent when child was created (stress as quick send as possible)
(1 until numMessages).foreach(n child.tell(n, probe.ref))
}
val expectedMessages = (0 until numMessages).toVector
for (p (0 until numParents); c (0 until numChildren)) {
val probe = probes(p)(c)
probe.receiveN(numMessages) should equal(expectedMessages)
}
}
}
}

View file

@ -11,7 +11,10 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import akka.actor.ActorSelection
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig)
class RemoteSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString("""
akka.remote.artery.advanced.outbound-lanes = 1
akka.remote.artery.advanced.inbound-lanes = 1
""").withFallback(ArterySpecSupport.defaultConfig))
class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
ConfigFactory.parseString("""