Latency benchmarks for rememered entities + JFR events (#29103)

This commit is contained in:
Christopher Batey 2020-05-28 08:35:27 +01:00 committed by GitHub
parent 870eef540a
commit 0d1237fd44
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 614 additions and 358 deletions

View file

@ -14,7 +14,7 @@ import akka.annotation.InternalApi
* INTERNAL API
*/
@InternalApi
private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) extends ActorFlightRecorder {
private[akka] final class JFRActorFlightRecorder() extends ActorFlightRecorder {
override val delivery: DeliveryFlightRecorder = new JFRDeliveryFlightRecorder
}

View file

@ -4,15 +4,10 @@
package akka.actor.typed.internal
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorPath
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.{ ActorSystem, Extension, ExtensionId }
import akka.annotation.InternalApi
import akka.util.JavaVersion
import akka.util.FlightRecorderLoader
/**
* INTERNAL API
@ -21,20 +16,10 @@ import akka.util.JavaVersion
object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] {
override def createExtension(system: ActorSystem[_]): ActorFlightRecorder =
if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) {
// Dynamic instantiation to not trigger class load on earlier JDKs
import scala.language.existentials
system.dynamicAccess.createInstanceFor[ActorFlightRecorder](
"akka.actor.typed.internal.jfr.JFRActorFlightRecorder",
(classOf[ActorSystem[_]], system) :: Nil) match {
case Success(jfr) => jfr
case Failure(ex) =>
system.log.warn("Failed to load JFR Actor flight recorder, falling back to noop. Exception: {}", ex.toString)
NoOpActorFlightRecorder
} // fallback if not possible to dynamically load for some reason
} else
// JFR not available on Java 8
NoOpActorFlightRecorder
FlightRecorderLoader.load[ActorFlightRecorder](
system,
"akka.actor.typed.internal.jfr.JFRActorFlightRecorder",
NoOpActorFlightRecorder)
}
/**
@ -43,7 +28,6 @@ object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] {
@InternalApi
private[akka] trait ActorFlightRecorder extends Extension {
val delivery: DeliveryFlightRecorder
}
/**

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.util
import akka.actor.{ ClassicActorSystemProvider, ExtendedActorSystem }
import akka.annotation.InternalApi
import scala.reflect.ClassTag
import scala.util.{ Failure, Success }
/**
* INTERNAL API
*/
@InternalApi
private[akka] object FlightRecorderLoader {
def load[T: ClassTag](casp: ClassicActorSystemProvider, fqcn: String, fallback: T): T = {
val system = casp.classicSystem.asInstanceOf[ExtendedActorSystem]
if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) {
// Dynamic instantiation to not trigger class load on earlier JDKs
system.dynamicAccess.createInstanceFor[T](fqcn, Nil) match {
case Success(jfr) =>
jfr
case Failure(ex) =>
system.log.warning("Failed to load JFR flight recorder, falling back to noop. Exception: {}", ex.toString)
fallback
} // fallback if not possible to dynamically load for some reason
} else
// JFR not available on Java 8
fallback
}
}

View file

@ -9,7 +9,9 @@
</encoder>
</appender>
<root level="INFO">
<logger name="akka.cluster.sharding" level="INFO"/>
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View file

