Merge pull request #17871 from smlin/sharding-with-ddata
=cls #17846 Use CRDTs instead of PersistentActor to remember the state of the ShardCoordinator
This commit is contained in:
commit
453a554de2
14 changed files with 654 additions and 360 deletions
|
|
@ -66,7 +66,11 @@ akka.cluster.sharding {
|
||||||
# persistence used by the entity actors.
|
# persistence used by the entity actors.
|
||||||
snapshot-plugin-id = ""
|
snapshot-plugin-id = ""
|
||||||
|
|
||||||
# The coordinator saves persistent snapshots after this number of persistent
|
# Parameter which determines how the coordinator will be store a state
|
||||||
|
# valid values either "persistence" or "ddata"
|
||||||
|
state-store-mode = "persistence"
|
||||||
|
|
||||||
|
# The shard saves persistent snapshots after this number of persistent
|
||||||
# events. Snapshots are used to reduce recovery times.
|
# events. Snapshots are used to reduce recovery times.
|
||||||
snapshot-after = 1000
|
snapshot-after = 1000
|
||||||
|
|
||||||
|
|
@ -80,6 +84,14 @@ akka.cluster.sharding {
|
||||||
max-simultaneous-rebalance = 3
|
max-simultaneous-rebalance = 3
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
|
||||||
|
# works only for state-store-mode = "ddata"
|
||||||
|
waiting-for-state-timeout = 5 s
|
||||||
|
|
||||||
|
# Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
|
||||||
|
# works only for state-store-mode = "ddata"
|
||||||
|
updating-state-timeout = 5 s
|
||||||
|
|
||||||
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
|
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
|
||||||
coordinator-singleton = ${akka.cluster.singleton}
|
coordinator-singleton = ${akka.cluster.singleton}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.actor.PoisonPill
|
import akka.actor.PoisonPill
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
|
import akka.cluster.ddata.DistributedData
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.pattern.BackoffSupervisor
|
import akka.pattern.BackoffSupervisor
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
@ -414,6 +415,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
||||||
|
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
val sharding = ClusterSharding(context.system)
|
val sharding = ClusterSharding(context.system)
|
||||||
|
lazy val replicator = DistributedData(context.system).replicator
|
||||||
|
|
||||||
private def coordinatorSingletonManagerName(encName: String): String =
|
private def coordinatorSingletonManagerName(encName: String): String =
|
||||||
encName + "Coordinator"
|
encName + "Coordinator"
|
||||||
|
|
@ -425,12 +427,17 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
||||||
case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) ⇒
|
case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) ⇒
|
||||||
import settings.role
|
import settings.role
|
||||||
import settings.tuningParameters.coordinatorFailureBackoff
|
import settings.tuningParameters.coordinatorFailureBackoff
|
||||||
|
|
||||||
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
|
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
|
||||||
val cName = coordinatorSingletonManagerName(encName)
|
val cName = coordinatorSingletonManagerName(encName)
|
||||||
val cPath = coordinatorPath(encName)
|
val cPath = coordinatorPath(encName)
|
||||||
val shardRegion = context.child(encName).getOrElse {
|
val shardRegion = context.child(encName).getOrElse {
|
||||||
if (context.child(cName).isEmpty) {
|
if (context.child(cName).isEmpty) {
|
||||||
val coordinatorProps = ShardCoordinator.props(typeName, settings, allocationStrategy)
|
val coordinatorProps =
|
||||||
|
if (settings.stateStoreMode == "persistence")
|
||||||
|
ShardCoordinator.props(typeName, settings, allocationStrategy)
|
||||||
|
else
|
||||||
|
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator)
|
||||||
val singletonProps = BackoffSupervisor.props(
|
val singletonProps = BackoffSupervisor.props(
|
||||||
childProps = coordinatorProps,
|
childProps = coordinatorProps,
|
||||||
childName = "coordinator",
|
childName = "coordinator",
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,9 @@ object ClusterShardingSettings {
|
||||||
leastShardAllocationRebalanceThreshold =
|
leastShardAllocationRebalanceThreshold =
|
||||||
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
|
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
|
||||||
leastShardAllocationMaxSimultaneousRebalance =
|
leastShardAllocationMaxSimultaneousRebalance =
|
||||||
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
|
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"),
|
||||||
|
waitingForStateTimeout = config.getDuration("waiting-for-state-timeout", MILLISECONDS).millis,
|
||||||
|
updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis)
|
||||||
|
|
||||||
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
|
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
|
||||||
|
|
||||||
|
|
@ -45,6 +47,7 @@ object ClusterShardingSettings {
|
||||||
rememberEntities = config.getBoolean("remember-entities"),
|
rememberEntities = config.getBoolean("remember-entities"),
|
||||||
journalPluginId = config.getString("journal-plugin-id"),
|
journalPluginId = config.getString("journal-plugin-id"),
|
||||||
snapshotPluginId = config.getString("snapshot-plugin-id"),
|
snapshotPluginId = config.getString("snapshot-plugin-id"),
|
||||||
|
stateStoreMode = config.getString("state-store-mode"),
|
||||||
tuningParameters,
|
tuningParameters,
|
||||||
coordinatorSingletonSettings)
|
coordinatorSingletonSettings)
|
||||||
}
|
}
|
||||||
|
|
@ -78,7 +81,9 @@ object ClusterShardingSettings {
|
||||||
val rebalanceInterval: FiniteDuration,
|
val rebalanceInterval: FiniteDuration,
|
||||||
val snapshotAfter: Int,
|
val snapshotAfter: Int,
|
||||||
val leastShardAllocationRebalanceThreshold: Int,
|
val leastShardAllocationRebalanceThreshold: Int,
|
||||||
val leastShardAllocationMaxSimultaneousRebalance: Int)
|
val leastShardAllocationMaxSimultaneousRebalance: Int,
|
||||||
|
val waitingForStateTimeout: FiniteDuration,
|
||||||
|
val updatingStateTimeout: FiniteDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -101,6 +106,7 @@ final class ClusterShardingSettings(
|
||||||
val rememberEntities: Boolean,
|
val rememberEntities: Boolean,
|
||||||
val journalPluginId: String,
|
val journalPluginId: String,
|
||||||
val snapshotPluginId: String,
|
val snapshotPluginId: String,
|
||||||
|
val stateStoreMode: String,
|
||||||
val tuningParameters: ClusterShardingSettings.TuningParameters,
|
val tuningParameters: ClusterShardingSettings.TuningParameters,
|
||||||
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
|
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
|
||||||
|
|
||||||
|
|
@ -127,6 +133,7 @@ final class ClusterShardingSettings(
|
||||||
rememberEntities: Boolean = rememberEntities,
|
rememberEntities: Boolean = rememberEntities,
|
||||||
journalPluginId: String = journalPluginId,
|
journalPluginId: String = journalPluginId,
|
||||||
snapshotPluginId: String = snapshotPluginId,
|
snapshotPluginId: String = snapshotPluginId,
|
||||||
|
stateStoreMode: String = stateStoreMode,
|
||||||
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
|
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
|
||||||
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
|
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
|
||||||
new ClusterShardingSettings(
|
new ClusterShardingSettings(
|
||||||
|
|
@ -134,6 +141,7 @@ final class ClusterShardingSettings(
|
||||||
rememberEntities,
|
rememberEntities,
|
||||||
journalPluginId,
|
journalPluginId,
|
||||||
snapshotPluginId,
|
snapshotPluginId,
|
||||||
|
stateStoreMode,
|
||||||
tuningParameters,
|
tuningParameters,
|
||||||
coordinatorSingletonSettings)
|
coordinatorSingletonSettings)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ private[akka] class Shard(
|
||||||
extractShardId: ShardRegion.ExtractShardId,
|
extractShardId: ShardRegion.ExtractShardId,
|
||||||
handOffStopMessage: Any) extends Actor with ActorLogging {
|
handOffStopMessage: Any) extends Actor with ActorLogging {
|
||||||
|
|
||||||
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate }
|
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized }
|
||||||
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
|
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
|
||||||
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
|
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
|
||||||
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
|
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
|
||||||
|
|
@ -113,6 +113,10 @@ private[akka] class Shard(
|
||||||
|
|
||||||
var handOffStopper: Option[ActorRef] = None
|
var handOffStopper: Option[ActorRef] = None
|
||||||
|
|
||||||
|
initialized()
|
||||||
|
|
||||||
|
def initialized(): Unit = context.parent ! ShardInitialized(shardId)
|
||||||
|
|
||||||
def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size }
|
def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size }
|
||||||
|
|
||||||
def processChange[A](event: A)(handler: A ⇒ Unit): Unit =
|
def processChange[A](event: A)(handler: A ⇒ Unit): Unit =
|
||||||
|
|
@ -297,6 +301,9 @@ private[akka] class PersistentShard(
|
||||||
|
|
||||||
var persistCount = 0
|
var persistCount = 0
|
||||||
|
|
||||||
|
// would be initialized after recovery completed
|
||||||
|
override def initialized(): Unit = {}
|
||||||
|
|
||||||
override def receive = receiveCommand
|
override def receive = receiveCommand
|
||||||
|
|
||||||
override def processChange[A](event: A)(handler: A ⇒ Unit): Unit = {
|
override def processChange[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||||
|
|
@ -316,7 +323,10 @@ private[akka] class PersistentShard(
|
||||||
case EntityStarted(id) ⇒ state = state.copy(state.entities + id)
|
case EntityStarted(id) ⇒ state = state.copy(state.entities + id)
|
||||||
case EntityStopped(id) ⇒ state = state.copy(state.entities - id)
|
case EntityStopped(id) ⇒ state = state.copy(state.entities - id)
|
||||||
case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot
|
case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot
|
||||||
case RecoveryCompleted ⇒ state.entities foreach getEntity
|
case RecoveryCompleted ⇒
|
||||||
|
state.entities foreach getEntity
|
||||||
|
super.initialized()
|
||||||
|
log.debug("Shard recovery completed {}", shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def entityTerminated(ref: ActorRef): Unit = {
|
override def entityTerminated(ref: ActorRef): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package akka.cluster.sharding
|
package akka.cluster.sharding
|
||||||
|
|
||||||
|
import akka.persistence._
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
@ -15,16 +17,15 @@ import akka.actor.Deploy
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.actor.ReceiveTimeout
|
import akka.actor.ReceiveTimeout
|
||||||
|
import akka.actor.Stash
|
||||||
import akka.actor.Terminated
|
import akka.actor.Terminated
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ClusterEvent.ClusterShuttingDown
|
import akka.cluster.ClusterEvent.ClusterShuttingDown
|
||||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||||
|
import akka.cluster.ddata.LWWRegisterKey
|
||||||
|
import akka.cluster.ddata.LWWRegister
|
||||||
|
import akka.cluster.ddata.Replicator._
|
||||||
import akka.dispatch.ExecutionContexts
|
import akka.dispatch.ExecutionContexts
|
||||||
import akka.persistence.PersistentActor
|
|
||||||
import akka.persistence.RecoveryCompleted
|
|
||||||
import akka.persistence.SaveSnapshotFailure
|
|
||||||
import akka.persistence.SaveSnapshotSuccess
|
|
||||||
import akka.persistence.SnapshotOffer
|
|
||||||
import akka.pattern.pipe
|
import akka.pattern.pipe
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -40,7 +41,16 @@ object ShardCoordinator {
|
||||||
*/
|
*/
|
||||||
private[akka] def props(typeName: String, settings: ClusterShardingSettings,
|
private[akka] def props(typeName: String, settings: ClusterShardingSettings,
|
||||||
allocationStrategy: ShardAllocationStrategy): Props =
|
allocationStrategy: ShardAllocationStrategy): Props =
|
||||||
Props(new ShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local)
|
Props(new PersistentShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata.
|
||||||
|
*/
|
||||||
|
private[akka] def props(typeName: String, settings: ClusterShardingSettings,
|
||||||
|
allocationStrategy: ShardAllocationStrategy,
|
||||||
|
replicator: ActorRef): Props =
|
||||||
|
Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator)).withDeploy(Deploy.local)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
|
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
|
||||||
|
|
@ -358,29 +368,22 @@ object ShardCoordinator {
|
||||||
*
|
*
|
||||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||||
*/
|
*/
|
||||||
class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
|
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
|
||||||
extends PersistentActor with ActorLogging {
|
extends Actor with ActorLogging {
|
||||||
import ShardCoordinator._
|
import ShardCoordinator._
|
||||||
import ShardCoordinator.Internal._
|
import ShardCoordinator.Internal._
|
||||||
import ShardRegion.ShardId
|
import ShardRegion.ShardId
|
||||||
import settings.tuningParameters._
|
import settings.tuningParameters._
|
||||||
|
|
||||||
override def persistenceId = s"/sharding/${typeName}Coordinator"
|
|
||||||
|
|
||||||
override def journalPluginId: String = settings.journalPluginId
|
|
||||||
|
|
||||||
override def snapshotPluginId: String = settings.snapshotPluginId
|
|
||||||
|
|
||||||
val removalMargin = Cluster(context.system).settings.DownRemovalMargin
|
val removalMargin = Cluster(context.system).settings.DownRemovalMargin
|
||||||
|
|
||||||
var persistentState = State.empty
|
var state = State.empty
|
||||||
var rebalanceInProgress = Set.empty[ShardId]
|
var rebalanceInProgress = Set.empty[ShardId]
|
||||||
var unAckedHostShards = Map.empty[ShardId, Cancellable]
|
var unAckedHostShards = Map.empty[ShardId, Cancellable]
|
||||||
// regions that have requested handoff, for graceful shutdown
|
// regions that have requested handoff, for graceful shutdown
|
||||||
var gracefulShutdownInProgress = Set.empty[ActorRef]
|
var gracefulShutdownInProgress = Set.empty[ActorRef]
|
||||||
var aliveRegions = Set.empty[ActorRef]
|
var aliveRegions = Set.empty[ActorRef]
|
||||||
var persistCount = 0
|
|
||||||
|
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
|
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
|
||||||
|
|
@ -393,62 +396,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
Cluster(context.system).unsubscribe(self)
|
Cluster(context.system).unsubscribe(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def receiveRecover: Receive = {
|
def active: Receive = {
|
||||||
case evt: DomainEvent ⇒
|
|
||||||
log.debug("receiveRecover {}", evt)
|
|
||||||
evt match {
|
|
||||||
case ShardRegionRegistered(region) ⇒
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
case ShardRegionProxyRegistered(proxy) ⇒
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
case ShardRegionTerminated(region) ⇒
|
|
||||||
if (persistentState.regions.contains(region))
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
else {
|
|
||||||
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
|
|
||||||
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
|
|
||||||
"removed by later watch.", region)
|
|
||||||
}
|
|
||||||
case ShardRegionProxyTerminated(proxy) ⇒
|
|
||||||
if (persistentState.regionProxies.contains(proxy))
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
case ShardHomeAllocated(shard, region) ⇒
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
case _: ShardHomeDeallocated ⇒
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
}
|
|
||||||
|
|
||||||
case SnapshotOffer(_, state: State) ⇒
|
|
||||||
log.debug("receiveRecover SnapshotOffer {}", state)
|
|
||||||
//Old versions of the state object may not have unallocatedShard set,
|
|
||||||
// thus it will be null.
|
|
||||||
if (state.unallocatedShards == null)
|
|
||||||
persistentState = state.copy(unallocatedShards = Set.empty)
|
|
||||||
else
|
|
||||||
persistentState = state
|
|
||||||
|
|
||||||
case RecoveryCompleted ⇒
|
|
||||||
persistentState.regionProxies.foreach(context.watch)
|
|
||||||
persistentState.regions.foreach { case (a, _) ⇒ context.watch(a) }
|
|
||||||
persistentState.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) }
|
|
||||||
allocateShardHomes()
|
|
||||||
}
|
|
||||||
|
|
||||||
override def receiveCommand: Receive = {
|
|
||||||
case Register(region) ⇒
|
case Register(region) ⇒
|
||||||
log.debug("ShardRegion registered: [{}]", region)
|
log.debug("ShardRegion registered: [{}]", region)
|
||||||
aliveRegions += region
|
aliveRegions += region
|
||||||
if (persistentState.regions.contains(region))
|
if (state.regions.contains(region))
|
||||||
sender() ! RegisterAck(self)
|
region ! RegisterAck(self)
|
||||||
else {
|
else {
|
||||||
gracefulShutdownInProgress -= region
|
gracefulShutdownInProgress -= region
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardRegionRegistered(region)) { evt ⇒
|
||||||
persist(ShardRegionRegistered(region)) { evt ⇒
|
val firstRegion = state.regions.isEmpty
|
||||||
val firstRegion = persistentState.regions.isEmpty
|
state = state.updated(evt)
|
||||||
|
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
context.watch(region)
|
context.watch(region)
|
||||||
sender() ! RegisterAck(self)
|
region ! RegisterAck(self)
|
||||||
|
|
||||||
if (firstRegion)
|
if (firstRegion)
|
||||||
allocateShardHomes()
|
allocateShardHomes()
|
||||||
|
|
@ -457,28 +417,26 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
|
|
||||||
case RegisterProxy(proxy) ⇒
|
case RegisterProxy(proxy) ⇒
|
||||||
log.debug("ShardRegion proxy registered: [{}]", proxy)
|
log.debug("ShardRegion proxy registered: [{}]", proxy)
|
||||||
if (persistentState.regionProxies.contains(proxy))
|
if (state.regionProxies.contains(proxy))
|
||||||
sender() ! RegisterAck(self)
|
proxy ! RegisterAck(self)
|
||||||
else {
|
else {
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardRegionProxyRegistered(proxy)) { evt ⇒
|
||||||
persist(ShardRegionProxyRegistered(proxy)) { evt ⇒
|
state = state.updated(evt)
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
context.watch(proxy)
|
context.watch(proxy)
|
||||||
sender() ! RegisterAck(self)
|
proxy ! RegisterAck(self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case t @ Terminated(ref) ⇒
|
case t @ Terminated(ref) ⇒
|
||||||
if (persistentState.regions.contains(ref)) {
|
if (state.regions.contains(ref)) {
|
||||||
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
|
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
|
||||||
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
|
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
|
||||||
else
|
else
|
||||||
regionTerminated(ref)
|
regionTerminated(ref)
|
||||||
} else if (persistentState.regionProxies.contains(ref)) {
|
} else if (state.regionProxies.contains(ref)) {
|
||||||
log.debug("ShardRegion proxy terminated: [{}]", ref)
|
log.debug("ShardRegion proxy terminated: [{}]", ref)
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardRegionProxyTerminated(ref)) { evt ⇒
|
||||||
persist(ShardRegionProxyTerminated(ref)) { evt ⇒
|
state = state.updated(evt)
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -487,10 +445,10 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
|
|
||||||
case GetShardHome(shard) ⇒
|
case GetShardHome(shard) ⇒
|
||||||
if (!rebalanceInProgress.contains(shard)) {
|
if (!rebalanceInProgress.contains(shard)) {
|
||||||
persistentState.shards.get(shard) match {
|
state.shards.get(shard) match {
|
||||||
case Some(ref) ⇒ sender() ! ShardHome(shard, ref)
|
case Some(ref) ⇒ sender() ! ShardHome(shard, ref)
|
||||||
case None ⇒
|
case None ⇒
|
||||||
val activeRegions = persistentState.regions -- gracefulShutdownInProgress
|
val activeRegions = state.regions -- gracefulShutdownInProgress
|
||||||
if (activeRegions.nonEmpty) {
|
if (activeRegions.nonEmpty) {
|
||||||
val getShardHomeSender = sender()
|
val getShardHomeSender = sender()
|
||||||
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions)
|
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions)
|
||||||
|
|
@ -524,14 +482,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
}
|
}
|
||||||
|
|
||||||
case ResendShardHost(shard, region) ⇒
|
case ResendShardHost(shard, region) ⇒
|
||||||
persistentState.shards.get(shard) match {
|
state.shards.get(shard) match {
|
||||||
case Some(`region`) ⇒ sendHostShardMsg(shard, region)
|
case Some(`region`) ⇒ sendHostShardMsg(shard, region)
|
||||||
case _ ⇒ //Reallocated to another region
|
case _ ⇒ //Reallocated to another region
|
||||||
}
|
}
|
||||||
|
|
||||||
case RebalanceTick ⇒
|
case RebalanceTick ⇒
|
||||||
if (persistentState.regions.nonEmpty) {
|
if (state.regions.nonEmpty) {
|
||||||
val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress)
|
val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress)
|
||||||
shardsFuture.value match {
|
shardsFuture.value match {
|
||||||
case Some(Success(shards)) ⇒
|
case Some(Success(shards)) ⇒
|
||||||
continueRebalance(shards)
|
continueRebalance(shards)
|
||||||
|
|
@ -551,20 +509,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
rebalanceInProgress -= shard
|
rebalanceInProgress -= shard
|
||||||
log.debug("Rebalance shard [{}] done [{}]", shard, ok)
|
log.debug("Rebalance shard [{}] done [{}]", shard, ok)
|
||||||
// The shard could have been removed by ShardRegionTerminated
|
// The shard could have been removed by ShardRegionTerminated
|
||||||
if (persistentState.shards.contains(shard))
|
if (state.shards.contains(shard))
|
||||||
if (ok) {
|
if (ok) {
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardHomeDeallocated(shard)) { evt ⇒
|
||||||
persist(ShardHomeDeallocated(shard)) { evt ⇒
|
state = state.updated(evt)
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
log.debug("Shard [{}] deallocated", evt.shard)
|
log.debug("Shard [{}] deallocated", evt.shard)
|
||||||
allocateShardHomes()
|
allocateShardHomes()
|
||||||
}
|
}
|
||||||
} else // rebalance not completed, graceful shutdown will be retried
|
} else // rebalance not completed, graceful shutdown will be retried
|
||||||
gracefulShutdownInProgress -= persistentState.shards(shard)
|
gracefulShutdownInProgress -= state.shards(shard)
|
||||||
|
|
||||||
case GracefulShutdownReq(region) ⇒
|
case GracefulShutdownReq(region) ⇒
|
||||||
if (!gracefulShutdownInProgress(region))
|
if (!gracefulShutdownInProgress(region))
|
||||||
persistentState.regions.get(region) match {
|
state.regions.get(region) match {
|
||||||
case Some(shards) ⇒
|
case Some(shards) ⇒
|
||||||
log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards)
|
log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards)
|
||||||
gracefulShutdownInProgress += region
|
gracefulShutdownInProgress += region
|
||||||
|
|
@ -572,12 +529,6 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
case None ⇒
|
case None ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
case SaveSnapshotSuccess(_) ⇒
|
|
||||||
log.debug("Persistent snapshot saved successfully")
|
|
||||||
|
|
||||||
case SaveSnapshotFailure(_, reason) ⇒
|
|
||||||
log.warning("Persistent snapshot failure: {}", reason.getMessage)
|
|
||||||
|
|
||||||
case ShardHome(_, _) ⇒
|
case ShardHome(_, _) ⇒
|
||||||
//On rebalance, we send ourselves a GetShardHome message to reallocate a
|
//On rebalance, we send ourselves a GetShardHome message to reallocate a
|
||||||
// shard. This receive handles the "response" from that message. i.e. ignores it.
|
// shard. This receive handles the "response" from that message. i.e. ignores it.
|
||||||
|
|
@ -589,7 +540,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
context.become(shuttingDown)
|
context.become(shuttingDown)
|
||||||
|
|
||||||
case ShardRegion.GetCurrentRegions ⇒
|
case ShardRegion.GetCurrentRegions ⇒
|
||||||
val reply = ShardRegion.CurrentRegions(persistentState.regions.keySet.map { ref ⇒
|
val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref ⇒
|
||||||
if (ref.path.address.host.isEmpty) Cluster(context.system).selfAddress
|
if (ref.path.address.host.isEmpty) Cluster(context.system).selfAddress
|
||||||
else ref.path.address
|
else ref.path.address
|
||||||
})
|
})
|
||||||
|
|
@ -598,16 +549,17 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
case _: CurrentClusterState ⇒
|
case _: CurrentClusterState ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit
|
||||||
|
|
||||||
def regionTerminated(ref: ActorRef): Unit =
|
def regionTerminated(ref: ActorRef): Unit =
|
||||||
if (persistentState.regions.contains(ref)) {
|
if (state.regions.contains(ref)) {
|
||||||
log.debug("ShardRegion terminated: [{}]", ref)
|
log.debug("ShardRegion terminated: [{}]", ref)
|
||||||
persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) }
|
state.regions(ref).foreach { s ⇒ self ! GetShardHome(s) }
|
||||||
|
|
||||||
gracefulShutdownInProgress -= ref
|
gracefulShutdownInProgress -= ref
|
||||||
|
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardRegionTerminated(ref)) { evt ⇒
|
||||||
persist(ShardRegionTerminated(ref)) { evt ⇒
|
state = state.updated(evt)
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
allocateShardHomes()
|
allocateShardHomes()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -616,31 +568,22 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
case _ ⇒ // ignore all
|
case _ ⇒ // ignore all
|
||||||
}
|
}
|
||||||
|
|
||||||
def saveSnapshotWhenNeeded(): Unit = {
|
|
||||||
persistCount += 1
|
|
||||||
if (persistCount % snapshotAfter == 0) {
|
|
||||||
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
|
|
||||||
saveSnapshot(persistentState)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = {
|
def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = {
|
||||||
region ! HostShard(shard)
|
region ! HostShard(shard)
|
||||||
val cancel = context.system.scheduler.scheduleOnce(shardStartTimeout, self, ResendShardHost(shard, region))
|
val cancel = context.system.scheduler.scheduleOnce(shardStartTimeout, self, ResendShardHost(shard, region))
|
||||||
unAckedHostShards = unAckedHostShards.updated(shard, cancel)
|
unAckedHostShards = unAckedHostShards.updated(shard, cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) }
|
def allocateShardHomes(): Unit = state.unallocatedShards.foreach { self ! GetShardHome(_) }
|
||||||
|
|
||||||
def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit =
|
def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit =
|
||||||
if (!rebalanceInProgress.contains(shard)) {
|
if (!rebalanceInProgress.contains(shard)) {
|
||||||
persistentState.shards.get(shard) match {
|
state.shards.get(shard) match {
|
||||||
case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref)
|
case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref)
|
||||||
case None ⇒
|
case None ⇒
|
||||||
if (persistentState.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
|
if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
|
||||||
saveSnapshotWhenNeeded()
|
update(ShardHomeAllocated(shard, region)) { evt ⇒
|
||||||
persist(ShardHomeAllocated(shard, region)) { evt ⇒
|
state = state.updated(evt)
|
||||||
persistentState = persistentState.updated(evt)
|
|
||||||
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
|
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
|
||||||
|
|
||||||
sendHostShardMsg(evt.shard, evt.region)
|
sendHostShardMsg(evt.shard, evt.region)
|
||||||
|
|
@ -648,19 +591,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
}
|
}
|
||||||
} else
|
} else
|
||||||
log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}",
|
log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}",
|
||||||
region, shard, persistentState)
|
region, shard, state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def continueRebalance(shards: Set[ShardId]): Unit =
|
def continueRebalance(shards: Set[ShardId]): Unit =
|
||||||
shards.foreach { shard ⇒
|
shards.foreach { shard ⇒
|
||||||
if (!rebalanceInProgress(shard)) {
|
if (!rebalanceInProgress(shard)) {
|
||||||
persistentState.shards.get(shard) match {
|
state.shards.get(shard) match {
|
||||||
case Some(rebalanceFromRegion) ⇒
|
case Some(rebalanceFromRegion) ⇒
|
||||||
rebalanceInProgress += shard
|
rebalanceInProgress += shard
|
||||||
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
|
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
|
||||||
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
|
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
|
||||||
persistentState.regions.keySet ++ persistentState.regionProxies)
|
state.regions.keySet ++ state.regionProxies)
|
||||||
.withDispatcher(context.props.dispatcher))
|
.withDispatcher(context.props.dispatcher))
|
||||||
case None ⇒
|
case None ⇒
|
||||||
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
|
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
|
||||||
|
|
@ -670,3 +613,182 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Singleton coordinator that decides where to allocate shards.
|
||||||
|
*
|
||||||
|
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||||
|
*/
|
||||||
|
class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
|
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
|
||||||
|
extends ShardCoordinator(typeName, settings, allocationStrategy) with PersistentActor {
|
||||||
|
import ShardCoordinator.Internal._
|
||||||
|
import settings.tuningParameters._
|
||||||
|
|
||||||
|
override def persistenceId = s"/sharding/${typeName}Coordinator"
|
||||||
|
|
||||||
|
override def journalPluginId: String = settings.journalPluginId
|
||||||
|
|
||||||
|
override def snapshotPluginId: String = settings.snapshotPluginId
|
||||||
|
|
||||||
|
var persistCount = 0
|
||||||
|
|
||||||
|
override def receiveRecover: Receive = {
|
||||||
|
case evt: DomainEvent ⇒
|
||||||
|
log.debug("receiveRecover {}", evt)
|
||||||
|
evt match {
|
||||||
|
case ShardRegionRegistered(region) ⇒
|
||||||
|
state = state.updated(evt)
|
||||||
|
case ShardRegionProxyRegistered(proxy) ⇒
|
||||||
|
state = state.updated(evt)
|
||||||
|
case ShardRegionTerminated(region) ⇒
|
||||||
|
if (state.regions.contains(region))
|
||||||
|
state = state.updated(evt)
|
||||||
|
else {
|
||||||
|
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
|
||||||
|
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
|
||||||
|
"removed by later watch.", region)
|
||||||
|
}
|
||||||
|
case ShardRegionProxyTerminated(proxy) ⇒
|
||||||
|
if (state.regionProxies.contains(proxy))
|
||||||
|
state = state.updated(evt)
|
||||||
|
case ShardHomeAllocated(shard, region) ⇒
|
||||||
|
state = state.updated(evt)
|
||||||
|
case _: ShardHomeDeallocated ⇒
|
||||||
|
state = state.updated(evt)
|
||||||
|
}
|
||||||
|
|
||||||
|
case SnapshotOffer(_, st: State) ⇒
|
||||||
|
log.debug("receiveRecover SnapshotOffer {}", st)
|
||||||
|
//Old versions of the state object may not have unallocatedShard set,
|
||||||
|
// thus it will be null.
|
||||||
|
if (st.unallocatedShards == null)
|
||||||
|
state = st.copy(unallocatedShards = Set.empty)
|
||||||
|
else
|
||||||
|
state = st
|
||||||
|
|
||||||
|
case RecoveryCompleted ⇒
|
||||||
|
state.regionProxies.foreach(context.watch)
|
||||||
|
state.regions.foreach { case (a, _) ⇒ context.watch(a) }
|
||||||
|
state.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) }
|
||||||
|
allocateShardHomes()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def receiveCommand: Receive = active orElse {
|
||||||
|
case SaveSnapshotSuccess(_) ⇒
|
||||||
|
log.debug("Persistent snapshot saved successfully")
|
||||||
|
|
||||||
|
case SaveSnapshotFailure(_, reason) ⇒
|
||||||
|
log.warning("Persistent snapshot failure: {}", reason.getMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = {
|
||||||
|
saveSnapshotWhenNeeded()
|
||||||
|
persist(evt)(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
def saveSnapshotWhenNeeded(): Unit = {
|
||||||
|
persistCount += 1
|
||||||
|
if (persistCount % snapshotAfter == 0) {
|
||||||
|
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
|
||||||
|
saveSnapshot(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Singleton coordinator (with state based on ddata) that decides where to allocate shards.
|
||||||
|
*
|
||||||
|
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||||
|
*/
|
||||||
|
class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
|
allocationStrategy: ShardCoordinator.ShardAllocationStrategy,
|
||||||
|
replicator: ActorRef)
|
||||||
|
extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash {
|
||||||
|
import ShardCoordinator.Internal._
|
||||||
|
import akka.cluster.ddata.Replicator.Update
|
||||||
|
|
||||||
|
val waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout
|
||||||
|
val updatingStateTimeout = settings.tuningParameters.updatingStateTimeout
|
||||||
|
|
||||||
|
Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass)
|
||||||
|
|
||||||
|
implicit val node = Cluster(context.system)
|
||||||
|
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
|
||||||
|
|
||||||
|
var afterUpdateCallback: DomainEvent ⇒ Unit = _
|
||||||
|
|
||||||
|
getState()
|
||||||
|
|
||||||
|
override def receive: Receive = waitingForState
|
||||||
|
|
||||||
|
// This state will drop all other messages since they will be retried
|
||||||
|
def waitingForState: Receive = {
|
||||||
|
case g @ GetSuccess(CoordinatorStateKey, _) ⇒
|
||||||
|
state = g.get(CoordinatorStateKey).value
|
||||||
|
state.regionProxies.foreach(context.watch)
|
||||||
|
state.regions.foreach { case (a, _) ⇒ context.watch(a) }
|
||||||
|
state.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) }
|
||||||
|
allocateShardHomes()
|
||||||
|
activate()
|
||||||
|
|
||||||
|
case GetFailure(CoordinatorStateKey, _) ⇒
|
||||||
|
log.error(
|
||||||
|
"The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis",
|
||||||
|
waitingForStateTimeout.toMillis)
|
||||||
|
getState()
|
||||||
|
|
||||||
|
case NotFound(CoordinatorStateKey, _) ⇒ activate()
|
||||||
|
}
|
||||||
|
|
||||||
|
def waitingForUpdate[E <: DomainEvent](evt: E): Receive = {
|
||||||
|
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||||
|
log.debug("The coordinator state was successfully updated with {}", evt)
|
||||||
|
updateSuccess(evt)
|
||||||
|
|
||||||
|
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||||
|
log.error(
|
||||||
|
"The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}",
|
||||||
|
updatingStateTimeout.toMillis,
|
||||||
|
evt)
|
||||||
|
sendUpdate(evt)
|
||||||
|
|
||||||
|
case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`)) ⇒
|
||||||
|
log.error(
|
||||||
|
cause,
|
||||||
|
"The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted",
|
||||||
|
error,
|
||||||
|
evt)
|
||||||
|
throw cause
|
||||||
|
case _ ⇒ stash()
|
||||||
|
}
|
||||||
|
|
||||||
|
def activate() = {
|
||||||
|
context.become(active)
|
||||||
|
log.info("Sharding Coordinator was moved to the active state {}", state)
|
||||||
|
}
|
||||||
|
|
||||||
|
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = {
|
||||||
|
afterUpdateCallback = f.asInstanceOf[DomainEvent ⇒ Unit]
|
||||||
|
context.become(waitingForUpdate(evt))
|
||||||
|
sendUpdate(evt)
|
||||||
|
}
|
||||||
|
|
||||||
|
def updateSuccess(evt: DomainEvent): Unit = {
|
||||||
|
afterUpdateCallback(evt)
|
||||||
|
afterUpdateCallback = null
|
||||||
|
context.become(active)
|
||||||
|
unstashAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
def getState(): Unit =
|
||||||
|
replicator ! Get(CoordinatorStateKey, ReadMajority(waitingForStateTimeout))
|
||||||
|
|
||||||
|
def sendUpdate(evt: DomainEvent) = {
|
||||||
|
val s = state.updated(evt)
|
||||||
|
replicator ! Update(CoordinatorStateKey, LWWRegister(State.empty), WriteMajority(updatingStateTimeout), Some(evt)) { reg ⇒
|
||||||
|
reg.withValue(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -154,6 +154,12 @@ object ShardRegion {
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand
|
@SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We must be sure that a shard is initialized before to start send messages to it.
|
||||||
|
* Shard could be terminated during initialization.
|
||||||
|
*/
|
||||||
|
final case class ShardInitialized(shardId: ShardId)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by
|
* Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by
|
||||||
* the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch`
|
* the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch`
|
||||||
|
|
@ -302,11 +308,13 @@ class ShardRegion(
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Terminated(ref) ⇒ receiveTerminated(ref)
|
case Terminated(ref) ⇒ receiveTerminated(ref)
|
||||||
|
case ShardInitialized(shardId) ⇒ initializeShard(shardId, sender())
|
||||||
case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt)
|
case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt)
|
||||||
case state: CurrentClusterState ⇒ receiveClusterState(state)
|
case state: CurrentClusterState ⇒ receiveClusterState(state)
|
||||||
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg)
|
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg)
|
||||||
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd)
|
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd)
|
||||||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
||||||
|
case msg: RestartShard ⇒ deliverMessage(msg, sender())
|
||||||
}
|
}
|
||||||
|
|
||||||
def receiveClusterState(state: CurrentClusterState): Unit = {
|
def receiveClusterState(state: CurrentClusterState): Unit = {
|
||||||
|
|
@ -336,7 +344,6 @@ class ShardRegion(
|
||||||
|
|
||||||
//Start the shard, if already started this does nothing
|
//Start the shard, if already started this does nothing
|
||||||
getShard(shard)
|
getShard(shard)
|
||||||
deliverBufferedMessages(shard)
|
|
||||||
|
|
||||||
sender() ! ShardStarted(shard)
|
sender() ! ShardStarted(shard)
|
||||||
|
|
||||||
|
|
@ -354,7 +361,10 @@ class ShardRegion(
|
||||||
if (ref != self)
|
if (ref != self)
|
||||||
context.watch(ref)
|
context.watch(ref)
|
||||||
|
|
||||||
deliverBufferedMessages(shard)
|
if (ref == self)
|
||||||
|
getShard(shard).foreach(deliverBufferedMessages(shard, _))
|
||||||
|
else
|
||||||
|
deliverBufferedMessages(shard, ref)
|
||||||
|
|
||||||
case RegisterAck(coord) ⇒
|
case RegisterAck(coord) ⇒
|
||||||
context.watch(coord)
|
context.watch(coord)
|
||||||
|
|
@ -472,12 +482,44 @@ class ShardRegion(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def deliverBufferedMessages(shard: String): Unit = {
|
def initializeShard(id: ShardId, shard: ActorRef): Unit = {
|
||||||
shardBuffers.get(shard) match {
|
log.debug("Shard was initialized {}", id)
|
||||||
|
shards = shards.updated(id, shard)
|
||||||
|
deliverBufferedMessages(id, shard)
|
||||||
|
}
|
||||||
|
|
||||||
|
def bufferMessage(shardId: ShardId, msg: Any, snd: ActorRef) = {
|
||||||
|
val totBufSize = totalBufferSize
|
||||||
|
if (totBufSize >= bufferSize) {
|
||||||
|
if (loggedFullBufferWarning)
|
||||||
|
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
|
||||||
|
else {
|
||||||
|
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
|
||||||
|
loggedFullBufferWarning = true
|
||||||
|
}
|
||||||
|
context.system.deadLetters ! msg
|
||||||
|
} else {
|
||||||
|
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
|
||||||
|
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
|
||||||
|
|
||||||
|
// log some insight to how buffers are filled up every 10% of the buffer capacity
|
||||||
|
val tot = totBufSize + 1
|
||||||
|
if (tot % (bufferSize / 10) == 0) {
|
||||||
|
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
|
||||||
|
if (tot <= bufferSize / 2)
|
||||||
|
log.info(logMsg)
|
||||||
|
else
|
||||||
|
log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def deliverBufferedMessages(shardId: ShardId, receiver: ActorRef): Unit = {
|
||||||
|
shardBuffers.get(shardId) match {
|
||||||
case Some(buf) ⇒
|
case Some(buf) ⇒
|
||||||
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shard)
|
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shardId)
|
||||||
buf.foreach { case (msg, snd) ⇒ deliverMessage(msg, snd) }
|
buf.foreach { case (msg, snd) ⇒ receiver.tell(msg, snd) }
|
||||||
shardBuffers -= shard
|
shardBuffers -= shardId
|
||||||
case None ⇒
|
case None ⇒
|
||||||
}
|
}
|
||||||
loggedFullBufferWarning = false
|
loggedFullBufferWarning = false
|
||||||
|
|
@ -505,7 +547,18 @@ class ShardRegion(
|
||||||
val shardId = extractShardId(msg)
|
val shardId = extractShardId(msg)
|
||||||
regionByShard.get(shardId) match {
|
regionByShard.get(shardId) match {
|
||||||
case Some(ref) if ref == self ⇒
|
case Some(ref) if ref == self ⇒
|
||||||
getShard(shardId).tell(msg, snd)
|
getShard(shardId) match {
|
||||||
|
case Some(shard) ⇒
|
||||||
|
shardBuffers.get(shardId) match {
|
||||||
|
case Some(buf) ⇒
|
||||||
|
// Since now messages to a shard is buffered then those messages must be in right order
|
||||||
|
bufferMessage(shardId, msg, snd)
|
||||||
|
deliverBufferedMessages(shardId, shard)
|
||||||
|
case None ⇒
|
||||||
|
shard.tell(msg, snd)
|
||||||
|
}
|
||||||
|
case None ⇒ bufferMessage(shardId, msg, snd)
|
||||||
|
}
|
||||||
case Some(ref) ⇒
|
case Some(ref) ⇒
|
||||||
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
|
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
|
||||||
ref.tell(msg, snd)
|
ref.tell(msg, snd)
|
||||||
|
|
@ -517,36 +570,13 @@ class ShardRegion(
|
||||||
log.debug("Request shard [{}] home", shardId)
|
log.debug("Request shard [{}] home", shardId)
|
||||||
coordinator.foreach(_ ! GetShardHome(shardId))
|
coordinator.foreach(_ ! GetShardHome(shardId))
|
||||||
}
|
}
|
||||||
val totBufSize = totalBufferSize
|
bufferMessage(shardId, msg, snd)
|
||||||
if (totBufSize >= bufferSize) {
|
|
||||||
if (loggedFullBufferWarning)
|
|
||||||
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
|
|
||||||
else {
|
|
||||||
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
|
|
||||||
loggedFullBufferWarning = true
|
|
||||||
}
|
|
||||||
context.system.deadLetters ! msg
|
|
||||||
} else {
|
|
||||||
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
|
|
||||||
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
|
|
||||||
|
|
||||||
// log some insight to how buffers are filled up every 10% of the buffer capacity
|
|
||||||
val tot = totBufSize + 1
|
|
||||||
if (tot % (bufferSize / 10) == 0) {
|
|
||||||
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
|
|
||||||
if (tot <= bufferSize / 2)
|
|
||||||
log.info(logMsg)
|
|
||||||
else
|
|
||||||
log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getShard(id: ShardId): ActorRef = shards.getOrElse(
|
def getShard(id: ShardId): Option[ActorRef] = shards.get(id).orElse(
|
||||||
id,
|
|
||||||
entityProps match {
|
entityProps match {
|
||||||
case Some(props) ⇒
|
case Some(props) if !shardsByRef.values.exists(_ == id) ⇒
|
||||||
log.debug("Starting shard [{}] in region", id)
|
log.debug("Starting shard [{}] in region", id)
|
||||||
|
|
||||||
val name = URLEncoder.encode(id, "utf-8")
|
val name = URLEncoder.encode(id, "utf-8")
|
||||||
|
|
@ -560,9 +590,10 @@ class ShardRegion(
|
||||||
extractShardId,
|
extractShardId,
|
||||||
handOffStopMessage).withDispatcher(context.props.dispatcher),
|
handOffStopMessage).withDispatcher(context.props.dispatcher),
|
||||||
name))
|
name))
|
||||||
shards = shards.updated(id, shard)
|
|
||||||
shardsByRef = shardsByRef.updated(shard, id)
|
shardsByRef = shardsByRef.updated(shard, id)
|
||||||
shard
|
None
|
||||||
|
case Some(props) ⇒
|
||||||
|
None
|
||||||
case None ⇒
|
case None ⇒
|
||||||
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
|
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -26,26 +26,7 @@ import scala.concurrent.Future
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
|
object ClusterShardingCustomShardAllocationSpec {
|
||||||
val first = role("first")
|
|
||||||
val second = role("second")
|
|
||||||
|
|
||||||
commonConfig(ConfigFactory.parseString("""
|
|
||||||
akka.loglevel = INFO
|
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
|
||||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
|
||||||
akka.persistence.journal.leveldb-shared {
|
|
||||||
timeout = 5s
|
|
||||||
store {
|
|
||||||
native = off
|
|
||||||
dir = "target/journal-ClusterShardingCustomShardAllocationSpec"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec"
|
|
||||||
"""))
|
|
||||||
|
|
||||||
class Entity extends Actor {
|
class Entity extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case id: Int ⇒ sender() ! id
|
case id: Int ⇒ sender() ! id
|
||||||
|
|
@ -98,11 +79,43 @@ object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClusterShardingCustomShardAllocationMultiJvmNode1 extends ClusterShardingCustomShardAllocationSpec
|
abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||||
class ClusterShardingCustomShardAllocationMultiJvmNode2 extends ClusterShardingCustomShardAllocationSpec
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShardingCustomShardAllocationSpec) with STMultiNodeSpec with ImplicitSender {
|
commonConfig(ConfigFactory.parseString(s"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared {
|
||||||
|
timeout = 5s
|
||||||
|
store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingCustomShardAllocationSpec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec"
|
||||||
|
akka.cluster.sharding.state-store-mode = "$mode"
|
||||||
|
"""))
|
||||||
|
}
|
||||||
|
|
||||||
|
object PersistentClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("persistence")
|
||||||
|
object DDataClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("ddata")
|
||||||
|
|
||||||
|
class PersistentClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig)
|
||||||
|
class DDataClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(DDataClusterShardingCustomShardAllocationSpecConfig)
|
||||||
|
|
||||||
|
class PersistentClusterShardingCustomShardAllocationMultiJvmNode1 extends PersistentClusterShardingCustomShardAllocationSpec
|
||||||
|
class PersistentClusterShardingCustomShardAllocationMultiJvmNode2 extends PersistentClusterShardingCustomShardAllocationSpec
|
||||||
|
|
||||||
|
class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec
|
||||||
|
class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec
|
||||||
|
|
||||||
|
abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingCustomShardAllocationSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||||
import ClusterShardingCustomShardAllocationSpec._
|
import ClusterShardingCustomShardAllocationSpec._
|
||||||
|
import config._
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
|
@ -146,7 +159,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
|
||||||
|
|
||||||
lazy val allocator = system.actorOf(Props[Allocator], "allocator")
|
lazy val allocator = system.actorOf(Props[Allocator], "allocator")
|
||||||
|
|
||||||
"Cluster sharding with custom allocation strategy" must {
|
s"Cluster sharding ($mode) with custom allocation strategy" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
// start the Persistence extension
|
// start the Persistence extension
|
||||||
|
|
|
||||||
|
|
@ -22,34 +22,7 @@ import akka.remote.testkit.STMultiNodeSpec
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
||||||
object ClusterShardingFailureSpec extends MultiNodeConfig {
|
object ClusterShardingFailureSpec {
|
||||||
val controller = role("controller")
|
|
||||||
val first = role("first")
|
|
||||||
val second = role("second")
|
|
||||||
|
|
||||||
commonConfig(ConfigFactory.parseString("""
|
|
||||||
akka.loglevel = INFO
|
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
|
||||||
akka.cluster.auto-down-unreachable-after = 0s
|
|
||||||
akka.cluster.down-removal-margin = 5s
|
|
||||||
akka.cluster.roles = ["backend"]
|
|
||||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
|
||||||
akka.persistence.journal.leveldb-shared {
|
|
||||||
timeout = 5s
|
|
||||||
store {
|
|
||||||
native = off
|
|
||||||
dir = "target/journal-ClusterShardingFailureSpec"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
|
|
||||||
akka.cluster.sharding.coordinator-failure-backoff = 3s
|
|
||||||
akka.cluster.sharding.shard-failure-backoff = 3s
|
|
||||||
"""))
|
|
||||||
|
|
||||||
testTransport(on = true)
|
|
||||||
|
|
||||||
case class Get(id: String)
|
case class Get(id: String)
|
||||||
case class Add(id: String, i: Int)
|
case class Add(id: String, i: Int)
|
||||||
case class Value(id: String, n: Int)
|
case class Value(id: String, n: Int)
|
||||||
|
|
@ -75,12 +48,55 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClusterShardingFailureMultiJvmNode1 extends ClusterShardingFailureSpec
|
abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||||
class ClusterShardingFailureMultiJvmNode2 extends ClusterShardingFailureSpec
|
val controller = role("controller")
|
||||||
class ClusterShardingFailureMultiJvmNode3 extends ClusterShardingFailureSpec
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpec) with STMultiNodeSpec with ImplicitSender {
|
commonConfig(ConfigFactory.parseString(s"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.cluster.auto-down-unreachable-after = 0s
|
||||||
|
akka.cluster.down-removal-margin = 5s
|
||||||
|
akka.cluster.roles = ["backend"]
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared {
|
||||||
|
timeout = 5s
|
||||||
|
store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingFailureSpec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
|
||||||
|
akka.cluster.sharding {
|
||||||
|
coordinator-failure-backoff = 3s
|
||||||
|
shard-failure-backoff = 3s
|
||||||
|
state-store-mode = "$mode"
|
||||||
|
}
|
||||||
|
"""))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
|
}
|
||||||
|
|
||||||
|
object PersistentClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("persistence")
|
||||||
|
object DDataClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("ddata")
|
||||||
|
|
||||||
|
class PersistentClusterShardingFailureSpec extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig)
|
||||||
|
class DDataClusterShardingFailureSpec extends ClusterShardingFailureSpec(DDataClusterShardingFailureSpecConfig)
|
||||||
|
|
||||||
|
class PersistentClusterShardingFailureMultiJvmNode1 extends PersistentClusterShardingFailureSpec
|
||||||
|
class PersistentClusterShardingFailureMultiJvmNode2 extends PersistentClusterShardingFailureSpec
|
||||||
|
class PersistentClusterShardingFailureMultiJvmNode3 extends PersistentClusterShardingFailureSpec
|
||||||
|
|
||||||
|
class DDataClusterShardingFailureMultiJvmNode1 extends DDataClusterShardingFailureSpec
|
||||||
|
class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec
|
||||||
|
class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec
|
||||||
|
|
||||||
|
abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||||
import ClusterShardingFailureSpec._
|
import ClusterShardingFailureSpec._
|
||||||
|
import config._
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
|
@ -120,7 +136,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
|
||||||
|
|
||||||
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
||||||
|
|
||||||
"Cluster sharding with flaky journal" must {
|
s"Cluster sharding ($mode) with flaky journal" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
// start the Persistence extension
|
// start the Persistence extension
|
||||||
|
|
|
||||||
|
|
@ -26,26 +26,7 @@ import scala.concurrent.Future
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
|
object ClusterShardingGracefulShutdownSpec {
|
||||||
val first = role("first")
|
|
||||||
val second = role("second")
|
|
||||||
|
|
||||||
commonConfig(ConfigFactory.parseString("""
|
|
||||||
akka.loglevel = INFO
|
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
|
||||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
|
||||||
akka.persistence.journal.leveldb-shared {
|
|
||||||
timeout = 5s
|
|
||||||
store {
|
|
||||||
native = off
|
|
||||||
dir = "target/journal-ClusterShardingGracefulShutdownSpec"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec"
|
|
||||||
"""))
|
|
||||||
|
|
||||||
case object StopEntity
|
case object StopEntity
|
||||||
|
|
||||||
class Entity extends Actor {
|
class Entity extends Actor {
|
||||||
|
|
@ -84,11 +65,43 @@ object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClusterShardingGracefulShutdownMultiJvmNode1 extends ClusterShardingGracefulShutdownSpec
|
abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||||
class ClusterShardingGracefulShutdownMultiJvmNode2 extends ClusterShardingGracefulShutdownSpec
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
|
||||||
class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingGracefulShutdownSpec) with STMultiNodeSpec with ImplicitSender {
|
commonConfig(ConfigFactory.parseString(s"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared {
|
||||||
|
timeout = 5s
|
||||||
|
store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingGracefulShutdownSpec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec"
|
||||||
|
akka.cluster.sharding.state-store-mode = "$mode"
|
||||||
|
"""))
|
||||||
|
}
|
||||||
|
|
||||||
|
object PersistentClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("persistence")
|
||||||
|
object DDataClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("ddata")
|
||||||
|
|
||||||
|
class PersistentClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig)
|
||||||
|
class DDataClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(DDataClusterShardingGracefulShutdownSpecConfig)
|
||||||
|
|
||||||
|
class PersistentClusterShardingGracefulShutdownMultiJvmNode1 extends PersistentClusterShardingGracefulShutdownSpec
|
||||||
|
class PersistentClusterShardingGracefulShutdownMultiJvmNode2 extends PersistentClusterShardingGracefulShutdownSpec
|
||||||
|
|
||||||
|
class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec
|
||||||
|
class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec
|
||||||
|
|
||||||
|
abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracefulShutdownSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||||
import ClusterShardingGracefulShutdownSpec._
|
import ClusterShardingGracefulShutdownSpec._
|
||||||
|
import config._
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
|
@ -131,7 +144,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG
|
||||||
|
|
||||||
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
||||||
|
|
||||||
"Cluster sharding" must {
|
s"Cluster sharding ($mode)" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
// start the Persistence extension
|
// start the Persistence extension
|
||||||
|
|
|
||||||
|
|
@ -26,30 +26,7 @@ import akka.testkit._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
|
|
||||||
object ClusterShardingLeavingSpec extends MultiNodeConfig {
|
object ClusterShardingLeavingSpec {
|
||||||
val first = role("first")
|
|
||||||
val second = role("second")
|
|
||||||
val third = role("third")
|
|
||||||
val fourth = role("fourth")
|
|
||||||
|
|
||||||
commonConfig(ConfigFactory.parseString("""
|
|
||||||
akka.loglevel = INFO
|
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
|
||||||
akka.cluster.auto-down-unreachable-after = 0s
|
|
||||||
akka.cluster.down-removal-margin = 5s
|
|
||||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
|
||||||
akka.persistence.journal.leveldb-shared {
|
|
||||||
timeout = 5s
|
|
||||||
store {
|
|
||||||
native = off
|
|
||||||
dir = "target/journal-ClusterShardingLeavingSpec"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec"
|
|
||||||
"""))
|
|
||||||
|
|
||||||
case class Ping(id: String)
|
case class Ping(id: String)
|
||||||
|
|
||||||
class Entity extends Actor {
|
class Entity extends Actor {
|
||||||
|
|
@ -76,16 +53,53 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
|
||||||
val extractShardId: ShardRegion.ExtractShardId = {
|
val extractShardId: ShardRegion.ExtractShardId = {
|
||||||
case Ping(id: String) ⇒ id.charAt(0).toString
|
case Ping(id: String) ⇒ id.charAt(0).toString
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec
|
abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||||
class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec
|
val first = role("first")
|
||||||
class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec
|
val second = role("second")
|
||||||
class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec
|
val third = role("third")
|
||||||
|
val fourth = role("fourth")
|
||||||
|
|
||||||
class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender {
|
commonConfig(ConfigFactory.parseString(s"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.cluster.auto-down-unreachable-after = 0s
|
||||||
|
akka.cluster.down-removal-margin = 5s
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared {
|
||||||
|
timeout = 5s
|
||||||
|
store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingLeavingSpec"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec"
|
||||||
|
akka.cluster.sharding.state-store-mode = "$mode"
|
||||||
|
"""))
|
||||||
|
}
|
||||||
|
|
||||||
|
object PersistentClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("persistence")
|
||||||
|
object DDataClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("ddata")
|
||||||
|
|
||||||
|
class PersistentClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig)
|
||||||
|
class DDataClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(DDataClusterShardingLeavingSpecConfig)
|
||||||
|
|
||||||
|
class PersistentClusterShardingLeavingMultiJvmNode1 extends PersistentClusterShardingLeavingSpec
|
||||||
|
class PersistentClusterShardingLeavingMultiJvmNode2 extends PersistentClusterShardingLeavingSpec
|
||||||
|
class PersistentClusterShardingLeavingMultiJvmNode3 extends PersistentClusterShardingLeavingSpec
|
||||||
|
class PersistentClusterShardingLeavingMultiJvmNode4 extends PersistentClusterShardingLeavingSpec
|
||||||
|
|
||||||
|
class DDataClusterShardingLeavingMultiJvmNode1 extends DDataClusterShardingLeavingSpec
|
||||||
|
class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavingSpec
|
||||||
|
class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec
|
||||||
|
class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec
|
||||||
|
|
||||||
|
abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||||
import ClusterShardingLeavingSpec._
|
import ClusterShardingLeavingSpec._
|
||||||
|
import config._
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
|
@ -132,7 +146,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe
|
||||||
|
|
||||||
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
||||||
|
|
||||||
"Cluster sharding with leaving member" must {
|
s"Cluster sharding ($mode) with leaving member" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
// start the Persistence extension
|
// start the Persistence extension
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
*/
|
*/
|
||||||
package akka.cluster.sharding
|
package akka.cluster.sharding
|
||||||
|
|
||||||
|
import akka.cluster.ddata.{ ReplicatorSettings, Replicator }
|
||||||
import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
|
import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
|
||||||
import akka.cluster.sharding.ShardRegion.Passivate
|
import akka.cluster.sharding.ShardRegion.Passivate
|
||||||
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
|
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
|
||||||
|
|
@ -11,7 +12,6 @@ import language.postfixOps
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.pattern.BackoffSupervisor
|
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.persistence.PersistentActor
|
import akka.persistence.PersistentActor
|
||||||
import akka.persistence.Persistence
|
import akka.persistence.Persistence
|
||||||
|
|
@ -29,46 +29,7 @@ import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
||||||
import akka.pattern.BackoffSupervisor
|
import akka.pattern.BackoffSupervisor
|
||||||
|
|
||||||
object ClusterShardingSpec extends MultiNodeConfig {
|
object ClusterShardingSpec {
|
||||||
val controller = role("controller")
|
|
||||||
val first = role("first")
|
|
||||||
val second = role("second")
|
|
||||||
val third = role("third")
|
|
||||||
val fourth = role("fourth")
|
|
||||||
val fifth = role("fifth")
|
|
||||||
val sixth = role("sixth")
|
|
||||||
|
|
||||||
commonConfig(ConfigFactory.parseString("""
|
|
||||||
akka.loglevel = INFO
|
|
||||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
||||||
akka.remote.log-remote-lifecycle-events = off
|
|
||||||
akka.cluster.auto-down-unreachable-after = 0s
|
|
||||||
akka.cluster.down-removal-margin = 5s
|
|
||||||
akka.cluster.roles = ["backend"]
|
|
||||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
|
||||||
akka.persistence.journal.leveldb-shared.store {
|
|
||||||
native = off
|
|
||||||
dir = "target/journal-ClusterShardingSpec"
|
|
||||||
}
|
|
||||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
|
||||||
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
|
|
||||||
akka.cluster.sharding {
|
|
||||||
retry-interval = 1 s
|
|
||||||
handoff-timeout = 10 s
|
|
||||||
shard-start-timeout = 5s
|
|
||||||
entity-restart-backoff = 1s
|
|
||||||
rebalance-interval = 2 s
|
|
||||||
least-shard-allocation-strategy {
|
|
||||||
rebalance-threshold = 2
|
|
||||||
max-simultaneous-rebalance = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"""))
|
|
||||||
|
|
||||||
nodeConfig(sixth) {
|
|
||||||
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
|
|
||||||
}
|
|
||||||
|
|
||||||
//#counter-actor
|
//#counter-actor
|
||||||
case object Increment
|
case object Increment
|
||||||
case object Decrement
|
case object Decrement
|
||||||
|
|
@ -136,6 +97,48 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||||
|
val controller = role("controller")
|
||||||
|
val first = role("first")
|
||||||
|
val second = role("second")
|
||||||
|
val third = role("third")
|
||||||
|
val fourth = role("fourth")
|
||||||
|
val fifth = role("fifth")
|
||||||
|
val sixth = role("sixth")
|
||||||
|
|
||||||
|
commonConfig(ConfigFactory.parseString(s"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||||
|
akka.remote.log-remote-lifecycle-events = off
|
||||||
|
akka.cluster.auto-down-unreachable-after = 0s
|
||||||
|
akka.cluster.down-removal-margin = 5s
|
||||||
|
akka.cluster.roles = ["backend"]
|
||||||
|
akka.cluster.distributed-data.gossip-interval = 1s
|
||||||
|
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||||
|
akka.persistence.journal.leveldb-shared.store {
|
||||||
|
native = off
|
||||||
|
dir = "target/journal-ClusterShardingSpec"
|
||||||
|
}
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||||
|
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
|
||||||
|
akka.cluster.sharding {
|
||||||
|
retry-interval = 1 s
|
||||||
|
handoff-timeout = 10 s
|
||||||
|
shard-start-timeout = 5s
|
||||||
|
entity-restart-backoff = 1s
|
||||||
|
rebalance-interval = 2 s
|
||||||
|
state-store-mode = "$mode"
|
||||||
|
least-shard-allocation-strategy {
|
||||||
|
rebalance-threshold = 2
|
||||||
|
max-simultaneous-rebalance = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""))
|
||||||
|
nodeConfig(sixth) {
|
||||||
|
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// only used in documentation
|
// only used in documentation
|
||||||
object ClusterShardingDocCode {
|
object ClusterShardingDocCode {
|
||||||
import ClusterShardingSpec._
|
import ClusterShardingSpec._
|
||||||
|
|
@ -156,16 +159,31 @@ object ClusterShardingDocCode {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class ClusterShardingMultiJvmNode1 extends ClusterShardingSpec
|
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence")
|
||||||
class ClusterShardingMultiJvmNode2 extends ClusterShardingSpec
|
object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata")
|
||||||
class ClusterShardingMultiJvmNode3 extends ClusterShardingSpec
|
|
||||||
class ClusterShardingMultiJvmNode4 extends ClusterShardingSpec
|
|
||||||
class ClusterShardingMultiJvmNode5 extends ClusterShardingSpec
|
|
||||||
class ClusterShardingMultiJvmNode6 extends ClusterShardingSpec
|
|
||||||
class ClusterShardingMultiJvmNode7 extends ClusterShardingSpec
|
|
||||||
|
|
||||||
class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMultiNodeSpec with ImplicitSender {
|
class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig)
|
||||||
|
class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig)
|
||||||
|
|
||||||
|
class PersistentClusterShardingMultiJvmNode1 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode2 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode3 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode4 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode5 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode6 extends PersistentClusterShardingSpec
|
||||||
|
class PersistentClusterShardingMultiJvmNode7 extends PersistentClusterShardingSpec
|
||||||
|
|
||||||
|
class DDataClusterShardingMultiJvmNode1 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode2 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode3 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode4 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode5 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode6 extends DDataClusterShardingSpec
|
||||||
|
class DDataClusterShardingMultiJvmNode7 extends DDataClusterShardingSpec
|
||||||
|
|
||||||
|
abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||||
import ClusterShardingSpec._
|
import ClusterShardingSpec._
|
||||||
|
import config._
|
||||||
|
|
||||||
override def initialParticipants = roles.size
|
override def initialParticipants = roles.size
|
||||||
|
|
||||||
|
|
@ -195,6 +213,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
}
|
}
|
||||||
|
|
||||||
def createCoordinator(): Unit = {
|
def createCoordinator(): Unit = {
|
||||||
|
val replicator = system.actorOf(Replicator.props(
|
||||||
|
ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator")
|
||||||
|
|
||||||
def coordinatorProps(typeName: String, rebalanceEnabled: Boolean) = {
|
def coordinatorProps(typeName: String, rebalanceEnabled: Boolean) = {
|
||||||
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
||||||
val cfg = ConfigFactory.parseString(s"""
|
val cfg = ConfigFactory.parseString(s"""
|
||||||
|
|
@ -203,7 +224,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"}
|
rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"}
|
||||||
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
|
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
|
||||||
val settings = ClusterShardingSettings(cfg)
|
val settings = ClusterShardingSettings(cfg)
|
||||||
|
if (settings.stateStoreMode == "persistence")
|
||||||
ShardCoordinator.props(typeName, settings, allocationStrategy)
|
ShardCoordinator.props(typeName, settings, allocationStrategy)
|
||||||
|
else
|
||||||
|
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator)
|
||||||
}
|
}
|
||||||
|
|
||||||
List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter",
|
List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter",
|
||||||
|
|
@ -252,7 +276,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true)
|
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true)
|
||||||
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true)
|
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true)
|
||||||
|
|
||||||
"Cluster sharding" must {
|
s"Cluster sharding ($mode)" must {
|
||||||
|
|
||||||
"setup shared journal" in {
|
"setup shared journal" in {
|
||||||
// start the Persistence extension
|
// start the Persistence extension
|
||||||
|
|
|
||||||
|
|
@ -176,6 +176,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
|
||||||
also add latency. This should be considered when designing the application specific
|
also add latency. This should be considered when designing the application specific
|
||||||
shard resolution, e.g. to avoid too fine grained shards.
|
shard resolution, e.g. to avoid too fine grained shards.
|
||||||
|
|
||||||
|
Distributed Data Mode
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case
|
||||||
|
state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data``
|
||||||
|
module with the ``WriteMajority`` consistency. This mode could be enabled by setting up
|
||||||
|
``akka.cluster.sharding.state-store-mode`` as ``ddata``.
|
||||||
|
It make possible to remove ``akka-persistence`` dependency from a project if this dependency
|
||||||
|
has not using in user code and ``remember-entities`` is ``off``.
|
||||||
|
Note that option also could lead to the shards duplication in case of a cluster fragmentation
|
||||||
|
due to a broken replication between nodes.
|
||||||
|
|
||||||
Proxy Only Mode
|
Proxy Only Mode
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -179,6 +179,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
|
||||||
also add latency. This should be considered when designing the application specific
|
also add latency. This should be considered when designing the application specific
|
||||||
shard resolution, e.g. to avoid too fine grained shards.
|
shard resolution, e.g. to avoid too fine grained shards.
|
||||||
|
|
||||||
|
Distributed Data Mode
|
||||||
|
---------------------
|
||||||
|
|
||||||
|
Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case
|
||||||
|
state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data``
|
||||||
|
module with the ``WriteMajority`` consistency. This mode could be enabled by setting up
|
||||||
|
``akka.cluster.sharding.state-store-mode`` as ``ddata``.
|
||||||
|
It make possible to remove ``akka-persistence`` dependency from a project if this dependency
|
||||||
|
has not using in user code and ``remember-entities`` is ``off``.
|
||||||
|
Note that option also could lead to the shards duplication in case of a cluster fragmentation
|
||||||
|
due to a broken replication between nodes.
|
||||||
|
|
||||||
Proxy Only Mode
|
Proxy Only Mode
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -136,7 +136,7 @@ object AkkaBuild extends Build {
|
||||||
id = "akka-cluster-sharding",
|
id = "akka-cluster-sharding",
|
||||||
base = file("akka-cluster-sharding"),
|
base = file("akka-cluster-sharding"),
|
||||||
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
|
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
|
||||||
persistence % "compile;test->provided", clusterTools)
|
persistence % "compile;test->provided", distributedData % "compile;test->provided", clusterTools)
|
||||||
) configs (MultiJvm)
|
) configs (MultiJvm)
|
||||||
|
|
||||||
lazy val distributedData = Project(
|
lazy val distributedData = Project(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue