diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index da582e149e..31b0fbfb65 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -23,6 +23,11 @@ akka.cluster.sharding { # due to rebalance or crash. remember-entities = off + # When 'remember-entities' is enabled and the state store mode is ddata this controls + # how the remembered entities and shards are stored. Possible values are "eventsourced" and "ddata" + # Default is ddata for backwards compatibility. + remember-entities-store = "ddata" + # Set this to a time duration to have sharding passivate entities when they have not # received any message in this length of time. Set to 'off' to disable. # It is always disabled if `remember-entities` is enabled. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 52205ddbb2..40bc16aec7 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -721,7 +721,7 @@ private[akka] class ClusterShardingGuardian extends Actor { private def replicator(settings: ClusterShardingSettings): ActorRef = { if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData || - settings.stateStoreMode == ClusterShardingSettings.StateStoreModeCustom) { + settings.stateStoreMode == ClusterShardingSettings.RememberEntitiesStoreCustom) { // one Replicator per role replicatorByRole.get(settings.role) match { case Some(ref) => ref @@ -754,16 +754,23 @@ private[akka] class ClusterShardingGuardian extends Actor { val rep = replicator(settings) val rememberEntitiesStoreProvider: Option[RememberEntitiesProvider] = if (!settings.rememberEntities) None - else - // FIXME separate setting for state and remember entities store https://github.com/akka/akka/issues/28961 - Some(settings.stateStoreMode match { - case ClusterShardingSettings.StateStoreModeDData => + else { + // with the deprecated persistence state store mode we always use the event sourced provider for shard regions + // and no store for coordinator (the coordinator is a PersistentActor in that case) + val rememberEntitiesProvider = + if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) + ClusterShardingSettings.RememberEntitiesStoreEventsourced + // FIXME move to setting + else context.system.settings.config.getString("akka.cluster.sharding.remember-entities-store") + Some(rememberEntitiesProvider match { + case ClusterShardingSettings.RememberEntitiesStoreDData => new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, rep) - case ClusterShardingSettings.StateStoreModePersistence => + case ClusterShardingSettings.RememberEntitiesStoreEventsourced => new EventSourcedRememberEntitiesProvider(typeName, settings) - case ClusterShardingSettings.StateStoreModeCustom => + case ClusterShardingSettings.RememberEntitiesStoreCustom => new CustomStateStoreModeProvider(typeName, context.system, settings) }) + } val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index f9d994d06e..5aac6121ef 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -20,7 +20,24 @@ object ClusterShardingSettings { val StateStoreModePersistence = "persistence" val StateStoreModeDData = "ddata" - val StateStoreModeCustom = "custom" + + /** + * INTERNAL API + */ + @InternalApi + private[akka] val RememberEntitiesStoreCustom = "custom" + + /** + * INTERNAL API + */ + @InternalApi + private[akka] val RememberEntitiesStoreDData = "ddata" + + /** + * INTERNAL API + */ + @InternalApi + private[akka] val RememberEntitiesStoreEventsourced = "eventsourced" /** * Create settings from the default configuration @@ -301,9 +318,9 @@ final class ClusterShardingSettings( tuningParameters, coordinatorSingletonSettings) - import ClusterShardingSettings.{ StateStoreModeCustom, StateStoreModeDData, StateStoreModePersistence } + import ClusterShardingSettings.{ RememberEntitiesStoreCustom, StateStoreModeDData, StateStoreModePersistence } require( - stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData || stateStoreMode == StateStoreModeCustom, + stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData || stateStoreMode == RememberEntitiesStoreCustom, s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'") /** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */ diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala index 70b65589ab..293ab548ef 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/CustomStateStoreModeProvider.scala @@ -23,8 +23,8 @@ private[akka] final class CustomStateStoreModeProvider( private val log = Logging(system, getClass) log.warning("Using custom remember entities store for [{}], not intended for production use.", typeName) - val customStore = if (system.settings.config.hasPath("akka.cluster.sharding.custom-store")) { - val customClassName = system.settings.config.getString("akka.cluster.sharding.custom-store") + val customStore = if (system.settings.config.hasPath("akka.cluster.sharding.remember-entities-custom-store")) { + val customClassName = system.settings.config.getString("akka.cluster.sharding.remember-entities-custom-store") val store = system .asInstanceOf[ExtendedActorSystem] diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala index 05af784b61..f5484c121a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala @@ -34,10 +34,10 @@ private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, override def shardStoreProps(shardId: ShardId): Props = EventSourcedRememberEntitiesStore.props(typeName, shardId, settings) - // FIXME persistent state store deprecated but we are adding a remember entities store that is not deprecated - // We need a new impl for this to allow ddata + persistent remember entities - // For now it is anyways not possible to configure state store and remember entities store separately so this is never used - override def coordinatorStoreProps(): Props = ??? + // Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata + // combined with eventsourced remember entities storage + override def coordinatorStoreProps(): Props = + EventSourcedRememberShards.props(typeName) } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala new file mode 100644 index 0000000000..cdd4264943 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal + +import akka.actor.Props +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardRegion.ShardId +import akka.persistence.PersistentActor + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object EventSourcedRememberShards { + def props(typeName: String): Props = + Props(new EventSourcedRememberShards(typeName)) +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class EventSourcedRememberShards(typeName: String) extends PersistentActor { + + override val persistenceId: String = s"$typeName-remember-entitites" + + private var shards = Set.empty[ShardId] + + override def receiveRecover: Receive = { + case shardId: ShardId => + // FIXME optimize for adding rather than reading (which is done only once) + shards += shardId + } + + override def receiveCommand: Receive = { + case RememberEntitiesCoordinatorStore.GetShards => + sender() ! RememberEntitiesCoordinatorStore.RememberedShards(shards) + + case RememberEntitiesCoordinatorStore.AddShard(shardId: ShardId) => + persistAsync(shardId) { shardId => + shards += shardId + sender() ! RememberEntitiesCoordinatorStore.UpdateDone(shardId) + } + } + +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 851b7aa460..d4780396bb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -118,7 +118,7 @@ abstract class ClusterShardingCustomShardAllocationSpec(multiNodeConfig: Cluster s"Cluster sharding ($mode) with custom allocation strategy" must { "use specified region" in within(30.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second)) + startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second)) join(first, first) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index e8a665b69f..91d4161dc0 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -117,7 +117,7 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur s"Cluster sharding ($mode) with flaky journal/network" must { "join cluster" in within(20.seconds) { - startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second)) + startPersistenceIfNeeded(startOn = controller, setStoreOn = Seq(first, second)) join(first, first) join(second, first) @@ -139,11 +139,11 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur "recover after journal/network failure" in within(20.seconds) { runOn(controller) { - if (isDdataMode) - testConductor.blackhole(first, second, Direction.Both).await - else { + if (persistenceIsNeeded) { testConductor.blackhole(controller, first, Direction.Both).await testConductor.blackhole(controller, second, Direction.Both).await + } else { + testConductor.blackhole(first, second, Direction.Both).await } } enterBarrier("journal-blackholed") @@ -159,11 +159,11 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur enterBarrier("first-delayed") runOn(controller) { - if (isDdataMode) - testConductor.passThrough(first, second, Direction.Both).await - else { + if (persistenceIsNeeded) { testConductor.passThrough(controller, first, Direction.Both).await testConductor.passThrough(controller, second, Direction.Both).await + } else { + testConductor.passThrough(first, second, Direction.Both).await } } enterBarrier("journal-ok") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 1a67f3e187..20cf8840a6 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -68,7 +68,7 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard s"Cluster sharding ($mode)" must { "start some shards in both regions" in within(30.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second)) + startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second)) join(first, first, typeName) join(second, first, typeName) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index b59718a954..9210ea0495 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -86,7 +86,7 @@ abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavin s"Cluster sharding ($mode) with leaving member" must { "join cluster" in within(20.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = roles) + startPersistenceIfNeeded(startOn = first, setStoreOn = roles) join(first, first, onJoinedRunOnFrom = startSharding()) join(second, first, onJoinedRunOnFrom = startSharding()) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index 77b578c022..f18991e222 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -67,7 +67,7 @@ abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMin s"Cluster with min-nr-of-members using sharding ($mode)" must { "use all nodes" in within(30.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third)) // the only test not asserting join status before starting to shard join(first, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala index cfa8ac381f..9797b5c822 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesNewExtractorSpec.scala @@ -136,7 +136,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec( s"Cluster with min-nr-of-members using sharding ($mode)" must { "start up first cluster and sharding" in within(15.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(second, third)) + startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(second, third)) join(first, first) join(second, first) @@ -199,7 +199,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec( val sys2 = ActorSystem(system.name, system.settings.config) val probe2 = TestProbe()(sys2) - if (!isDdataMode) { + if (persistenceIsNeeded) { sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref) val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get SharedLeveldbJournal.setStore(sharedStore, sys2) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 696a9b6185..a68ac89b30 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -19,18 +19,21 @@ object ClusterShardingRememberEntitiesSpec { case id: Int => (id.toString, id) } - val extractShardId: ShardRegion.ExtractShardId = msg => - msg match { - case id: Int => id.toString - case ShardRegion.StartEntity(id) => id - } + val extractShardId: ShardRegion.ExtractShardId = { + case id: Int => id.toString + case ShardRegion.StartEntity(id) => id + } } -abstract class ClusterShardingRememberEntitiesSpecConfig(mode: String, rememberEntities: Boolean) +abstract class ClusterShardingRememberEntitiesSpecConfig( + mode: String, + rememberEntities: Boolean, + rememberEntitiesStore: String = ClusterShardingSettings.RememberEntitiesStoreDData) extends MultiNodeClusterShardingConfig( mode, rememberEntities, + rememberEntitiesStore = rememberEntitiesStore, additionalConfig = s""" akka.testconductor.barrier-timeout = 60 s akka.test.single-expect-default = 60 s @@ -56,13 +59,23 @@ class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Bool rememberEntities) class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities) +class DDataClusterShardingEventSourcedRememberEntitiesSpecConfig(rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpecConfig( + ClusterShardingSettings.StateStoreModeDData, + rememberEntities, + ClusterShardingSettings.RememberEntitiesStoreEventsourced) abstract class PersistentClusterShardingRememberEntitiesSpec(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpec( new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) + abstract class DDataClusterShardingRememberEntitiesSpec(rememberEntities: Boolean) extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) +abstract class DDataClusterShardingEventSourcedRememberEntitiesSpec(rememberEntities: Boolean) + extends ClusterShardingRememberEntitiesSpec( + new DDataClusterShardingEventSourcedRememberEntitiesSpecConfig(rememberEntities)) + class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesSpec(true) class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode2 @@ -85,6 +98,13 @@ class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode1 extends DDataClus class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false) class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false) +class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode1 + extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true) +class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode2 + extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true) +class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode3 + extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true) + abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShardingRememberEntitiesSpecConfig) extends MultiNodeClusterShardingSpec(multiNodeConfig) with ImplicitSender { @@ -122,7 +142,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard s"Cluster sharding with remember entities ($mode)" must { "start remembered entities when coordinator fail over" in within(30.seconds) { - startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third)) val entityProbe = TestProbe() val probe = TestProbe() @@ -181,7 +201,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard val entityProbe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2) - if (!isDdataMode) setStore(sys2, storeOn = first) + if (persistenceIsNeeded) setStore(sys2, storeOn = first) Cluster(sys2).join(Cluster(sys2).selfAddress) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala index f074f56467..87f27f5148 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala @@ -85,6 +85,7 @@ object MultiNodeClusterShardingConfig { abstract class MultiNodeClusterShardingConfig( val mode: String = ClusterShardingSettings.StateStoreModeDData, val rememberEntities: Boolean = false, + val rememberEntitiesStore: String = ClusterShardingSettings.RememberEntitiesStoreDData, additionalConfig: String = "", loglevel: String = "INFO") extends MultiNodeConfig { @@ -95,7 +96,8 @@ abstract class MultiNodeClusterShardingConfig( s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig]).replace("Config", "").replace("_", "")}" val persistenceConfig: Config = - if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty + if (mode == ClusterShardingSettings.StateStoreModeDData && rememberEntitiesStore != ClusterShardingSettings.RememberEntitiesStoreEventsourced) + ConfigFactory.empty else MultiNodeClusterShardingConfig.persistenceConfig(targetDir) val common: Config = @@ -105,6 +107,8 @@ abstract class MultiNodeClusterShardingConfig( akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning akka.cluster.testkit.auto-down-unreachable-after = 0s akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.remember-entities = $rememberEntities + akka.cluster.sharding.remember-entities-store = "$rememberEntitiesStore" akka.cluster.sharding.distributed-data.durable.lmdb { dir = $targetDir/sharding-ddata map-size = 10 MiB diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala index 218aed25ab..e2a2f71eaa 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -170,10 +170,14 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding ClusterSharding(sys).startProxy(typeName, role, extractEntityId, extractShardId) } - protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData + protected def isDdataMode = mode == ClusterShardingSettings.StateStoreModeDData + protected def persistenceIsNeeded: Boolean = + mode == ClusterShardingSettings.StateStoreModePersistence || + system.settings.config + .getString("akka.cluster.sharding.remember-entities-store") == ClusterShardingSettings.RememberEntitiesStoreEventsourced - protected def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit = - if (!isDdataMode) setStore(sys, storeOn) + protected def setStoreIfNeeded(sys: ActorSystem, storeOn: RoleName): Unit = + if (persistenceIsNeeded) setStore(sys, storeOn) protected def setStore(sys: ActorSystem, storeOn: RoleName): Unit = { val probe = TestProbe()(sys) @@ -190,8 +194,8 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding * @param startOn the node to start the `SharedLeveldbStore` store on * @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on */ - protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = - if (!isDdataMode) startPersistence(startOn, setStoreOn) + protected def startPersistenceIfNeeded(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = + if (persistenceIsNeeded) startPersistence(startOn, setStoreOn) /** * {{{ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala similarity index 91% rename from akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesSpec.scala rename to akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala index 02acce0688..e84033b24d 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala @@ -20,7 +20,7 @@ import akka.testkit.WithLogCapturing import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -object RememberEntitiesSpec { +object PersistentStartEntitySpec { class EntityActor extends Actor { override def receive: Receive = { case "give-me-shard" => sender() ! context.parent @@ -49,13 +49,14 @@ object RememberEntitiesSpec { """.stripMargin) } -class RememberEntitiesSpec - extends AkkaSpec(RememberEntitiesSpec.config) +// this test covers remember entities + StartEntity for the deprecated persistent state store +class PersistentStartEntitySpec + extends AkkaSpec(PersistentStartEntitySpec.config) with AnyWordSpecLike with ImplicitSender with WithLogCapturing { - import RememberEntitiesSpec._ + import PersistentStartEntitySpec._ override def atStartup(): Unit = { // Form a one node cluster diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala index 0f3404832e..7edb69b0bc 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala @@ -34,8 +34,11 @@ object RememberEntitiesFailureSpec { akka.remote.artery.canonical.port = 0 akka.remote.classic.netty.tcp.port = 0 akka.cluster.sharding.distributed-data.durable.keys = [] - akka.cluster.sharding.state-store-mode = custom - akka.cluster.sharding.custom-store = "akka.cluster.sharding.RememberEntitiesFailureSpec$$FakeStore" + # must be ddata or else remember entities store is ignored + akka.cluster.sharding.state-store-mode = ddata + akka.cluster.sharding.remember-entities = on + akka.cluster.sharding.remember-entities-store = custom + akka.cluster.sharding.remember-entities-custom-store = "akka.cluster.sharding.RememberEntitiesFailureSpec$$FakeStore" # quick backoffs akka.cluster.sharding.entity-restart-backoff = 1s akka.cluster.sharding.shard-failure-backoff = 1s