diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 50dca24f8e..7a477c8dfe 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -13,77 +13,89 @@ akka.cluster.sharding { # The extension creates a top level actor with this name in top level system scope, # e.g. '/system/sharding' guardian-name = sharding - + # Specifies that entities runs on cluster nodes with a specific role. # If the role is not specified (or empty) all nodes in the cluster are used. role = "" - + # When this is set to 'on' the active entity actors will automatically be restarted # upon Shard restart. i.e. if the Shard is started on a different ShardRegion # due to rebalance or crash. remember-entities = off - + # If the coordinator can't store state changes it will be stopped # and started again after this duration, with an exponential back-off # of up to 5 times this duration. coordinator-failure-backoff = 5 s - + # The ShardRegion retries registration and shard location requests to the # ShardCoordinator with this interval if it does not reply. retry-interval = 2 s - + # Maximum number of messages that are buffered by a ShardRegion actor. buffer-size = 100000 - + # Timeout of the shard rebalancing process. handoff-timeout = 60 s - + # Time given to a region to acknowledge it's hosting a shard. shard-start-timeout = 10 s - + # If the shard is remembering entities and can't store state changes # will be stopped and then started again after this duration. Any messages # sent to an affected entity may be lost in this process. shard-failure-backoff = 10 s - + # If the shard is remembering entities and an entity stops itself without # using passivate. The entity will be restarted after this duration or when # the next message for it is received, which ever occurs first. entity-restart-backoff = 10 s - + # Rebalance check is performed periodically with this interval. rebalance-interval = 10 s - + # Absolute path to the journal plugin configuration entity that is to be # used for the internal persistence of ClusterSharding. If not defined - # the default journal plugin is used. Note that this is not related to + # the default journal plugin is used. Note that this is not related to # persistence used by the entity actors. journal-plugin-id = "" - + # Absolute path to the snapshot plugin configuration entity that is to be # used for the internal persistence of ClusterSharding. If not defined - # the default snapshot plugin is used. Note that this is not related to + # the default snapshot plugin is used. Note that this is not related to # persistence used by the entity actors. snapshot-plugin-id = "" - - # The coordinator saves persistent snapshots after this number of persistent - # events. Snapshots are used to reduce recovery times. + + # Parameter which determines how the coordinator will be store a state + # valid values either "persistence" or "ddata" + state-store-mode = "persistence" + + # The shard saves persistent snapshots after this number of persistent + # events. Snapshots are used to reduce recovery times. snapshot-after = 1000 - + # Setting for the default shard allocation strategy least-shard-allocation-strategy { # Threshold of how large the difference between most and least number of # allocated shards must be to begin the rebalancing. rebalance-threshold = 10 - + # The number of ongoing rebalancing processes is limited to this number. max-simultaneous-rebalance = 3 } - - # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. + + # Timeout of waiting the initial distributed state (an initial state will be queried again if the timeout happened) + # works only for state-store-mode = "ddata" + waiting-for-state-timeout = 5 s + + # Timeout of waiting for update the distributed state (update will be retried if the timeout happened) + # works only for state-store-mode = "ddata" + updating-state-timeout = 5 s + + # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. coordinator-singleton = ${akka.cluster.singleton} - - # The id of the dispatcher to use for ClusterSharding actors. + + # The id of the dispatcher to use for ClusterSharding actors. # If not specified default dispatcher is used. # If specified you need to define the settings of the actual dispatcher. # This dispatcher for the entity actors is defined by the user provided diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 68b403b75d..700320c6b2 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -19,6 +19,7 @@ import akka.actor.NoSerializationVerificationNeeded import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData import akka.cluster.singleton.ClusterSingletonManager import akka.pattern.BackoffSupervisor import akka.util.ByteString @@ -414,6 +415,7 @@ private[akka] class ClusterShardingGuardian extends Actor { val cluster = Cluster(context.system) val sharding = ClusterSharding(context.system) + lazy val replicator = DistributedData(context.system).replicator private def coordinatorSingletonManagerName(encName: String): String = encName + "Coordinator" @@ -425,12 +427,17 @@ private[akka] class ClusterShardingGuardian extends Actor { case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) ⇒ import settings.role import settings.tuningParameters.coordinatorFailureBackoff + val encName = URLEncoder.encode(typeName, ByteString.UTF_8) val cName = coordinatorSingletonManagerName(encName) val cPath = coordinatorPath(encName) val shardRegion = context.child(encName).getOrElse { if (context.child(cName).isEmpty) { - val coordinatorProps = ShardCoordinator.props(typeName, settings, allocationStrategy) + val coordinatorProps = + if (settings.stateStoreMode == "persistence") + ShardCoordinator.props(typeName, settings, allocationStrategy) + else + ShardCoordinator.props(typeName, settings, allocationStrategy, replicator) val singletonProps = BackoffSupervisor.props( childProps = coordinatorProps, childName = "coordinator", diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 636e06caa1..11bcd91e83 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -36,7 +36,9 @@ object ClusterShardingSettings { leastShardAllocationRebalanceThreshold = config.getInt("least-shard-allocation-strategy.rebalance-threshold"), leastShardAllocationMaxSimultaneousRebalance = - config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance")) + config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance"), + waitingForStateTimeout = config.getDuration("waiting-for-state-timeout", MILLISECONDS).millis, + updatingStateTimeout = config.getDuration("updating-state-timeout", MILLISECONDS).millis) val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) @@ -45,6 +47,7 @@ object ClusterShardingSettings { rememberEntities = config.getBoolean("remember-entities"), journalPluginId = config.getString("journal-plugin-id"), snapshotPluginId = config.getString("snapshot-plugin-id"), + stateStoreMode = config.getString("state-store-mode"), tuningParameters, coordinatorSingletonSettings) } @@ -78,7 +81,9 @@ object ClusterShardingSettings { val rebalanceInterval: FiniteDuration, val snapshotAfter: Int, val leastShardAllocationRebalanceThreshold: Int, - val leastShardAllocationMaxSimultaneousRebalance: Int) + val leastShardAllocationMaxSimultaneousRebalance: Int, + val waitingForStateTimeout: FiniteDuration, + val updatingStateTimeout: FiniteDuration) } /** @@ -101,6 +106,7 @@ final class ClusterShardingSettings( val rememberEntities: Boolean, val journalPluginId: String, val snapshotPluginId: String, + val stateStoreMode: String, val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { @@ -127,6 +133,7 @@ final class ClusterShardingSettings( rememberEntities: Boolean = rememberEntities, journalPluginId: String = journalPluginId, snapshotPluginId: String = snapshotPluginId, + stateStoreMode: String = stateStoreMode, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = new ClusterShardingSettings( @@ -134,6 +141,7 @@ final class ClusterShardingSettings( rememberEntities, journalPluginId, snapshotPluginId, + stateStoreMode, tuningParameters, coordinatorSingletonSettings) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index d9b163340a..ff9a27387a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -98,7 +98,7 @@ private[akka] class Shard( extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Actor with ActorLogging { - import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate } + import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized } import ShardCoordinator.Internal.{ HandOff, ShardStopped } import Shard.{ State, RestartEntity, EntityStopped, EntityStarted } import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage @@ -113,6 +113,10 @@ private[akka] class Shard( var handOffStopper: Option[ActorRef] = None + initialized() + + def initialized(): Unit = context.parent ! ShardInitialized(shardId) + def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size } def processChange[A](event: A)(handler: A ⇒ Unit): Unit = @@ -297,6 +301,9 @@ private[akka] class PersistentShard( var persistCount = 0 + // would be initialized after recovery completed + override def initialized(): Unit = {} + override def receive = receiveCommand override def processChange[A](event: A)(handler: A ⇒ Unit): Unit = { @@ -316,7 +323,10 @@ private[akka] class PersistentShard( case EntityStarted(id) ⇒ state = state.copy(state.entities + id) case EntityStopped(id) ⇒ state = state.copy(state.entities - id) case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot - case RecoveryCompleted ⇒ state.entities foreach getEntity + case RecoveryCompleted ⇒ + state.entities foreach getEntity + super.initialized() + log.debug("Shard recovery completed {}", shardId) } override def entityTerminated(ref: ActorRef): Unit = { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 427eb638ba..d4a9f927c4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -3,6 +3,8 @@ */ package akka.cluster.sharding +import akka.persistence._ + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ @@ -15,16 +17,15 @@ import akka.actor.Deploy import akka.actor.NoSerializationVerificationNeeded import akka.actor.Props import akka.actor.ReceiveTimeout +import akka.actor.Stash import akka.actor.Terminated import akka.cluster.Cluster import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ddata.LWWRegisterKey +import akka.cluster.ddata.LWWRegister +import akka.cluster.ddata.Replicator._ import akka.dispatch.ExecutionContexts -import akka.persistence.PersistentActor -import akka.persistence.RecoveryCompleted -import akka.persistence.SaveSnapshotFailure -import akka.persistence.SaveSnapshotSuccess -import akka.persistence.SnapshotOffer import akka.pattern.pipe /** @@ -40,7 +41,16 @@ object ShardCoordinator { */ private[akka] def props(typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardAllocationStrategy): Props = - Props(new ShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local) + Props(new PersistentShardCoordinator(typeName: String, settings, allocationStrategy)).withDeploy(Deploy.local) + + /** + * INTERNAL API + * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata. + */ + private[akka] def props(typeName: String, settings: ClusterShardingSettings, + allocationStrategy: ShardAllocationStrategy, + replicator: ActorRef): Props = + Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator)).withDeploy(Deploy.local) /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. @@ -51,12 +61,12 @@ object ShardCoordinator { /** * Invoked when the location of a new shard is to be decided. * @param requester actor reference to the [[ShardRegion]] that requested the location of the - * shard, can be returned if preference should be given to the node where the shard was first accessed + * shard, can be returned if preference should be given to the node where the shard was first accessed * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, - * in the order they were allocated + * in the order they were allocated * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of - * the references included in the `currentShardAllocations` parameter + * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: ShardId, currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]): Future[ActorRef] @@ -64,9 +74,9 @@ object ShardCoordinator { /** * Invoked periodically to decide which shards to rebalance to another location. * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, - * in the order they were allocated + * in the order they were allocated * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. - * you should not include these in the returned set + * you should not include these in the returned set * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance(currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], @@ -95,12 +105,12 @@ object ShardCoordinator { /** * Invoked when the location of a new shard is to be decided. * @param requester actor reference to the [[ShardRegion]] that requested the location of the - * shard, can be returned if preference should be given to the node where the shard was first accessed + * shard, can be returned if preference should be given to the node where the shard was first accessed * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, - * in the order they were allocated + * in the order they were allocated * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of - * the references included in the `currentShardAllocations` parameter + * the references included in the `currentShardAllocations` parameter */ def allocateShard(requester: ActorRef, shardId: String, currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]]): Future[ActorRef] @@ -108,9 +118,9 @@ object ShardCoordinator { /** * Invoked periodically to decide which shards to rebalance to another location. * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, - * in the order they were allocated + * in the order they were allocated * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. - * you should not include these in the returned set + * you should not include these in the returned set * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance(currentShardAllocations: java.util.Map[ActorRef, immutable.IndexedSeq[String]], @@ -358,29 +368,22 @@ object ShardCoordinator { * * @see [[ClusterSharding$ ClusterSharding extension]] */ -class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, - allocationStrategy: ShardCoordinator.ShardAllocationStrategy) - extends PersistentActor with ActorLogging { +abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, + allocationStrategy: ShardCoordinator.ShardAllocationStrategy) + extends Actor with ActorLogging { import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId import settings.tuningParameters._ - override def persistenceId = s"/sharding/${typeName}Coordinator" - - override def journalPluginId: String = settings.journalPluginId - - override def snapshotPluginId: String = settings.snapshotPluginId - val removalMargin = Cluster(context.system).settings.DownRemovalMargin - var persistentState = State.empty + var state = State.empty var rebalanceInProgress = Set.empty[ShardId] var unAckedHostShards = Map.empty[ShardId, Cancellable] // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] var aliveRegions = Set.empty[ActorRef] - var persistCount = 0 import context.dispatcher val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) @@ -393,62 +396,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, Cluster(context.system).unsubscribe(self) } - override def receiveRecover: Receive = { - case evt: DomainEvent ⇒ - log.debug("receiveRecover {}", evt) - evt match { - case ShardRegionRegistered(region) ⇒ - persistentState = persistentState.updated(evt) - case ShardRegionProxyRegistered(proxy) ⇒ - persistentState = persistentState.updated(evt) - case ShardRegionTerminated(region) ⇒ - if (persistentState.regions.contains(region)) - persistentState = persistentState.updated(evt) - else { - log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + - " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + - "removed by later watch.", region) - } - case ShardRegionProxyTerminated(proxy) ⇒ - if (persistentState.regionProxies.contains(proxy)) - persistentState = persistentState.updated(evt) - case ShardHomeAllocated(shard, region) ⇒ - persistentState = persistentState.updated(evt) - case _: ShardHomeDeallocated ⇒ - persistentState = persistentState.updated(evt) - } - - case SnapshotOffer(_, state: State) ⇒ - log.debug("receiveRecover SnapshotOffer {}", state) - //Old versions of the state object may not have unallocatedShard set, - // thus it will be null. - if (state.unallocatedShards == null) - persistentState = state.copy(unallocatedShards = Set.empty) - else - persistentState = state - - case RecoveryCompleted ⇒ - persistentState.regionProxies.foreach(context.watch) - persistentState.regions.foreach { case (a, _) ⇒ context.watch(a) } - persistentState.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) } - allocateShardHomes() - } - - override def receiveCommand: Receive = { + def active: Receive = { case Register(region) ⇒ log.debug("ShardRegion registered: [{}]", region) aliveRegions += region - if (persistentState.regions.contains(region)) - sender() ! RegisterAck(self) + if (state.regions.contains(region)) + region ! RegisterAck(self) else { gracefulShutdownInProgress -= region - saveSnapshotWhenNeeded() - persist(ShardRegionRegistered(region)) { evt ⇒ - val firstRegion = persistentState.regions.isEmpty - - persistentState = persistentState.updated(evt) + update(ShardRegionRegistered(region)) { evt ⇒ + val firstRegion = state.regions.isEmpty + state = state.updated(evt) context.watch(region) - sender() ! RegisterAck(self) + region ! RegisterAck(self) if (firstRegion) allocateShardHomes() @@ -457,28 +417,26 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case RegisterProxy(proxy) ⇒ log.debug("ShardRegion proxy registered: [{}]", proxy) - if (persistentState.regionProxies.contains(proxy)) - sender() ! RegisterAck(self) + if (state.regionProxies.contains(proxy)) + proxy ! RegisterAck(self) else { - saveSnapshotWhenNeeded() - persist(ShardRegionProxyRegistered(proxy)) { evt ⇒ - persistentState = persistentState.updated(evt) + update(ShardRegionProxyRegistered(proxy)) { evt ⇒ + state = state.updated(evt) context.watch(proxy) - sender() ! RegisterAck(self) + proxy ! RegisterAck(self) } } case t @ Terminated(ref) ⇒ - if (persistentState.regions.contains(ref)) { + if (state.regions.contains(ref)) { if (removalMargin != Duration.Zero && t.addressTerminated && aliveRegions(ref)) context.system.scheduler.scheduleOnce(removalMargin, self, DelayedShardRegionTerminated(ref)) else regionTerminated(ref) - } else if (persistentState.regionProxies.contains(ref)) { + } else if (state.regionProxies.contains(ref)) { log.debug("ShardRegion proxy terminated: [{}]", ref) - saveSnapshotWhenNeeded() - persist(ShardRegionProxyTerminated(ref)) { evt ⇒ - persistentState = persistentState.updated(evt) + update(ShardRegionProxyTerminated(ref)) { evt ⇒ + state = state.updated(evt) } } @@ -487,10 +445,10 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case GetShardHome(shard) ⇒ if (!rebalanceInProgress.contains(shard)) { - persistentState.shards.get(shard) match { + state.shards.get(shard) match { case Some(ref) ⇒ sender() ! ShardHome(shard, ref) case None ⇒ - val activeRegions = persistentState.regions -- gracefulShutdownInProgress + val activeRegions = state.regions -- gracefulShutdownInProgress if (activeRegions.nonEmpty) { val getShardHomeSender = sender() val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions) @@ -524,14 +482,14 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, } case ResendShardHost(shard, region) ⇒ - persistentState.shards.get(shard) match { + state.shards.get(shard) match { case Some(`region`) ⇒ sendHostShardMsg(shard, region) case _ ⇒ //Reallocated to another region } case RebalanceTick ⇒ - if (persistentState.regions.nonEmpty) { - val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress) + if (state.regions.nonEmpty) { + val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress) shardsFuture.value match { case Some(Success(shards)) ⇒ continueRebalance(shards) @@ -551,20 +509,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, rebalanceInProgress -= shard log.debug("Rebalance shard [{}] done [{}]", shard, ok) // The shard could have been removed by ShardRegionTerminated - if (persistentState.shards.contains(shard)) + if (state.shards.contains(shard)) if (ok) { - saveSnapshotWhenNeeded() - persist(ShardHomeDeallocated(shard)) { evt ⇒ - persistentState = persistentState.updated(evt) + update(ShardHomeDeallocated(shard)) { evt ⇒ + state = state.updated(evt) log.debug("Shard [{}] deallocated", evt.shard) allocateShardHomes() } } else // rebalance not completed, graceful shutdown will be retried - gracefulShutdownInProgress -= persistentState.shards(shard) + gracefulShutdownInProgress -= state.shards(shard) case GracefulShutdownReq(region) ⇒ if (!gracefulShutdownInProgress(region)) - persistentState.regions.get(region) match { + state.regions.get(region) match { case Some(shards) ⇒ log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards) gracefulShutdownInProgress += region @@ -572,12 +529,6 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case None ⇒ } - case SaveSnapshotSuccess(_) ⇒ - log.debug("Persistent snapshot saved successfully") - - case SaveSnapshotFailure(_, reason) ⇒ - log.warning("Persistent snapshot failure: {}", reason.getMessage) - case ShardHome(_, _) ⇒ //On rebalance, we send ourselves a GetShardHome message to reallocate a // shard. This receive handles the "response" from that message. i.e. ignores it. @@ -589,7 +540,7 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, context.become(shuttingDown) case ShardRegion.GetCurrentRegions ⇒ - val reply = ShardRegion.CurrentRegions(persistentState.regions.keySet.map { ref ⇒ + val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref ⇒ if (ref.path.address.host.isEmpty) Cluster(context.system).selfAddress else ref.path.address }) @@ -598,16 +549,17 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case _: CurrentClusterState ⇒ } + def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit + def regionTerminated(ref: ActorRef): Unit = - if (persistentState.regions.contains(ref)) { + if (state.regions.contains(ref)) { log.debug("ShardRegion terminated: [{}]", ref) - persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } + state.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } gracefulShutdownInProgress -= ref - saveSnapshotWhenNeeded() - persist(ShardRegionTerminated(ref)) { evt ⇒ - persistentState = persistentState.updated(evt) + update(ShardRegionTerminated(ref)) { evt ⇒ + state = state.updated(evt) allocateShardHomes() } } @@ -616,31 +568,22 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, case _ ⇒ // ignore all } - def saveSnapshotWhenNeeded(): Unit = { - persistCount += 1 - if (persistCount % snapshotAfter == 0) { - log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) - saveSnapshot(persistentState) - } - } - def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = { region ! HostShard(shard) val cancel = context.system.scheduler.scheduleOnce(shardStartTimeout, self, ResendShardHost(shard, region)) unAckedHostShards = unAckedHostShards.updated(shard, cancel) } - def allocateShardHomes(): Unit = persistentState.unallocatedShards.foreach { self ! GetShardHome(_) } + def allocateShardHomes(): Unit = state.unallocatedShards.foreach { self ! GetShardHome(_) } def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit = if (!rebalanceInProgress.contains(shard)) { - persistentState.shards.get(shard) match { + state.shards.get(shard) match { case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref) case None ⇒ - if (persistentState.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) { - saveSnapshotWhenNeeded() - persist(ShardHomeAllocated(shard, region)) { evt ⇒ - persistentState = persistentState.updated(evt) + if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) { + update(ShardHomeAllocated(shard, region)) { evt ⇒ + state = state.updated(evt) log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) sendHostShardMsg(evt.shard, evt.region) @@ -648,19 +591,19 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, } } else log.debug("Allocated region {} for shard [{}] is not (any longer) one of the registered regions: {}", - region, shard, persistentState) + region, shard, state) } } def continueRebalance(shards: Set[ShardId]): Unit = shards.foreach { shard ⇒ if (!rebalanceInProgress(shard)) { - persistentState.shards.get(shard) match { + state.shards.get(shard) match { case Some(rebalanceFromRegion) ⇒ rebalanceInProgress += shard log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, - persistentState.regions.keySet ++ persistentState.regionProxies) + state.regions.keySet ++ state.regionProxies) .withDispatcher(context.props.dispatcher)) case None ⇒ log.debug("Rebalance of non-existing shard [{}] is ignored", shard) @@ -670,3 +613,182 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, } } + +/** + * Singleton coordinator that decides where to allocate shards. + * + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSettings, + allocationStrategy: ShardCoordinator.ShardAllocationStrategy) + extends ShardCoordinator(typeName, settings, allocationStrategy) with PersistentActor { + import ShardCoordinator.Internal._ + import settings.tuningParameters._ + + override def persistenceId = s"/sharding/${typeName}Coordinator" + + override def journalPluginId: String = settings.journalPluginId + + override def snapshotPluginId: String = settings.snapshotPluginId + + var persistCount = 0 + + override def receiveRecover: Receive = { + case evt: DomainEvent ⇒ + log.debug("receiveRecover {}", evt) + evt match { + case ShardRegionRegistered(region) ⇒ + state = state.updated(evt) + case ShardRegionProxyRegistered(proxy) ⇒ + state = state.updated(evt) + case ShardRegionTerminated(region) ⇒ + if (state.regions.contains(region)) + state = state.updated(evt) + else { + log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + + " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + + "removed by later watch.", region) + } + case ShardRegionProxyTerminated(proxy) ⇒ + if (state.regionProxies.contains(proxy)) + state = state.updated(evt) + case ShardHomeAllocated(shard, region) ⇒ + state = state.updated(evt) + case _: ShardHomeDeallocated ⇒ + state = state.updated(evt) + } + + case SnapshotOffer(_, st: State) ⇒ + log.debug("receiveRecover SnapshotOffer {}", st) + //Old versions of the state object may not have unallocatedShard set, + // thus it will be null. + if (st.unallocatedShards == null) + state = st.copy(unallocatedShards = Set.empty) + else + state = st + + case RecoveryCompleted ⇒ + state.regionProxies.foreach(context.watch) + state.regions.foreach { case (a, _) ⇒ context.watch(a) } + state.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) } + allocateShardHomes() + } + + override def receiveCommand: Receive = active orElse { + case SaveSnapshotSuccess(_) ⇒ + log.debug("Persistent snapshot saved successfully") + + case SaveSnapshotFailure(_, reason) ⇒ + log.warning("Persistent snapshot failure: {}", reason.getMessage) + } + + def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { + saveSnapshotWhenNeeded() + persist(evt)(f) + } + + def saveSnapshotWhenNeeded(): Unit = { + persistCount += 1 + if (persistCount % snapshotAfter == 0) { + log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr) + saveSnapshot(state) + } + } +} + +/** + * Singleton coordinator (with state based on ddata) that decides where to allocate shards. + * + * @see [[ClusterSharding$ ClusterSharding extension]] + */ +class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, + allocationStrategy: ShardCoordinator.ShardAllocationStrategy, + replicator: ActorRef) + extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash { + import ShardCoordinator.Internal._ + import akka.cluster.ddata.Replicator.Update + + val waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout + val updatingStateTimeout = settings.tuningParameters.updatingStateTimeout + + Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass) + + implicit val node = Cluster(context.system) + val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState") + + var afterUpdateCallback: DomainEvent ⇒ Unit = _ + + getState() + + override def receive: Receive = waitingForState + + // This state will drop all other messages since they will be retried + def waitingForState: Receive = { + case g @ GetSuccess(CoordinatorStateKey, _) ⇒ + state = g.get(CoordinatorStateKey).value + state.regionProxies.foreach(context.watch) + state.regions.foreach { case (a, _) ⇒ context.watch(a) } + state.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) } + allocateShardHomes() + activate() + + case GetFailure(CoordinatorStateKey, _) ⇒ + log.error( + "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis", + waitingForStateTimeout.toMillis) + getState() + + case NotFound(CoordinatorStateKey, _) ⇒ activate() + } + + def waitingForUpdate[E <: DomainEvent](evt: E): Receive = { + case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ + log.debug("The coordinator state was successfully updated with {}", evt) + updateSuccess(evt) + + case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒ + log.error( + "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}", + updatingStateTimeout.toMillis, + evt) + sendUpdate(evt) + + case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`)) ⇒ + log.error( + cause, + "The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted", + error, + evt) + throw cause + case _ ⇒ stash() + } + + def activate() = { + context.become(active) + log.info("Sharding Coordinator was moved to the active state {}", state) + } + + def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { + afterUpdateCallback = f.asInstanceOf[DomainEvent ⇒ Unit] + context.become(waitingForUpdate(evt)) + sendUpdate(evt) + } + + def updateSuccess(evt: DomainEvent): Unit = { + afterUpdateCallback(evt) + afterUpdateCallback = null + context.become(active) + unstashAll() + } + + def getState(): Unit = + replicator ! Get(CoordinatorStateKey, ReadMajority(waitingForStateTimeout)) + + def sendUpdate(evt: DomainEvent) = { + val s = state.updated(evt) + replicator ! Update(CoordinatorStateKey, LWWRegister(State.empty), WriteMajority(updatingStateTimeout), Some(evt)) { reg ⇒ + reg.withValue(s) + } + } + +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index ae2e5a0c36..f96279dda4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -154,6 +154,12 @@ object ShardRegion { */ @SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand + /** + * We must be sure that a shard is initialized before to start send messages to it. + * Shard could be terminated during initialization. + */ + final case class ShardInitialized(shardId: ShardId) + /** * Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by * the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch` @@ -302,11 +308,13 @@ class ShardRegion( def receive = { case Terminated(ref) ⇒ receiveTerminated(ref) + case ShardInitialized(shardId) ⇒ initializeShard(shardId, sender()) case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) case state: CurrentClusterState ⇒ receiveClusterState(state) case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) + case msg: RestartShard ⇒ deliverMessage(msg, sender()) } def receiveClusterState(state: CurrentClusterState): Unit = { @@ -336,7 +344,6 @@ class ShardRegion( //Start the shard, if already started this does nothing getShard(shard) - deliverBufferedMessages(shard) sender() ! ShardStarted(shard) @@ -354,7 +361,10 @@ class ShardRegion( if (ref != self) context.watch(ref) - deliverBufferedMessages(shard) + if (ref == self) + getShard(shard).foreach(deliverBufferedMessages(shard, _)) + else + deliverBufferedMessages(shard, ref) case RegisterAck(coord) ⇒ context.watch(coord) @@ -472,12 +482,44 @@ class ShardRegion( } } - def deliverBufferedMessages(shard: String): Unit = { - shardBuffers.get(shard) match { + def initializeShard(id: ShardId, shard: ActorRef): Unit = { + log.debug("Shard was initialized {}", id) + shards = shards.updated(id, shard) + deliverBufferedMessages(id, shard) + } + + def bufferMessage(shardId: ShardId, msg: Any, snd: ActorRef) = { + val totBufSize = totalBufferSize + if (totBufSize >= bufferSize) { + if (loggedFullBufferWarning) + log.debug("Buffer is full, dropping message for shard [{}]", shardId) + else { + log.warning("Buffer is full, dropping message for shard [{}]", shardId) + loggedFullBufferWarning = true + } + context.system.deadLetters ! msg + } else { + val buf = shardBuffers.getOrElse(shardId, Vector.empty) + shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) + + // log some insight to how buffers are filled up every 10% of the buffer capacity + val tot = totBufSize + 1 + if (tot % (bufferSize / 10) == 0) { + val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity." + if (tot <= bufferSize / 2) + log.info(logMsg) + else + log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.") + } + } + } + + def deliverBufferedMessages(shardId: ShardId, receiver: ActorRef): Unit = { + shardBuffers.get(shardId) match { case Some(buf) ⇒ - log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shard) - buf.foreach { case (msg, snd) ⇒ deliverMessage(msg, snd) } - shardBuffers -= shard + log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shardId) + buf.foreach { case (msg, snd) ⇒ receiver.tell(msg, snd) } + shardBuffers -= shardId case None ⇒ } loggedFullBufferWarning = false @@ -505,7 +547,18 @@ class ShardRegion( val shardId = extractShardId(msg) regionByShard.get(shardId) match { case Some(ref) if ref == self ⇒ - getShard(shardId).tell(msg, snd) + getShard(shardId) match { + case Some(shard) ⇒ + shardBuffers.get(shardId) match { + case Some(buf) ⇒ + // Since now messages to a shard is buffered then those messages must be in right order + bufferMessage(shardId, msg, snd) + deliverBufferedMessages(shardId, shard) + case None ⇒ + shard.tell(msg, snd) + } + case None ⇒ bufferMessage(shardId, msg, snd) + } case Some(ref) ⇒ log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref) ref.tell(msg, snd) @@ -517,36 +570,13 @@ class ShardRegion( log.debug("Request shard [{}] home", shardId) coordinator.foreach(_ ! GetShardHome(shardId)) } - val totBufSize = totalBufferSize - if (totBufSize >= bufferSize) { - if (loggedFullBufferWarning) - log.debug("Buffer is full, dropping message for shard [{}]", shardId) - else { - log.warning("Buffer is full, dropping message for shard [{}]", shardId) - loggedFullBufferWarning = true - } - context.system.deadLetters ! msg - } else { - val buf = shardBuffers.getOrElse(shardId, Vector.empty) - shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) - - // log some insight to how buffers are filled up every 10% of the buffer capacity - val tot = totBufSize + 1 - if (tot % (bufferSize / 10) == 0) { - val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity." - if (tot <= bufferSize / 2) - log.info(logMsg) - else - log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.") - } - } + bufferMessage(shardId, msg, snd) } } - def getShard(id: ShardId): ActorRef = shards.getOrElse( - id, + def getShard(id: ShardId): Option[ActorRef] = shards.get(id).orElse( entityProps match { - case Some(props) ⇒ + case Some(props) if !shardsByRef.values.exists(_ == id) ⇒ log.debug("Starting shard [{}] in region", id) val name = URLEncoder.encode(id, "utf-8") @@ -560,9 +590,10 @@ class ShardRegion( extractShardId, handOffStopMessage).withDispatcher(context.props.dispatcher), name)) - shards = shards.updated(id, shard) shardsByRef = shardsByRef.updated(shard, id) - shard + None + case Some(props) ⇒ + None case None ⇒ throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") }) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 1c45ad0698..6f822d4ae4 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -26,26 +26,7 @@ import scala.concurrent.Future import akka.util.Timeout import akka.pattern.ask -object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/journal-ClusterShardingCustomShardAllocationSpec" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec" - """)) - +object ClusterShardingCustomShardAllocationSpec { class Entity extends Actor { def receive = { case id: Int ⇒ sender() ! id @@ -98,11 +79,43 @@ object ClusterShardingCustomShardAllocationSpec extends MultiNodeConfig { } -class ClusterShardingCustomShardAllocationMultiJvmNode1 extends ClusterShardingCustomShardAllocationSpec -class ClusterShardingCustomShardAllocationMultiJvmNode2 extends ClusterShardingCustomShardAllocationSpec +abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") -class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShardingCustomShardAllocationSpec) with STMultiNodeSpec with ImplicitSender { + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingCustomShardAllocationSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingCustomShardAllocationSpec" + akka.cluster.sharding.state-store-mode = "$mode" + """)) +} + +object PersistentClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("persistence") +object DDataClusterShardingCustomShardAllocationSpecConfig extends ClusterShardingCustomShardAllocationSpecConfig("ddata") + +class PersistentClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig) +class DDataClusterShardingCustomShardAllocationSpec extends ClusterShardingCustomShardAllocationSpec(DDataClusterShardingCustomShardAllocationSpecConfig) + +class PersistentClusterShardingCustomShardAllocationMultiJvmNode1 extends PersistentClusterShardingCustomShardAllocationSpec +class PersistentClusterShardingCustomShardAllocationMultiJvmNode2 extends PersistentClusterShardingCustomShardAllocationSpec + +class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec +class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec + +abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingCustomShardAllocationSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingCustomShardAllocationSpec._ + import config._ override def initialParticipants = roles.size @@ -146,7 +159,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar lazy val allocator = system.actorOf(Props[Allocator], "allocator") - "Cluster sharding with custom allocation strategy" must { + s"Cluster sharding ($mode) with custom allocation strategy" must { "setup shared journal" in { // start the Persistence extension diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index e07f6b62bb..1cb5809e4b 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -22,34 +22,7 @@ import akka.remote.testkit.STMultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ -object ClusterShardingFailureSpec extends MultiNodeConfig { - val controller = role("controller") - val first = role("first") - val second = role("second") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote.log-remote-lifecycle-events = off - akka.cluster.auto-down-unreachable-after = 0s - akka.cluster.down-removal-margin = 5s - akka.cluster.roles = ["backend"] - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/journal-ClusterShardingFailureSpec" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec" - akka.cluster.sharding.coordinator-failure-backoff = 3s - akka.cluster.sharding.shard-failure-backoff = 3s - """)) - - testTransport(on = true) - +object ClusterShardingFailureSpec { case class Get(id: String) case class Add(id: String, i: Int) case class Value(id: String, n: Int) @@ -75,12 +48,55 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { } -class ClusterShardingFailureMultiJvmNode1 extends ClusterShardingFailureSpec -class ClusterShardingFailureMultiJvmNode2 extends ClusterShardingFailureSpec -class ClusterShardingFailureMultiJvmNode3 extends ClusterShardingFailureSpec +abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") -class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpec) with STMultiNodeSpec with ImplicitSender { + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s + akka.cluster.roles = ["backend"] + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingFailureSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingFailureSpec" + akka.cluster.sharding { + coordinator-failure-backoff = 3s + shard-failure-backoff = 3s + state-store-mode = "$mode" + } + """)) + + testTransport(on = true) +} + +object PersistentClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("persistence") +object DDataClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("ddata") + +class PersistentClusterShardingFailureSpec extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig) +class DDataClusterShardingFailureSpec extends ClusterShardingFailureSpec(DDataClusterShardingFailureSpecConfig) + +class PersistentClusterShardingFailureMultiJvmNode1 extends PersistentClusterShardingFailureSpec +class PersistentClusterShardingFailureMultiJvmNode2 extends PersistentClusterShardingFailureSpec +class PersistentClusterShardingFailureMultiJvmNode3 extends PersistentClusterShardingFailureSpec + +class DDataClusterShardingFailureMultiJvmNode1 extends DDataClusterShardingFailureSpec +class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec +class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec + +abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingFailureSpec._ + import config._ override def initialParticipants = roles.size @@ -120,7 +136,7 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe lazy val region = ClusterSharding(system).shardRegion("Entity") - "Cluster sharding with flaky journal" must { + s"Cluster sharding ($mode) with flaky journal" must { "setup shared journal" in { // start the Persistence extension diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index e83e60f860..b7fb193d1d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -26,26 +26,7 @@ import scala.concurrent.Future import akka.util.Timeout import akka.pattern.ask -object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote.log-remote-lifecycle-events = off - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/journal-ClusterShardingGracefulShutdownSpec" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec" - """)) - +object ClusterShardingGracefulShutdownSpec { case object StopEntity class Entity extends Actor { @@ -84,11 +65,43 @@ object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig { } -class ClusterShardingGracefulShutdownMultiJvmNode1 extends ClusterShardingGracefulShutdownSpec -class ClusterShardingGracefulShutdownMultiJvmNode2 extends ClusterShardingGracefulShutdownSpec +abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") -class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingGracefulShutdownSpec) with STMultiNodeSpec with ImplicitSender { + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingGracefulShutdownSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec" + akka.cluster.sharding.state-store-mode = "$mode" + """)) +} + +object PersistentClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("persistence") +object DDataClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("ddata") + +class PersistentClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig) +class DDataClusterShardingGracefulShutdownSpec extends ClusterShardingGracefulShutdownSpec(DDataClusterShardingGracefulShutdownSpecConfig) + +class PersistentClusterShardingGracefulShutdownMultiJvmNode1 extends PersistentClusterShardingGracefulShutdownSpec +class PersistentClusterShardingGracefulShutdownMultiJvmNode2 extends PersistentClusterShardingGracefulShutdownSpec + +class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec +class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec + +abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracefulShutdownSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingGracefulShutdownSpec._ + import config._ override def initialParticipants = roles.size @@ -131,7 +144,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG lazy val region = ClusterSharding(system).shardRegion("Entity") - "Cluster sharding" must { + s"Cluster sharding ($mode)" must { "setup shared journal" in { // start the Persistence extension diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index def0d585bf..c9dcc0bf9e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -26,30 +26,7 @@ import akka.testkit._ import com.typesafe.config.ConfigFactory import org.apache.commons.io.FileUtils -object ClusterShardingLeavingSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - val fourth = role("fourth") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote.log-remote-lifecycle-events = off - akka.cluster.auto-down-unreachable-after = 0s - akka.cluster.down-removal-margin = 5s - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared { - timeout = 5s - store { - native = off - dir = "target/journal-ClusterShardingLeavingSpec" - } - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec" - """)) - +object ClusterShardingLeavingSpec { case class Ping(id: String) class Entity extends Actor { @@ -76,16 +53,53 @@ object ClusterShardingLeavingSpec extends MultiNodeConfig { val extractShardId: ShardRegion.ExtractShardId = { case Ping(id: String) ⇒ id.charAt(0).toString } - } -class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec -class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec -class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec -class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec +abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") -class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender { + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingLeavingSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec" + akka.cluster.sharding.state-store-mode = "$mode" + """)) +} + +object PersistentClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("persistence") +object DDataClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("ddata") + +class PersistentClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig) +class DDataClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(DDataClusterShardingLeavingSpecConfig) + +class PersistentClusterShardingLeavingMultiJvmNode1 extends PersistentClusterShardingLeavingSpec +class PersistentClusterShardingLeavingMultiJvmNode2 extends PersistentClusterShardingLeavingSpec +class PersistentClusterShardingLeavingMultiJvmNode3 extends PersistentClusterShardingLeavingSpec +class PersistentClusterShardingLeavingMultiJvmNode4 extends PersistentClusterShardingLeavingSpec + +class DDataClusterShardingLeavingMultiJvmNode1 extends DDataClusterShardingLeavingSpec +class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavingSpec +class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec +class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec + +abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingLeavingSpec._ + import config._ override def initialParticipants = roles.size @@ -132,7 +146,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe lazy val region = ClusterSharding(system).shardRegion("Entity") - "Cluster sharding with leaving member" must { + s"Cluster sharding ($mode) with leaving member" must { "setup shared journal" in { // start the Persistence extension diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 23e09f3093..9a3aeeb5de 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -3,6 +3,7 @@ */ package akka.cluster.sharding +import akka.cluster.ddata.{ ReplicatorSettings, Replicator } import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff } import akka.cluster.sharding.ShardRegion.Passivate import akka.cluster.sharding.ShardRegion.GetCurrentRegions @@ -11,7 +12,6 @@ import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor._ -import akka.pattern.BackoffSupervisor import akka.cluster.Cluster import akka.persistence.PersistentActor import akka.persistence.Persistence @@ -29,46 +29,7 @@ import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.pattern.BackoffSupervisor -object ClusterShardingSpec extends MultiNodeConfig { - val controller = role("controller") - val first = role("first") - val second = role("second") - val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") - val sixth = role("sixth") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "akka.cluster.ClusterActorRefProvider" - akka.remote.log-remote-lifecycle-events = off - akka.cluster.auto-down-unreachable-after = 0s - akka.cluster.down-removal-margin = 5s - akka.cluster.roles = ["backend"] - akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" - akka.persistence.journal.leveldb-shared.store { - native = off - dir = "target/journal-ClusterShardingSpec" - } - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec" - akka.cluster.sharding { - retry-interval = 1 s - handoff-timeout = 10 s - shard-start-timeout = 5s - entity-restart-backoff = 1s - rebalance-interval = 2 s - least-shard-allocation-strategy { - rebalance-threshold = 2 - max-simultaneous-rebalance = 1 - } - } - """)) - - nodeConfig(sixth) { - ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""") - } - +object ClusterShardingSpec { //#counter-actor case object Increment case object Decrement @@ -136,6 +97,48 @@ object ClusterShardingSpec extends MultiNodeConfig { } +abstract class ClusterShardingSpecConfig(val mode: String) extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + val sixth = role("sixth") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.cluster.down-removal-margin = 5s + akka.cluster.roles = ["backend"] + akka.cluster.distributed-data.gossip-interval = 1s + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared.store { + native = off + dir = "target/journal-ClusterShardingSpec" + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingSpec" + akka.cluster.sharding { + retry-interval = 1 s + handoff-timeout = 10 s + shard-start-timeout = 5s + entity-restart-backoff = 1s + rebalance-interval = 2 s + state-store-mode = "$mode" + least-shard-allocation-strategy { + rebalance-threshold = 2 + max-simultaneous-rebalance = 1 + } + } + """)) + nodeConfig(sixth) { + ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""") + } +} + // only used in documentation object ClusterShardingDocCode { import ClusterShardingSpec._ @@ -156,16 +159,31 @@ object ClusterShardingDocCode { } -class ClusterShardingMultiJvmNode1 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode2 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode3 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode4 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode5 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode6 extends ClusterShardingSpec -class ClusterShardingMultiJvmNode7 extends ClusterShardingSpec +object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") +object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata") -class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMultiNodeSpec with ImplicitSender { +class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig) +class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig) + +class PersistentClusterShardingMultiJvmNode1 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode2 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode3 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode4 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode5 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode6 extends PersistentClusterShardingSpec +class PersistentClusterShardingMultiJvmNode7 extends PersistentClusterShardingSpec + +class DDataClusterShardingMultiJvmNode1 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode2 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode3 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode4 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode5 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode6 extends DDataClusterShardingSpec +class DDataClusterShardingMultiJvmNode7 extends DDataClusterShardingSpec + +abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { import ClusterShardingSpec._ + import config._ override def initialParticipants = roles.size @@ -195,6 +213,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } def createCoordinator(): Unit = { + val replicator = system.actorOf(Replicator.props( + ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator") + def coordinatorProps(typeName: String, rebalanceEnabled: Boolean) = { val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) val cfg = ConfigFactory.parseString(s""" @@ -203,7 +224,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult rebalance-interval = ${if (rebalanceEnabled) "2s" else "3600s"} """).withFallback(system.settings.config.getConfig("akka.cluster.sharding")) val settings = ClusterShardingSettings(cfg) - ShardCoordinator.props(typeName, settings, allocationStrategy) + if (settings.stateStoreMode == "persistence") + ShardCoordinator.props(typeName, settings, allocationStrategy) + else + ShardCoordinator.props(typeName, settings, allocationStrategy, replicator) } List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter", @@ -252,7 +276,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true) lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true) - "Cluster sharding" must { + s"Cluster sharding ($mode)" must { "setup shared journal" in { // start the Persistence extension diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 7e4282e97d..217d62c879 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -176,6 +176,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. +Distributed Data Mode +--------------------- + +Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case +state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data`` +module with the ``WriteMajority`` consistency. This mode could be enabled by setting up +``akka.cluster.sharding.state-store-mode`` as ``ddata``. +It make possible to remove ``akka-persistence`` dependency from a project if this dependency +has not using in user code and ``remember-entities`` is ``off``. +Note that option also could lead to the shards duplication in case of a cluster fragmentation +due to a broken replication between nodes. + Proxy Only Mode --------------- diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index fd225f8733..5c2e63eb39 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -179,6 +179,18 @@ unused shards due to the round-trip to the coordinator. Rebalancing of shards ma also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards. +Distributed Data Mode +--------------------- + +Instead of using ``akka-persistence`` is possible to use ``akka-distributed-data`` module. In such case +state of the ``ShardCoordinator`` will be replicated inside a cluster by the ``akka-distributed-data`` +module with the ``WriteMajority`` consistency. This mode could be enabled by setting up +``akka.cluster.sharding.state-store-mode`` as ``ddata``. +It make possible to remove ``akka-persistence`` dependency from a project if this dependency +has not using in user code and ``remember-entities`` is ``off``. +Note that option also could lead to the shards duplication in case of a cluster fragmentation +due to a broken replication between nodes. + Proxy Only Mode --------------- diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 0933ea821c..f8b5cdf7c7 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -52,7 +52,7 @@ object AkkaBuild extends Build { archivesPathFinder.get.map(file => (file -> ("akka/" + file.getName))) } ), - aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, + aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, slf4j, agent, persistence, persistenceQuery, persistenceTck, kernel, osgi, docs, contrib, samples, multiNodeTestkit, benchJmh, typed) ) @@ -62,7 +62,7 @@ object AkkaBuild extends Build { base = file("akka-scala-nightly"), // remove dependencies that we have to build ourselves (Scala STM) // samples don't work with dbuild right now - aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, + aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, clusterMetrics, clusterTools, clusterSharding, distributedData, slf4j, persistence, persistenceQuery, persistenceTck, kernel, osgi, contrib, multiNodeTestkit, benchJmh, typed) ).disablePlugins(ValidatePullRequest) @@ -136,9 +136,9 @@ object AkkaBuild extends Build { id = "akka-cluster-sharding", base = file("akka-cluster-sharding"), dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm", - persistence % "compile;test->provided", clusterTools) + persistence % "compile;test->provided", distributedData % "compile;test->provided", clusterTools) ) configs (MultiJvm) - + lazy val distributedData = Project( id = "akka-distributed-data-experimental", base = file("akka-distributed-data"), @@ -244,7 +244,7 @@ object AkkaBuild extends Build { lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala") lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda") - + lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala") lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java")