Turn verbose logging into a config flag (#29168)
This commit is contained in:
parent
5a5468dd4a
commit
224fb1592d
18 changed files with 61 additions and 43 deletions
|
|
@ -206,6 +206,10 @@ akka.cluster.sharding {
|
|||
|
||||
# The interval between retries for acquiring the lease
|
||||
lease-retry-interval = 5s
|
||||
|
||||
# Provide a higher level of details in the debug logs, often per routed message. Be careful about enabling
|
||||
# in production systems.
|
||||
verbose-debug-logging = off
|
||||
}
|
||||
# //#sharding-ext-config
|
||||
|
||||
|
|
|
|||
|
|
@ -111,10 +111,6 @@ private[akka] object Shard {
|
|||
private val RememberEntityTimeoutKey = "RememberEntityTimeout"
|
||||
final case class RememberEntityTimeout(operation: RememberEntitiesShardStore.Command)
|
||||
|
||||
// FIXME Leaving this on while we are working on the remember entities refactor
|
||||
// should it go in settings perhaps, useful for tricky sharding bugs?
|
||||
final val VerboseDebug = true
|
||||
|
||||
/**
|
||||
* State machine for an entity:
|
||||
* {{{
|
||||
|
|
@ -253,7 +249,7 @@ private[akka] object Shard {
|
|||
}
|
||||
}
|
||||
|
||||
final class Entities(log: LoggingAdapter, rememberingEntities: Boolean) {
|
||||
final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) {
|
||||
private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]()
|
||||
// needed to look up entity by reg when a Passivating is received
|
||||
private val byRef = new util.HashMap[ActorRef, EntityId]()
|
||||
|
|
@ -323,7 +319,7 @@ private[akka] object Shard {
|
|||
}
|
||||
}
|
||||
def entityPassivating(entityId: EntityId): Unit = {
|
||||
if (VerboseDebug) log.debug("[{}] passivating", entityId)
|
||||
if (verboseDebug) log.debug("[{}] passivating", entityId)
|
||||
entities.get(entityId) match {
|
||||
case wf: WithRef =>
|
||||
val state = entityState(entityId).transition(Passivating(wf.ref))
|
||||
|
|
@ -410,6 +406,8 @@ private[akka] class Shard(
|
|||
|
||||
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
|
||||
|
||||
final val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")
|
||||
|
||||
private val rememberEntitiesStore: Option[ActorRef] =
|
||||
rememberEntitiesProvider.map { provider =>
|
||||
val store = context.actorOf(provider.shardStoreProps(shardId).withDeploy(Deploy.local), "RememberEntitiesStore")
|
||||
|
|
@ -419,7 +417,7 @@ private[akka] class Shard(
|
|||
|
||||
private val flightRecorder = ShardingFlightRecorder(context.system)
|
||||
|
||||
private val entities = new Entities(log, settings.rememberEntities)
|
||||
private val entities = new Entities(log, settings.rememberEntities, verboseDebug)
|
||||
|
||||
private var lastMessageTimestamp = Map.empty[EntityId, Long]
|
||||
|
||||
|
|
@ -527,7 +525,7 @@ private[akka] class Shard(
|
|||
case RememberEntityTimeout(GetEntities) =>
|
||||
loadingEntityIdsFailed()
|
||||
case msg =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("Got msg of type [{}] from [{}] while waiting for remember entitites", msg.getClass, sender())
|
||||
stash()
|
||||
}
|
||||
|
|
@ -597,7 +595,7 @@ private[akka] class Shard(
|
|||
}
|
||||
|
||||
def sendToRememberStore(store: ActorRef, storingStarts: Set[EntityId], storingStops: Set[EntityId]): Unit = {
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Remember update [{}] and stops [{}] triggered",
|
||||
storingStarts.mkString(", "),
|
||||
|
|
@ -626,7 +624,7 @@ private[akka] class Shard(
|
|||
// none of the current impls will send back a partial update, yet!
|
||||
case RememberEntitiesShardStore.UpdateDone(storedStarts, storedStops) =>
|
||||
val duration = System.nanoTime() - startTimeNanos
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Update done for ids, started [{}], stopped [{}]. Duration {} ms",
|
||||
storedStarts.mkString(", "),
|
||||
|
|
@ -647,7 +645,7 @@ private[akka] class Shard(
|
|||
case l: LeaseLost => receiveLeaseLost(l)
|
||||
case ack: ShardRegion.StartEntityAck => receiveStartEntityAck(ack)
|
||||
case ShardRegion.Passivate(stopMessage) =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Passivation of [{}] arrived while updating",
|
||||
entities.entityId(sender()).getOrElse(s"Unknown actor ${sender()}"))
|
||||
|
|
@ -699,13 +697,13 @@ private[akka] class Shard(
|
|||
|
||||
val (pendingStarts, pendingStops) = entities.pendingRememberEntities()
|
||||
if (pendingStarts.isEmpty && pendingStops.isEmpty) {
|
||||
if (VerboseDebug) log.debug("Update complete, no pending updates, going to idle")
|
||||
if (verboseDebug) log.debug("Update complete, no pending updates, going to idle")
|
||||
unstashAll()
|
||||
context.become(idle)
|
||||
} else {
|
||||
// Note: no unstashing as long as we are batching, is that a problem?
|
||||
val pendingStartIds = pendingStarts.keySet
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Update complete, pending updates, doing another write. Starts [{}], stops [{}]",
|
||||
pendingStartIds.mkString(", "),
|
||||
|
|
@ -733,11 +731,11 @@ private[akka] class Shard(
|
|||
case RestartTerminatedEntity(entityId) =>
|
||||
entities.entityState(entityId) match {
|
||||
case WaitingForRestart =>
|
||||
if (VerboseDebug) log.debug("Restarting entity unexpectedly terminated entity [{}]", entityId)
|
||||
if (verboseDebug) log.debug("Restarting entity unexpectedly terminated entity [{}]", entityId)
|
||||
getOrCreateEntity(entityId)
|
||||
case Active(_) =>
|
||||
// it up could already have been started, that's fine
|
||||
if (VerboseDebug) log.debug("Got RestartTerminatedEntity for [{}] but it is already running")
|
||||
if (verboseDebug) log.debug("Got RestartTerminatedEntity for [{}] but it is already running")
|
||||
case other =>
|
||||
throw new IllegalStateException(
|
||||
s"Unexpected state for [$entityId] when getting RestartTerminatedEntity: [$other]")
|
||||
|
|
@ -814,7 +812,7 @@ private[akka] class Shard(
|
|||
|
||||
def receiveShardQuery(msg: ShardQuery): Unit = msg match {
|
||||
case GetCurrentShardState =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("GetCurrentShardState, full state: [{}], active: [{}]", entities, entities.activeEntityIds())
|
||||
sender() ! CurrentShardState(shardId, entities.activeEntityIds())
|
||||
case GetShardStats => sender() ! ShardStats(shardId, entities.size)
|
||||
|
|
@ -865,7 +863,7 @@ private[akka] class Shard(
|
|||
}
|
||||
entities.entityState(entityId) match {
|
||||
case RememberingStop(_) =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("Stop of [{}] arrived, already is among the pending stops", entityId)
|
||||
case Active(_) =>
|
||||
log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId)
|
||||
|
|
@ -876,7 +874,7 @@ private[akka] class Shard(
|
|||
case Passivating(_) =>
|
||||
if (entities.pendingRememberedEntitiesExist()) {
|
||||
// will go in next batch update
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops",
|
||||
entityId)
|
||||
|
|
@ -904,7 +902,7 @@ private[akka] class Shard(
|
|||
log.debug("Passivation when there are buffered messages for [{}], ignoring", id)
|
||||
// FIXME should we buffer the stop message then?
|
||||
} else {
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("Passivation started for [{}]", id)
|
||||
entities.entityPassivating(id)
|
||||
entity ! stopMessage
|
||||
|
|
@ -961,25 +959,25 @@ private[akka] class Shard(
|
|||
|
||||
// we can only start a new entity if we are not currently waiting for another write
|
||||
if (entities.pendingRememberedEntitiesExist()) {
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("StartEntity({}) from [{}], adding to batch", start.entityId, snd)
|
||||
entities.rememberingStart(entityId, ackTo = Some(snd))
|
||||
} else {
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("StartEntity({}) from [{}], starting", start.entityId, snd)
|
||||
startEntity(start.entityId, Some(sender()))
|
||||
}
|
||||
case _ =>
|
||||
entities.entityState(entityId) match {
|
||||
case Active(ref) =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("Delivering message of type [{}] to [{}]", payload.getClass, entityId)
|
||||
touchLastMessageTimestamp(entityId)
|
||||
ref.tell(payload, snd)
|
||||
case RememberingStart(_) | RememberingStop(_) | Passivating(_) =>
|
||||
appendToMessageBuffer(entityId, msg, snd)
|
||||
case state @ (WaitingForRestart | RememberedButNotCreated) =>
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Delivering message of type [{}] to [{}] (starting because [{}])",
|
||||
payload.getClass,
|
||||
|
|
@ -991,7 +989,7 @@ private[akka] class Shard(
|
|||
case NoState =>
|
||||
if (entities.pendingRememberedEntitiesExist()) {
|
||||
// No actor running and write in progress for some other entity id (can only happen with remember entities enabled)
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug(
|
||||
"Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]",
|
||||
payload.getClass,
|
||||
|
|
@ -1001,7 +999,7 @@ private[akka] class Shard(
|
|||
entities.rememberingStart(entityId, ackTo = None)
|
||||
} else {
|
||||
// No actor running and no write in progress, start actor and deliver message when started
|
||||
if (VerboseDebug)
|
||||
if (verboseDebug)
|
||||
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId)
|
||||
appendToMessageBuffer(entityId, msg, snd)
|
||||
entities.rememberingStart(entityId, ackTo = None)
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
|
|||
|akka.remote.classic.netty.tcp.port = 0
|
||||
|akka.remote.artery.canonical.port = 0
|
||||
|akka.loglevel = DEBUG
|
||||
|akka.cluster.sharding.verbose-debug-logging = on
|
||||
|akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
|
||||
|""".stripMargin) with WithLogCapturing {
|
||||
import ClusterShardingInternalsSpec._
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ object ClusterShardingLeaseSpec {
|
|||
distributed-data.durable {
|
||||
keys = []
|
||||
}
|
||||
verbose-debug-logging = on
|
||||
}
|
||||
""").withFallback(TestLease.config)
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ object ConcurrentStartupShardingSpec {
|
|||
akka.remote.artery.canonical.port = 0
|
||||
akka.log-dead-letters = off
|
||||
akka.log-dead-letters-during-shutdown = off
|
||||
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
akka.actor {
|
||||
default-dispatcher {
|
||||
executor = "fork-join-executor"
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ object CoordinatedShutdownShardingSpec {
|
|||
akka.actor.provider = "cluster"
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
"""
|
||||
|
||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||
|
|
|
|||
|
|
@ -5,15 +5,13 @@
|
|||
package akka.cluster.sharding
|
||||
import akka.actor.ActorRef
|
||||
import akka.cluster.sharding
|
||||
import akka.cluster.sharding.Shard.{
|
||||
Active,
|
||||
NoState,
|
||||
Passivating,
|
||||
PassivationComplete,
|
||||
RememberedButNotCreated,
|
||||
RememberingStart,
|
||||
RememberingStop
|
||||
}
|
||||
import akka.cluster.sharding.Shard.Active
|
||||
import akka.cluster.sharding.Shard.NoState
|
||||
import akka.cluster.sharding.Shard.Passivating
|
||||
import akka.cluster.sharding.Shard.PassivationComplete
|
||||
import akka.cluster.sharding.Shard.RememberedButNotCreated
|
||||
import akka.cluster.sharding.Shard.RememberingStart
|
||||
import akka.cluster.sharding.Shard.RememberingStop
|
||||
import akka.event.NoLogging
|
||||
import akka.util.OptionVal
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
|
@ -21,15 +19,18 @@ import org.scalatest.wordspec.AnyWordSpec
|
|||
|
||||
class EntitiesSpec extends AnyWordSpec with Matchers {
|
||||
|
||||
private def newEntities(rememberingEntities: Boolean) =
|
||||
new sharding.Shard.Entities(NoLogging, rememberingEntities = rememberingEntities, false)
|
||||
|
||||
"Entities" should {
|
||||
"start empty" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = false)
|
||||
val entities = newEntities(rememberingEntities = false)
|
||||
entities.activeEntityIds() shouldEqual Set.empty
|
||||
entities.size shouldEqual 0
|
||||
entities.activeEntities() shouldEqual Set.empty
|
||||
}
|
||||
"set already remembered entities to state RememberedButNotStarted" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
|
||||
val entities = newEntities(rememberingEntities = true)
|
||||
val ids = Set("a", "b", "c")
|
||||
entities.alreadyRemembered(ids)
|
||||
entities.activeEntities() shouldEqual Set.empty
|
||||
|
|
@ -39,7 +40,7 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
|
|||
}
|
||||
}
|
||||
"set state to remembering start" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
|
||||
val entities = newEntities(rememberingEntities = true)
|
||||
entities.rememberingStart("a", None)
|
||||
entities.entityState("a") shouldEqual RememberingStart(None)
|
||||
entities.pendingRememberedEntitiesExist() should ===(true)
|
||||
|
|
@ -53,7 +54,7 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
|
|||
entities.pendingRememberEntities()._1 should be(empty)
|
||||
}
|
||||
"set state to remembering stop" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
|
||||
val entities = newEntities(rememberingEntities = true)
|
||||
entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate
|
||||
entities.entityPassivating("a") // need to go through passivate to remember stop
|
||||
entities.rememberingStop("a", PassivationComplete)
|
||||
|
|
@ -70,7 +71,7 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
|
|||
}
|
||||
|
||||
"fully remove an entity" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, rememberingEntities = true)
|
||||
val entities = newEntities(rememberingEntities = true)
|
||||
val ref = ActorRef.noSender
|
||||
entities.addEntity("a", ref)
|
||||
entities.entityPassivating("a") // needs to go through passivating to be removed
|
||||
|
|
@ -81,19 +82,19 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
|
|||
|
||||
}
|
||||
"add an entity as active" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, false)
|
||||
val entities = newEntities(rememberingEntities = false)
|
||||
val ref = ActorRef.noSender
|
||||
entities.addEntity("a", ref)
|
||||
entities.entityState("a") shouldEqual Active(ref)
|
||||
}
|
||||
"look up actor ref by id" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, false)
|
||||
val entities = newEntities(rememberingEntities = false)
|
||||
val ref = ActorRef.noSender
|
||||
entities.addEntity("a", ref)
|
||||
entities.entityId(ref) shouldEqual OptionVal.Some("a")
|
||||
}
|
||||
"set state to passivating" in {
|
||||
val entities = new sharding.Shard.Entities(NoLogging, false)
|
||||
val entities = newEntities(rememberingEntities = false)
|
||||
val ref = ActorRef.noSender
|
||||
entities.addEntity("a", ref)
|
||||
entities.entityPassivating("a")
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ object InactiveEntityPassivationSpec {
|
|||
akka.actor.provider = "cluster"
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""")
|
||||
|
||||
val enabledConfig = ConfigFactory.parseString("""
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ class JoinConfigCompatCheckShardingSpec extends AkkaSpec() with WithLogCapturing
|
|||
akka.coordinated-shutdown.terminate-actor-system = on
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""")
|
||||
|
||||
"A Joining Node" must {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ object PersistentShardingMigrationSpec {
|
|||
|
||||
# make sure we test snapshots
|
||||
snapshot-after = 5
|
||||
|
||||
verbose-debug-logging = on
|
||||
}
|
||||
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ object PersistentStartEntitySpec {
|
|||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""".stripMargin)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ object ProxyShardingSpec {
|
|||
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
"""
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ object RememberEntitiesBatchedUpdatesSpec {
|
|||
akka.cluster.sharding.remember-entities = on
|
||||
# no leaks between test runs thank you
|
||||
akka.cluster.sharding.distributed-data.durable.keys = []
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""".stripMargin)
|
||||
}
|
||||
class RememberEntitiesBatchedUpdatesSpec
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ object RememberEntitiesFailureSpec {
|
|||
akka.cluster.sharding.shard-failure-backoff = 1s
|
||||
akka.cluster.sharding.coordinator-failure-backoff = 1s
|
||||
akka.cluster.sharding.updating-state-timeout = 1s
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""")
|
||||
|
||||
class EntityActor extends Actor with ActorLogging {
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ object RemoveInternalClusterShardingDataSpec {
|
|||
akka.cluster.sharding.snapshot-after = 5
|
||||
akka.cluster.sharding.state-store-mode = persistence
|
||||
akka.cluster.sharding.keep-nr-of-batches = 0
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
"""
|
||||
|
||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ object ShardRegionSpec {
|
|||
}
|
||||
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
|
||||
akka.cluster.jmx.enabled = off
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
"""))
|
||||
|
||||
val shardTypeName = "Caat"
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ object ShardWithLeaseSpec {
|
|||
heartbeat-timeout = 120s
|
||||
lease-operation-timeout = 3s
|
||||
}
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
"""
|
||||
|
||||
class EntityActor extends Actor with ActorLogging {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ object SupervisionSpec {
|
|||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
|
||||
akka.loglevel = DEBUG
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
""")
|
||||
|
||||
case class Msg(id: Long, msg: Any)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue