diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 13d743c45f..8a5062118c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -118,7 +118,7 @@ private[akka] object Shard { /** * State machine for an entity: * {{{ - * Entity id remembered on shard start +-------------------------+ restart (via region) + * Entity id remembered on shard start +-------------------------+ restart (via region) StartEntity * +--------------------------------->| RememberedButNotCreated |------------------------------+ * | +-------------------------+ | * | | | @@ -636,50 +636,12 @@ private[akka] class Shard( log.error("Remember entity store did not respond, crashing shard") throw new RuntimeException( s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") - case ShardRegion.StartEntity(entityId) => - if (!entities.entityIdExists(entityId)) { - if (VerboseDebug) - log.debug( - "StartEntity([{}]) from [{}] while a write already in progress. Marking as pending", - entityId, - sender()) - entities.rememberingStart(entityId, ackTo = Some(sender())) - } else { - // it's already running, ack immediately - sender() ! ShardRegion.StartEntityAck(entityId, shardId) - } - - case Terminated(ref) => receiveTerminated(ref) - case _: CoordinatorMessage => stash() - case RestartTerminatedEntity(entityId) => - entities.entityState(entityId) match { - case WaitingForRestart => - if (VerboseDebug) - log.debug("Restarting terminated entity [{}]", entityId) - getOrCreateEntity(entityId) - case other => - throw new IllegalStateException( - s"Got RestartTerminatedEntity for [$entityId] but it's not waiting to be restarted. Actual state [$other]") - } - case RestartRememberedEntities(entities) => restartEntities(entities) - case l: LeaseLost => receiveLeaseLost(l) - case ShardRegion.StartEntityAck(entityId, _) => - if (update.started.contains(entityId)) { - // currently in progress of starting, so we'll need to stop it when that is done - entities.rememberingStop(entityId, StartedElsewhere) - } else { - entities.entityState(entityId) match { - case _: RememberingStart => - // queued up for batched start, let's just not start it - entities.removeEntity(entityId) - case _: Active => - // add to stop batch - entities.rememberingStop(entityId, StartedElsewhere) - case _ => - // not sure for other states, so deal with it later - stash() - } - } + case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender())) + case Terminated(ref) => receiveTerminated(ref) + case _: CoordinatorMessage => stash() + case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd) + case l: LeaseLost => receiveLeaseLost(l) + case ack: ShardRegion.StartEntityAck => receiveStartEntityAck(ack) case ShardRegion.Passivate(stopMessage) => if (VerboseDebug) log.debug( @@ -807,12 +769,37 @@ private[akka] class Shard( } } + // FIXME in what scenario do we get this to the shard? private def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = { - if (ack.shardId != shardId && entities.entityIdExists(ack.entityId)) { - log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId) - - entities.rememberingStop(ack.entityId, StartedElsewhere) - rememberUpdate(remove = Set(ack.entityId)) + if (ack.shardId != shardId) { + entities.entityState(shardId) match { + case RememberingStart(_) | RememberingStop(_) => + log.debug( + "Entity [{}] previously owned by shard [{}] started in shard [{}] while waiting to be written, stashing for later handling", + ack.entityId, + shardId, + ack.shardId) + stash() + case Active(ref) => + log.debug( + "Entity [{}] previously owned by shard [{}] started in shard [{}]", + ack.entityId, + shardId, + ack.shardId) + entities.entityPassivating(ack.entityId) + ref ! handOffStopMessage + case RememberedButNotCreated => + // FIXME could be pending with the starting strategy though, what happens when that arrives? + log.debug( + "Entity [{}] (remembered but not yet created) previously owned by shard [{}] started in shard [{}]", + ack.entityId, + shardId, + ack.shardId) + entities.removeEntity(ack.entityId) + case other => + throw new IllegalStateException( + s"Unexpected state [$other] when start entity ack of entity [${ack.entityId}] was seen from shard [${ack.shardId}]") + } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 451ccb0ca5..2b3205368a 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -109,8 +109,11 @@ object ClusterShardingSpec { } -abstract class ClusterShardingSpecConfig(mode: String, val entityRecoveryStrategy: String = "all") - extends MultiNodeClusterShardingConfig(mode) { +abstract class ClusterShardingSpecConfig( + mode: String, + rememberEntitiesStore: String, + val entityRecoveryStrategy: String = "all") + extends MultiNodeClusterShardingConfig(mode = mode, rememberEntitiesStore = rememberEntitiesStore) { val controller = role("controller") val first = role("first") @@ -205,12 +208,24 @@ object ClusterShardingDocCode { } object PersistentClusterShardingSpecConfig - extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence) -object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData) + extends ClusterShardingSpecConfig( + ClusterShardingSettings.StateStoreModePersistence, + ClusterShardingSettings.RememberEntitiesStoreEventsourced) +object DDataClusterShardingSpecConfig + extends ClusterShardingSpecConfig( + ClusterShardingSettings.StateStoreModeDData, + ClusterShardingSettings.RememberEntitiesStoreDData) + object PersistentClusterShardingWithEntityRecoverySpecConfig - extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence, "constant") + extends ClusterShardingSpecConfig( + ClusterShardingSettings.StateStoreModePersistence, + ClusterShardingSettings.RememberEntitiesStoreEventsourced, + "constant") object DDataClusterShardingWithEntityRecoverySpecConfig - extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData, "constant") + extends ClusterShardingSpecConfig( + ClusterShardingSettings.StateStoreModeDData, + ClusterShardingSettings.RememberEntitiesStoreDData, + "constant") class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig) class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig) @@ -270,7 +285,7 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, replicator) } - def persistenceRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) = { + def eventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) = { new EventSourcedRememberEntitiesProvider(typeName, settings) } @@ -344,10 +359,10 @@ abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig) val rememberEntitiesProvider = if (!rememberEntities) None else - settings.stateStoreMode match { - case ClusterShardingSettings.StateStoreModeDData => Some(ddataRememberEntitiesProvider(typeName)) - case ClusterShardingSettings.StateStoreModePersistence => - Some(persistenceRememberEntitiesProvider(typeName, settings)) + settings.rememberEntitiesStore match { + case ClusterShardingSettings.RememberEntitiesStoreDData => Some(ddataRememberEntitiesProvider(typeName)) + case ClusterShardingSettings.RememberEntitiesStoreEventsourced => + Some(eventSourcedRememberEntitiesProvider(typeName, settings)) } system.actorOf(