Problems with StartEntityAck, StartEntity and the persistent test running with ddata remember entities (#29161)
This commit is contained in:
parent
13aed055fd
commit
5643f7e194
2 changed files with 63 additions and 61 deletions
|
|
@ -118,7 +118,7 @@ private[akka] object Shard {
|
||||||
/**
|
/**
|
||||||
* State machine for an entity:
|
* State machine for an entity:
|
||||||
* {{{
|
* {{{
|
||||||
* Entity id remembered on shard start +-------------------------+ restart (via region)
|
* Entity id remembered on shard start +-------------------------+ restart (via region) StartEntity
|
||||||
* +--------------------------------->| RememberedButNotCreated |------------------------------+
|
* +--------------------------------->| RememberedButNotCreated |------------------------------+
|
||||||
* | +-------------------------+ |
|
* | +-------------------------+ |
|
||||||
* | | |
|
* | | |
|
||||||
|
|
@ -636,50 +636,12 @@ private[akka] class Shard(
|
||||||
log.error("Remember entity store did not respond, crashing shard")
|
log.error("Remember entity store did not respond, crashing shard")
|
||||||
throw new RuntimeException(
|
throw new RuntimeException(
|
||||||
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
|
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
|
||||||
case ShardRegion.StartEntity(entityId) =>
|
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))
|
||||||
if (!entities.entityIdExists(entityId)) {
|
case Terminated(ref) => receiveTerminated(ref)
|
||||||
if (VerboseDebug)
|
case _: CoordinatorMessage => stash()
|
||||||
log.debug(
|
case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd)
|
||||||
"StartEntity([{}]) from [{}] while a write already in progress. Marking as pending",
|
case l: LeaseLost => receiveLeaseLost(l)
|
||||||
entityId,
|
case ack: ShardRegion.StartEntityAck => receiveStartEntityAck(ack)
|
||||||
sender())
|
|
||||||
entities.rememberingStart(entityId, ackTo = Some(sender()))
|
|
||||||
} else {
|
|
||||||
// it's already running, ack immediately
|
|
||||||
sender() ! ShardRegion.StartEntityAck(entityId, shardId)
|
|
||||||
}
|
|
||||||
|
|
||||||
case Terminated(ref) => receiveTerminated(ref)
|
|
||||||
case _: CoordinatorMessage => stash()
|
|
||||||
case RestartTerminatedEntity(entityId) =>
|
|
||||||
entities.entityState(entityId) match {
|
|
||||||
case WaitingForRestart =>
|
|
||||||
if (VerboseDebug)
|
|
||||||
log.debug("Restarting terminated entity [{}]", entityId)
|
|
||||||
getOrCreateEntity(entityId)
|
|
||||||
case other =>
|
|
||||||
throw new IllegalStateException(
|
|
||||||
s"Got RestartTerminatedEntity for [$entityId] but it's not waiting to be restarted. Actual state [$other]")
|
|
||||||
}
|
|
||||||
case RestartRememberedEntities(entities) => restartEntities(entities)
|
|
||||||
case l: LeaseLost => receiveLeaseLost(l)
|
|
||||||
case ShardRegion.StartEntityAck(entityId, _) =>
|
|
||||||
if (update.started.contains(entityId)) {
|
|
||||||
// currently in progress of starting, so we'll need to stop it when that is done
|
|
||||||
entities.rememberingStop(entityId, StartedElsewhere)
|
|
||||||
} else {
|
|
||||||
entities.entityState(entityId) match {
|
|
||||||
case _: RememberingStart =>
|
|
||||||
// queued up for batched start, let's just not start it
|
|
||||||
entities.removeEntity(entityId)
|
|
||||||
case _: Active =>
|
|
||||||
// add to stop batch
|
|
||||||
entities.rememberingStop(entityId, StartedElsewhere)
|
|
||||||
case _ =>
|
|
||||||
// not sure for other states, so deal with it later
|
|
||||||
stash()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case ShardRegion.Passivate(stopMessage) =>
|
case ShardRegion.Passivate(stopMessage) =>
|
||||||
if (VerboseDebug)
|
if (VerboseDebug)
|
||||||
log.debug(
|
log.debug(
|
||||||
|
|
@ -807,12 +769,37 @@ private[akka] class Shard(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME in what scenario do we get this to the shard?
|
||||||
private def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
|
private def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
|
||||||
if (ack.shardId != shardId && entities.entityIdExists(ack.entityId)) {
|
if (ack.shardId != shardId) {
|
||||||
log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId)
|
entities.entityState(shardId) match {
|
||||||
|
case RememberingStart(_) | RememberingStop(_) =>
|
||||||
entities.rememberingStop(ack.entityId, StartedElsewhere)
|
log.debug(
|
||||||
rememberUpdate(remove = Set(ack.entityId))
|
"Entity [{}] previously owned by shard [{}] started in shard [{}] while waiting to be written, stashing for later handling",
|
||||||
|
ack.entityId,
|
||||||
|
shardId,
|
||||||
|
ack.shardId)
|
||||||
|
stash()
|
||||||
|
case Active(ref) =>
|
||||||
|
log.debug(
|
||||||
|
"Entity [{}] previously owned by shard [{}] started in shard [{}]",
|
||||||
|
ack.entityId,
|
||||||
|
shardId,
|
||||||
|
ack.shardId)
|
||||||
|
entities.entityPassivating(ack.entityId)
|
||||||
|
ref ! handOffStopMessage
|
||||||
|
case RememberedButNotCreated =>
|
||||||
|
// FIXME could be pending with the starting strategy though, what happens when that arrives?
|
||||||
|
log.debug(
|
||||||
|
"Entity [{}] (remembered but not yet created) previously owned by shard [{}] started in shard [{}]",
|
||||||
|
ack.entityId,
|
||||||
|
shardId,
|
||||||
|
ack.shardId)
|
||||||
|
entities.removeEntity(ack.entityId)
|
||||||
|
case other =>
|
||||||
|
throw new IllegalStateException(
|
||||||
|
s"Unexpected state [$other] when start entity ack of entity [${ack.entityId}] was seen from shard [${ack.shardId}]")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -109,8 +109,11 @@ object ClusterShardingSpec {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class ClusterShardingSpecConfig(mode: String, val entityRecoveryStrategy: String = "all")
|
abstract class ClusterShardingSpecConfig(
|
||||||
extends MultiNodeClusterShardingConfig(mode) {
|
mode: String,
|
||||||
|
rememberEntitiesStore: String,
|
||||||
|
val entityRecoveryStrategy: String = "all")
|
||||||
|
extends MultiNodeClusterShardingConfig(mode = mode, rememberEntitiesStore = rememberEntitiesStore) {
|
||||||
|
|
||||||
val controller = role("controller")
|
val controller = role("controller")
|
||||||
val first = role("first")
|
val first = role("first")
|
||||||
|
|
@ -205,12 +208,24 @@ object ClusterShardingDocCode {
|
||||||
}
|
}
|
||||||
|
|
||||||
object PersistentClusterShardingSpecConfig
|
object PersistentClusterShardingSpecConfig
|
||||||
extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
|
extends ClusterShardingSpecConfig(
|
||||||
object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData)
|
ClusterShardingSettings.StateStoreModePersistence,
|
||||||
|
ClusterShardingSettings.RememberEntitiesStoreEventsourced)
|
||||||
|
object DDataClusterShardingSpecConfig
|
||||||
|
extends ClusterShardingSpecConfig(
|
||||||
|
ClusterShardingSettings.StateStoreModeDData,
|
||||||
|
ClusterShardingSettings.RememberEntitiesStoreDData)
|
||||||
|
|
||||||
object PersistentClusterShardingWithEntityRecoverySpecConfig
|
object PersistentClusterShardingWithEntityRecoverySpecConfig
|
||||||
extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence, "constant")
|
extends ClusterShardingSpecConfig(
|
||||||
|
ClusterShardingSettings.StateStoreModePersistence,
|
||||||
|
ClusterShardingSettings.RememberEntitiesStoreEventsourced,
|
||||||
|
"constant")
|
||||||
object DDataClusterShardingWithEntityRecoverySpecConfig
|
object DDataClusterShardingWithEntityRecoverySpecConfig
|
||||||
extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData, "constant")
|
extends ClusterShardingSpecConfig(
|
||||||
|
ClusterShardingSettings.StateStoreModeDData,
|
||||||
|
ClusterShardingSettings.RememberEntitiesStoreDData,
|
||||||
|
"constant")
|
||||||
|
|
||||||
class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig)
|
class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig)
|
||||||
class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig)
|
class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig)
|
||||||
|
|
@ -270,7 +285,7 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
|
||||||
new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, replicator)
|
new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, replicator)
|
||||||
}
|
}
|
||||||
|
|
||||||
def persistenceRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) = {
|
def eventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) = {
|
||||||
new EventSourcedRememberEntitiesProvider(typeName, settings)
|
new EventSourcedRememberEntitiesProvider(typeName, settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -344,10 +359,10 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
|
||||||
val rememberEntitiesProvider =
|
val rememberEntitiesProvider =
|
||||||
if (!rememberEntities) None
|
if (!rememberEntities) None
|
||||||
else
|
else
|
||||||
settings.stateStoreMode match {
|
settings.rememberEntitiesStore match {
|
||||||
case ClusterShardingSettings.StateStoreModeDData => Some(ddataRememberEntitiesProvider(typeName))
|
case ClusterShardingSettings.RememberEntitiesStoreDData => Some(ddataRememberEntitiesProvider(typeName))
|
||||||
case ClusterShardingSettings.StateStoreModePersistence =>
|
case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
|
||||||
Some(persistenceRememberEntitiesProvider(typeName, settings))
|
Some(eventSourcedRememberEntitiesProvider(typeName, settings))
|
||||||
}
|
}
|
||||||
|
|
||||||
system.actorOf(
|
system.actorOf(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue