diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 92447dfaad..0216c56102 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -14,6 +14,15 @@ akka.cluster.sharding { # e.g. '/user/sharding' guardian-name = sharding + # Specifies that entries runs on cluster nodes with a specific role. + # If the role is not specified (or empty) all nodes in the cluster are used. + role = "" + + # When this is set to 'on' the active entry actors will automatically be restarted + # upon Shard restart. i.e. if the Shard is started on a different ShardRegion + # due to rebalance or crash. + remember-entries = off + # If the coordinator can't store state changes it will be stopped # and started again after this duration. coordinator-failure-backoff = 10 s @@ -21,13 +30,14 @@ akka.cluster.sharding { # The ShardRegion retries registration and shard location requests to the # ShardCoordinator with this interval if it does not reply. retry-interval = 2 s + # Maximum number of messages that are buffered by a ShardRegion actor. buffer-size = 100000 # Timeout of the shard rebalancing process. handoff-timeout = 60 s - # Time given to a region to acknowdge it's hosting a shard. + # Time given to a region to acknowledge it's hosting a shard. shard-start-timeout = 10 s # If the shard can't store state changes it will retry the action @@ -43,6 +53,18 @@ akka.cluster.sharding { # Rebalance check is performed periodically with this interval. rebalance-interval = 10 s + # Absolute path to the journal plugin configuration entry that is to be + # used for the internal persistence of ClusterSharding. If not defined + # the default journal plugin is used. Note that this is not related to + # persistence used by the entry actors. + journal-plugin-id = "" + + # Absolute path to the snapshot plugin configuration entry that is to be + # used for the internal persistence of ClusterSharding. If not defined + # the default snapshot plugin is used. Note that this is not related to + # persistence used by the entry actors. + snapshot-plugin-id = "" + # How often the coordinator saves persistent snapshots, which are # used to reduce recovery times snapshot-interval = 3600 s diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index a39c441716..90d8a5341d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -5,17 +5,20 @@ package akka.cluster.sharding import java.net.URLEncoder import java.util.concurrent.ConcurrentHashMap -import akka.cluster.sharding.Shard.{ ShardCommand, StateChange } -import akka.cluster.sharding.ShardCoordinator.Internal.SnapshotTick + import scala.collection.immutable import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.Success + import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.ActorSelection import akka.actor.ActorSystem +import akka.actor.Cancellable +import akka.actor.Deploy import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId @@ -27,22 +30,22 @@ import akka.actor.ReceiveTimeout import akka.actor.RootActorPath import akka.actor.Terminated import akka.cluster.Cluster +import akka.cluster.ClusterEvent.ClusterDomainEvent +import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberUp -import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.Member import akka.cluster.MemberStatus -import akka.pattern.ask -import akka.persistence._ -import akka.cluster.ClusterEvent.ClusterDomainEvent +import akka.cluster.sharding.Shard.{ ShardCommand, StateChange } +import akka.cluster.sharding.ShardCoordinator.Internal.SnapshotTick import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings -import scala.concurrent.Future import akka.dispatch.ExecutionContexts +import akka.persistence._ +import akka.pattern.ask import akka.pattern.pipe -import scala.util.Success import akka.util.ByteString /** @@ -165,6 +168,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv override def createExtension(system: ExtendedActorSystem): ClusterSharding = new ClusterSharding(system) + } /** @@ -177,31 +181,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { private val cluster = Cluster(system) - /** - * INTERNAL API - */ - private[akka] object Settings { - val config = system.settings.config.getConfig("akka.cluster.sharding") - - val GuardianName: String = config.getString("guardian-name") - val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis - val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis - val BufferSize: Int = config.getInt("buffer-size") - val HandOffTimeout: FiniteDuration = config.getDuration("handoff-timeout", MILLISECONDS).millis - val ShardStartTimeout: FiniteDuration = config.getDuration("shard-start-timeout", MILLISECONDS).millis - val ShardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis - val EntryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis - val RebalanceInterval: FiniteDuration = config.getDuration("rebalance-interval", MILLISECONDS).millis - val SnapshotInterval: FiniteDuration = config.getDuration("snapshot-interval", MILLISECONDS).millis - val LeastShardAllocationRebalanceThreshold: Int = - config.getInt("least-shard-allocation-strategy.rebalance-threshold") - val LeastShardAllocationMaxSimultaneousRebalance: Int = - config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance") - } - - import Settings._ private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap - private lazy val guardian = system.actorOf(Props[ClusterShardingGuardian], Settings.GuardianName) + private lazy val guardian = { + val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name") + system.actorOf(Props[ClusterShardingGuardian], guardianName) + } private[akka] def requireClusterRole(role: Option[String]): Unit = require(role.forall(cluster.selfRoles.contains), @@ -219,11 +203,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` * @param role specifies that this entry type requires cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` - * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. - * @param idExtractor partial function to extract the entry id and the message to send to the - * entry from the incoming message, if the partial function does not match the message will - * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + * @param settings configuration settings, see [[ClusterShardingSettings]] * @param shardResolver function to determine the shard id for an incoming message, only messages * that passed the `idExtractor` will be used * @param allocationStrategy possibility to use a custom shard allocation and @@ -235,16 +215,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - role: Option[String], - rememberEntries: Boolean, + settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - requireClusterRole(role) + requireClusterRole(settings.role) implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, role, rememberEntries, + val startMsg = Start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) @@ -264,10 +243,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * * @param typeName the name of the entry type * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` - * @param role specifies that this entry type requires cluster nodes with a specific role. - * If the role is not specified all nodes in the cluster are used. - * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` - * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. + * @param settings configuration settings, see [[ClusterShardingSettings]] * @param idExtractor partial function to extract the entry id and the message to send to the * entry from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream @@ -278,14 +254,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - role: Option[String], - rememberEntries: Boolean, + settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): ActorRef = { - start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, - new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), - PoisonPill) + val allocationStrategy = new LeastShardAllocationStrategy( + settings.tuningParameters.leastShardAllocationRebalanceThreshold, + settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) + + start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, PoisonPill) } /** @@ -298,10 +275,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * * @param typeName the name of the entry type * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` - * @param role specifies that this entry type requires cluster nodes with a specific role. - * If the role is not specified all nodes in the cluster are used. - * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` - * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. + * @param settings configuration settings, see [[ClusterShardingSettings]] * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * entry from the incoming message * @param allocationStrategy possibility to use a custom shard allocation and @@ -313,13 +287,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - role: Option[String], - rememberEntries: Boolean, + settings: ClusterShardingSettings, messageExtractor: ShardRegion.MessageExtractor, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - start(typeName, entryProps, role, rememberEntries, + start(typeName, entryProps, settings, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) @@ -342,10 +315,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * * @param typeName the name of the entry type * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` - * @param role specifies that this entry type requires cluster nodes with a specific role. - * If the role is not specified all nodes in the cluster are used. - * @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard` - * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. + * @param settings configuration settings, see [[ClusterShardingSettings]] * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * entry from the incoming message * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard @@ -353,13 +323,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { def start( typeName: String, entryProps: Props, - role: Option[String], - rememberEntries: Boolean, + settings: ClusterShardingSettings, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { - start(typeName, entryProps, role, rememberEntries, messageExtractor, - new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), - PoisonPill) + val allocationStrategy = new LeastShardAllocationStrategy( + settings.tuningParameters.leastShardAllocationRebalanceThreshold, + settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) + + start(typeName, entryProps, settings, messageExtractor, allocationStrategy, PoisonPill) } /** @@ -388,7 +359,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { shardResolver: ShardRegion.ShardResolver): ActorRef = { implicit val timeout = system.settings.CreationTimeout - val startMsg = StartProxy(typeName, role, idExtractor, shardResolver) + val settings = ClusterShardingSettings(system).withRole(role) + val startMsg = StartProxy(typeName, settings, idExtractor, shardResolver) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion @@ -441,11 +413,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy - final case class Start(typeName: String, entryProps: Props, role: Option[String], rememberEntries: Boolean, + final case class Start(typeName: String, entryProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded - final case class StartProxy(typeName: String, role: Option[String], + final case class StartProxy(typeName: String, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded @@ -460,7 +432,6 @@ private[akka] class ClusterShardingGuardian extends Actor { val cluster = Cluster(context.system) val sharding = ClusterSharding(context.system) - import sharding.Settings._ private def coordinatorSingletonManagerName(encName: String): String = encName + "Coordinator" @@ -469,15 +440,16 @@ private[akka] class ClusterShardingGuardian extends Actor { (self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress def receive = { - case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ + case Start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ + import settings.role + import settings.tuningParameters.coordinatorFailureBackoff val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) val shardRegion = context.child(encName).getOrElse { if (context.child(cName).isEmpty) { - val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout, - rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy) - val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps) + val coordinatorProps = ShardCoordinator.props(settings, allocationStrategy) + val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps) val singletonSettings = ClusterSingletonManagerSettings(context.system) .withSingletonName("singleton").withRole(role) context.actorOf(ClusterSingletonManager.props( @@ -490,14 +462,8 @@ private[akka] class ClusterShardingGuardian extends Actor { context.actorOf(ShardRegion.props( typeName = typeName, entryProps = entryProps, - role = role, + settings = settings, coordinatorPath = cPath, - retryInterval = RetryInterval, - snapshotInterval = SnapshotInterval, - shardFailureBackoff = ShardFailureBackoff, - entryRestartBackoff = EntryRestartBackoff, - bufferSize = BufferSize, - rememberEntries = rememberEntries, idExtractor = idExtractor, shardResolver = shardResolver, handOffStopMessage = handOffStopMessage), @@ -505,17 +471,15 @@ private[akka] class ClusterShardingGuardian extends Actor { } sender() ! Started(shardRegion) - case StartProxy(typeName, role, idExtractor, shardResolver) ⇒ + case StartProxy(typeName, settings, idExtractor, shardResolver) ⇒ val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) val shardRegion = context.child(encName).getOrElse { context.actorOf(ShardRegion.proxyProps( typeName = typeName, - role = role, + settings = settings, coordinatorPath = cPath, - retryInterval = RetryInterval, - bufferSize = BufferSize, idExtractor = idExtractor, shardResolver = shardResolver), name = encName) @@ -538,19 +502,13 @@ object ShardRegion { private[akka] def props( typeName: String, entryProps: Props, - role: Option[String], + settings: ClusterShardingSettings, coordinatorPath: String, - retryInterval: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any): Props = - Props(new ShardRegion(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, - snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)) + Props(new ShardRegion(typeName, Some(entryProps), settings, coordinatorPath, idExtractor, + shardResolver, handOffStopMessage)).withDeploy(Deploy.local) /** * INTERNAL API @@ -559,14 +517,12 @@ object ShardRegion { */ private[akka] def proxyProps( typeName: String, - role: Option[String], + settings: ClusterShardingSettings, coordinatorPath: String, - retryInterval: FiniteDuration, - bufferSize: Int, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): Props = - Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, - Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill)) + Props(new ShardRegion(typeName, None, settings, coordinatorPath, idExtractor, shardResolver, PoisonPill)) + .withDeploy(Deploy.local) /** * Marker type of entry identifier (`String`). @@ -688,7 +644,7 @@ object ShardRegion { private[akka] def handOffStopperProps( shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any): Props = - Props(new HandOffStopper(shard, replyTo, entries, stopMessage)) + Props(new HandOffStopper(shard, replyTo, entries, stopMessage)).withDeploy(Deploy.local) } /** @@ -701,20 +657,16 @@ object ShardRegion { class ShardRegion( typeName: String, entryProps: Option[Props], - role: Option[String], + settings: ClusterShardingSettings, coordinatorPath: String, - retryInterval: FiniteDuration, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any) extends Actor with ActorLogging { import ShardCoordinator.Internal._ import ShardRegion._ + import settings._ + import settings.tuningParameters._ val cluster = Cluster(context.system) @@ -969,11 +921,7 @@ class ShardRegion( typeName, id, props, - shardFailureBackoff, - entryRestartBackoff, - snapshotInterval, - bufferSize, - rememberEntries, + settings, idExtractor, shardResolver, handOffStopMessage), @@ -1050,16 +998,12 @@ private[akka] object Shard { def props(typeName: String, shardId: ShardRegion.ShardId, entryProps: Props, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, + settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any): Props = - Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval, - bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)) + Props(new Shard(typeName, shardId, entryProps, settings, idExtractor, shardResolver, handOffStopMessage)) + .withDeploy(Deploy.local) } /** @@ -1074,11 +1018,7 @@ private[akka] class Shard( typeName: String, shardId: ShardRegion.ShardId, entryProps: Props, - shardFailureBackoff: FiniteDuration, - entryRestartBackoff: FiniteDuration, - snapshotInterval: FiniteDuration, - bufferSize: Int, - rememberEntries: Boolean, + settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any) extends PersistentActor with ActorLogging { @@ -1090,11 +1030,17 @@ private[akka] class Shard( import akka.cluster.sharding.ShardRegion.ShardRegionCommand import akka.persistence.RecoveryCompleted + import settings.rememberEntries + import settings.tuningParameters._ import context.dispatcher val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick) override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" + override def journalPluginId: String = settings.journalPluginId + + override def snapshotPluginId: String = settings.snapshotPluginId + var state = State.Empty var idByRef = Map.empty[ActorRef, EntryId] var refById = Map.empty[EntryId, ActorRef] @@ -1309,6 +1255,7 @@ private[akka] object ShardCoordinatorSupervisor { */ def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props = Props(new ShardCoordinatorSupervisor(failureBackoff, coordinatorProps)) + .withDeploy(Deploy.local) /** * INTERNAL API @@ -1348,9 +1295,9 @@ object ShardCoordinator { * INTERNAL API * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. */ - private[akka] def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration, + private[akka] def props(settings: ClusterShardingSettings, allocationStrategy: ShardAllocationStrategy): Props = - Props(new ShardCoordinator(handOffTimeout, shardStartTimeout, rebalanceInterval, snapshotInterval, allocationStrategy)) + Props(new ShardCoordinator(settings, allocationStrategy)).withDeploy(Deploy.local) /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. @@ -1670,16 +1617,19 @@ object ShardCoordinator { * * @see [[ClusterSharding$ ClusterSharding extension]] */ -class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, - snapshotInterval: FiniteDuration, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) +class ShardCoordinator(settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends PersistentActor with ActorLogging { import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId - import akka.actor.Cancellable + import settings.tuningParameters._ override def persistenceId = self.path.toStringWithoutAddress + override def journalPluginId: String = settings.journalPluginId + + override def snapshotPluginId: String = settings.snapshotPluginId + var persistentState = State.empty var rebalanceInProgress = Set.empty[ShardId] var unAckedHostShards = Map.empty[ShardId, Cancellable] diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala new file mode 100644 index 0000000000..1601386578 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -0,0 +1,130 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.duration._ +import scala.concurrent.duration.FiniteDuration + +import akka.actor.ActorSystem +import akka.actor.NoSerializationVerificationNeeded +import com.typesafe.config.Config + +object ClusterShardingSettings { + /** + * Create settings from the default configuration + * `akka.cluster.sharding`. + */ + def apply(system: ActorSystem): ClusterShardingSettings = + apply(system.settings.config.getConfig("akka.cluster.sharding")) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.sharding`. + */ + def apply(config: Config): ClusterShardingSettings = { + val tuningParameters = new TuningParameters( + coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis, + retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis, + bufferSize = config.getInt("buffer-size"), + handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis, + shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis, + shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis, + entryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis, + rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis, + snapshotInterval = config.getDuration("snapshot-interval", MILLISECONDS).millis, + leastShardAllocationRebalanceThreshold = + config.getInt("least-shard-allocation-strategy.rebalance-threshold"), + leastShardAllocationMaxSimultaneousRebalance = + config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance")) + + new ClusterShardingSettings( + role = roleOption(config.getString("role")), + rememberEntries = config.getBoolean("remember-entries"), + journalPluginId = config.getString("journal-plugin-id"), + snapshotPluginId = config.getString("snapshot-plugin-id"), + tuningParameters) + } + + /** + * Java API: Create settings from the default configuration + * `akka.cluster.sharding`. + */ + def create(system: ActorSystem): ClusterShardingSettings = apply(system) + + /** + * Java API: Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.sharding`. + */ + def create(config: Config): ClusterShardingSettings = apply(config) + + /** + * INTERNAL API + */ + private[akka] def roleOption(role: String): Option[String] = + if (role == "") None else Option(role) + + class TuningParameters( + val coordinatorFailureBackoff: FiniteDuration, + val retryInterval: FiniteDuration, + val bufferSize: Int, + val handOffTimeout: FiniteDuration, + val shardStartTimeout: FiniteDuration, + val shardFailureBackoff: FiniteDuration, + val entryRestartBackoff: FiniteDuration, + val rebalanceInterval: FiniteDuration, + val snapshotInterval: FiniteDuration, + val leastShardAllocationRebalanceThreshold: Int, + val leastShardAllocationMaxSimultaneousRebalance: Int) +} + +/** + * @param role specifies that this entry type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param rememberEntries true if active entry actors shall be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. + * @param journalPluginId Absolute path to the journal plugin configuration entry that is to + * be used for the internal persistence of ClusterSharding. If not defined the default + * journal plugin is used. Note that this is not related to persistence used by the entry + * actors. + * @param snapshotPluginId Absolute path to the snapshot plugin configuration entry that is to + * be used for the internal persistence of ClusterSharding. If not defined the default + * snapshot plugin is used. Note that this is not related to persistence used by the entry + * actors. + * @param tuningParameters additional tuning parameters, see descriptions in reference.conf + */ +final class ClusterShardingSettings( + val role: Option[String], + val rememberEntries: Boolean, + val journalPluginId: String, + val snapshotPluginId: String, + val tuningParameters: ClusterShardingSettings.TuningParameters) extends NoSerializationVerificationNeeded { + + def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) + + def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role) + + def withRememberEntries(rememberEntries: Boolean): ClusterShardingSettings = + copy(rememberEntries = rememberEntries) + + def withJournalPluginId(journalPluginId: String): ClusterShardingSettings = + copy(journalPluginId = journalPluginId) + + def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings = + copy(snapshotPluginId = snapshotPluginId) + + def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings = + copy(tuningParameters = tuningParameters) + + private def copy(role: Option[String] = role, + rememberEntries: Boolean = rememberEntries, + journalPluginId: String = journalPluginId, + snapshotPluginId: String = snapshotPluginId, + tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters): ClusterShardingSettings = + new ClusterShardingSettings( + role, + rememberEntries, + journalPluginId, + snapshotPluginId, + tuningParameters) +} diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 65f35d23e7..36ee134ebb 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -135,8 +135,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar ClusterSharding(system).start( typeName = "Entity", entryProps = Props[Entity], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver, allocationStrategy = TestAllocationStrategy(allocator), diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 7448c73c4b..edaeaf6170 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -112,8 +112,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe ClusterSharding(system).start( typeName = "Entity", entryProps = Props[Entity], - role = None, - rememberEntries = true, + settings = ClusterShardingSettings(system).withRememberEntries(true), idExtractor = idExtractor, shardResolver = shardResolver) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 5530f32794..b6e3043687 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -122,8 +122,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG ClusterSharding(system).start( typeName = "Entity", entryProps = Props[Entity], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver, allocationStrategy, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index e39fbe99eb..4376d3a069 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -124,8 +124,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe ClusterSharding(system).start( typeName = "Entity", entryProps = Props[Entity], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index c00d9bb70d..97814ca3d0 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -48,7 +48,6 @@ object ClusterShardingSpec extends MultiNodeConfig { akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec" akka.cluster.sharding { - role = backend retry-interval = 1 s handoff-timeout = 10 s shard-start-timeout = 5s @@ -183,11 +182,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } def createCoordinator(): Unit = { - val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - def coordinatorProps(rebalanceEnabled: Boolean) = - ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, - rebalanceInterval = if (rebalanceEnabled) 2.seconds else 3600.seconds, - snapshotInterval = 3600.seconds, allocationStrategy) + def coordinatorProps(rebalanceEnabled: Boolean) = { + val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + val cfg = ConfigFactory.parseString(s""" + handoff-timeout = 10s + shard-start-timeout = 10s + rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"} + snapshot-interval = 3600s + """).withFallback(system.settings.config.getConfig("akka.cluster.sharding")) + val settings = ClusterShardingSettings(cfg) + ShardCoordinator.props(settings, allocationStrategy) + } List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter", "PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { coordinatorName ⇒ @@ -200,21 +205,26 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props( - typeName = typeName, - entryProps = Props[Counter], - role = None, - coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", - retryInterval = 1.second, - shardFailureBackoff = 1.second, - entryRestartBackoff = 1.second, - snapshotInterval = 1.hour, - bufferSize = 1000, - rememberEntries = rememberEntries, - idExtractor = idExtractor, - shardResolver = shardResolver, - handOffStopMessage = PoisonPill), - name = typeName + "Region") + def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = { + val cfg = ConfigFactory.parseString(""" + retry-interval = 1s + shard-failure-backoff = 1s + entry-restart-backoff = 1s + snapshot-interval = 3600s + buffer-size = 1000 + """).withFallback(system.settings.config.getConfig("akka.cluster.sharding")) + val settings = ClusterShardingSettings(cfg) + .withRememberEntries(rememberEntries) + system.actorOf(ShardRegion.props( + typeName = typeName, + entryProps = Props[Counter], + settings = settings, + coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", + idExtractor = idExtractor, + shardResolver = shardResolver, + handOffStopMessage = PoisonPill), + name = typeName + "Region") + } lazy val region = createRegion("counter", rememberEntries = false) lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false) @@ -319,12 +329,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "support proxy only mode" in within(10.seconds) { runOn(second) { + val cfg = ConfigFactory.parseString(""" + retry-interval = 1s + buffer-size = 1000 + """).withFallback(system.settings.config.getConfig("akka.cluster.sharding")) + val settings = ClusterShardingSettings(cfg) val proxy = system.actorOf(ShardRegion.proxyProps( typeName = "counter", - role = None, + settings, coordinatorPath = "/user/counterCoordinator/singleton/coordinator", - retryInterval = 1.second, - bufferSize = 1000, idExtractor = idExtractor, shardResolver = shardResolver), name = "regionProxy") @@ -491,16 +504,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", entryProps = Props[Counter], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) //#counter-start ClusterSharding(system).start( typeName = "AnotherCounter", entryProps = Props[Counter], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) } @@ -541,8 +552,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", entryProps = Props[Counter], - role = None, - rememberEntries = false, + settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index 2db3699c52..c0f862a2ab 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -70,8 +70,9 @@ public class ClusterShardingTest { //#counter-start Option roleOption = Option.none(); + ClusterShardingSettings settings = ClusterShardingSettings.create(system); ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", - Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor); + Props.create(Counter.class), settings, messageExtractor); //#counter-start //#counter-usage diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 7a54fd55c8..f2ec29584f 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -230,8 +230,10 @@ The configuration properties changed name to ``akka.cluster.sharding``. ClusterSharding construction ============================ -Role is defined as parameter to the ``start`` method of the ``ClusterSharding`` extension -instead of in configuration, so that it can be defined per entry type. +Several parameters of the ``start`` method of the ``ClusterSharding`` extension are now defined +in a settings object ``ClusterShardingSettings``. +It can be created from system configuration properties and also amended with API. +These settings can be defined differently per entry type if needed. Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter. diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index f692cf8158..27b79e8d8c 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -260,11 +260,13 @@ Remembering Entries ------------------- The list of entries in each ``Shard`` can be made persistent (durable) by setting -the ``rememberEntries`` flag to true when calling ``ClusterSharding.start``. When configured -to remember entries, whenever a ``Shard`` is rebalanced onto another node or recovers after a -crash it will recreate all the entries which were previously running in that ``Shard``. To -permanently stop entries, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the -entry will be automatically restarted after the entry restart backoff specified in the configuration. +the ``rememberEntries`` flag to true in ``ClusterShardingSettings`` when calling +``ClusterSharding.start``. When configured to remember entries, whenever a ``Shard`` +is rebalanced onto another node or recovers after a crash it will recreate all the +entries which were previously running in that ``Shard``. To permanently stop entries, +a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the +entry will be automatically restarted after the entry restart backoff specified in +the configuration. When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries after a rebalance or recovering from a crash. Entries will only be started once the first message