diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index f289b9aa77..dd6f5bcbd9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -453,7 +453,7 @@ private[akka] class Shard( whenDone() case Some(store) => - if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds, command) + if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command) entityIds.foreach(entities.remembering) store ! command @@ -469,7 +469,7 @@ private[akka] class Shard( def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = { // none of the current impls will send back a partial update, yet! case RememberEntitiesShardStore.UpdateDone(ids) => - if (VerboseDebug) log.debug("Update done for ids {}", ids) + if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", ")) timers.cancel(RememberEntityTimeoutKey) whenDone() if (pendingStarts.isEmpty) { @@ -477,7 +477,10 @@ private[akka] class Shard( context.become(idle) unstashAll() } else { - if (VerboseDebug) log.debug("New entities encountered while waiting starting those: {}", pendingStarts) + if (VerboseDebug) + log.debug( + "New entities encountered while waiting starting those: [{}]", + pendingStarts.keys.mkString(", ")) startEntities(pendingStarts) } case RememberEntityTimeout(`command`) => @@ -486,39 +489,40 @@ private[akka] class Shard( case msg: ShardRegion.StartEntity => if (VerboseDebug) log.debug( - "Start entity while a write already in progress. Pending writes {}. Writes in progress {}", - pendingStarts, - entityIds) + "Start entity while a write already in progress. Pending writes [{}]. Writes in progress [{}]", + pendingStarts.keys.mkString(", "), + entityIds.mkString(", ")) if (!entities.entityIdExists(msg.entityId)) context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender())))) // below cases should handle same messages as in idle - case _: Terminated => stash() - case _: EntityTerminated => stash() - case _: CoordinatorMessage => stash() - case _: RememberEntityCommand => stash() - case _: ShardRegion.StartEntityAck => stash() - case _: ShardRegionCommand => stash() - case msg: ShardQuery => receiveShardQuery(msg) - case PassivateIdleTick => stash() - case msg: LeaseLost => receiveLeaseLost(msg) - case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) + case _: Terminated => stash() + case _: EntityTerminated => stash() + case _: CoordinatorMessage => stash() + case _: RememberEntityCommand => stash() + case _: ShardRegion.StartEntityAck => stash() + case _: ShardRegionCommand => stash() + case msg: ShardQuery => receiveShardQuery(msg) + case PassivateIdleTick => stash() + case msg: LeaseLost => receiveLeaseLost(msg) + case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) case msg if extractEntityId.isDefinedAt(msg) => + // FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage? val (id, _) = extractEntityId(msg) if (entities.entityIdExists(id)) { - if (VerboseDebug) log.debug("Entity already known about. Try and deliver. {}", id) + if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id) deliverMessage(msg, sender(), OptionVal.Some(entityIds)) } else { - if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. {}", id) - stash() + if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id) + appendToMessageBuffer(id, msg, sender()) context.become(waitingForUpdate(pendingStarts + (id -> None))) } case msg => // shouldn't be any other message types, but just in case log.warning( - "Stashing unexpected message [{}] while waiting for remember entities update of {}", + "Stashing unexpected message [{}] while waiting for remember entities update of [{}]", msg.getClass, - entityIds) + entityIds.mkString(", ")) stash() } } @@ -553,12 +557,12 @@ private[akka] class Shard( private def startEntities(entities: Map[EntityId, Option[ActorRef]]): Unit = { val alreadyStarted = entities.filterKeys(entity => this.entities.entityIdExists(entity)) val needStarting = entities -- alreadyStarted.keySet - log.debug( - "Request to start entities {}. Already started {}. Need starting {}", - entities, - alreadyStarted, - needStarting) - + if (log.isDebugEnabled) { + log.debug( + "Request to start entities. Already started [{}]. Need starting [{}]", + alreadyStarted.keys.mkString(", "), + needStarting.keys.mkString(", ")) + } alreadyStarted.foreach { case (entityId, requestor) => getOrCreateEntity(entityId) @@ -632,6 +636,13 @@ private[akka] class Shard( private def receiveTerminated(ref: ActorRef): Unit = { if (handOffStopper.contains(ref)) context.stop(self) + else { + // workaround for watchWith not working with stash #29101 + entities.entityId(ref) match { + case OptionVal.Some(id) => entityTerminated(ref, id) + case _ => + } + } } @InternalStableApi @@ -758,11 +769,11 @@ private[akka] class Shard( // unstash happens on async write complete if (VerboseDebug) log.debug( - "Stashing message [{}] to [{}] because of write in progress for [{}]", + "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]", payload.getClass, id, entityIdsWaitingForWrite.get) - stash() + appendToMessageBuffer(id, msg, snd) } } } @@ -775,7 +786,7 @@ private[akka] class Shard( case OptionVal.None => val name = URLEncoder.encode(id, "utf-8") val a = context.actorOf(entityProps(id), name) - context.watchWith(a, EntityTerminated(id, a)) + context.watch(a) log.debug("Started entity [{}] with entity id [{}] in shard [{}]", a, id, shardId) entities.addEntity(id, a) touchLastMessageTimestamp(id) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 37a0edb364..3e01d1d32a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -800,7 +800,10 @@ abstract class ShardCoordinator( deferGetShardHomeRequest(shard, sender()) true } else if (!hasAllRegionsRegistered()) { - log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) + log.debug( + "GetShardHome [{}] request from [{}] ignored, because not all regions have registered yet.", + shard, + sender()) true } else { state.shards.get(shard) match { @@ -1328,22 +1331,28 @@ private[akka] class DDataShardCoordinator( stash() case RememberEntitiesCoordinatorStore.UpdateDone(shard) => - require(shardId.contains(shard)) - if (!waitingForStateWrite) { - log.debug("The ShardCoordinator saw remember shard start successfully written {}", evt) - if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey) - unbecomeAfterUpdate(evt, afterUpdateCallback) + if (!shardId.contains(shard)) { + log.warning( + "Saw remember entities update complete for shard id [{}], while waiting for [{}]", + shard, + shardId.getOrElse("")) } else { - log.debug( - "The ShardCoordinator saw remember shard start successfully written {}, waiting for state update", - evt) - context.become( - waitingForUpdate( - evt, - shardId, - waitingForStateWrite = true, - waitingForRememberShard = false, - afterUpdateCallback = afterUpdateCallback)) + if (!waitingForStateWrite) { + log.debug("The ShardCoordinator saw remember shard start successfully written {}", evt) + if (shardId.isDefined) timers.cancel(RememberEntitiesTimeoutKey) + unbecomeAfterUpdate(evt, afterUpdateCallback) + } else { + log.debug( + "The ShardCoordinator saw remember shard start successfully written {}, waiting for state update", + evt) + context.become( + waitingForUpdate( + evt, + shardId, + waitingForStateWrite = true, + waitingForRememberShard = false, + afterUpdateCallback = afterUpdateCallback)) + } } case RememberEntitiesCoordinatorStore.UpdateFailed(shard) => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 5582da142c..3f70540cd4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -981,7 +981,7 @@ private[akka] class ShardRegion( } def initializeShard(id: ShardId, shard: ActorRef): Unit = { - log.debug("{}: Shard was initialized {}", typeName, id) + log.debug("{}: Shard was initialized [{}]", typeName, id) startingShards -= id deliverBufferedMessages(id, shard) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index 54455bcb49..8d7f268200 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -174,7 +174,8 @@ private[akka] final class DDataRememberEntitiesShardStore( allIds: Set[EntityId], updates: Map[Set[EntityId], (Update[ORSet[EntityId]], Int)]): Receive = { case UpdateSuccess(_, Some(ids: Set[EntityId] @unchecked)) => - log.debug("The DDataShard state was successfully updated for [{}]", ids) + if (log.isDebugEnabled) + log.debug("The DDataShard state was successfully updated for [{}]", ids.mkString(", ")) val remaining = updates - ids if (remaining.isEmpty) { requestor ! RememberEntitiesShardStore.UpdateDone(allIds)