Batch entitiy start updates for remembered entities (#29064)

* Initial prototype for batching entity creates
This commit is contained in:
Christopher Batey 2020-05-14 11:38:09 +01:00 committed by GitHub
parent dba6eec460
commit 33226dbffc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 902 additions and 169 deletions

View file

@ -326,7 +326,7 @@ private[akka] class Shard(
case Terminated(ref) => receiveTerminated(ref)
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
case msg: ShardRegion.StartEntity => receiveStartEntity(msg)
case msg: ShardRegion.StartEntity => startEntities(Map(msg.entityId -> Some(sender())))
case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg)
case msg: ShardRegionCommand => receiveShardRegionCommand(msg)
case msg: ShardQuery => receiveShardQuery(msg)
@ -338,12 +338,18 @@ private[akka] class Shard(
def waitForAsyncWrite(entityId: EntityId, command: RememberEntitiesShardStore.Command)(
whenDone: EntityId => Unit): Unit = {
waitForAsyncWrite(Set(entityId), command)(_ => whenDone(entityId))
}
def waitForAsyncWrite(entityIds: Set[EntityId], command: RememberEntitiesShardStore.Command)(
whenDone: Set[EntityId] => Unit): Unit = {
rememberEntitiesStore match {
case None =>
whenDone(entityId)
whenDone(entityIds)
case Some(store) =>
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityId, command)
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds, command)
store ! command
timers.startSingleTimer(
RememberEntityTimeoutKey,
@ -352,35 +358,51 @@ private[akka] class Shard(
// and this could always fail before ddata store completes retrying writes
settings.tuningParameters.updatingStateTimeout)
context.become {
case RememberEntitiesShardStore.UpdateDone(`entityId`) =>
if (VerboseDebug) log.debug("Update of [{}] {} done", entityId, command)
context.become(waitingForUpdate(Map.empty))
def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = {
// none of the current impls will send back a partial update, yet!
case RememberEntitiesShardStore.UpdateDone(ids) if ids == ids =>
timers.cancel(RememberEntityTimeoutKey)
whenDone(entityId)
context.become(idle)
unstashAll()
whenDone(entityIds)
if (pendingStarts.isEmpty) {
context.become(idle)
unstashAll()
} else {
startEntities(pendingStarts)
// FIXME what if all these are already in entities? Need a become idle/unstashAll
}
case RememberEntityTimeout(`command`) =>
throw new RuntimeException(
s"Async write for entityId $entityId timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
s"Async write for entityIds $entityIds timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case msg: ShardRegion.StartEntity =>
log.error("Start entity while a write already in progress")
context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender()))))
// below cases should handle same messages as in idle
case _: Terminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntity => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender(), OptionVal.Some(entityId))
case msg =>
case _: Terminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) =>
val (id, _) = extractEntityId(msg)
if (entityIds.contains(id)) {
deliverMessage(msg, sender(), OptionVal.Some(entityIds))
} else {
stash()
context.become(waitingForUpdate(pendingStarts + (id -> None)))
}
case msg =>
// shouldn't be any other message types, but just in case
log.debug(
log.warning(
"Stashing unexpected message [{}] while waiting for remember entities update of {}",
msg.getClass,
entityId)
entityIds)
stash()
}
}
@ -410,19 +432,32 @@ private[akka] class Shard(
case RestartEntities(ids) => restartEntities(ids)
}
private def receiveStartEntity(start: ShardRegion.StartEntity): Unit = {
val requester = sender()
log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", requester, start.entityId, shardId)
touchLastMessageTimestamp(start.entityId)
// this could be because of a start message or due to a new message for the entity
// if it is a start entity then start entity ack is sent after it is created
private def startEntities(entities: Map[EntityId, Option[ActorRef]]): Unit = {
val alreadyStarted = entities.filterKeys(entity => entityIds(entity))
val needStarting = entities -- alreadyStarted.keySet
log.debug(
"Request to start entities {}. Already started {}. Need starting {}",
entities,
alreadyStarted,
needStarting)
if (entityIds(start.entityId)) {
getOrCreateEntity(start.entityId)
requester ! ShardRegion.StartEntityAck(start.entityId, shardId)
} else {
waitForAsyncWrite(start.entityId, RememberEntitiesShardStore.AddEntity(start.entityId)) { id =>
getOrCreateEntity(id)
sendMsgBuffer(id)
requester ! ShardRegion.StartEntityAck(id, shardId)
alreadyStarted.foreach {
case (entityId, requestor) =>
getOrCreateEntity(entityId)
touchLastMessageTimestamp(entityId)
requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
}
if (needStarting.nonEmpty) {
waitForAsyncWrite(needStarting.keySet, RememberEntitiesShardStore.AddEntities(needStarting.keySet)) { _ =>
needStarting.foreach {
case (entityId, requestor) =>
getOrCreateEntity(entityId)
sendMsgBuffer(entityId)
requestor.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
}
}
}
}
@ -431,9 +466,9 @@ private[akka] class Shard(
if (ack.shardId != shardId && entityIds(ack.entityId)) {
log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId)
waitForAsyncWrite(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { id =>
entityIds = entityIds - id
messageBuffers.remove(id)
waitForAsyncWrite(ack.entityId, RememberEntitiesShardStore.RemoveEntity(ack.entityId)) { _ =>
entityIds = entityIds - ack.entityId
messageBuffers.remove(ack.entityId)
}
}
}
@ -505,7 +540,7 @@ private[akka] class Shard(
context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id))
} else {
// FIXME optional wait for completion as optimization where stops are not critical
waitForAsyncWrite(id, RememberEntitiesShardStore.RemoveEntity(id))(passivateCompleted)
waitForAsyncWrite(id, RememberEntitiesShardStore.RemoveEntity(id))(_ => passivateCompleted(id))
}
}
@ -551,7 +586,9 @@ private[akka] class Shard(
entityIds = entityIds - entityId
if (hasBufferedMessages) {
log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId)
waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntity(entityId))(sendMsgBuffer)
waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntities(Set(entityId))) { _ =>
sendMsgBuffer(entityId)
}
} else {
log.debug("Entity stopped after passivation [{}]", entityId)
dropBufferFor(entityId)
@ -559,10 +596,10 @@ private[akka] class Shard(
}
/**
* @param entityIdWaitingForWrite an id for an remember entity write in progress, if non empty messages for that id
* @param entityIdsWaitingForWrite ids for remembered entities that have a write in progress, if non empty messages for that id
* will be buffered
*/
private def deliverMessage(msg: Any, snd: ActorRef, entityIdWaitingForWrite: OptionVal[EntityId]): Unit = {
private def deliverMessage(msg: Any, snd: ActorRef, entityIdsWaitingForWrite: OptionVal[Set[EntityId]]): Unit = {
val (id, payload) = extractEntityId(msg)
if (id == null || id == "") {
log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName)
@ -571,17 +608,18 @@ private[akka] class Shard(
payload match {
case start: ShardRegion.StartEntity =>
// we can only start a new entity if we are not currently waiting for another write
if (entityIdWaitingForWrite.isEmpty) receiveStartEntity(start)
if (entityIdsWaitingForWrite.isEmpty) startEntities(Map(start.entityId -> Some(sender())))
// write in progress, see waitForAsyncWrite for unstash
else stash()
case _ =>
if (messageBuffers.contains(id) || entityIdWaitingForWrite.contains(id)) {
if (messageBuffers.contains(id) || (entityIdsWaitingForWrite.isDefined && entityIdsWaitingForWrite.get
.contains(id))) {
// either:
// 1. entity is passivating, buffer until passivation complete (id in message buffers)
// 2. we are waiting for storing entity start or stop with remember entities to complete
// and want to buffer until write completes
if (VerboseDebug) {
if (entityIdWaitingForWrite.contains(id))
if (entityIdsWaitingForWrite.isDefined && entityIdsWaitingForWrite.get.contains(id))
log.debug("Buffering message [{}] to [{}] because of write in progress for it", msg.getClass, id)
else
log.debug("Buffering message [{}] to [{}] because passivation in progress for it", msg.getClass, id)
@ -614,16 +652,16 @@ private[akka] class Shard(
actor.tell(payload, snd)
} else {
entityIdWaitingForWrite match {
entityIdsWaitingForWrite match {
case OptionVal.None =>
// No actor running and no write in progress, start actor and deliver message when started
// Note; we only do this if remembering, otherwise the buffer is an overhead
if (VerboseDebug)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id)
appendToMessageBuffer(id, msg, snd)
waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer)
waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntities(Set(id)))(sendMsgBuffer)
case OptionVal.Some(`id`) =>
case OptionVal.Some(ids) if ids.contains(id) =>
// No actor running and write in progress for this particular id, buffer message for deliver when
// write completes
if (VerboseDebug)
@ -633,7 +671,7 @@ private[akka] class Shard(
id)
appendToMessageBuffer(id, msg, snd)
case OptionVal.Some(otherId) =>
case OptionVal.Some(otherIds) =>
// No actor running and write in progress for some other entity id, stash message for deliver when
// unstash happens on async write complete
if (VerboseDebug)
@ -641,7 +679,7 @@ private[akka] class Shard(
"Stashing message [{}] to [{}] because of write in progress for [{}]",
payload.getClass,
id,
otherId)
otherIds)
stash()
}
}