@ -0,0 +1,307 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import java.nio.file.Paths
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.cluster.MemberStatus
import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, GetShardRegionState, Passivate }
import akka.testkit._
import akka.util.ccompat._
import com.typesafe.config.ConfigFactory
import org.HdrHistogram.Histogram
import scala.concurrent.duration._
@ccompatUsedUntil213
object ClusterShardingRememberEntitiesPerfSpec {
val NrRegions = 6
// use 5 for "real" testing
val NrIterations = 2
// use 5 for "real" testing
val NrOfMessagesFactor = 1
case class In(id: Long, created: Long = System.currentTimeMillis())
case class Out(latency: Long)
class LatencyEntity extends Actor with ActorLogging {
override def receive: Receive = {
case In(_, created) =>
sender() ! Out(System.currentTimeMillis() - created)
case _: Stop =>
// log.debug("Stop received {}", self.path.name)
context.parent ! Passivate("stop")
case "stop" =>
// log.debug("Final Stop received {}", self.path.name)
context.stop(self)
case msg => throw new RuntimeException("unexpected msg " + msg)
}
}
object LatencyEntity {
val extractEntityId: ShardRegion.ExtractEntityId = {
case in: In => (in.id.toString, in)
case msg @ Stop(id) => (id.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = _ => "0"
}
case class Stop(id: Int)
}
object ClusterShardingRememberEntitiesPerfSpecConfig
extends MultiNodeClusterShardingConfig(
rememberEntities = true,
additionalConfig = s"""
akka.loglevel = DEBUG
akka.testconductor.barrier-timeout = 3 minutes
akka.remote.artery.advanced.outbound-message-queue-size = 10000
akka.remote.artery.advanced.maximum-frame-size = 512 KiB
# comment next line to enable durable lmdb storage
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding {
remember-entities = on
}
""") {
val first = role("first")
val second = role("second")
val third = role("third")
nodeConfig(third)(ConfigFactory.parseString(s"""
akka.cluster.sharding.distributed-data.durable.lmdb {
# use same directory when starting new node on third (not used at same time)
dir = "$targetDir/sharding-third"
}
"""))
}
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode1 extends ClusterShardingRememberEntitiesPerfSpec
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardingRememberEntitiesPerfSpec
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec
abstract class ClusterShardingRememberEntitiesPerfSpec
extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig)
with ImplicitSender {
import ClusterShardingRememberEntitiesPerfSpec._
import ClusterShardingRememberEntitiesPerfSpecConfig._
import ClusterShardingRememberEntitiesPerfSpec._
def startSharding(): Unit = {
(1 to NrRegions).foreach { n =>
startSharding(
system,
typeName = s"EntityLatency$n",
entityProps = Props(new LatencyEntity()),
extractEntityId = LatencyEntity.extractEntityId,
extractShardId = LatencyEntity.extractShardId)
}
}
var latencyRegions = Vector.empty[ActorRef]
val latencyCount = new AtomicInteger(0)
override protected def atStartup(): Unit = {
super.atStartup()
join(first, first)
startSharding()
// this will make it run on first
runOn(first) {
latencyRegions = (1 to NrRegions).map { n =>
val region = ClusterSharding(system).shardRegion(s"EntityLatency$n")
region ! In(0)
expectMsgType[Out]
region
}.toVector
}
enterBarrier("allocated-on-first")
join(second, first)
join(third, first)
within(20.seconds) {
awaitAssert {
cluster.state.members.size should ===(3)
cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
}
}
enterBarrier("all-up")
}
"Cluster sharding with remember entities performance" must {
val percentiles = List(99.9, 99.0, 95.0, 50.0)
def runBench(name: String)(logic: (Int, ActorRef, Histogram) => Long): Unit = {
val testRun = latencyCount.getAndIncrement()
runOn(first) {
val recording = new FlightRecording(system)
recording.start()
val region = latencyRegions(testRun)
val fullHistogram = new Histogram(10L * 1000L, 2)
val throughputs = (1 to NrIterations).map { iteration =>
val histogram: Histogram = new Histogram(10L * 1000L, 2)
val startTime = System.nanoTime()
val numberOfMessages = logic(iteration, region, histogram)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = numberOfMessages * 1000.0 / took
println(
s"### Test [${name}] stop with $numberOfMessages took $took ms, " +
f"throughput $throughput%,.0f msg/s")
// println("Iteration Latencies: ")
// histogram.outputPercentileDistribution(System.out, 1.0)
fullHistogram.add(histogram)
throughput
}
println(f"Average throughput: ${throughputs.sum / NrIterations}%,.0f msg/s")
println("Combined latency figures:")
println(s"total ${fullHistogram.getTotalCount} max ${fullHistogram.getMaxValue} ${percentiles
.map(p => s"$p% ${fullHistogram.getValueAtPercentile(p)}ms")
.mkString(" ")}")
recording.endAndDump(Paths.get("target", s"${name.replace(" ", "-")}.jfr"))
}
enterBarrier(s"after-start-stop-${testRun}")
}
"test when starting new entity" in {
val numberOfMessages = 200 * NrOfMessagesFactor
runBench("start new entities") { (iteration, region, histogram) =>
(1 to numberOfMessages).foreach { n =>
region ! In(iteration * 100000 + n)
}
for (_ <- 1 to numberOfMessages) {
histogram.recordValue(expectMsgType[Out].latency)
}
numberOfMessages
}
}
"test latency when starting new entity and sending a few messages" in {
val numberOfMessages = 800 * NrOfMessagesFactor
runBench("start, few messages") { (iteration, region, histogram) =>
for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) {
region ! In(iteration * 100000 + n)
}
for (_ <- 1 to numberOfMessages) {
histogram.recordValue(expectMsgType[Out].latency)
}
numberOfMessages
}
}
"test latency when starting new entity and sending a few messages to it and stopping" in {
val numberOfMessages = 800 * NrOfMessagesFactor
// 160 entities, and an extra one for the intialization
// all but the first one are not removed
runBench("start, few messages, stop") { (iteration, region, histogram) =>
for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) {
val id = iteration * 100000 + n
region ! In(id)
if (m == 5) {
region ! Stop(id)
}
}
for (_ <- 1 to numberOfMessages) {
try {
histogram.recordValue(expectMsgType[Out].latency)
} catch {
case e: AssertionError =>
log.error(s"Received ${histogram.getTotalCount} out of $numberOfMessages")
throw e
}
}
awaitAssert({
region ! GetShardRegionState
val stats = expectMsgType[CurrentShardRegionState]
stats.shards.head.shardId shouldEqual "0"
stats.shards.head.entityIds.toList.sorted shouldEqual List("0") // the init entity
}, 2.seconds)
numberOfMessages
}
}
"test latency when starting, few messages, stopping, few messages" in {
val numberOfMessages = 800 * NrOfMessagesFactor
runBench("start, few messages, stop, few messages") { (iteration, region, histogram) =>
for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) {
val id = iteration * 100000 + n
region ! In(id)
if (m == 2) {
region ! Stop(id)
}
}
for (_ <- 1 to numberOfMessages) {
try {
histogram.recordValue(expectMsgType[Out].latency)
} catch {
case e: AssertionError =>
log.error(s"Received ${histogram.getTotalCount} out of $numberOfMessages")
throw e
}
}
numberOfMessages
}
}
"test when starting some new entities mixed with sending to started" in {
runBench("starting mixed with sending to started") { (iteration, region, histogram) =>
val numberOfMessages = 1600 * NrOfMessagesFactor
(1 to numberOfMessages).foreach { n =>
val msg =
if (n % 20 == 0)
-(iteration * 100000 + n) // unique, will start new entity
else
iteration * 100000 + (n % 10) // these will go to same 10 started entities
region ! In(msg)
if (n == 10) {
for (_ <- 1 to 10) {
histogram.recordValue(expectMsgType[Out].latency)
}
}
}
for (_ <- 1 to numberOfMessages - 10) {
histogram.recordValue(expectMsgType[Out].latency)
}
numberOfMessages
}
}
"test sending to started" in {
runBench("sending to started") { (iteration, region, histogram) =>
val numberOfMessages = 1600 * NrOfMessagesFactor
(1 to numberOfMessages).foreach { n =>
region ! In(iteration * 100000 + (n % 10)) // these will go to same 10 started entities
if (n == 10) {
for (_ <- 1 to 10) {
histogram.recordValue(expectMsgType[Out].latency)
}
}
}
for (_ <- 1 to numberOfMessages - 10) {
histogram.recordValue(expectMsgType[Out].latency)
}
numberOfMessages
}
}
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import java.nio.file.Path
import akka.actor.{ ActorSystem, ExtendedActorSystem }
/**
* This will work on JDK11 and JDK8 built with the enable-jfr flag (8u262+).
*
* For Akka JRF recordings you may need to run a publish for multi jvm tests
* to get the ComileJDK9 things compiled.
*/
class FlightRecording(system: ActorSystem) {
private val dynamic = system.asInstanceOf[ExtendedActorSystem].dynamicAccess
private val recording =
dynamic.createInstanceFor[AnyRef]("jdk.jfr.Recording", Nil).toOption
private val clazz = recording.map(_.getClass)
private val startMethod = clazz.map(_.getDeclaredMethod("start"))
private val stopMethod = clazz.map(_.getDeclaredMethod("stop"))
private val dumpMethod = clazz.map(_.getDeclaredMethod("dump", classOf[Path]))
def start() = {
for {
r <- recording
m <- startMethod
} yield m.invoke(r)
}
def endAndDump(location: Path) = {
for {
r <- recording
stop <- stopMethod
dump <- dumpMethod
} yield {
stop.invoke(r)
dump.invoke(r, location)
}
}
}

