Deliver buffered messages after passivation, #26957

* This problem was introduced in the optimization in PR #26878,
  and that regression has not been released.
* While waiting for the ddata update response it buffers messages
  for the entity that is stopped/started and in the case of passivation
  those buffered messages were not delivered afterwards. Therefore
  the test failed when waiting for the expected response.
This commit is contained in:
Patrik Nordwall 2019-05-20 08:37:55 +02:00
parent a2658cc8a0
commit 1ef65091f6
3 changed files with 27 additions and 6 deletions

View file

@ -430,9 +430,18 @@ private[akka] class Shard(
// EntityStopped handler
def passivateCompleted(event: EntityStopped): Unit = {
log.debug("Entity stopped after passivation [{}]", event.entityId)
val hasBufferedMessages = messageBuffers.getOrEmpty(event.entityId).nonEmpty
state = state.copy(state.entities - event.entityId)
messageBuffers.remove(event.entityId)
if (hasBufferedMessages) {
log.debug(
"Entity stopped after passivation [{}], but will be started again due to buffered messages.",
event.entityId)
processChange(EntityStarted(event.entityId))(sendMsgBuffer)
} else {
log.debug("Entity stopped after passivation [{}]", event.entityId)
messageBuffers.remove(event.entityId)
}
}
// EntityStarted handler

View file

@ -1171,8 +1171,14 @@ class DDataShardCoordinator(
unstashAll()
}
private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit =
private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = {
log.debug(
"GetShardHome [{}] request from [{}] stashed, because waiting for initial state or update of state. " +
"It will be handled afterwards.",
request.shard,
sender)
getShardHomeRequests += (sender -> request)
}
private def unstashGetShardHomeRequests(): Unit = {
getShardHomeRequests.foreach {

View file

@ -238,10 +238,16 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf
//Test the Shard passivate works after a journal failure
shard2.tell(Passivate(PoisonPill), entity21)
awaitCond({
awaitAssert {
// Note that the order between this Get message to 21 and the above Passivate to 21 is undefined.
// If this Get arrives first the reply will be Value("21", 3) and then it is retried by the
// awaitAssert.
// Also note that there is no timeout parameter on below expectMsg because messages should not
// be lost here. They should be buffered and delivered also after Passivate completed.
region ! Get("21")
expectMsgType[Value] == Value("21", 0)
}, message = "Passivating did not reset Value down to 0")
// counter reset to 0 when started again
expectMsg(Value("21", 0))
}
region ! Add("21", 1)