=cls #17846 Use CRDTs instead of PersistentActor to remember the state of the ShardCoordinator #17871

This commit is contained in:
Ostapenko Evgeniy 2015-08-20 13:24:39 +03:00
parent d2f08a3456
commit 6814d08ef1
14 changed files with 654 additions and 360 deletions

View file

@ -13,77 +13,89 @@ akka.cluster.sharding {
# The extension creates a top level actor with this name in top level system scope,
# e.g. '/system/sharding'
guardian-name = sharding
# Specifies that entities runs on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
role = ""
# When this is set to 'on' the active entity actors will automatically be restarted
# upon Shard restart. i.e. if the Shard is started on a different ShardRegion
# due to rebalance or crash.
remember-entities = off
# If the coordinator can't store state changes it will be stopped
# and started again after this duration, with an exponential back-off
# of up to 5 times this duration.
coordinator-failure-backoff = 5 s
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.
buffer-size = 100000
# Timeout of the shard rebalancing process.
handoff-timeout = 60 s
# Time given to a region to acknowledge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard is remembering entities and can't store state changes
# will be stopped and then started again after this duration. Any messages
# sent to an affected entity may be lost in this process.
shard-failure-backoff = 10 s
# If the shard is remembering entities and an entity stops itself without
# using passivate. The entity will be restarted after this duration or when
# the next message for it is received, which ever occurs first.
entity-restart-backoff = 10 s
# Rebalance check is performed periodically with this interval.
rebalance-interval = 10 s
# Absolute path to the journal plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default journal plugin is used. Note that this is not related to
# the default journal plugin is used. Note that this is not related to
# persistence used by the entity actors.
journal-plugin-id = ""
# Absolute path to the snapshot plugin configuration entity that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default snapshot plugin is used. Note that this is not related to
# the default snapshot plugin is used. Note that this is not related to
# persistence used by the entity actors.
snapshot-plugin-id = ""
# The coordinator saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# Parameter which determines how the coordinator will be store a state
# valid values either "persistence" or "ddata"
state-store-mode = "persistence"
# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
snapshot-after = 1000
# Setting for the default shard allocation strategy
least-shard-allocation-strategy {
# Threshold of how large the difference between most and least number of
# allocated shards must be to begin the rebalancing.
rebalance-threshold = 10
# The number of ongoing rebalancing processes is limited to this number.
max-simultaneous-rebalance = 3
}
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
# Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened)
# works only for state-store-mode = "ddata"
waiting-for-state-timeout = 5 s
# Timeout of waiting for update the distributed state (update will be retried if the timeout happened)
# works only for state-store-mode = "ddata"
updating-state-timeout = 5 s
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
coordinator-singleton = ${akka.cluster.singleton}
# The id of the dispatcher to use for ClusterSharding actors.
# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided

View file

@ -19,6 +19,7 @@ import akka.actor.NoSerializationVerificationNeeded
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.singleton.ClusterSingletonManager
import akka.pattern.BackoffSupervisor
import akka.util.ByteString
@ -414,6 +415,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
val cluster = Cluster(context.system)
val sharding = ClusterSharding(context.system)
lazy val replicator = DistributedData(context.system).replicator
private def coordinatorSingletonManagerName(encName: String): String =
encName + "Coordinator"
@ -425,12 +427,17 @@ private[akka] class ClusterShardingGuardian extends Actor {
case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
import settings.role
import settings.tuningParameters.coordinatorFailureBackoff
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
if (context.child(cName).isEmpty) {
val coordinatorProps = ShardCoordinator.props(typeName, settings, allocationStrategy)
val coordinatorProps =
if (settings.stateStoreMode == "persistence")
ShardCoordinator.props(typeName, settings, allocationStrategy)
else
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator)
val singletonProps = BackoffSupervisor.props(
childProps = coordinatorProps,
childName = "coordinator",

View file

@ -36,7 +36,9 @@ object ClusterShardingSettings {
leastShardAllocationRebalanceThreshold =
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance =
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"),
waitingForStateTimeout = config.getDuration("waiting-for-state-timeout", MILLISECONDS).millis,
updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis)
val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton"))
@ -45,6 +47,7 @@ object ClusterShardingSettings {
rememberEntities = config.getBoolean("remember-entities"),
journalPluginId = config.getString("journal-plugin-id"),
snapshotPluginId = config.getString("snapshot-plugin-id"),
stateStoreMode = config.getString("state-store-mode"),
tuningParameters,
coordinatorSingletonSettings)
}
@ -78,7 +81,9 @@ object ClusterShardingSettings {
val rebalanceInterval: FiniteDuration,
val snapshotAfter: Int,
val leastShardAllocationRebalanceThreshold: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int)
val leastShardAllocationMaxSimultaneousRebalance: Int,
val waitingForStateTimeout: FiniteDuration,
val updatingStateTimeout: FiniteDuration)
}
/**
@ -101,6 +106,7 @@ final class ClusterShardingSettings(
val rememberEntities: Boolean,
val journalPluginId: String,
val snapshotPluginId: String,
val stateStoreMode: String,
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
@ -127,6 +133,7 @@ final class ClusterShardingSettings(
rememberEntities: Boolean = rememberEntities,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
stateStoreMode: String = stateStoreMode,
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
new ClusterShardingSettings(
@ -134,6 +141,7 @@ final class ClusterShardingSettings(
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
tuningParameters,
coordinatorSingletonSettings)
}

View file

@ -98,7 +98,7 @@ private[akka] class Shard(
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any) extends Actor with ActorLogging {
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate }
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized }
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
@ -113,6 +113,10 @@ private[akka] class Shard(
var handOffStopper: Option[ActorRef] = None
initialized()
def initialized(): Unit = context.parent ! ShardInitialized(shardId)
def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) sum + entity._2.size }
def processChange[A](event: A)(handler: A Unit): Unit =
@ -297,6 +301,9 @@ private[akka] class PersistentShard(
var persistCount = 0
// would be initialized after recovery completed
override def initialized(): Unit = {}
override def receive = receiveCommand
override def processChange[A](event: A)(handler: A Unit): Unit = {
@ -316,7 +323,10 @@ private[akka] class PersistentShard(
case EntityStarted(id) state = state.copy(state.entities + id)
case EntityStopped(id) state = state.copy(state.entities - id)
case SnapshotOffer(_, snapshot: State) state = snapshot
case RecoveryCompleted state.entities foreach getEntity
case RecoveryCompleted
state.entities foreach getEntity
super.initialized()
log.debug("Shard recovery completed {}", shardId)
}
override def entityTerminated(ref: ActorRef): Unit = {

View file

@ -3,6 +3,8 @@
*/
package akka.cluster.sharding
import akka.persistence._
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
@ -15,16 +17,15 @@ import akka.actor.Deploy
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import akka.actor.ReceiveTimeout
import akka.actor.Stash
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ddata.LWWRegisterKey
import akka.cluster.ddata.LWWRegister
import akka.cluster.ddata.Replicator._
import akka.dispatch.ExecutionContexts
import akka.persistence.PersistentActor
import akka.persistence.RecoveryCompleted
import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess
import akka.persistence.SnapshotOffer
import akka.pattern.pipe
/**
@ -40,7 +41,16 @@ object ShardCoordinator {
*/
private[akka] def props(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardAllocationStrategy): Props =
Props(new ShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local)
Props(new PersistentShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local)
/**
* INTERNAL API
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata.
*/
private[akka] def props(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardAllocationStrategy,
replicator: ActorRef): Props =
Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator)).withDeploy(Deploy.local)
/**
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
@ -51,12 +61,12 @@ object ShardCoordinator {
/**
* Invoked when the location of a new shard is to be decided.
* @param requester actor reference to the [[ShardRegion]] that requested the location of the
* shard, can be returned if preference should be given to the node where the shard was first accessed
* shard, can be returned if preference should be given to the node where the shard was first accessed
* @param shardId the id of the shard to allocate
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated
* in the order they were allocated
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
* the references included in the `currentShardAllocations` parameter
* the references included in the `currentShardAllocations` parameter
*/
def allocateShard(requester: ActorRef, shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef]
@ -64,9 +74,9 @@ object ShardCoordinator {
/**
* Invoked periodically to decide which shards to rebalance to another location.
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated
* in the order they were allocated
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
* you should not include these in the returned set
* you should not include these in the returned set
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
*/
def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
@ -95,12 +105,12 @@ object ShardCoordinator {
/**
* Invoked when the location of a new shard is to be decided.
* @param requester actor reference to the [[ShardRegion]] that requested the location of the
* shard, can be returned if preference should be given to the node where the shard was first accessed
* shard, can be returned if preference should be given to the node where the shard was first accessed
* @param shardId the id of the shard to allocate
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated
* in the order they were allocated
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
* the references included in the `currentShardAllocations` parameter
* the references included in the `currentShardAllocations` parameter
*/
def allocateShard(requester: ActorRef, shardId: String,
currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef]
@ -108,9 +118,9 @@ object ShardCoordinator {
/**
* Invoked periodically to decide which shards to rebalance to another location.
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated
* in the order they were allocated
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
* you should not include these in the returned set
* you should not include these in the returned set
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
*/
def rebalance(currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]],
@ -358,29 +368,22 @@ object ShardCoordinator {
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends PersistentActor with ActorLogging {
abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends Actor with ActorLogging {
import ShardCoordinator._
import ShardCoordinator.Internal._
import ShardRegion.ShardId
import settings.tuningParameters._
override def persistenceId = s"/sharding/${typeName}Coordinator"
override def journalPluginId: String = settings.journalPluginId
override def snapshotPluginId: String = settings.snapshotPluginId
val removalMargin = Cluster(context.system).settings.DownRemovalMargin
var persistentState = State.empty
var state = State.empty
var rebalanceInProgress = Set.empty[ShardId]
var unAckedHostShards = Map.empty[ShardId, Cancellable]
// regions that have requested handoff, for graceful shutdown
var gracefulShutdownInProgress = Set.empty[ActorRef]
var aliveRegions = Set.empty[ActorRef]
var persistCount = 0
import context.dispatcher
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
@ -393,62 +396,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
Cluster(context.system).unsubscribe(self)
}
override def receiveRecover: Receive = {
case evt: DomainEvent
log.debug("receiveRecover {}", evt)
evt match {
case ShardRegionRegistered(region)
persistentState = persistentState.updated(evt)
case ShardRegionProxyRegistered(proxy)
persistentState = persistentState.updated(evt)
case ShardRegionTerminated(region)
if (persistentState.regions.contains(region))
persistentState = persistentState.updated(evt)
else {
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
"removed by later watch.", region)
}
case ShardRegionProxyTerminated(proxy)
if (persistentState.regionProxies.contains(proxy))
persistentState = persistentState.updated(evt)
case ShardHomeAllocated(shard, region)
persistentState = persistentState.updated(evt)
case _: ShardHomeDeallocated
persistentState = persistentState.updated(evt)
}
case SnapshotOffer(_, state: State)
log.debug("receiveRecover SnapshotOffer {}", state)
//Old versions of the state object may not have unallocatedShard set,
// thus it will be null.
if (state.unallocatedShards == null)
persistentState = state.copy(unallocatedShards = Set.empty)
else
persistentState = state
case RecoveryCompleted
persistentState.regionProxies.foreach(context.watch)
persistentState.regions.foreach { case (a, _) context.watch(a) }
persistentState.shards.foreach { case (a, r) sendHostShardMsg(a, r) }
allocateShardHomes()
}
override def receiveCommand: Receive = {
def active: Receive = {
case Register(region)
log.debug("ShardRegion registered: [{}]", region)
aliveRegions += region
if (persistentState.regions.contains(region))
sender() ! RegisterAck(self)
if (state.regions.contains(region))
region ! RegisterAck(self)
else {
gracefulShutdownInProgress -= region
saveSnapshotWhenNeeded()
persist(ShardRegionRegistered(region)) { evt
val firstRegion = persistentState.regions.isEmpty
persistentState = persistentState.updated(evt)
update(ShardRegionRegistered(region)) { evt
val firstRegion = state.regions.isEmpty
state = state.updated(evt)
context.watch(region)
sender() ! RegisterAck(self)
region ! RegisterAck(self)
if (firstRegion)
allocateShardHomes()
@ -457,28 +417,26 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case RegisterProxy(proxy)
log.debug("ShardRegion proxy registered: [{}]", proxy)
if (persistentState.regionProxies.contains(proxy))
sender() ! RegisterAck(self)
if (state.regionProxies.contains(proxy))
proxy ! RegisterAck(self)
else {
saveSnapshotWhenNeeded()
persist(ShardRegionProxyRegistered(proxy)) { evt
persistentState = persistentState.updated(evt)
update(ShardRegionProxyRegistered(proxy)) { evt
state = state.updated(evt)
context.watch(proxy)
sender() ! RegisterAck(self)
proxy ! RegisterAck(self)
}
}
case t @ Terminated(ref)
if (persistentState.regions.contains(ref)) {
if (state.regions.contains(ref)) {
if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref))
context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref))
else
regionTerminated(ref)
} else if (persistentState.regionProxies.contains(ref)) {
} else if (state.regionProxies.contains(ref)) {
log.debug("ShardRegion proxy terminated: [{}]", ref)
saveSnapshotWhenNeeded()
persist(ShardRegionProxyTerminated(ref)) { evt
persistentState = persistentState.updated(evt)
update(ShardRegionProxyTerminated(ref)) { evt
state = state.updated(evt)
}
}
@ -487,10 +445,10 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case GetShardHome(shard)
if (!rebalanceInProgress.contains(shard)) {
persistentState.shards.get(shard) match {
state.shards.get(shard) match {
case Some(ref) sender() ! ShardHome(shard, ref)
case None
val activeRegions = persistentState.regions -- gracefulShutdownInProgress
val activeRegions = state.regions -- gracefulShutdownInProgress
if (activeRegions.nonEmpty) {
val getShardHomeSender = sender()
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions)
@ -524,14 +482,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
}
case ResendShardHost(shard, region)
persistentState.shards.get(shard) match {
state.shards.get(shard) match {
case Some(`region`) sendHostShardMsg(shard, region)
case _ //Reallocated to another region
}
case RebalanceTick
if (persistentState.regions.nonEmpty) {
val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress)
if (state.regions.nonEmpty) {
val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress)
shardsFuture.value match {
case Some(Success(shards))
continueRebalance(shards)
@ -551,20 +509,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
rebalanceInProgress -= shard
log.debug("Rebalance shard [{}] done [{}]", shard, ok)
// The shard could have been removed by ShardRegionTerminated
if (persistentState.shards.contains(shard))
if (state.shards.contains(shard))
if (ok) {
saveSnapshotWhenNeeded()
persist(ShardHomeDeallocated(shard)) { evt
persistentState = persistentState.updated(evt)
update(ShardHomeDeallocated(shard)) { evt
state = state.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
allocateShardHomes()
}
} else // rebalance not completed, graceful shutdown will be retried
gracefulShutdownInProgress -= persistentState.shards(shard)
gracefulShutdownInProgress -= state.shards(shard)
case GracefulShutdownReq(region)
if (!gracefulShutdownInProgress(region))
persistentState.regions.get(region) match {
state.regions.get(region) match {
case Some(shards)
log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards)
gracefulShutdownInProgress += region
@ -572,12 +529,6 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case None
}
case SaveSnapshotSuccess(_)
log.debug("Persistent snapshot saved successfully")
case SaveSnapshotFailure(_, reason)
log.warning("Persistent snapshot failure: {}", reason.getMessage)
case ShardHome(_, _)
//On rebalance, we send ourselves a GetShardHome message to reallocate a
// shard. This receive handles the "response" from that message. i.e. ignores it.
@ -589,7 +540,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
context.become(shuttingDown)
case ShardRegion.GetCurrentRegions
val reply = ShardRegion.CurrentRegions(persistentState.regions.keySet.map { ref
val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref
if (ref.path.address.host.isEmpty) Cluster(context.system).selfAddress
else ref.path.address
})
@ -598,16 +549,17 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case _: CurrentClusterState
}
def update[E <: DomainEvent](evt: E)(f: E Unit): Unit
def regionTerminated(ref: ActorRef): Unit =
if (persistentState.regions.contains(ref)) {
if (state.regions.contains(ref)) {
log.debug("ShardRegion terminated: [{}]", ref)
persistentState.regions(ref).foreach { s self ! GetShardHome(s) }
state.regions(ref).foreach { s self ! GetShardHome(s) }
gracefulShutdownInProgress -= ref
saveSnapshotWhenNeeded()
persist(ShardRegionTerminated(ref)) { evt
persistentState = persistentState.updated(evt)
update(ShardRegionTerminated(ref)) { evt
state = state.updated(evt)
allocateShardHomes()
}
}
@ -616,31 +568,22 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
case _ // ignore all
}
def saveSnapshotWhenNeeded(): Unit = {
persistCount += 1
if (persistCount % snapshotAfter == 0) {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(persistentState)
}
}
def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = {
region ! HostShard(shard)
val cancel = context.system.scheduler.scheduleOnce(shardStartTimeout, self, ResendShardHost(shard, region))
unAckedHostShards = unAckedHostShards.updated(shard, cancel)
}
def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) }
def allocateShardHomes(): Unit = state.unallocatedShards.foreach { self ! GetShardHome(_) }
def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit =
if (!rebalanceInProgress.contains(shard)) {
persistentState.shards.get(shard) match {
state.shards.get(shard) match {
case Some(ref) getShardHomeSender ! ShardHome(shard, ref)
case None
if (persistentState.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
saveSnapshotWhenNeeded()
persist(ShardHomeAllocated(shard, region)) { evt
persistentState = persistentState.updated(evt)
if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
update(ShardHomeAllocated(shard, region)) { evt
state = state.updated(evt)
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
sendHostShardMsg(evt.shard, evt.region)
@ -648,19 +591,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
}
} else
log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}",
region, shard, persistentState)
region, shard, state)
}
}
def continueRebalance(shards: Set[ShardId]): Unit =
shards.foreach { shard
if (!rebalanceInProgress(shard)) {
persistentState.shards.get(shard) match {
state.shards.get(shard) match {
case Some(rebalanceFromRegion)
rebalanceInProgress += shard
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
persistentState.regions.keySet ++ persistentState.regionProxies)
state.regions.keySet ++ state.regionProxies)
.withDispatcher(context.props.dispatcher))
case None
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
@ -670,3 +613,182 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
}
}
/**
* Singleton coordinator that decides where to allocate shards.
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends ShardCoordinator(typeName, settings, allocationStrategy) with PersistentActor {
import ShardCoordinator.Internal._
import settings.tuningParameters._
override def persistenceId = s"/sharding/${typeName}Coordinator"
override def journalPluginId: String = settings.journalPluginId
override def snapshotPluginId: String = settings.snapshotPluginId
var persistCount = 0
override def receiveRecover: Receive = {
case evt: DomainEvent
log.debug("receiveRecover {}", evt)
evt match {
case ShardRegionRegistered(region)
state = state.updated(evt)
case ShardRegionProxyRegistered(proxy)
state = state.updated(evt)
case ShardRegionTerminated(region)
if (state.regions.contains(region))
state = state.updated(evt)
else {
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
"removed by later watch.", region)
}
case ShardRegionProxyTerminated(proxy)
if (state.regionProxies.contains(proxy))
state = state.updated(evt)
case ShardHomeAllocated(shard, region)
state = state.updated(evt)
case _: ShardHomeDeallocated
state = state.updated(evt)
}
case SnapshotOffer(_, st: State)
log.debug("receiveRecover SnapshotOffer {}", st)
//Old versions of the state object may not have unallocatedShard set,
// thus it will be null.
if (st.unallocatedShards == null)
state = st.copy(unallocatedShards = Set.empty)
else
state = st
case RecoveryCompleted
state.regionProxies.foreach(context.watch)
state.regions.foreach { case (a, _) context.watch(a) }
state.shards.foreach { case (a, r) sendHostShardMsg(a, r) }
allocateShardHomes()
}
override def receiveCommand: Receive = active orElse {
case SaveSnapshotSuccess(_)
log.debug("Persistent snapshot saved successfully")
case SaveSnapshotFailure(_, reason)
log.warning("Persistent snapshot failure: {}", reason.getMessage)
}
def update[E <: DomainEvent](evt: E)(f: E Unit): Unit = {
saveSnapshotWhenNeeded()
persist(evt)(f)
}
def saveSnapshotWhenNeeded(): Unit = {
persistCount += 1
if (persistCount % snapshotAfter == 0) {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
}
}
}
/**
* Singleton coordinator (with state based on ddata) that decides where to allocate shards.
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy,
replicator: ActorRef)
extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash {
import ShardCoordinator.Internal._
import akka.cluster.ddata.Replicator.Update
val waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout
val updatingStateTimeout = settings.tuningParameters.updatingStateTimeout
Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass)
implicit val node = Cluster(context.system)
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
var afterUpdateCallback: DomainEvent Unit = _
getState()
override def receive: Receive = waitingForState
// This state will drop all other messages since they will be retried
def waitingForState: Receive = {
case g @ GetSuccess(CoordinatorStateKey, _)
state = g.get(CoordinatorStateKey).value
state.regionProxies.foreach(context.watch)
state.regions.foreach { case (a, _) context.watch(a) }
state.shards.foreach { case (a, r) sendHostShardMsg(a, r) }
allocateShardHomes()
activate()
case GetFailure(CoordinatorStateKey, _)
log.error(
"The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis",
waitingForStateTimeout.toMillis)
getState()
case NotFound(CoordinatorStateKey, _) activate()
}
def waitingForUpdate[E <: DomainEvent](evt: E): Receive = {
case UpdateSuccess(CoordinatorStateKey, Some(`evt`))
log.debug("The coordinator state was successfully updated with {}", evt)
updateSuccess(evt)
case UpdateTimeout(CoordinatorStateKey, Some(`evt`))
log.error(
"The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}",
updatingStateTimeout.toMillis,
evt)
sendUpdate(evt)
case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`))
log.error(
cause,
"The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted",
error,
evt)
throw cause
case _ stash()
}
def activate() = {
context.become(active)
log.info("Sharding Coordinator was moved to the active state {}", state)
}
def update[E <: DomainEvent](evt: E)(f: E Unit): Unit = {
afterUpdateCallback = f.asInstanceOf[DomainEvent Unit]
context.become(waitingForUpdate(evt))
sendUpdate(evt)
}
def updateSuccess(evt: DomainEvent): Unit = {
afterUpdateCallback(evt)
afterUpdateCallback = null
context.become(active)
unstashAll()
}
def getState(): Unit =
replicator ! Get(CoordinatorStateKey, ReadMajority(waitingForStateTimeout))
def sendUpdate(evt: DomainEvent) = {
val s = state.updated(evt)
replicator ! Update(CoordinatorStateKey, LWWRegister(State.empty), WriteMajority(updatingStateTimeout), Some(evt)) { reg
reg.withValue(s)
}
}
}

View file

@ -154,6 +154,12 @@ object ShardRegion {
*/
@SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand
/**
* We must be sure that a shard is initialized before to start send messages to it.
* Shard could be terminated during initialization.
*/
final case class ShardInitialized(shardId: ShardId)
/**
* Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by
* the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch`
@ -302,11 +308,13 @@ class ShardRegion(
def receive = {
case Terminated(ref) receiveTerminated(ref)
case ShardInitialized(shardId) initializeShard(shardId, sender())
case evt: ClusterDomainEvent receiveClusterEvent(evt)
case state: CurrentClusterState receiveClusterState(state)
case msg: CoordinatorMessage receiveCoordinatorMessage(msg)
case cmd: ShardRegionCommand receiveCommand(cmd)
case msg if extractEntityId.isDefinedAt(msg) deliverMessage(msg, sender())
case msg: RestartShard deliverMessage(msg, sender())
}
def receiveClusterState(state: CurrentClusterState): Unit = {
@ -336,7 +344,6 @@ class ShardRegion(
//Start the shard, if already started this does nothing
getShard(shard)
deliverBufferedMessages(shard)
sender() ! ShardStarted(shard)
@ -354,7 +361,10 @@ class ShardRegion(
if (ref != self)
context.watch(ref)
deliverBufferedMessages(shard)
if (ref == self)
getShard(shard).foreach(deliverBufferedMessages(shard, _))
else
deliverBufferedMessages(shard, ref)
case RegisterAck(coord)
context.watch(coord)
@ -472,12 +482,44 @@ class ShardRegion(
}
}
def deliverBufferedMessages(shard: String): Unit = {
shardBuffers.get(shard) match {
def initializeShard(id: ShardId, shard: ActorRef): Unit = {
log.debug("Shard was initialized {}", id)
shards = shards.updated(id, shard)
deliverBufferedMessages(id, shard)
}
def bufferMessage(shardId: ShardId, msg: Any, snd: ActorRef) = {
val totBufSize = totalBufferSize
if (totBufSize >= bufferSize) {
if (loggedFullBufferWarning)
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
else {
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
loggedFullBufferWarning = true
}
context.system.deadLetters ! msg
} else {
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
// log some insight to how buffers are filled up every 10% of the buffer capacity
val tot = totBufSize + 1
if (tot % (bufferSize / 10) == 0) {
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
if (tot <= bufferSize / 2)
log.info(logMsg)
else
log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.")
}
}
}
def deliverBufferedMessages(shardId: ShardId, receiver: ActorRef): Unit = {
shardBuffers.get(shardId) match {
case Some(buf)
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shard)
buf.foreach { case (msg, snd) deliverMessage(msg, snd) }
shardBuffers -= shard
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shardId)
buf.foreach { case (msg, snd) receiver.tell(msg, snd) }
shardBuffers -= shardId
case None
}
loggedFullBufferWarning = false
@ -505,7 +547,18 @@ class ShardRegion(
val shardId = extractShardId(msg)
regionByShard.get(shardId) match {
case Some(ref) if ref == self
getShard(shardId).tell(msg, snd)
getShard(shardId) match {
case Some(shard)
shardBuffers.get(shardId) match {
case Some(buf)
// Since now messages to a shard is buffered then those messages must be in right order
bufferMessage(shardId, msg, snd)
deliverBufferedMessages(shardId, shard)
case None
shard.tell(msg, snd)
}
case None bufferMessage(shardId, msg, snd)
}
case Some(ref)
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
ref.tell(msg, snd)
@ -517,36 +570,13 @@ class ShardRegion(
log.debug("Request shard [{}] home", shardId)
coordinator.foreach(_ ! GetShardHome(shardId))
}
val totBufSize = totalBufferSize
if (totBufSize >= bufferSize) {
if (loggedFullBufferWarning)
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
else {
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
loggedFullBufferWarning = true
}
context.system.deadLetters ! msg
} else {
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
// log some insight to how buffers are filled up every 10% of the buffer capacity
val tot = totBufSize + 1
if (tot % (bufferSize / 10) == 0) {
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
if (tot <= bufferSize / 2)
log.info(logMsg)
else
log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.")
}
}
bufferMessage(shardId, msg, snd)
}
}
def getShard(id: ShardId): ActorRef = shards.getOrElse(
id,
def getShard(id: ShardId): Option[ActorRef] = shards.get(id).orElse(
entityProps match {
case Some(props)
case Some(props) if !shardsByRef.values.exists(_ == id)
log.debug("Starting shard [{}] in region", id)
val name = URLEncoder.encode(id, "utf-8")
@ -560,9 +590,10 @@ class ShardRegion(
extractShardId,
handOffStopMessage).withDispatcher(context.props.dispatcher),
name))
shards = shards.updated(id, shard)
shardsByRef = shardsByRef.updated(shard, id)
shard
None
case Some(props)
None
case None
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
})

View file

@ -26,26 +26,7 @@ import scala.concurrent.Future
import akka.util.Timeout
import akka.pattern.ask
object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingCustomShardAllocationSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec"
"""))
object ClusterShardingCustomShardAllocationSpec {
class Entity extends Actor {
def receive = {
case id: Int sender() ! id
@ -98,11 +79,43 @@ object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig {
}
class ClusterShardingCustomShardAllocationMultiJvmNode1 extends ClusterShardingCustomShardAllocationSpec
class ClusterShardingCustomShardAllocationMultiJvmNode2 extends ClusterShardingCustomShardAllocationSpec
abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShardingCustomShardAllocationSpec) with STMultiNodeSpec with ImplicitSender {
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingCustomShardAllocationSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec"
akka.cluster.sharding.state-store-mode = "$mode"
"""))
}
object PersistentClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("persistence")
object DDataClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("ddata")
class PersistentClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig)
class DDataClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(DDataClusterShardingCustomShardAllocationSpecConfig)
class PersistentClusterShardingCustomShardAllocationMultiJvmNode1 extends PersistentClusterShardingCustomShardAllocationSpec
class PersistentClusterShardingCustomShardAllocationMultiJvmNode2 extends PersistentClusterShardingCustomShardAllocationSpec
class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec
class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec
abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingCustomShardAllocationSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingCustomShardAllocationSpec._
import config._
override def initialParticipants = roles.size
@ -146,7 +159,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
lazy val allocator = system.actorOf(Props[Allocator], "allocator")
"Cluster sharding with custom allocation strategy" must {
s"Cluster sharding ($mode) with custom allocation strategy" must {
"setup shared journal" in {
// start the Persistence extension

View file

@ -22,34 +22,7 @@ import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
object ClusterShardingFailureSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingFailureSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
akka.cluster.sharding.coordinator-failure-backoff = 3s
akka.cluster.sharding.shard-failure-backoff = 3s
"""))
testTransport(on = true)
object ClusterShardingFailureSpec {
case class Get(id: String)
case class Add(id: String, i: Int)
case class Value(id: String, n: Int)
@ -75,12 +48,55 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
}
class ClusterShardingFailureMultiJvmNode1 extends ClusterShardingFailureSpec
class ClusterShardingFailureMultiJvmNode2 extends ClusterShardingFailureSpec
class ClusterShardingFailureMultiJvmNode3 extends ClusterShardingFailureSpec
abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpec) with STMultiNodeSpec with ImplicitSender {
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingFailureSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec"
akka.cluster.sharding {
coordinator-failure-backoff = 3s
shard-failure-backoff = 3s
state-store-mode = "$mode"
}
"""))
testTransport(on = true)
}
object PersistentClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("persistence")
object DDataClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("ddata")
class PersistentClusterShardingFailureSpec extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig)
class DDataClusterShardingFailureSpec extends ClusterShardingFailureSpec(DDataClusterShardingFailureSpecConfig)
class PersistentClusterShardingFailureMultiJvmNode1 extends PersistentClusterShardingFailureSpec
class PersistentClusterShardingFailureMultiJvmNode2 extends PersistentClusterShardingFailureSpec
class PersistentClusterShardingFailureMultiJvmNode3 extends PersistentClusterShardingFailureSpec
class DDataClusterShardingFailureMultiJvmNode1 extends DDataClusterShardingFailureSpec
class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec
class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec
abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingFailureSpec._
import config._
override def initialParticipants = roles.size
@ -120,7 +136,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding with flaky journal" must {
s"Cluster sharding ($mode) with flaky journal" must {
"setup shared journal" in {
// start the Persistence extension

View file

@ -26,26 +26,7 @@ import scala.concurrent.Future
import akka.util.Timeout
import akka.pattern.ask
object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingGracefulShutdownSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec"
"""))
object ClusterShardingGracefulShutdownSpec {
case object StopEntity
class Entity extends Actor {
@ -84,11 +65,43 @@ object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
}
class ClusterShardingGracefulShutdownMultiJvmNode1 extends ClusterShardingGracefulShutdownSpec
class ClusterShardingGracefulShutdownMultiJvmNode2 extends ClusterShardingGracefulShutdownSpec
abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingGracefulShutdownSpec) with STMultiNodeSpec with ImplicitSender {
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingGracefulShutdownSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec"
akka.cluster.sharding.state-store-mode = "$mode"
"""))
}
object PersistentClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("persistence")
object DDataClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("ddata")
class PersistentClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig)
class DDataClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(DDataClusterShardingGracefulShutdownSpecConfig)
class PersistentClusterShardingGracefulShutdownMultiJvmNode1 extends PersistentClusterShardingGracefulShutdownSpec
class PersistentClusterShardingGracefulShutdownMultiJvmNode2 extends PersistentClusterShardingGracefulShutdownSpec
class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec
class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec
abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracefulShutdownSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingGracefulShutdownSpec._
import config._
override def initialParticipants = roles.size
@ -131,7 +144,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding" must {
s"Cluster sharding ($mode)" must {
"setup shared journal" in {
// start the Persistence extension

View file

@ -26,30 +26,7 @@ import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
object ClusterShardingLeavingSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingLeavingSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec"
"""))
object ClusterShardingLeavingSpec {
case class Ping(id: String)
class Entity extends Actor {
@ -76,16 +53,53 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig {
val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id: String) id.charAt(0).toString
}
}
class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec
abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender {
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingLeavingSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec"
akka.cluster.sharding.state-store-mode = "$mode"
"""))
}
object PersistentClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("persistence")
object DDataClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("ddata")
class PersistentClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig)
class DDataClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(DDataClusterShardingLeavingSpecConfig)
class PersistentClusterShardingLeavingMultiJvmNode1 extends PersistentClusterShardingLeavingSpec
class PersistentClusterShardingLeavingMultiJvmNode2 extends PersistentClusterShardingLeavingSpec
class PersistentClusterShardingLeavingMultiJvmNode3 extends PersistentClusterShardingLeavingSpec
class PersistentClusterShardingLeavingMultiJvmNode4 extends PersistentClusterShardingLeavingSpec
class DDataClusterShardingLeavingMultiJvmNode1 extends DDataClusterShardingLeavingSpec
class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavingSpec
class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec
class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec
abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingLeavingSpec._
import config._
override def initialParticipants = roles.size
@ -132,7 +146,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding with leaving member" must {
s"Cluster sharding ($mode) with leaving member" must {
"setup shared journal" in {
// start the Persistence extension

View file

@ -3,6 +3,7 @@
*/
package akka.cluster.sharding
import akka.cluster.ddata.{ ReplicatorSettings, Replicator }
import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
import akka.cluster.sharding.ShardRegion.Passivate
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
@ -11,7 +12,6 @@ import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.pattern.BackoffSupervisor
import akka.cluster.Cluster
import akka.persistence.PersistentActor
import akka.persistence.Persistence
@ -29,46 +29,7 @@ import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.pattern.BackoffSupervisor
object ClusterShardingSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.store {
native = off
dir = "target/journal-ClusterShardingSpec"
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
akka.cluster.sharding {
retry-interval = 1 s
handoff-timeout = 10 s
shard-start-timeout = 5s
entity-restart-backoff = 1s
rebalance-interval = 2 s
least-shard-allocation-strategy {
rebalance-threshold = 2
max-simultaneous-rebalance = 1
}
}
"""))
nodeConfig(sixth) {
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
}
object ClusterShardingSpec {
//#counter-actor
case object Increment
case object Decrement
@ -136,6 +97,48 @@ object ClusterShardingSpec extends MultiNodeConfig {
}
abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
val sixth = role("sixth")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.down-removal-margin = 5s
akka.cluster.roles = ["backend"]
akka.cluster.distributed-data.gossip-interval = 1s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.store {
native = off
dir = "target/journal-ClusterShardingSpec"
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
akka.cluster.sharding {
retry-interval = 1 s
handoff-timeout = 10 s
shard-start-timeout = 5s
entity-restart-backoff = 1s
rebalance-interval = 2 s
state-store-mode = "$mode"
least-shard-allocation-strategy {
rebalance-threshold = 2
max-simultaneous-rebalance = 1
}
}
"""))
nodeConfig(sixth) {
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
}
}
// only used in documentation
object ClusterShardingDocCode {
import ClusterShardingSpec._
@ -156,16 +159,31 @@ object ClusterShardingDocCode {
}
class ClusterShardingMultiJvmNode1 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode2 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode3 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode4 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode5 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode6 extends ClusterShardingSpec
class ClusterShardingMultiJvmNode7 extends ClusterShardingSpec
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence")
object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata")
class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMultiNodeSpec with ImplicitSender {
class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig)
class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig)
class PersistentClusterShardingMultiJvmNode1 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode2 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode3 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode4 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode5 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode6 extends PersistentClusterShardingSpec
class PersistentClusterShardingMultiJvmNode7 extends PersistentClusterShardingSpec
class DDataClusterShardingMultiJvmNode1 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode2 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode3 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode4 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode5 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode6 extends DDataClusterShardingSpec
class DDataClusterShardingMultiJvmNode7 extends DDataClusterShardingSpec
abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingSpec._
import config._
override def initialParticipants = roles.size
@ -195,6 +213,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
def createCoordinator(): Unit = {
val replicator = system.actorOf(Replicator.props(
ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator")
def coordinatorProps(typeName: String, rebalanceEnabled: Boolean) = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val cfg = ConfigFactory.parseString(s"""
@ -203,7 +224,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"}
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
ShardCoordinator.props(typeName, settings, allocationStrategy)
if (settings.stateStoreMode == "persistence")
ShardCoordinator.props(typeName, settings, allocationStrategy)
else
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator)
}
List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter",
@ -252,7 +276,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true)
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true)
"Cluster sharding" must {
s"Cluster sharding ($mode)" must {
"setup shared journal" in {
// start the Persistence extension

View file

@ -176,6 +176,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
also add latency. This should be considered when designing the application specific
shard resolution, e.g. to avoid too fine grained shards.
Distributed Data Mode
---------------------
Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case
state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data``
module with the ``WriteMajority`` consistency. This mode could be enabled by setting up
``akka.cluster.sharding.state-store-mode`` as ``ddata``.
It make possible to remove ``akka-persistence`` dependency from a project if this dependency
has not using in user code and ``remember-entities`` is ``off``.
Note that option also could lead to the shards duplication in case of a cluster fragmentation
due to a broken replication between nodes.
Proxy Only Mode
---------------

View file

@ -179,6 +179,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma
also add latency. This should be considered when designing the application specific
shard resolution, e.g. to avoid too fine grained shards.
Distributed Data Mode
---------------------
Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case
state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data``
module with the ``WriteMajority`` consistency. This mode could be enabled by setting up
``akka.cluster.sharding.state-store-mode`` as ``ddata``.
It make possible to remove ``akka-persistence`` dependency from a project if this dependency
has not using in user code and ``remember-entities`` is ``off``.
Note that option also could lead to the shards duplication in case of a cluster fragmentation
due to a broken replication between nodes.
Proxy Only Mode
---------------

View file

@ -52,7 +52,7 @@ object AkkaBuild extends Build {
archivesPathFinder.get.map(file => (file -> ("akka/" + file.getName)))
}
),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed)
)
@ -62,7 +62,7 @@ object AkkaBuild extends Build {
base = file("akka-scala-nightly"),
// remove dependencies that we have to build ourselves (Scala STM)
// samples don't work with dbuild right now
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel,
cluster, clusterMetrics, clusterTools, clusterSharding, distributedData,
slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed)
).disablePlugins(ValidatePullRequest)
@ -136,9 +136,9 @@ object AkkaBuild extends Build {
id = "akka-cluster-sharding",
base = file("akka-cluster-sharding"),
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
persistence % "compile;test->provided", clusterTools)
persistence % "compile;test->provided", distributedData % "compile;test->provided", clusterTools)
) configs (MultiJvm)
lazy val distributedData = Project(
id = "akka-distributed-data-experimental",
base = file("akka-distributed-data"),
@ -244,7 +244,7 @@ object AkkaBuild extends Build {
lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala")
lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda")
lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala")
lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java")