View file

@ -0,0 +1,37 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal.jfr
import akka.annotation.InternalApi
import jdk.jfr.{ Category, Enabled, Event, Label, StackTrace, Timespan }
// requires jdk9+ to compile
// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module
/** INTERNAL API */
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Operation")
final class RememberEntityWrite(@Timespan(Timespan.NANOSECONDS) val timeTaken: Long) extends Event
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Add")
final class RememberEntityAdd(val entityId: String) extends Event
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Remove")
final class RememberEntityRemove(val entityId: String) extends Event
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Sharding", "Shard")) @Label("Passivate")
final class Passivate(val entityId: String) extends Event
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Sharding", "Shard")) @Label("Passivate Restart")
final class PassivateRestart(val entityId: String) extends Event

View file

@ -0,0 +1,20 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal.jfr
import akka.cluster.sharding.ShardingFlightRecorder
class JFRShardingFlightRecorder extends ShardingFlightRecorder {
override def rememberEntityOperation(duration: Long): Unit =
new RememberEntityWrite(duration).commit()
override def rememberEntityAdd(entityId: String): Unit =
new RememberEntityAdd(entityId).commit()
override def rememberEntityRemove(entityId: String): Unit =
new RememberEntityRemove(entityId).commit()
override def entityPassivate(entityId: String): Unit =
new Passivate(entityId).commit()
override def entityPassivateRestart(entityId: String): Unit =
new PassivateRestart(entityId).commit()
}

View file

@ -114,6 +114,8 @@ private[akka] object Shard {
// should it go in settings perhaps, useful for tricky sharding bugs?
final val VerboseDebug = true
final case class StartEntityInternal(id: EntityId)
sealed trait EntityState
/**
@ -277,6 +279,8 @@ private[akka] class Shard(
}
}
private val flightRecorder = ShardingFlightRecorder(context.system)
private val entities = new Entities(log)
private var lastMessageTimestamp = Map.empty[EntityId, Long]
@ -403,7 +407,7 @@ private[akka] class Shard(
if (ids.nonEmpty) {
entities.alreadyRemembered(ids)
restartRememberedEntities(ids)
} else {}
}
context.parent ! ShardInitialized(shardId)
context.become(idle)
unstashAll()
@ -434,6 +438,7 @@ private[akka] class Shard(
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
case msg: ShardRegion.StartEntity => startEntities(Map(msg.entityId -> Some(sender())))
case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg)
case msg: StartEntityInternal => startEntities(Map(msg.id -> None))
case msg: ShardRegionCommand => receiveShardRegionCommand(msg)
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => passivateIdleEntities()
@ -442,90 +447,105 @@ private[akka] class Shard(
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender(), OptionVal.None)
}
def sendToRememberStore(entityId: EntityId, command: RememberEntitiesShardStore.Command)(
whenDone: () => Unit): Unit = {
sendToRememberStore(Set(entityId), command)(() => whenDone())
}
def sendToRememberStore(entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)(
whenDone: () => Unit): Unit = {
def rememberRemove(entityId: String)(whenDone: () => Unit): Unit = {
rememberEntitiesStore match {
case None =>
whenDone()
case Some(store) =>
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command)
entityIds.foreach(entities.remembering)
store ! command
timers.startSingleTimer(
RememberEntityTimeoutKey,
RememberEntityTimeout(command),
// FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times
// and this could always fail before ddata store completes retrying writes
settings.tuningParameters.updatingStateTimeout)
context.become(waitingForUpdate(Map.empty))
def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = {
// none of the current impls will send back a partial update, yet!
case RememberEntitiesShardStore.UpdateDone(ids) =>
if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", "))
timers.cancel(RememberEntityTimeoutKey)
whenDone()
if (pendingStarts.isEmpty) {
if (VerboseDebug) log.debug("No pending entities, going to idle")
context.become(idle)
unstashAll()
} else {
if (VerboseDebug)
log.debug(
"New entities encountered while waiting starting those: [{}]",
pendingStarts.keys.mkString(", "))
startEntities(pendingStarts)
}
case RememberEntityTimeout(`command`) =>
throw new RuntimeException(
s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case msg: ShardRegion.StartEntity =>
if (VerboseDebug)
log.debug(
"Start entity while a write already in progress. Pending writes [{}]. Writes in progress [{}]",
pendingStarts.keys.mkString(", "),
entityIds.mkString(", "))
if (!entities.entityIdExists(msg.entityId))
context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender()))))
// below cases should handle same messages as in idle
case _: Terminated => stash()
case _: EntityTerminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) =>
// FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage?
val (id, _) = extractEntityId(msg)
if (entities.entityIdExists(id)) {
if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id)
deliverMessage(msg, sender(), OptionVal.Some(entityIds))
} else {
if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id)
appendToMessageBuffer(id, msg, sender())
context.become(waitingForUpdate(pendingStarts + (id -> None)))
}
case msg =>
// shouldn't be any other message types, but just in case
log.warning(
"Stashing unexpected message [{}] while waiting for remember entities update of [{}]",
msg.getClass,
entityIds.mkString(", "))
stash()
flightRecorder.rememberEntityRemove(entityId)
sendToRememberStore(store, Set(entityId), RememberEntitiesShardStore.RemoveEntity(entityId))(whenDone)
}
}
def rememberAdd(entityIds: Set[EntityId])(whenDone: () => Unit): Unit = {
rememberEntitiesStore match {
case None =>
whenDone()
case Some(store) =>
entityIds.foreach { id =>
entities.remembering(id)
flightRecorder.rememberEntityAdd(id)
}
sendToRememberStore(store, entityIds, RememberEntitiesShardStore.AddEntities(entityIds))(whenDone)
}
}
/**
* The whenDone callback should not call become either directly or by calling this method again
* as it uses become.
*/
def sendToRememberStore(store: ActorRef, entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)(
whenDone: () => Unit): Unit = {
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command)
val startTime = System.nanoTime()
store ! command
timers.startSingleTimer(
RememberEntityTimeoutKey,
RememberEntityTimeout(command),
// FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times
// and this could always fail before ddata store completes retrying writes
settings.tuningParameters.updatingStateTimeout)
context.become(waitingForUpdate(Map.empty))
def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = {
// none of the current impls will send back a partial update, yet!
case RememberEntitiesShardStore.UpdateDone(ids) =>
val duration = System.nanoTime() - startTime
if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", "))
flightRecorder.rememberEntityOperation(duration)
timers.cancel(RememberEntityTimeoutKey)
whenDone()
if (pendingStarts.isEmpty) {
if (VerboseDebug) log.debug("No pending entities, going to idle")
unstashAll()
context.become(idle)
} else {
if (VerboseDebug)
log.debug("New entities encountered while waiting starting those: [{}]", pendingStarts.keys.mkString(", "))
startEntities(pendingStarts)
}
case RememberEntityTimeout(`command`) =>
throw new RuntimeException(
s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case msg: ShardRegion.StartEntity =>
if (VerboseDebug)
log.debug(
"Start entity while a write already in progress. Pending writes {}. Writes in progress {}",
pendingStarts,
entityIds)
if (!entities.entityIdExists(msg.entityId))
context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender()))))
// below cases should handle same messages as in idle
case _: Terminated => stash()
case _: EntityTerminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: StartEntityInternal => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) =>
// FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage?
val (id, _) = extractEntityId(msg)
if (entities.entityIdExists(id)) {
if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id)
deliverMessage(msg, sender(), OptionVal.Some(entityIds))
} else {
if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id)
appendToMessageBuffer(id, msg, sender())
context.become(waitingForUpdate(pendingStarts + (id -> None)))
}
case msg =>
// shouldn't be any other message types, but just in case
log.warning(
"Stashing unexpected message [{}] while waiting for remember entities update of [{}]",
msg.getClass,
entityIds.mkString(", "))
stash()
}
}
@ -572,7 +592,7 @@ private[akka] class Shard(
}
if (needStarting.nonEmpty) {
sendToRememberStore(needStarting.keySet, RememberEntitiesShardStore.AddEntities(needStarting.keySet)) { () =>
rememberAdd(needStarting.keySet) { () =>
needStarting.foreach {
case (entityId, requestor) =>
getOrCreateEntity(entityId)
@ -587,7 +607,7 @@ private[akka] class Shard(
if (ack.shardId != shardId && entities.entityIdExists(ack.entityId)) {
log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId)
sendToRememberStore(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { () =>
rememberRemove(ack.entityId) { () =>
entities.removeEntity(ack.entityId)
messageBuffers.remove(ack.entityId)
}
@ -658,6 +678,7 @@ private[akka] class Shard(
// Note; because we're not persisting the EntityStopped, we don't need
// to persist the EntityStarted either.
log.debug("Starting entity [{}] again, there are buffered messages for it", id)
flightRecorder.entityPassivateRestart(id)
// this will re-add the entity back
sendMsgBuffer(id)
} else {
@ -668,7 +689,7 @@ private[akka] class Shard(
context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id))
} else {
// FIXME optional wait for completion as optimization where stops are not critical
sendToRememberStore(id, RememberEntitiesShardStore.RemoveEntity(id))(() => passivateCompleted(id))
rememberRemove(id)(() => passivateCompleted(id))
}
}
}
@ -683,6 +704,7 @@ private[akka] class Shard(
entities.entityPassivating(id)
messageBuffers.add(id)
entity ! stopMessage
flightRecorder.entityPassivate(id)
} else {
log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity", entity)
}
@ -714,9 +736,9 @@ private[akka] class Shard(
entities.removeEntity(entityId)
if (hasBufferedMessages) {
log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId)
sendToRememberStore(entityId, RememberEntitiesShardStore.AddEntities(Set(entityId))) { () =>
sendMsgBuffer(entityId)
}
// in case we're already writing to the remember store
flightRecorder.entityPassivateRestart(entityId)
self ! StartEntityInternal(entityId)
} else {
log.debug("Entity stopped after passivation [{}]", entityId)
dropBufferFor(entityId)
@ -755,7 +777,6 @@ private[akka] class Shard(
"Delivering message of type [{}] to [{}] (starting because known but not running)",
payload.getClass,
id)
val actor = getOrCreateEntity(id)
touchLastMessageTimestamp(id)
actor.tell(payload, snd)
@ -764,7 +785,7 @@ private[akka] class Shard(
if (VerboseDebug)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id)
appendToMessageBuffer(id, msg, snd)
sendToRememberStore(id, RememberEntitiesShardStore.AddEntities(Set(id)))(() => sendMsgBuffer(id))
rememberAdd(Set(id))(() => sendMsgBuffer(id))
case OptionVal.None =>
// No actor running and write in progress for some other entity id, stash message for deliver when
// unstash happens on async write complete

View file

@ -0,0 +1,46 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.util.FlightRecorderLoader
/**
* INTERNAL API
*/
@InternalApi
object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = this
override def createExtension(system: ExtendedActorSystem): ShardingFlightRecorder =
FlightRecorderLoader.load[ShardingFlightRecorder](
system,
"akka.cluster.sharding.internal.jfr.JFRShardingFlightRecorder",
NoOpShardingFlightRecorder)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] trait ShardingFlightRecorder extends Extension {
def rememberEntityOperation(duration: Long): Unit
def rememberEntityAdd(entityId: String): Unit
def rememberEntityRemove(entityId: String): Unit
def entityPassivate(entityId: String): Unit
def entityPassivateRestart(entityId: String): Unit
}
/**
* INTERNAL
*/
@InternalApi
private[akka] case object NoOpShardingFlightRecorder extends ShardingFlightRecorder {
override def rememberEntityOperation(duration: Long): Unit = ()
override def rememberEntityAdd(entityId: String): Unit = ()
override def rememberEntityRemove(entityId: String): Unit = ()
override def entityPassivate(entityId: String): Unit = ()
override def entityPassivateRestart(entityId: String): Unit = ()
}

View file

@ -1,223 +1 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.MemberStatus
import akka.testkit._
import akka.util.ccompat._
@ccompatUsedUntil213
object ClusterShardingRememberEntitiesPerfSpec {
def props(): Props = Props(new TestEntity)
class TestEntity extends Actor with ActorLogging {
log.debug("Started TestEntity: {}", self)
def receive = {
case m => sender() ! m
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = {
case _: Int => "0" // only one shard
case ShardRegion.StartEntity(_) => "0"
}
}
object ClusterShardingRememberEntitiesPerfSpecConfig
extends MultiNodeClusterShardingConfig(
rememberEntities = true,
additionalConfig = s"""
akka.testconductor.barrier-timeout = 3 minutes
akka.remote.artery.advanced.outbound-message-queue-size = 10000
akka.remote.artery.advanced.maximum-frame-size = 512 KiB
# comment next line to enable durable lmdb storage
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding {
remember-entities = on
}
""") {
val first = role("first")
val second = role("second")
val third = role("third")
nodeConfig(third)(ConfigFactory.parseString(s"""
akka.cluster.sharding.distributed-data.durable.lmdb {
# use same directory when starting new node on third (not used at same time)
dir = "$targetDir/sharding-third"
}
"""))
}
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode1 extends ClusterShardingRememberEntitiesPerfSpec
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardingRememberEntitiesPerfSpec
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec
abstract class ClusterShardingRememberEntitiesPerfSpec
extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig)
with ImplicitSender {
import ClusterShardingRememberEntitiesPerfSpec._
import ClusterShardingRememberEntitiesPerfSpecConfig._
def startSharding(): Unit = {
(1 to 3).foreach { n =>
startSharding(
system,
typeName = s"Entity$n",
entityProps = ClusterShardingRememberEntitiesPerfSpec.props(),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
}
lazy val region1 = ClusterSharding(system).shardRegion("Entity1")
lazy val region2 = ClusterSharding(system).shardRegion("Entity2")
lazy val region3 = ClusterSharding(system).shardRegion("Entity3")
// use 5 for "real" testing
private val nrIterations = 2
// use 5 for "real" testing
private val numberOfMessagesFactor = 1
s"Cluster sharding with remember entities performance" must {
"form cluster" in within(20.seconds) {
join(first, first)
startSharding()
// this will make it run on first
runOn(first) {
region1 ! 0
expectMsg(0)
region2 ! 0
expectMsg(0)
region3 ! 0
expectMsg(0)
}
enterBarrier("allocated-on-first")
join(second, first)
join(third, first)
within(remaining) {
awaitAssert {
cluster.state.members.size should ===(3)
cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
}
}
enterBarrier("all-up")
}
"test when starting new entity" in {
runOn(first) {
val numberOfMessages = 200 * numberOfMessagesFactor
(1 to nrIterations).foreach { iteration =>
val startTime = System.nanoTime()
(1 to numberOfMessages).foreach { n =>
region1 ! (iteration * 100000 + n)
}
receiveN(numberOfMessages, 20.seconds)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = numberOfMessages * 1000.0 / took
println(
s"### Test1 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " +
f"throughput $throughput%,.0f msg/s")
}
}
enterBarrier("after-1")
}
"test when starting new entity and sending a few messages to it" in {
runOn(first) {
val numberOfMessages = 800 * numberOfMessagesFactor
(1 to nrIterations).foreach { iteration =>
val startTime = System.nanoTime()
for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) {
region2 ! (iteration * 100000 + n)
}
receiveN(numberOfMessages, 20.seconds)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = numberOfMessages * 1000.0 / took
println(
s"### Test2 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " +
f"throughput $throughput%,.0f msg/s")
}
}
enterBarrier("after-2")
}
"test when starting some new entities mixed with sending to started" in {
runOn(first) {
val numberOfMessages = 1600 * numberOfMessagesFactor
(1 to nrIterations).foreach { iteration =>
val startTime = System.nanoTime()
(1 to numberOfMessages).foreach { n =>
val msg =
if (n % 20 == 0)
-(iteration * 100000 + n) // unique, will start new entity
else
iteration * 100000 + (n % 10) // these will go to same 10 started entities
region3 ! msg
if (n == 10) {
// wait for the first 10 to avoid filling up stash
receiveN(10, 5.seconds)
}
}
receiveN(numberOfMessages - 10, 20.seconds)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = numberOfMessages * 1000.0 / took
println(
s"### Test3 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " +
f"throughput $throughput%,.0f msg/s")
}
}
enterBarrier("after-3")
}
"test sending to started" in {
runOn(first) {
val numberOfMessages = 1600 * numberOfMessagesFactor
(1 to nrIterations).foreach { iteration =>
var startTime = System.nanoTime()
(1 to numberOfMessages).foreach { n =>
region3 ! (iteration * 100000 + (n % 10)) // these will go to same 10 started entities
if (n == 10) {
// wait for the first 10 and then start the clock
receiveN(10, 5.seconds)
startTime = System.nanoTime()
}
}
receiveN(numberOfMessages - 10, 20.seconds)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = numberOfMessages * 1000.0 / took
println(
s"### Test4 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " +
f"throughput $throughput%,.0f msg/s")
}
}
enterBarrier("after-4")
}
}
}

View file

@ -16,7 +16,7 @@ import akka.remote.artery.RemotingFlightRecorder
* INTERNAL API
*/
@InternalApi
private[akka] final class JFRRemotingFlightRecorder(system: ExtendedActorSystem) extends RemotingFlightRecorder {
private[akka] final class JFRRemotingFlightRecorder() extends RemotingFlightRecorder {
override def transportMediaDriverStarted(directoryName: String): Unit =
new TransportMediaDriverStarted(directoryName).commit()

View file

@ -6,17 +6,10 @@ package akka.remote.artery
import java.net.InetSocketAddress
import scala.util.Failure
import scala.util.Success
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.{ Address, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
import akka.util.JavaVersion
import akka.util.FlightRecorderLoader
/**
* INTERNAL API
@ -25,19 +18,10 @@ import akka.util.JavaVersion
object RemotingFlightRecorder extends ExtensionId[RemotingFlightRecorder] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): RemotingFlightRecorder =
if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) {
// Dynamic instantiation to not trigger class load on earlier JDKs
system.dynamicAccess.createInstanceFor[RemotingFlightRecorder](
"akka.remote.artery.jfr.JFRRemotingFlightRecorder",
(classOf[ExtendedActorSystem], system) :: Nil) match {
case Success(jfr) => jfr
case Failure(ex) =>
system.log.warning("Failed to load JFR remoting flight recorder, falling back to noop. Exception: {}", ex)
NoOpRemotingFlightRecorder
} // fallback if not possible to dynamically load for some reason
} else
// JFR not available on Java 8
NoOpRemotingFlightRecorder
FlightRecorderLoader.load[RemotingFlightRecorder](
system,
"akka.remote.artery.jfr.JFRRemotingFlightRecorder",
NoOpRemotingFlightRecorder)
override def lookup(): ExtensionId[_ <: Extension] = this
}

View file

@ -164,6 +164,7 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding")
.settings(Protobuf.settings)
.configs(MultiJvm)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
.enablePlugins(Jdk9)
lazy val clusterTools = akkaModule("akka-cluster-tools")
.dependsOn(
@ -463,7 +464,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.dependsOn(
actorTyped % "compile->CompileJdk9",
clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm",
clusterSharding,
clusterSharding % "compile->compile;compile->CompileJdk9;multi-jvm->multi-jvm",
actorTestkitTyped % "test->test",
actorTypedTests % "test->test",
persistenceTyped % "test->test",

View file

@ -30,6 +30,7 @@ object AkkaDisciplinePlugin extends AutoPlugin {
"akka-cluster",
"akka-cluster-metrics",
"akka-cluster-sharding",
"akka-cluster-sharding-typed",
"akka-distributed-data",
"akka-persistence",
"akka-persistence-tck",