diff --git a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala index 8f90d0eea7..a6aefde120 100644 --- a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala +++ b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala @@ -14,7 +14,7 @@ import akka.annotation.InternalApi * INTERNAL API */ @InternalApi -private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) extends ActorFlightRecorder { +private[akka] final class JFRActorFlightRecorder() extends ActorFlightRecorder { override val delivery: DeliveryFlightRecorder = new JFRDeliveryFlightRecorder } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala index faf8956b81..0ba4d2cc18 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala @@ -4,15 +4,10 @@ package akka.actor.typed.internal -import scala.util.Failure -import scala.util.Success - import akka.actor.ActorPath -import akka.actor.typed.ActorSystem -import akka.actor.typed.Extension -import akka.actor.typed.ExtensionId +import akka.actor.typed.{ ActorSystem, Extension, ExtensionId } import akka.annotation.InternalApi -import akka.util.JavaVersion +import akka.util.FlightRecorderLoader /** * INTERNAL API @@ -21,20 +16,10 @@ import akka.util.JavaVersion object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] { override def createExtension(system: ActorSystem[_]): ActorFlightRecorder = - if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) { - // Dynamic instantiation to not trigger class load on earlier JDKs - import scala.language.existentials - system.dynamicAccess.createInstanceFor[ActorFlightRecorder]( - "akka.actor.typed.internal.jfr.JFRActorFlightRecorder", - (classOf[ActorSystem[_]], system) :: Nil) match { - case Success(jfr) => jfr - case Failure(ex) => - system.log.warn("Failed to load JFR Actor flight recorder, falling back to noop. Exception: {}", ex.toString) - NoOpActorFlightRecorder - } // fallback if not possible to dynamically load for some reason - } else - // JFR not available on Java 8 - NoOpActorFlightRecorder + FlightRecorderLoader.load[ActorFlightRecorder]( + system, + "akka.actor.typed.internal.jfr.JFRActorFlightRecorder", + NoOpActorFlightRecorder) } /** @@ -43,7 +28,6 @@ object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] { @InternalApi private[akka] trait ActorFlightRecorder extends Extension { val delivery: DeliveryFlightRecorder - } /** diff --git a/akka-actor/src/main/scala/akka/util/FlightRecorderLoader.scala b/akka-actor/src/main/scala/akka/util/FlightRecorderLoader.scala new file mode 100644 index 0000000000..4cdc4e93bf --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/FlightRecorderLoader.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.util +import akka.actor.{ ClassicActorSystemProvider, ExtendedActorSystem } +import akka.annotation.InternalApi + +import scala.reflect.ClassTag +import scala.util.{ Failure, Success } + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object FlightRecorderLoader { + def load[T: ClassTag](casp: ClassicActorSystemProvider, fqcn: String, fallback: T): T = { + val system = casp.classicSystem.asInstanceOf[ExtendedActorSystem] + if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) { + // Dynamic instantiation to not trigger class load on earlier JDKs + system.dynamicAccess.createInstanceFor[T](fqcn, Nil) match { + case Success(jfr) => + jfr + case Failure(ex) => + system.log.warning("Failed to load JFR flight recorder, falling back to noop. Exception: {}", ex.toString) + fallback + } // fallback if not possible to dynamically load for some reason + } else + // JFR not available on Java 8 + fallback + } +} diff --git a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml index e36652a969..6819d3ed3d 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml +++ b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml @@ -9,7 +9,9 @@ - + + + diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala new file mode 100644 index 0000000000..81f27cecac --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import java.nio.file.Paths +import java.util.concurrent.TimeUnit.NANOSECONDS +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor._ +import akka.cluster.MemberStatus +import akka.cluster.sharding.ShardRegion.{ CurrentShardRegionState, GetShardRegionState, Passivate } +import akka.testkit._ +import akka.util.ccompat._ +import com.typesafe.config.ConfigFactory +import org.HdrHistogram.Histogram + +import scala.concurrent.duration._ + +@ccompatUsedUntil213 +object ClusterShardingRememberEntitiesPerfSpec { + val NrRegions = 6 + // use 5 for "real" testing + val NrIterations = 2 + // use 5 for "real" testing + val NrOfMessagesFactor = 1 + + case class In(id: Long, created: Long = System.currentTimeMillis()) + case class Out(latency: Long) + + class LatencyEntity extends Actor with ActorLogging { + + override def receive: Receive = { + case In(_, created) => + sender() ! Out(System.currentTimeMillis() - created) + case _: Stop => +// log.debug("Stop received {}", self.path.name) + context.parent ! Passivate("stop") + case "stop" => +// log.debug("Final Stop received {}", self.path.name) + context.stop(self) + case msg => throw new RuntimeException("unexpected msg " + msg) + } + } + + object LatencyEntity { + + val extractEntityId: ShardRegion.ExtractEntityId = { + case in: In => (in.id.toString, in) + case msg @ Stop(id) => (id.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = _ => "0" + } + + case class Stop(id: Int) + +} + +object ClusterShardingRememberEntitiesPerfSpecConfig + extends MultiNodeClusterShardingConfig( + rememberEntities = true, + additionalConfig = s""" + akka.loglevel = DEBUG + akka.testconductor.barrier-timeout = 3 minutes + akka.remote.artery.advanced.outbound-message-queue-size = 10000 + akka.remote.artery.advanced.maximum-frame-size = 512 KiB + # comment next line to enable durable lmdb storage + akka.cluster.sharding.distributed-data.durable.keys = [] + akka.cluster.sharding { + remember-entities = on + } + """) { + + val first = role("first") + val second = role("second") + val third = role("third") + + nodeConfig(third)(ConfigFactory.parseString(s""" + akka.cluster.sharding.distributed-data.durable.lmdb { + # use same directory when starting new node on third (not used at same time) + dir = "$targetDir/sharding-third" + } + """)) + +} + +class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode1 extends ClusterShardingRememberEntitiesPerfSpec +class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardingRememberEntitiesPerfSpec +class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec + +abstract class ClusterShardingRememberEntitiesPerfSpec + extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig) + with ImplicitSender { + import ClusterShardingRememberEntitiesPerfSpec._ + import ClusterShardingRememberEntitiesPerfSpecConfig._ + + import ClusterShardingRememberEntitiesPerfSpec._ + + def startSharding(): Unit = { + (1 to NrRegions).foreach { n => + startSharding( + system, + typeName = s"EntityLatency$n", + entityProps = Props(new LatencyEntity()), + extractEntityId = LatencyEntity.extractEntityId, + extractShardId = LatencyEntity.extractShardId) + } + + } + + var latencyRegions = Vector.empty[ActorRef] + + val latencyCount = new AtomicInteger(0) + + override protected def atStartup(): Unit = { + super.atStartup() + join(first, first) + + startSharding() + + // this will make it run on first + runOn(first) { + latencyRegions = (1 to NrRegions).map { n => + val region = ClusterSharding(system).shardRegion(s"EntityLatency$n") + region ! In(0) + expectMsgType[Out] + region + }.toVector + } + enterBarrier("allocated-on-first") + + join(second, first) + join(third, first) + + within(20.seconds) { + awaitAssert { + cluster.state.members.size should ===(3) + cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + + enterBarrier("all-up") + } + + "Cluster sharding with remember entities performance" must { + + val percentiles = List(99.9, 99.0, 95.0, 50.0) + + def runBench(name: String)(logic: (Int, ActorRef, Histogram) => Long): Unit = { + val testRun = latencyCount.getAndIncrement() + runOn(first) { + val recording = new FlightRecording(system) + recording.start() + val region = latencyRegions(testRun) + val fullHistogram = new Histogram(10L * 1000L, 2) + val throughputs = (1 to NrIterations).map { iteration => + val histogram: Histogram = new Histogram(10L * 1000L, 2) + val startTime = System.nanoTime() + val numberOfMessages = logic(iteration, region, histogram) + val took = NANOSECONDS.toMillis(System.nanoTime - startTime) + val throughput = numberOfMessages * 1000.0 / took + println( + s"### Test [${name}] stop with $numberOfMessages took $took ms, " + + f"throughput $throughput%,.0f msg/s") + // println("Iteration Latencies: ") + // histogram.outputPercentileDistribution(System.out, 1.0) + fullHistogram.add(histogram) + throughput + } + println(f"Average throughput: ${throughputs.sum / NrIterations}%,.0f msg/s") + println("Combined latency figures:") + println(s"total ${fullHistogram.getTotalCount} max ${fullHistogram.getMaxValue} ${percentiles + .map(p => s"$p% ${fullHistogram.getValueAtPercentile(p)}ms") + .mkString(" ")}") + recording.endAndDump(Paths.get("target", s"${name.replace(" ", "-")}.jfr")) + } + enterBarrier(s"after-start-stop-${testRun}") + } + + "test when starting new entity" in { + val numberOfMessages = 200 * NrOfMessagesFactor + runBench("start new entities") { (iteration, region, histogram) => + (1 to numberOfMessages).foreach { n => + region ! In(iteration * 100000 + n) + } + for (_ <- 1 to numberOfMessages) { + histogram.recordValue(expectMsgType[Out].latency) + } + numberOfMessages + } + } + + "test latency when starting new entity and sending a few messages" in { + val numberOfMessages = 800 * NrOfMessagesFactor + runBench("start, few messages") { (iteration, region, histogram) => + for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) { + region ! In(iteration * 100000 + n) + } + for (_ <- 1 to numberOfMessages) { + histogram.recordValue(expectMsgType[Out].latency) + } + numberOfMessages + } + } + + "test latency when starting new entity and sending a few messages to it and stopping" in { + val numberOfMessages = 800 * NrOfMessagesFactor + // 160 entities, and an extra one for the intialization + // all but the first one are not removed + runBench("start, few messages, stop") { (iteration, region, histogram) => + for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) { + val id = iteration * 100000 + n + region ! In(id) + if (m == 5) { + region ! Stop(id) + } + } + for (_ <- 1 to numberOfMessages) { + try { + histogram.recordValue(expectMsgType[Out].latency) + } catch { + case e: AssertionError => + log.error(s"Received ${histogram.getTotalCount} out of $numberOfMessages") + throw e + } + } + + awaitAssert({ + region ! GetShardRegionState + val stats = expectMsgType[CurrentShardRegionState] + stats.shards.head.shardId shouldEqual "0" + stats.shards.head.entityIds.toList.sorted shouldEqual List("0") // the init entity + }, 2.seconds) + + numberOfMessages + } + } + + "test latency when starting, few messages, stopping, few messages" in { + val numberOfMessages = 800 * NrOfMessagesFactor + runBench("start, few messages, stop, few messages") { (iteration, region, histogram) => + for (n <- 1 to numberOfMessages / 5; m <- 1 to 5) { + val id = iteration * 100000 + n + region ! In(id) + if (m == 2) { + region ! Stop(id) + } + } + for (_ <- 1 to numberOfMessages) { + try { + histogram.recordValue(expectMsgType[Out].latency) + } catch { + case e: AssertionError => + log.error(s"Received ${histogram.getTotalCount} out of $numberOfMessages") + throw e + } + } + numberOfMessages + } + } + + "test when starting some new entities mixed with sending to started" in { + runBench("starting mixed with sending to started") { (iteration, region, histogram) => + val numberOfMessages = 1600 * NrOfMessagesFactor + (1 to numberOfMessages).foreach { n => + val msg = + if (n % 20 == 0) + -(iteration * 100000 + n) // unique, will start new entity + else + iteration * 100000 + (n % 10) // these will go to same 10 started entities + region ! In(msg) + + if (n == 10) { + for (_ <- 1 to 10) { + histogram.recordValue(expectMsgType[Out].latency) + } + } + } + for (_ <- 1 to numberOfMessages - 10) { + histogram.recordValue(expectMsgType[Out].latency) + } + numberOfMessages + } + } + + "test sending to started" in { + runBench("sending to started") { (iteration, region, histogram) => + val numberOfMessages = 1600 * NrOfMessagesFactor + (1 to numberOfMessages).foreach { n => + region ! In(iteration * 100000 + (n % 10)) // these will go to same 10 started entities + + if (n == 10) { + for (_ <- 1 to 10) { + histogram.recordValue(expectMsgType[Out].latency) + } + } + } + for (_ <- 1 to numberOfMessages - 10) { + histogram.recordValue(expectMsgType[Out].latency) + } + numberOfMessages + } + } + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/FlightRecording.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/FlightRecording.scala new file mode 100644 index 0000000000..733ce73017 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/FlightRecording.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding +import java.nio.file.Path + +import akka.actor.{ ActorSystem, ExtendedActorSystem } + +/** + * This will work on JDK11 and JDK8 built with the enable-jfr flag (8u262+). + * + * For Akka JRF recordings you may need to run a publish for multi jvm tests + * to get the ComileJDK9 things compiled. + */ +class FlightRecording(system: ActorSystem) { + + private val dynamic = system.asInstanceOf[ExtendedActorSystem].dynamicAccess + private val recording = + dynamic.createInstanceFor[AnyRef]("jdk.jfr.Recording", Nil).toOption + private val clazz = recording.map(_.getClass) + private val startMethod = clazz.map(_.getDeclaredMethod("start")) + private val stopMethod = clazz.map(_.getDeclaredMethod("stop")) + private val dumpMethod = clazz.map(_.getDeclaredMethod("dump", classOf[Path])) + + def start() = { + for { + r <- recording + m <- startMethod + } yield m.invoke(r) + } + + def endAndDump(location: Path) = { + for { + r <- recording + stop <- stopMethod + dump <- dumpMethod + } yield { + stop.invoke(r) + dump.invoke(r, location) + } + } +} diff --git a/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/Events.scala b/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/Events.scala new file mode 100644 index 0000000000..d6662a75c1 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/Events.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal.jfr +import akka.annotation.InternalApi +import jdk.jfr.{ Category, Enabled, Event, Label, StackTrace, Timespan } + +// requires jdk9+ to compile +// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module + +/** INTERNAL API */ + +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Operation") +final class RememberEntityWrite(@Timespan(Timespan.NANOSECONDS) val timeTaken: Long) extends Event + +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Add") +final class RememberEntityAdd(val entityId: String) extends Event + +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Sharding", "Shard")) @Label("Remember Entity Remove") +final class RememberEntityRemove(val entityId: String) extends Event + +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Sharding", "Shard")) @Label("Passivate") +final class Passivate(val entityId: String) extends Event + +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Sharding", "Shard")) @Label("Passivate Restart") +final class PassivateRestart(val entityId: String) extends Event diff --git a/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/JFRShardingFlightRecorder.scala b/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/JFRShardingFlightRecorder.scala new file mode 100644 index 0000000000..1153f63661 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala-jdk-9/akka/cluster/sharding/internal/jfr/JFRShardingFlightRecorder.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal.jfr + +import akka.cluster.sharding.ShardingFlightRecorder + +class JFRShardingFlightRecorder extends ShardingFlightRecorder { + override def rememberEntityOperation(duration: Long): Unit = + new RememberEntityWrite(duration).commit() + override def rememberEntityAdd(entityId: String): Unit = + new RememberEntityAdd(entityId).commit() + override def rememberEntityRemove(entityId: String): Unit = + new RememberEntityRemove(entityId).commit() + override def entityPassivate(entityId: String): Unit = + new Passivate(entityId).commit() + override def entityPassivateRestart(entityId: String): Unit = + new PassivateRestart(entityId).commit() +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index a703952319..c2505c045f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -114,6 +114,8 @@ private[akka] object Shard { // should it go in settings perhaps, useful for tricky sharding bugs? final val VerboseDebug = true + final case class StartEntityInternal(id: EntityId) + sealed trait EntityState /** @@ -277,6 +279,8 @@ private[akka] class Shard( } } + private val flightRecorder = ShardingFlightRecorder(context.system) + private val entities = new Entities(log) private var lastMessageTimestamp = Map.empty[EntityId, Long] @@ -403,7 +407,7 @@ private[akka] class Shard( if (ids.nonEmpty) { entities.alreadyRemembered(ids) restartRememberedEntities(ids) - } else {} + } context.parent ! ShardInitialized(shardId) context.become(idle) unstashAll() @@ -434,6 +438,7 @@ private[akka] class Shard( case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) case msg: ShardRegion.StartEntity => startEntities(Map(msg.entityId -> Some(sender()))) case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg) + case msg: StartEntityInternal => startEntities(Map(msg.id -> None)) case msg: ShardRegionCommand => receiveShardRegionCommand(msg) case msg: ShardQuery => receiveShardQuery(msg) case PassivateIdleTick => passivateIdleEntities() @@ -442,90 +447,105 @@ private[akka] class Shard( case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender(), OptionVal.None) } - def sendToRememberStore(entityId: EntityId, command: RememberEntitiesShardStore.Command)( - whenDone: () => Unit): Unit = { - sendToRememberStore(Set(entityId), command)(() => whenDone()) - } - - def sendToRememberStore(entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)( - whenDone: () => Unit): Unit = { + def rememberRemove(entityId: String)(whenDone: () => Unit): Unit = { rememberEntitiesStore match { case None => whenDone() - case Some(store) => - if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command) - - entityIds.foreach(entities.remembering) - store ! command - timers.startSingleTimer( - RememberEntityTimeoutKey, - RememberEntityTimeout(command), - // FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times - // and this could always fail before ddata store completes retrying writes - settings.tuningParameters.updatingStateTimeout) - - context.become(waitingForUpdate(Map.empty)) - - def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = { - // none of the current impls will send back a partial update, yet! - case RememberEntitiesShardStore.UpdateDone(ids) => - if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", ")) - timers.cancel(RememberEntityTimeoutKey) - whenDone() - if (pendingStarts.isEmpty) { - if (VerboseDebug) log.debug("No pending entities, going to idle") - context.become(idle) - unstashAll() - } else { - if (VerboseDebug) - log.debug( - "New entities encountered while waiting starting those: [{}]", - pendingStarts.keys.mkString(", ")) - startEntities(pendingStarts) - } - case RememberEntityTimeout(`command`) => - throw new RuntimeException( - s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") - case msg: ShardRegion.StartEntity => - if (VerboseDebug) - log.debug( - "Start entity while a write already in progress. Pending writes [{}]. Writes in progress [{}]", - pendingStarts.keys.mkString(", "), - entityIds.mkString(", ")) - if (!entities.entityIdExists(msg.entityId)) - context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender())))) - - // below cases should handle same messages as in idle - case _: Terminated => stash() - case _: EntityTerminated => stash() - case _: CoordinatorMessage => stash() - case _: RememberEntityCommand => stash() - case _: ShardRegion.StartEntityAck => stash() - case _: ShardRegionCommand => stash() - case msg: ShardQuery => receiveShardQuery(msg) - case PassivateIdleTick => stash() - case msg: LeaseLost => receiveLeaseLost(msg) - case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) - case msg if extractEntityId.isDefinedAt(msg) => - // FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage? - val (id, _) = extractEntityId(msg) - if (entities.entityIdExists(id)) { - if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id) - deliverMessage(msg, sender(), OptionVal.Some(entityIds)) - } else { - if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id) - appendToMessageBuffer(id, msg, sender()) - context.become(waitingForUpdate(pendingStarts + (id -> None))) - } - case msg => - // shouldn't be any other message types, but just in case - log.warning( - "Stashing unexpected message [{}] while waiting for remember entities update of [{}]", - msg.getClass, - entityIds.mkString(", ")) - stash() + flightRecorder.rememberEntityRemove(entityId) + sendToRememberStore(store, Set(entityId), RememberEntitiesShardStore.RemoveEntity(entityId))(whenDone) + } + } + def rememberAdd(entityIds: Set[EntityId])(whenDone: () => Unit): Unit = { + rememberEntitiesStore match { + case None => + whenDone() + case Some(store) => + entityIds.foreach { id => + entities.remembering(id) + flightRecorder.rememberEntityAdd(id) } + sendToRememberStore(store, entityIds, RememberEntitiesShardStore.AddEntities(entityIds))(whenDone) + } + } + + /** + * The whenDone callback should not call become either directly or by calling this method again + * as it uses become. + */ + def sendToRememberStore(store: ActorRef, entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)( + whenDone: () => Unit): Unit = { + if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command) + val startTime = System.nanoTime() + store ! command + timers.startSingleTimer( + RememberEntityTimeoutKey, + RememberEntityTimeout(command), + // FIXME this timeout needs to match the timeout used in the ddata shard write since that tries 3 times + // and this could always fail before ddata store completes retrying writes + settings.tuningParameters.updatingStateTimeout) + + context.become(waitingForUpdate(Map.empty)) + + def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = { + // none of the current impls will send back a partial update, yet! + case RememberEntitiesShardStore.UpdateDone(ids) => + val duration = System.nanoTime() - startTime + if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", ")) + flightRecorder.rememberEntityOperation(duration) + timers.cancel(RememberEntityTimeoutKey) + whenDone() + if (pendingStarts.isEmpty) { + if (VerboseDebug) log.debug("No pending entities, going to idle") + unstashAll() + context.become(idle) + } else { + if (VerboseDebug) + log.debug("New entities encountered while waiting starting those: [{}]", pendingStarts.keys.mkString(", ")) + startEntities(pendingStarts) + } + case RememberEntityTimeout(`command`) => + throw new RuntimeException( + s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") + case msg: ShardRegion.StartEntity => + if (VerboseDebug) + log.debug( + "Start entity while a write already in progress. Pending writes {}. Writes in progress {}", + pendingStarts, + entityIds) + if (!entities.entityIdExists(msg.entityId)) + context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender())))) + + // below cases should handle same messages as in idle + case _: Terminated => stash() + case _: EntityTerminated => stash() + case _: CoordinatorMessage => stash() + case _: RememberEntityCommand => stash() + case _: ShardRegion.StartEntityAck => stash() + case _: StartEntityInternal => stash() + case _: ShardRegionCommand => stash() + case msg: ShardQuery => receiveShardQuery(msg) + case PassivateIdleTick => stash() + case msg: LeaseLost => receiveLeaseLost(msg) + case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) + case msg if extractEntityId.isDefinedAt(msg) => + // FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage? + val (id, _) = extractEntityId(msg) + if (entities.entityIdExists(id)) { + if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id) + deliverMessage(msg, sender(), OptionVal.Some(entityIds)) + } else { + if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id) + appendToMessageBuffer(id, msg, sender()) + context.become(waitingForUpdate(pendingStarts + (id -> None))) + } + case msg => + // shouldn't be any other message types, but just in case + log.warning( + "Stashing unexpected message [{}] while waiting for remember entities update of [{}]", + msg.getClass, + entityIds.mkString(", ")) + stash() } } @@ -572,7 +592,7 @@ private[akka] class Shard( } if (needStarting.nonEmpty) { - sendToRememberStore(needStarting.keySet, RememberEntitiesShardStore.AddEntities(needStarting.keySet)) { () => + rememberAdd(needStarting.keySet) { () => needStarting.foreach { case (entityId, requestor) => getOrCreateEntity(entityId) @@ -587,7 +607,7 @@ private[akka] class Shard( if (ack.shardId != shardId && entities.entityIdExists(ack.entityId)) { log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId) - sendToRememberStore(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { () => + rememberRemove(ack.entityId) { () => entities.removeEntity(ack.entityId) messageBuffers.remove(ack.entityId) } @@ -658,6 +678,7 @@ private[akka] class Shard( // Note; because we're not persisting the EntityStopped, we don't need // to persist the EntityStarted either. log.debug("Starting entity [{}] again, there are buffered messages for it", id) + flightRecorder.entityPassivateRestart(id) // this will re-add the entity back sendMsgBuffer(id) } else { @@ -668,7 +689,7 @@ private[akka] class Shard( context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id)) } else { // FIXME optional wait for completion as optimization where stops are not critical - sendToRememberStore(id, RememberEntitiesShardStore.RemoveEntity(id))(() => passivateCompleted(id)) + rememberRemove(id)(() => passivateCompleted(id)) } } } @@ -683,6 +704,7 @@ private[akka] class Shard( entities.entityPassivating(id) messageBuffers.add(id) entity ! stopMessage + flightRecorder.entityPassivate(id) } else { log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity", entity) } @@ -714,9 +736,9 @@ private[akka] class Shard( entities.removeEntity(entityId) if (hasBufferedMessages) { log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId) - sendToRememberStore(entityId, RememberEntitiesShardStore.AddEntities(Set(entityId))) { () => - sendMsgBuffer(entityId) - } + // in case we're already writing to the remember store + flightRecorder.entityPassivateRestart(entityId) + self ! StartEntityInternal(entityId) } else { log.debug("Entity stopped after passivation [{}]", entityId) dropBufferFor(entityId) @@ -755,7 +777,6 @@ private[akka] class Shard( "Delivering message of type [{}] to [{}] (starting because known but not running)", payload.getClass, id) - val actor = getOrCreateEntity(id) touchLastMessageTimestamp(id) actor.tell(payload, snd) @@ -764,7 +785,7 @@ private[akka] class Shard( if (VerboseDebug) log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id) appendToMessageBuffer(id, msg, snd) - sendToRememberStore(id, RememberEntitiesShardStore.AddEntities(Set(id)))(() => sendMsgBuffer(id)) + rememberAdd(Set(id))(() => sendMsgBuffer(id)) case OptionVal.None => // No actor running and write in progress for some other entity id, stash message for deliver when // unstash happens on async write complete diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingFlightRecorder.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingFlightRecorder.scala new file mode 100644 index 0000000000..3c0c52cab3 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingFlightRecorder.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding +import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.annotation.InternalApi +import akka.util.FlightRecorderLoader + +/** + * INTERNAL API + */ +@InternalApi +object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider { + + override def lookup(): ExtensionId[_ <: Extension] = this + + override def createExtension(system: ExtendedActorSystem): ShardingFlightRecorder = + FlightRecorderLoader.load[ShardingFlightRecorder]( + system, + "akka.cluster.sharding.internal.jfr.JFRShardingFlightRecorder", + NoOpShardingFlightRecorder) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] trait ShardingFlightRecorder extends Extension { + def rememberEntityOperation(duration: Long): Unit + def rememberEntityAdd(entityId: String): Unit + def rememberEntityRemove(entityId: String): Unit + def entityPassivate(entityId: String): Unit + def entityPassivateRestart(entityId: String): Unit +} + +/** + * INTERNAL + */ +@InternalApi +private[akka] case object NoOpShardingFlightRecorder extends ShardingFlightRecorder { + override def rememberEntityOperation(duration: Long): Unit = () + override def rememberEntityAdd(entityId: String): Unit = () + override def rememberEntityRemove(entityId: String): Unit = () + override def entityPassivate(entityId: String): Unit = () + override def entityPassivateRestart(entityId: String): Unit = () +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala index 240c1adac8..8b13789179 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesPerfSpec.scala @@ -1,223 +1 @@ -/* - * Copyright (C) 2019-2020 Lightbend Inc. - */ -package akka.cluster.sharding - -import java.util.concurrent.TimeUnit.NANOSECONDS - -import scala.concurrent.duration._ - -import com.typesafe.config.ConfigFactory - -import akka.actor._ -import akka.cluster.MemberStatus -import akka.testkit._ -import akka.util.ccompat._ - -@ccompatUsedUntil213 -object ClusterShardingRememberEntitiesPerfSpec { - - def props(): Props = Props(new TestEntity) - - class TestEntity extends Actor with ActorLogging { - - log.debug("Started TestEntity: {}", self) - - def receive = { - case m => sender() ! m - } - } - - val extractEntityId: ShardRegion.ExtractEntityId = { - case id: Int => (id.toString, id) - } - - val extractShardId: ShardRegion.ExtractShardId = { - case _: Int => "0" // only one shard - case ShardRegion.StartEntity(_) => "0" - } - -} - -object ClusterShardingRememberEntitiesPerfSpecConfig - extends MultiNodeClusterShardingConfig( - rememberEntities = true, - additionalConfig = s""" - akka.testconductor.barrier-timeout = 3 minutes - akka.remote.artery.advanced.outbound-message-queue-size = 10000 - akka.remote.artery.advanced.maximum-frame-size = 512 KiB - # comment next line to enable durable lmdb storage - akka.cluster.sharding.distributed-data.durable.keys = [] - akka.cluster.sharding { - remember-entities = on - } - """) { - - val first = role("first") - val second = role("second") - val third = role("third") - - nodeConfig(third)(ConfigFactory.parseString(s""" - akka.cluster.sharding.distributed-data.durable.lmdb { - # use same directory when starting new node on third (not used at same time) - dir = "$targetDir/sharding-third" - } - """)) -} - -class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode1 extends ClusterShardingRememberEntitiesPerfSpec -class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardingRememberEntitiesPerfSpec -class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec - -abstract class ClusterShardingRememberEntitiesPerfSpec - extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig) - with ImplicitSender { - import ClusterShardingRememberEntitiesPerfSpec._ - import ClusterShardingRememberEntitiesPerfSpecConfig._ - - def startSharding(): Unit = { - (1 to 3).foreach { n => - startSharding( - system, - typeName = s"Entity$n", - entityProps = ClusterShardingRememberEntitiesPerfSpec.props(), - extractEntityId = extractEntityId, - extractShardId = extractShardId) - } - } - - lazy val region1 = ClusterSharding(system).shardRegion("Entity1") - lazy val region2 = ClusterSharding(system).shardRegion("Entity2") - lazy val region3 = ClusterSharding(system).shardRegion("Entity3") - - // use 5 for "real" testing - private val nrIterations = 2 - // use 5 for "real" testing - private val numberOfMessagesFactor = 1 - - s"Cluster sharding with remember entities performance" must { - - "form cluster" in within(20.seconds) { - join(first, first) - - startSharding() - - // this will make it run on first - runOn(first) { - region1 ! 0 - expectMsg(0) - region2 ! 0 - expectMsg(0) - region3 ! 0 - expectMsg(0) - } - enterBarrier("allocated-on-first") - - join(second, first) - join(third, first) - - within(remaining) { - awaitAssert { - cluster.state.members.size should ===(3) - cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up)) - } - } - - enterBarrier("all-up") - } - - "test when starting new entity" in { - runOn(first) { - val numberOfMessages = 200 * numberOfMessagesFactor - (1 to nrIterations).foreach { iteration => - val startTime = System.nanoTime() - (1 to numberOfMessages).foreach { n => - region1 ! (iteration * 100000 + n) - } - receiveN(numberOfMessages, 20.seconds) - val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughput = numberOfMessages * 1000.0 / took - println( - s"### Test1 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " + - f"throughput $throughput%,.0f msg/s") - } - } - enterBarrier("after-1") - } - - "test when starting new entity and sending a few messages to it" in { - runOn(first) { - val numberOfMessages = 800 * numberOfMessagesFactor - (1 to nrIterations).foreach { iteration => - val startTime = System.nanoTime() - for (n <- 1 to numberOfMessages / 5; _ <- 1 to 5) { - region2 ! (iteration * 100000 + n) - } - receiveN(numberOfMessages, 20.seconds) - val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughput = numberOfMessages * 1000.0 / took - println( - s"### Test2 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " + - f"throughput $throughput%,.0f msg/s") - } - } - enterBarrier("after-2") - } - - "test when starting some new entities mixed with sending to started" in { - runOn(first) { - val numberOfMessages = 1600 * numberOfMessagesFactor - (1 to nrIterations).foreach { iteration => - val startTime = System.nanoTime() - (1 to numberOfMessages).foreach { n => - val msg = - if (n % 20 == 0) - -(iteration * 100000 + n) // unique, will start new entity - else - iteration * 100000 + (n % 10) // these will go to same 10 started entities - region3 ! msg - - if (n == 10) { - // wait for the first 10 to avoid filling up stash - receiveN(10, 5.seconds) - } - } - receiveN(numberOfMessages - 10, 20.seconds) - val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughput = numberOfMessages * 1000.0 / took - println( - s"### Test3 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " + - f"throughput $throughput%,.0f msg/s") - } - } - enterBarrier("after-3") - } - - "test sending to started" in { - runOn(first) { - val numberOfMessages = 1600 * numberOfMessagesFactor - (1 to nrIterations).foreach { iteration => - var startTime = System.nanoTime() - (1 to numberOfMessages).foreach { n => - region3 ! (iteration * 100000 + (n % 10)) // these will go to same 10 started entities - - if (n == 10) { - // wait for the first 10 and then start the clock - receiveN(10, 5.seconds) - startTime = System.nanoTime() - } - } - receiveN(numberOfMessages - 10, 20.seconds) - val took = NANOSECONDS.toMillis(System.nanoTime - startTime) - val throughput = numberOfMessages * 1000.0 / took - println( - s"### Test4 with $numberOfMessages took ${(System.nanoTime() - startTime) / 1000 / 1000} ms, " + - f"throughput $throughput%,.0f msg/s") - } - } - enterBarrier("after-4") - } - } - -} diff --git a/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala index fa2c15b9aa..c3228c469c 100644 --- a/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala +++ b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala @@ -16,7 +16,7 @@ import akka.remote.artery.RemotingFlightRecorder * INTERNAL API */ @InternalApi -private[akka] final class JFRRemotingFlightRecorder(system: ExtendedActorSystem) extends RemotingFlightRecorder { +private[akka] final class JFRRemotingFlightRecorder() extends RemotingFlightRecorder { override def transportMediaDriverStarted(directoryName: String): Unit = new TransportMediaDriverStarted(directoryName).commit() diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala index cb4a1c5ff0..9363f3f7f0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala @@ -6,17 +6,10 @@ package akka.remote.artery import java.net.InetSocketAddress -import scala.util.Failure -import scala.util.Success - -import akka.actor.Address -import akka.actor.ExtendedActorSystem -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider +import akka.actor.{ Address, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi import akka.remote.UniqueAddress -import akka.util.JavaVersion +import akka.util.FlightRecorderLoader /** * INTERNAL API @@ -25,19 +18,10 @@ import akka.util.JavaVersion object RemotingFlightRecorder extends ExtensionId[RemotingFlightRecorder] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): RemotingFlightRecorder = - if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) { - // Dynamic instantiation to not trigger class load on earlier JDKs - system.dynamicAccess.createInstanceFor[RemotingFlightRecorder]( - "akka.remote.artery.jfr.JFRRemotingFlightRecorder", - (classOf[ExtendedActorSystem], system) :: Nil) match { - case Success(jfr) => jfr - case Failure(ex) => - system.log.warning("Failed to load JFR remoting flight recorder, falling back to noop. Exception: {}", ex) - NoOpRemotingFlightRecorder - } // fallback if not possible to dynamically load for some reason - } else - // JFR not available on Java 8 - NoOpRemotingFlightRecorder + FlightRecorderLoader.load[RemotingFlightRecorder]( + system, + "akka.remote.artery.jfr.JFRRemotingFlightRecorder", + NoOpRemotingFlightRecorder) override def lookup(): ExtensionId[_ <: Extension] = this } diff --git a/build.sbt b/build.sbt index df345dbc1f..ab18301e93 100644 --- a/build.sbt +++ b/build.sbt @@ -164,6 +164,7 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding") .settings(Protobuf.settings) .configs(MultiJvm) .enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams) + .enablePlugins(Jdk9) lazy val clusterTools = akkaModule("akka-cluster-tools") .dependsOn( @@ -463,7 +464,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .dependsOn( actorTyped % "compile->CompileJdk9", clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm", - clusterSharding, + clusterSharding % "compile->compile;compile->CompileJdk9;multi-jvm->multi-jvm", actorTestkitTyped % "test->test", actorTypedTests % "test->test", persistenceTyped % "test->test", diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 0c42b3a8d7..4810617e0e 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -30,6 +30,7 @@ object AkkaDisciplinePlugin extends AutoPlugin { "akka-cluster", "akka-cluster-metrics", "akka-cluster-sharding", + "akka-cluster-sharding-typed", "akka-distributed-data", "akka-persistence", "akka-persistence-tck",