Coordinator ddata state store with persistent remember entities (#29043)

* WIP

* Test and config update

* Multi-jvm tests updated to cover ddata state + persistent remember entities
This commit is contained in:
Johan Andrén 2020-05-11 12:17:29 +02:00 committed by GitHub
parent bc2671757f
commit 6f8f44b3df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 158 additions and 49 deletions

View file

@ -23,6 +23,11 @@ akka.cluster.sharding {
# due to rebalance or crash.
remember-entities = off
# When 'remember-entities' is enabled and the state store mode is ddata this controls
# how the remembered entities and shards are stored. Possible values are "eventsourced" and "ddata"
# Default is ddata for backwards compatibility.
remember-entities-store = "ddata"
# Set this to a time duration to have sharding passivate entities when they have not
# received any message in this length of time. Set to 'off' to disable.
# It is always disabled if `remember-entities` is enabled.

View file

@ -721,7 +721,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
private def replicator(settings: ClusterShardingSettings): ActorRef = {
if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData ||
settings.stateStoreMode == ClusterShardingSettings.StateStoreModeCustom) {
settings.stateStoreMode == ClusterShardingSettings.RememberEntitiesStoreCustom) {
// one Replicator per role
replicatorByRole.get(settings.role) match {
case Some(ref) => ref
@ -754,16 +754,23 @@ private[akka] class ClusterShardingGuardian extends Actor {
val rep = replicator(settings)
val rememberEntitiesStoreProvider: Option[RememberEntitiesProvider] =
if (!settings.rememberEntities) None
else
// FIXME separate setting for state and remember entities store https://github.com/akka/akka/issues/28961
Some(settings.stateStoreMode match {
case ClusterShardingSettings.StateStoreModeDData =>
else {
// with the deprecated persistence state store mode we always use the event sourced provider for shard regions
// and no store for coordinator (the coordinator is a PersistentActor in that case)
val rememberEntitiesProvider =
if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
ClusterShardingSettings.RememberEntitiesStoreEventsourced
// FIXME move to setting
else context.system.settings.config.getString("akka.cluster.sharding.remember-entities-store")
Some(rememberEntitiesProvider match {
case ClusterShardingSettings.RememberEntitiesStoreDData =>
new DDataRememberEntitiesProvider(typeName, settings, majorityMinCap, rep)
case ClusterShardingSettings.StateStoreModePersistence =>
case ClusterShardingSettings.RememberEntitiesStoreEventsourced =>
new EventSourcedRememberEntitiesProvider(typeName, settings)
case ClusterShardingSettings.StateStoreModeCustom =>
case ClusterShardingSettings.RememberEntitiesStoreCustom =>
new CustomStateStoreModeProvider(typeName, context.system, settings)
})
}
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)

View file

@ -20,7 +20,24 @@ object ClusterShardingSettings {
val StateStoreModePersistence = "persistence"
val StateStoreModeDData = "ddata"
val StateStoreModeCustom = "custom"
/**
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreCustom = "custom"
/**
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreDData = "ddata"
/**
* INTERNAL API
*/
@InternalApi
private[akka] val RememberEntitiesStoreEventsourced = "eventsourced"
/**
* Create settings from the default configuration
@ -301,9 +318,9 @@ final class ClusterShardingSettings(
tuningParameters,
coordinatorSingletonSettings)
import ClusterShardingSettings.{ StateStoreModeCustom, StateStoreModeDData, StateStoreModePersistence }
import ClusterShardingSettings.{ RememberEntitiesStoreCustom, StateStoreModeDData, StateStoreModePersistence }
require(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData || stateStoreMode == StateStoreModeCustom,
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData || stateStoreMode == RememberEntitiesStoreCustom,
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'")
/** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */

View file

@ -23,8 +23,8 @@ private[akka] final class CustomStateStoreModeProvider(
private val log = Logging(system, getClass)
log.warning("Using custom remember entities store for [{}], not intended for production use.", typeName)
val customStore = if (system.settings.config.hasPath("akka.cluster.sharding.custom-store")) {
val customClassName = system.settings.config.getString("akka.cluster.sharding.custom-store")
val customStore = if (system.settings.config.hasPath("akka.cluster.sharding.remember-entities-custom-store")) {
val customClassName = system.settings.config.getString("akka.cluster.sharding.remember-entities-custom-store")
val store = system
.asInstanceOf[ExtendedActorSystem]

View file

@ -34,10 +34,10 @@ private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String,
override def shardStoreProps(shardId: ShardId): Props =
EventSourcedRememberEntitiesStore.props(typeName, shardId, settings)
// FIXME persistent state store deprecated but we are adding a remember entities store that is not deprecated
// We need a new impl for this to allow ddata + persistent remember entities
// For now it is anyways not possible to configure state store and remember entities store separately so this is never used
override def coordinatorStoreProps(): Props = ???
// Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata
// combined with eventsourced remember entities storage
override def coordinatorStoreProps(): Props =
EventSourcedRememberShards.props(typeName)
}
/**

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal
import akka.actor.Props
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardRegion.ShardId
import akka.persistence.PersistentActor
/**
* INTERNAL API
*/
@InternalApi
private[akka] object EventSourcedRememberShards {
def props(typeName: String): Props =
Props(new EventSourcedRememberShards(typeName))
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class EventSourcedRememberShards(typeName: String) extends PersistentActor {
override val persistenceId: String = s"$typeName-remember-entitites"
private var shards = Set.empty[ShardId]
override def receiveRecover: Receive = {
case shardId: ShardId =>
// FIXME optimize for adding rather than reading (which is done only once)
shards += shardId
}
override def receiveCommand: Receive = {
case RememberEntitiesCoordinatorStore.GetShards =>
sender() ! RememberEntitiesCoordinatorStore.RememberedShards(shards)
case RememberEntitiesCoordinatorStore.AddShard(shardId: ShardId) =>
persistAsync(shardId) { shardId =>
shards += shardId
sender() ! RememberEntitiesCoordinatorStore.UpdateDone(shardId)
}
}
}

View file

@ -118,7 +118,7 @@ abstract class ClusterShardingCustomShardAllocationSpec(multiNodeConfig: Cluster
s"Cluster sharding ($mode) with custom allocation strategy" must {
"use specified region" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second))
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second))
join(first, first)

View file

@ -117,7 +117,7 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur
s"Cluster sharding ($mode) with flaky journal/network" must {
"join cluster" in within(20.seconds) {
startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second))
startPersistenceIfNeeded(startOn = controller, setStoreOn = Seq(first, second))
join(first, first)
join(second, first)
@ -139,11 +139,11 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur
"recover after journal/network failure" in within(20.seconds) {
runOn(controller) {
if (isDdataMode)
testConductor.blackhole(first, second, Direction.Both).await
else {
if (persistenceIsNeeded) {
testConductor.blackhole(controller, first, Direction.Both).await
testConductor.blackhole(controller, second, Direction.Both).await
} else {
testConductor.blackhole(first, second, Direction.Both).await
}
}
enterBarrier("journal-blackholed")
@ -159,11 +159,11 @@ abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailur
enterBarrier("first-delayed")
runOn(controller) {
if (isDdataMode)
testConductor.passThrough(first, second, Direction.Both).await
else {
if (persistenceIsNeeded) {
testConductor.passThrough(controller, first, Direction.Both).await
testConductor.passThrough(controller, second, Direction.Both).await
} else {
testConductor.passThrough(first, second, Direction.Both).await
}
}
enterBarrier("journal-ok")

View file

@ -68,7 +68,7 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard
s"Cluster sharding ($mode)" must {
"start some shards in both regions" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second))
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second))
join(first, first, typeName)
join(second, first, typeName)

View file

@ -86,7 +86,7 @@ abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavin
s"Cluster sharding ($mode) with leaving member" must {
"join cluster" in within(20.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = roles)
startPersistenceIfNeeded(startOn = first, setStoreOn = roles)
join(first, first, onJoinedRunOnFrom = startSharding())
join(second, first, onJoinedRunOnFrom = startSharding())

View file

@ -67,7 +67,7 @@ abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMin
s"Cluster with min-nr-of-members using sharding ($mode)" must {
"use all nodes" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third))
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third))
// the only test not asserting join status before starting to shard
join(first, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false)

View file

@ -136,7 +136,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(
s"Cluster with min-nr-of-members using sharding ($mode)" must {
"start up first cluster and sharding" in within(15.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(second, third))
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(second, third))
join(first, first)
join(second, first)
@ -199,7 +199,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(
val sys2 = ActorSystem(system.name, system.settings.config)
val probe2 = TestProbe()(sys2)
if (!isDdataMode) {
if (persistenceIsNeeded) {
sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref)
val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, sys2)

View file

@ -19,18 +19,21 @@ object ClusterShardingRememberEntitiesSpec {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => id.toString
case ShardRegion.StartEntity(id) => id
}
val extractShardId: ShardRegion.ExtractShardId = {
case id: Int => id.toString
case ShardRegion.StartEntity(id) => id
}
}
abstract class ClusterShardingRememberEntitiesSpecConfig(mode: String, rememberEntities: Boolean)
abstract class ClusterShardingRememberEntitiesSpecConfig(
mode: String,
rememberEntities: Boolean,
rememberEntitiesStore: String = ClusterShardingSettings.RememberEntitiesStoreDData)
extends MultiNodeClusterShardingConfig(
mode,
rememberEntities,
rememberEntitiesStore = rememberEntitiesStore,
additionalConfig = s"""
akka.testconductor.barrier-timeout = 60 s
akka.test.single-expect-default = 60 s
@ -56,13 +59,23 @@ class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Bool
rememberEntities)
class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities)
class DDataClusterShardingEventSourcedRememberEntitiesSpecConfig(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpecConfig(
ClusterShardingSettings.StateStoreModeDData,
rememberEntities,
ClusterShardingSettings.RememberEntitiesStoreEventsourced)
abstract class PersistentClusterShardingRememberEntitiesSpec(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpec(
new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities))
abstract class DDataClusterShardingRememberEntitiesSpec(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities))
abstract class DDataClusterShardingEventSourcedRememberEntitiesSpec(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpec(
new DDataClusterShardingEventSourcedRememberEntitiesSpecConfig(rememberEntities))
class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1
extends PersistentClusterShardingRememberEntitiesSpec(true)
class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode2
@ -85,6 +98,13 @@ class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode1 extends DDataClus
class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false)
class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false)
class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode1
extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true)
class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode2
extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true)
class DDataClusterShardingEventSourcedRememberEntitiesEnabledMultiJvmNode3
extends DDataClusterShardingEventSourcedRememberEntitiesSpec(true)
abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShardingRememberEntitiesSpecConfig)
extends MultiNodeClusterShardingSpec(multiNodeConfig)
with ImplicitSender {
@ -122,7 +142,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard
s"Cluster sharding with remember entities ($mode)" must {
"start remembered entities when coordinator fail over" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third))
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third))
val entityProbe = TestProbe()
val probe = TestProbe()
@ -181,7 +201,7 @@ abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShard
val entityProbe2 = TestProbe()(sys2)
val probe2 = TestProbe()(sys2)
if (!isDdataMode) setStore(sys2, storeOn = first)
if (persistenceIsNeeded) setStore(sys2, storeOn = first)
Cluster(sys2).join(Cluster(sys2).selfAddress)

View file

@ -85,6 +85,7 @@ object MultiNodeClusterShardingConfig {
abstract class MultiNodeClusterShardingConfig(
val mode: String = ClusterShardingSettings.StateStoreModeDData,
val rememberEntities: Boolean = false,
val rememberEntitiesStore: String = ClusterShardingSettings.RememberEntitiesStoreDData,
additionalConfig: String = "",
loglevel: String = "INFO")
extends MultiNodeConfig {
@ -95,7 +96,8 @@ abstract class MultiNodeClusterShardingConfig(
s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig]).replace("Config", "").replace("_", "")}"
val persistenceConfig: Config =
if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty
if (mode == ClusterShardingSettings.StateStoreModeDData && rememberEntitiesStore != ClusterShardingSettings.RememberEntitiesStoreEventsourced)
ConfigFactory.empty
else MultiNodeClusterShardingConfig.persistenceConfig(targetDir)
val common: Config =
@ -105,6 +107,8 @@ abstract class MultiNodeClusterShardingConfig(
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.remember-entities = $rememberEntities
akka.cluster.sharding.remember-entities-store = "$rememberEntitiesStore"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = $targetDir/sharding-ddata
map-size = 10 MiB

View file

@ -170,10 +170,14 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
ClusterSharding(sys).startProxy(typeName, role, extractEntityId, extractShardId)
}
protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
protected def isDdataMode = mode == ClusterShardingSettings.StateStoreModeDData
protected def persistenceIsNeeded: Boolean =
mode == ClusterShardingSettings.StateStoreModePersistence ||
system.settings.config
.getString("akka.cluster.sharding.remember-entities-store") == ClusterShardingSettings.RememberEntitiesStoreEventsourced
protected def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit =
if (!isDdataMode) setStore(sys, storeOn)
protected def setStoreIfNeeded(sys: ActorSystem, storeOn: RoleName): Unit =
if (persistenceIsNeeded) setStore(sys, storeOn)
protected def setStore(sys: ActorSystem, storeOn: RoleName): Unit = {
val probe = TestProbe()(sys)
@ -190,8 +194,8 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
* @param startOn the node to start the `SharedLeveldbStore` store on
* @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on
*/
protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit =
if (!isDdataMode) startPersistence(startOn, setStoreOn)
protected def startPersistenceIfNeeded(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit =
if (persistenceIsNeeded) startPersistence(startOn, setStoreOn)
/**
* {{{

View file

@ -20,7 +20,7 @@ import akka.testkit.WithLogCapturing
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object RememberEntitiesSpec {
object PersistentStartEntitySpec {
class EntityActor extends Actor {
override def receive: Receive = {
case "give-me-shard" => sender() ! context.parent
@ -49,13 +49,14 @@ object RememberEntitiesSpec {
""".stripMargin)
}
class RememberEntitiesSpec
extends AkkaSpec(RememberEntitiesSpec.config)
// this test covers remember entities + StartEntity for the deprecated persistent state store
class PersistentStartEntitySpec
extends AkkaSpec(PersistentStartEntitySpec.config)
with AnyWordSpecLike
with ImplicitSender
with WithLogCapturing {
import RememberEntitiesSpec._
import PersistentStartEntitySpec._
override def atStartup(): Unit = {
// Form a one node cluster

View file

@ -34,8 +34,11 @@ object RememberEntitiesFailureSpec {
akka.remote.artery.canonical.port = 0
akka.remote.classic.netty.tcp.port = 0
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.state-store-mode = custom
akka.cluster.sharding.custom-store = "akka.cluster.sharding.RememberEntitiesFailureSpec$$FakeStore"
# must be ddata or else remember entities store is ignored
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.remember-entities = on
akka.cluster.sharding.remember-entities-store = custom
akka.cluster.sharding.remember-entities-custom-store = "akka.cluster.sharding.RememberEntitiesFailureSpec$$FakeStore"
# quick backoffs
akka.cluster.sharding.entity-restart-backoff = 1s
akka.cluster.sharding.shard-failure-backoff = 1s