Log changes to the ShardCoordinator ddata state (#27444)
The number of shards is configurable, in the order of magnitude of the number of nodes in the cluster. Logging the ActorRef for each allocated shard is useful to see on which node the shard is allocated.
This commit is contained in:
parent
1fe2659203
commit
75de45d9ef
1 changed files with 5 additions and 0 deletions
|
|
@ -329,6 +329,8 @@ object ShardCoordinator {
|
||||||
rememberEntities: Boolean = false)
|
rememberEntities: Boolean = false)
|
||||||
extends ClusterShardingSerializable {
|
extends ClusterShardingSerializable {
|
||||||
|
|
||||||
|
override def toString = s"State($shards)"
|
||||||
|
|
||||||
def withRememberEntities(enabled: Boolean): State = {
|
def withRememberEntities(enabled: Boolean): State = {
|
||||||
if (enabled)
|
if (enabled)
|
||||||
copy(rememberEntities = enabled)
|
copy(rememberEntities = enabled)
|
||||||
|
|
@ -1049,6 +1051,7 @@ class DDataShardCoordinator(
|
||||||
({
|
({
|
||||||
case g @ GetSuccess(CoordinatorStateKey, _) =>
|
case g @ GetSuccess(CoordinatorStateKey, _) =>
|
||||||
state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities)
|
state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities)
|
||||||
|
log.debug("Received initial coordinator state [{}]", state)
|
||||||
val newRemainingKeys = remainingKeys - CoordinatorStateKey
|
val newRemainingKeys = remainingKeys - CoordinatorStateKey
|
||||||
if (newRemainingKeys.isEmpty)
|
if (newRemainingKeys.isEmpty)
|
||||||
becomeWaitingForStateInitialized()
|
becomeWaitingForStateInitialized()
|
||||||
|
|
@ -1178,6 +1181,7 @@ class DDataShardCoordinator(
|
||||||
private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E => Unit): Unit = {
|
private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E => Unit): Unit = {
|
||||||
context.unbecome()
|
context.unbecome()
|
||||||
afterUpdateCallback(evt)
|
afterUpdateCallback(evt)
|
||||||
|
log.debug("New coordinator state after [{}]: [{}]", evt, state)
|
||||||
unstashGetShardHomeRequests()
|
unstashGetShardHomeRequests()
|
||||||
unstashAll()
|
unstashAll()
|
||||||
}
|
}
|
||||||
|
|
@ -1236,6 +1240,7 @@ class DDataShardCoordinator(
|
||||||
|
|
||||||
def sendCoordinatorStateUpdate(evt: DomainEvent) = {
|
def sendCoordinatorStateUpdate(evt: DomainEvent) = {
|
||||||
val s = state.updated(evt)
|
val s = state.updated(evt)
|
||||||
|
log.debug("Publishing new coordinator state [{}]", state)
|
||||||
replicator ! Update(CoordinatorStateKey, LWWRegister(selfUniqueAddress, initEmptyState), writeMajority, Some(evt)) {
|
replicator ! Update(CoordinatorStateKey, LWWRegister(selfUniqueAddress, initEmptyState), writeMajority, Some(evt)) {
|
||||||
reg =>
|
reg =>
|
||||||
reg.withValueOf(s)
|
reg.withValueOf(s)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue