!cls #17158 Add configuration of sharding journal

* Use same type of settings object as in cluster-tools
* These settings can be different for different entity types
This commit is contained in:
Patrik Nordwall 2015-06-07 14:49:38 +02:00
parent c1d5221bf8
commit 8de24f38ca
11 changed files with 290 additions and 177 deletions

View file

@ -14,6 +14,15 @@ akka.cluster.sharding {
# e.g. '/user/sharding'
guardian-name = sharding
# Specifies that entries runs on cluster nodes with a specific role.
# If the role is not specified (or empty) all nodes in the cluster are used.
role = ""
# When this is set to 'on' the active entry actors will automatically be restarted
# upon Shard restart. i.e. if the Shard is started on a different ShardRegion
# due to rebalance or crash.
remember-entries = off
# If the coordinator can't store state changes it will be stopped
# and started again after this duration.
coordinator-failure-backoff = 10 s
@ -21,13 +30,14 @@ akka.cluster.sharding {
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.
buffer-size = 100000
# Timeout of the shard rebalancing process.
handoff-timeout = 60 s
# Time given to a region to acknowdge it's hosting a shard.
# Time given to a region to acknowledge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard can't store state changes it will retry the action
@ -43,6 +53,18 @@ akka.cluster.sharding {
# Rebalance check is performed periodically with this interval.
rebalance-interval = 10 s
# Absolute path to the journal plugin configuration entry that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default journal plugin is used. Note that this is not related to
# persistence used by the entry actors.
journal-plugin-id = ""
# Absolute path to the snapshot plugin configuration entry that is to be
# used for the internal persistence of ClusterSharding. If not defined
# the default snapshot plugin is used. Note that this is not related to
# persistence used by the entry actors.
snapshot-plugin-id = ""
# How often the coordinator saves persistent snapshots, which are
# used to reduce recovery times
snapshot-interval = 3600 s

View file

@ -5,17 +5,20 @@ package akka.cluster.sharding
import java.net.URLEncoder
import java.util.concurrent.ConcurrentHashMap
import akka.cluster.sharding.Shard.{ ShardCommand, StateChange }
import akka.cluster.sharding.ShardCoordinator.Internal.SnapshotTick
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.Success
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
@ -27,22 +30,22 @@ import akka.actor.ReceiveTimeout
import akka.actor.RootActorPath
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.ClusterDomainEvent
import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.pattern.ask
import akka.persistence._
import akka.cluster.ClusterEvent.ClusterDomainEvent
import akka.cluster.sharding.Shard.{ ShardCommand, StateChange }
import akka.cluster.sharding.ShardCoordinator.Internal.SnapshotTick
import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import scala.concurrent.Future
import akka.dispatch.ExecutionContexts
import akka.persistence._
import akka.pattern.ask
import akka.pattern.pipe
import scala.util.Success
import akka.util.ByteString
/**
@ -165,6 +168,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv
override def createExtension(system: ExtendedActorSystem): ClusterSharding =
new ClusterSharding(system)
}
/**
@ -177,31 +181,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
private val cluster = Cluster(system)
/**
* INTERNAL API
*/
private[akka] object Settings {
val config = system.settings.config.getConfig("akka.cluster.sharding")
val GuardianName: String = config.getString("guardian-name")
val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis
val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis
val BufferSize: Int = config.getInt("buffer-size")
val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis
val ShardStartTimeout: FiniteDuration = config.getDuration("shard-start-timeout", MILLISECONDS).millis
val ShardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis
val EntryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis
val RebalanceInterval: FiniteDuration = config.getDuration("rebalance-interval", MILLISECONDS).millis
val SnapshotInterval: FiniteDuration = config.getDuration("snapshot-interval", MILLISECONDS).millis
val LeastShardAllocationRebalanceThreshold: Int =
config.getInt("least-shard-allocation-strategy.rebalance-threshold")
val LeastShardAllocationMaxSimultaneousRebalance: Int =
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance")
}
import Settings._
private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap
private lazy val guardian = system.actorOf(Props[ClusterShardingGuardian], Settings.GuardianName)
private lazy val guardian = {
val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name")
system.actorOf(Props[ClusterShardingGuardian], guardianName)
}
private[akka] def requireClusterRole(role: Option[String]): Unit =
require(role.forall(cluster.selfRoles.contains),
@ -219,11 +203,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
* @param role specifies that this entry type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param idExtractor partial function to extract the entry id and the message to send to the
* entry from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param shardResolver function to determine the shard id for an incoming message, only messages
* that passed the `idExtractor` will be used
* @param allocationStrategy possibility to use a custom shard allocation and
@ -235,16 +215,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start(
typeName: String,
entryProps: Props,
role: Option[String],
rememberEntries: Boolean,
settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
requireClusterRole(role)
requireClusterRole(settings.role)
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entryProps, role, rememberEntries,
val startMsg = Start(typeName, entryProps, settings,
idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
@ -264,10 +243,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*
* @param typeName the name of the entry type
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
* @param role specifies that this entry type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param idExtractor partial function to extract the entry id and the message to send to the
* entry from the incoming message, if the partial function does not match the message will
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
@ -278,14 +254,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start(
typeName: String,
entryProps: Props,
role: Option[String],
rememberEntries: Boolean,
settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): ActorRef = {
start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
PoisonPill)
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, PoisonPill)
}
/**
@ -298,10 +275,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*
* @param typeName the name of the entry type
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
* @param role specifies that this entry type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
* entry from the incoming message
* @param allocationStrategy possibility to use a custom shard allocation and
@ -313,13 +287,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start(
typeName: String,
entryProps: Props,
role: Option[String],
rememberEntries: Boolean,
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
start(typeName, entryProps, role, rememberEntries,
start(typeName, entryProps, settings,
idExtractor = {
case msg if messageExtractor.entryId(msg) ne null
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
@ -342,10 +315,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*
* @param typeName the name of the entry type
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
* @param role specifies that this entry type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param settings configuration settings, see [[ClusterShardingSettings]]
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
* entry from the incoming message
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
@ -353,13 +323,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
def start(
typeName: String,
entryProps: Props,
role: Option[String],
rememberEntries: Boolean,
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
start(typeName, entryProps, role, rememberEntries, messageExtractor,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
PoisonPill)
val allocationStrategy = new LeastShardAllocationStrategy(
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
start(typeName, entryProps, settings, messageExtractor, allocationStrategy, PoisonPill)
}
/**
@ -388,7 +359,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
shardResolver: ShardRegion.ShardResolver): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val startMsg = StartProxy(typeName, role, idExtractor, shardResolver)
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, settings, idExtractor, shardResolver)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
@ -441,11 +413,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
*/
private[akka] object ClusterShardingGuardian {
import ShardCoordinator.ShardAllocationStrategy
final case class Start(typeName: String, entryProps: Props, role: Option[String], rememberEntries: Boolean,
final case class Start(typeName: String, entryProps: Props, settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver,
allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any)
extends NoSerializationVerificationNeeded
final case class StartProxy(typeName: String, role: Option[String],
final case class StartProxy(typeName: String, settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver)
extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
@ -460,7 +432,6 @@ private[akka] class ClusterShardingGuardian extends Actor {
val cluster = Cluster(context.system)
val sharding = ClusterSharding(context.system)
import sharding.Settings._
private def coordinatorSingletonManagerName(encName: String): String =
encName + "Coordinator"
@ -469,15 +440,16 @@ private[akka] class ClusterShardingGuardian extends Actor {
(self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress
def receive = {
case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
case Start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
import settings.role
import settings.tuningParameters.coordinatorFailureBackoff
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
if (context.child(cName).isEmpty) {
val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout,
rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy)
val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps)
val coordinatorProps = ShardCoordinator.props(settings, allocationStrategy)
val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps)
val singletonSettings = ClusterSingletonManagerSettings(context.system)
.withSingletonName("singleton").withRole(role)
context.actorOf(ClusterSingletonManager.props(
@ -490,14 +462,8 @@ private[akka] class ClusterShardingGuardian extends Actor {
context.actorOf(ShardRegion.props(
typeName = typeName,
entryProps = entryProps,
role = role,
settings = settings,
coordinatorPath = cPath,
retryInterval = RetryInterval,
snapshotInterval = SnapshotInterval,
shardFailureBackoff = ShardFailureBackoff,
entryRestartBackoff = EntryRestartBackoff,
bufferSize = BufferSize,
rememberEntries = rememberEntries,
idExtractor = idExtractor,
shardResolver = shardResolver,
handOffStopMessage = handOffStopMessage),
@ -505,17 +471,15 @@ private[akka] class ClusterShardingGuardian extends Actor {
}
sender() ! Started(shardRegion)
case StartProxy(typeName, role, idExtractor, shardResolver)
case StartProxy(typeName, settings, idExtractor, shardResolver)
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
context.actorOf(ShardRegion.proxyProps(
typeName = typeName,
role = role,
settings = settings,
coordinatorPath = cPath,
retryInterval = RetryInterval,
bufferSize = BufferSize,
idExtractor = idExtractor,
shardResolver = shardResolver),
name = encName)
@ -538,19 +502,13 @@ object ShardRegion {
private[akka] def props(
typeName: String,
entryProps: Props,
role: Option[String],
settings: ClusterShardingSettings,
coordinatorPath: String,
retryInterval: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entryRestartBackoff: FiniteDuration,
snapshotInterval: FiniteDuration,
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any): Props =
Props(new ShardRegion(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage))
Props(new ShardRegion(typeName, Some(entryProps), settings, coordinatorPath, idExtractor,
shardResolver, handOffStopMessage)).withDeploy(Deploy.local)
/**
* INTERNAL API
@ -559,14 +517,12 @@ object ShardRegion {
*/
private[akka] def proxyProps(
typeName: String,
role: Option[String],
settings: ClusterShardingSettings,
coordinatorPath: String,
retryInterval: FiniteDuration,
bufferSize: Int,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): Props =
Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero,
Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill))
Props(new ShardRegion(typeName, None, settings, coordinatorPath, idExtractor, shardResolver, PoisonPill))
.withDeploy(Deploy.local)
/**
* Marker type of entry identifier (`String`).
@ -688,7 +644,7 @@ object ShardRegion {
private[akka] def handOffStopperProps(
shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any): Props =
Props(new HandOffStopper(shard, replyTo, entries, stopMessage))
Props(new HandOffStopper(shard, replyTo, entries, stopMessage)).withDeploy(Deploy.local)
}
/**
@ -701,20 +657,16 @@ object ShardRegion {
class ShardRegion(
typeName: String,
entryProps: Option[Props],
role: Option[String],
settings: ClusterShardingSettings,
coordinatorPath: String,
retryInterval: FiniteDuration,
shardFailureBackoff: FiniteDuration,
entryRestartBackoff: FiniteDuration,
snapshotInterval: FiniteDuration,
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any) extends Actor with ActorLogging {
import ShardCoordinator.Internal._
import ShardRegion._
import settings._
import settings.tuningParameters._
val cluster = Cluster(context.system)
@ -969,11 +921,7 @@ class ShardRegion(
typeName,
id,
props,
shardFailureBackoff,
entryRestartBackoff,
snapshotInterval,
bufferSize,
rememberEntries,
settings,
idExtractor,
shardResolver,
handOffStopMessage),
@ -1050,16 +998,12 @@ private[akka] object Shard {
def props(typeName: String,
shardId: ShardRegion.ShardId,
entryProps: Props,
shardFailureBackoff: FiniteDuration,
entryRestartBackoff: FiniteDuration,
snapshotInterval: FiniteDuration,
bufferSize: Int,
rememberEntries: Boolean,
settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any): Props =
Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval,
bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage))
Props(new Shard(typeName, shardId, entryProps, settings, idExtractor, shardResolver, handOffStopMessage))
.withDeploy(Deploy.local)
}
/**
@ -1074,11 +1018,7 @@ private[akka] class Shard(
typeName: String,
shardId: ShardRegion.ShardId,
entryProps: Props,
shardFailureBackoff: FiniteDuration,
entryRestartBackoff: FiniteDuration,
snapshotInterval: FiniteDuration,
bufferSize: Int,
rememberEntries: Boolean,
settings: ClusterShardingSettings,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any) extends PersistentActor with ActorLogging {
@ -1090,11 +1030,17 @@ private[akka] class Shard(
import akka.cluster.sharding.ShardRegion.ShardRegionCommand
import akka.persistence.RecoveryCompleted
import settings.rememberEntries
import settings.tuningParameters._
import context.dispatcher
val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick)
override def persistenceId = s"/sharding/${typeName}Shard/${shardId}"
override def journalPluginId: String = settings.journalPluginId
override def snapshotPluginId: String = settings.snapshotPluginId
var state = State.Empty
var idByRef = Map.empty[ActorRef, EntryId]
var refById = Map.empty[EntryId, ActorRef]
@ -1309,6 +1255,7 @@ private[akka] object ShardCoordinatorSupervisor {
*/
def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props =
Props(new ShardCoordinatorSupervisor(failureBackoff, coordinatorProps))
.withDeploy(Deploy.local)
/**
* INTERNAL API
@ -1348,9 +1295,9 @@ object ShardCoordinator {
* INTERNAL API
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
*/
private[akka] def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration,
private[akka] def props(settings: ClusterShardingSettings,
allocationStrategy: ShardAllocationStrategy): Props =
Props(new ShardCoordinator(handOffTimeout, shardStartTimeout, rebalanceInterval, snapshotInterval, allocationStrategy))
Props(new ShardCoordinator(settings, allocationStrategy)).withDeploy(Deploy.local)
/**
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
@ -1670,16 +1617,19 @@ object ShardCoordinator {
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration,
snapshotInterval: FiniteDuration, allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
class ShardCoordinator(settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends PersistentActor with ActorLogging {
import ShardCoordinator._
import ShardCoordinator.Internal._
import ShardRegion.ShardId
import akka.actor.Cancellable
import settings.tuningParameters._
override def persistenceId = self.path.toStringWithoutAddress
override def journalPluginId: String = settings.journalPluginId
override def snapshotPluginId: String = settings.snapshotPluginId
var persistentState = State.empty
var rebalanceInProgress = Set.empty[ShardId]
var unAckedHostShards = Map.empty[ShardId, Cancellable]

View file

@ -0,0 +1,130 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import com.typesafe.config.Config
object ClusterShardingSettings {
/**
* Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def apply(system: ActorSystem): ClusterShardingSettings =
apply(system.settings.config.getConfig("akka.cluster.sharding"))
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def apply(config: Config): ClusterShardingSettings = {
val tuningParameters = new TuningParameters(
coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis,
retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"),
handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis,
shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis,
shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis,
entryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis,
rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis,
snapshotInterval = config.getDuration("snapshot-interval", MILLISECONDS).millis,
leastShardAllocationRebalanceThreshold =
config.getInt("least-shard-allocation-strategy.rebalance-threshold"),
leastShardAllocationMaxSimultaneousRebalance =
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"))
new ClusterShardingSettings(
role = roleOption(config.getString("role")),
rememberEntries = config.getBoolean("remember-entries"),
journalPluginId = config.getString("journal-plugin-id"),
snapshotPluginId = config.getString("snapshot-plugin-id"),
tuningParameters)
}
/**
* Java API: Create settings from the default configuration
* `akka.cluster.sharding`.
*/
def create(system: ActorSystem): ClusterShardingSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.sharding`.
*/
def create(config: Config): ClusterShardingSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
class TuningParameters(
val coordinatorFailureBackoff: FiniteDuration,
val retryInterval: FiniteDuration,
val bufferSize: Int,
val handOffTimeout: FiniteDuration,
val shardStartTimeout: FiniteDuration,
val shardFailureBackoff: FiniteDuration,
val entryRestartBackoff: FiniteDuration,
val rebalanceInterval: FiniteDuration,
val snapshotInterval: FiniteDuration,
val leastShardAllocationRebalanceThreshold: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int)
}
/**
* @param role specifies that this entry type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntries true if active entry actors shall be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param journalPluginId Absolute path to the journal plugin configuration entry that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* journal plugin is used. Note that this is not related to persistence used by the entry
* actors.
* @param snapshotPluginId Absolute path to the snapshot plugin configuration entry that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* snapshot plugin is used. Note that this is not related to persistence used by the entry
* actors.
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
*/
final class ClusterShardingSettings(
val role: Option[String],
val rememberEntries: Boolean,
val journalPluginId: String,
val snapshotPluginId: String,
val tuningParameters: ClusterShardingSettings.TuningParameters) extends NoSerializationVerificationNeeded {
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
def withRememberEntries(rememberEntries: Boolean): ClusterShardingSettings =
copy(rememberEntries = rememberEntries)
def withJournalPluginId(journalPluginId: String): ClusterShardingSettings =
copy(journalPluginId = journalPluginId)
def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings =
copy(snapshotPluginId = snapshotPluginId)
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
copy(tuningParameters = tuningParameters)
private def copy(role: Option[String] = role,
rememberEntries: Boolean = rememberEntries,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters): ClusterShardingSettings =
new ClusterShardingSettings(
role,
rememberEntries,
journalPluginId,
snapshotPluginId,
tuningParameters)
}

View file

@ -135,8 +135,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Props[Entity],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver,
allocationStrategy = TestAllocationStrategy(allocator),

View file

@ -112,8 +112,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Props[Entity],
role = None,
rememberEntries = true,
settings = ClusterShardingSettings(system).withRememberEntries(true),
idExtractor = idExtractor,
shardResolver = shardResolver)
}

View file

@ -122,8 +122,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Props[Entity],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver,
allocationStrategy,

View file

@ -124,8 +124,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Props[Entity],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver)
}

View file

@ -48,7 +48,6 @@ object ClusterShardingSpec extends MultiNodeConfig {
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec"
akka.cluster.sharding {
role = backend
retry-interval = 1 s
handoff-timeout = 10 s
shard-start-timeout = 5s
@ -183,11 +182,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
def createCoordinator(): Unit = {
def coordinatorProps(rebalanceEnabled: Boolean) = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
def coordinatorProps(rebalanceEnabled: Boolean) =
ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds,
rebalanceInterval = if (rebalanceEnabled) 2.seconds else 3600.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
val cfg = ConfigFactory.parseString(s"""
handoff-timeout = 10s
shard-start-timeout = 10s
rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"}
snapshot-interval = 3600s
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
ShardCoordinator.props(settings, allocationStrategy)
}
List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter",
"PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { coordinatorName
@ -200,21 +205,26 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}
}
def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props(
def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = {
val cfg = ConfigFactory.parseString("""
retry-interval = 1s
shard-failure-backoff = 1s
entry-restart-backoff = 1s
snapshot-interval = 3600s
buffer-size = 1000
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
.withRememberEntries(rememberEntries)
system.actorOf(ShardRegion.props(
typeName = typeName,
entryProps = Props[Counter],
role = None,
settings = settings,
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
retryInterval = 1.second,
shardFailureBackoff = 1.second,
entryRestartBackoff = 1.second,
snapshotInterval = 1.hour,
bufferSize = 1000,
rememberEntries = rememberEntries,
idExtractor = idExtractor,
shardResolver = shardResolver,
handOffStopMessage = PoisonPill),
name = typeName + "Region")
}
lazy val region = createRegion("counter", rememberEntries = false)
lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false)
@ -319,12 +329,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"support proxy only mode" in within(10.seconds) {
runOn(second) {
val cfg = ConfigFactory.parseString("""
retry-interval = 1s
buffer-size = 1000
""").withFallback(system.settings.config.getConfig("akka.cluster.sharding"))
val settings = ClusterShardingSettings(cfg)
val proxy = system.actorOf(ShardRegion.proxyProps(
typeName = "counter",
role = None,
settings,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
retryInterval = 1.second,
bufferSize = 1000,
idExtractor = idExtractor,
shardResolver = shardResolver),
name = "regionProxy")
@ -491,16 +504,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter",
entryProps = Props[Counter],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver)
//#counter-start
ClusterSharding(system).start(
typeName = "AnotherCounter",
entryProps = Props[Counter],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver)
}
@ -541,8 +552,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
typeName = "ApiTest",
entryProps = Props[Counter],
role = None,
rememberEntries = false,
settings = ClusterShardingSettings(system),
idExtractor = idExtractor,
shardResolver = shardResolver)

View file

@ -70,8 +70,9 @@ public class ClusterShardingTest {
//#counter-start
Option<String> roleOption = Option.none();
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor);
Props.create(Counter.class), settings, messageExtractor);
//#counter-start
//#counter-usage

View file

@ -230,8 +230,10 @@ The configuration properties changed name to ``akka.cluster.sharding``.
ClusterSharding construction
============================
Role is defined as parameter to the ``start`` method of the ``ClusterSharding`` extension
instead of in configuration, so that it can be defined per entry type.
Several parameters of the ``start`` method of the ``ClusterSharding`` extension are now defined
in a settings object ``ClusterShardingSettings``.
It can be created from system configuration properties and also amended with API.
These settings can be defined differently per entry type if needed.
Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method
of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter.

View file

@ -260,11 +260,13 @@ Remembering Entries
-------------------
The list of entries in each ``Shard`` can be made persistent (durable) by setting
the ``rememberEntries`` flag to true when calling ``ClusterSharding.start``. When configured
to remember entries, whenever a ``Shard`` is rebalanced onto another node or recovers after a
crash it will recreate all the entries which were previously running in that ``Shard``. To
permanently stop entries, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
entry will be automatically restarted after the entry restart backoff specified in the configuration.
the ``rememberEntries`` flag to true in ``ClusterShardingSettings`` when calling
``ClusterSharding.start``. When configured to remember entries, whenever a ``Shard``
is rebalanced onto another node or recovers after a crash it will recreate all the
entries which were previously running in that ``Shard``. To permanently stop entries,
a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
entry will be automatically restarted after the entry restart backoff specified in
the configuration.
When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries
after a rebalance or recovering from a crash. Entries will only be started once the first message