diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 230cfc4a5e..9058100ea5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -125,9 +125,8 @@ object ClusterShardingSettings { waitingForStateTimeout, updatingStateTimeout, "all", - 100 milliseconds, - 5 - ) + 100.milliseconds, + 5) } } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 3ba933ee7d..0e6b191884 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -398,6 +398,13 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti val cluster = Cluster(context.system) val removalMargin = cluster.downingProvider.downRemovalMargin + val minMembers = settings.role match { + case None ⇒ + cluster.settings.MinNrOfMembers + case Some(r) ⇒ + cluster.settings.MinNrOfMembersOfRole.getOrElse(r, cluster.settings.MinNrOfMembers) + } + var allRegionsRegistered = false var state = State.empty.withRememberEntities(settings.rememberEntities) var rebalanceInProgress = Set.empty[ShardId] @@ -434,13 +441,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti else { gracefulShutdownInProgress -= region update(ShardRegionRegistered(region)) { evt ⇒ - val firstRegion = state.regions.isEmpty state = state.updated(evt) context.watch(region) region ! RegisterAck(self) - if (firstRegion) - allocateShardHomes() + allocateShardHomesForRememberEntities() } } } else { @@ -460,7 +465,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti } case GetShardHome(shard) ⇒ - if (!rebalanceInProgress.contains(shard)) { + if (rebalanceInProgress.contains(shard)) { + log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard) + } else if (!hasAllRegionsRegistered()) { + log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) + } else { state.shards.get(shard) match { case Some(ref) ⇒ if (regionTerminationInProgress(ref)) @@ -534,7 +543,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti update(ShardHomeDeallocated(shard)) { evt ⇒ state = state.updated(evt) log.debug("Shard [{}] deallocated", evt.shard) - allocateShardHomes() + allocateShardHomesForRememberEntities() } } else // rebalance not completed, graceful shutdown will be retried gracefulShutdownInProgress -= state.shards(shard) @@ -638,7 +647,16 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti def stateInitialized(): Unit = { state.shards.foreach { case (a, r) ⇒ sendHostShardMsg(a, r) } - allocateShardHomes() + allocateShardHomesForRememberEntities() + } + + def hasAllRegionsRegistered(): Boolean = { + // the check is only for startup, i.e. once all have registered we don't check more + if (allRegionsRegistered) true + else { + allRegionsRegistered = aliveRegions.size >= minMembers + allRegionsRegistered + } } def regionTerminated(ref: ActorRef): Unit = @@ -651,7 +669,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti gracefulShutdownInProgress -= ref regionTerminationInProgress -= ref aliveRegions -= ref - allocateShardHomes() + allocateShardHomesForRememberEntities() } } @@ -673,7 +691,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti unAckedHostShards = unAckedHostShards.updated(shard, cancel) } - def allocateShardHomes(): Unit = { + def allocateShardHomesForRememberEntities(): Unit = { if (settings.rememberEntities) state.unallocatedShards.foreach { self ! GetShardHome(_) } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala new file mode 100644 index 0000000000..52ce684d72 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.duration._ +import java.io.File + +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.sharding.ShardRegion.GracefulShutdown +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +import scala.concurrent.duration._ +import akka.cluster.sharding.ShardRegion.GetClusterShardingStats +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.MemberStatus + +object ClusterShardingMinMembersSpec { + case object StopEntity + + val extractEntityId: ShardRegion.ExtractEntityId = { + case id: Int ⇒ (id.toString, id) + } + + val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match { + case id: Int ⇒ id.toString + } + +} + +abstract class ClusterShardingMinMembersSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingMinMembersSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingMinMembersSpec" + akka.cluster.sharding.state-store-mode = "$mode" + akka.cluster.sharding.rebalance-interval = 120s #disable rebalance + akka.cluster.min-nr-of-members = 3 + """)) +} + +object PersistentClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("persistence") +object DDataClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("ddata") + +class PersistentClusterShardingMinMembersSpec extends ClusterShardingMinMembersSpec(PersistentClusterShardingMinMembersSpecConfig) +class DDataClusterShardingMinMembersSpec extends ClusterShardingMinMembersSpec(DDataClusterShardingMinMembersSpecConfig) + +class PersistentClusterShardingMinMembersMultiJvmNode1 extends PersistentClusterShardingMinMembersSpec +class PersistentClusterShardingMinMembersMultiJvmNode2 extends PersistentClusterShardingMinMembersSpec +class PersistentClusterShardingMinMembersMultiJvmNode3 extends PersistentClusterShardingMinMembersSpec + +class DDataClusterShardingMinMembersMultiJvmNode1 extends DDataClusterShardingMinMembersSpec +class DDataClusterShardingMinMembersMultiJvmNode2 extends DDataClusterShardingMinMembersSpec +class DDataClusterShardingMinMembersMultiJvmNode3 extends DDataClusterShardingMinMembersSpec + +abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingMinMembersSpec._ + import config._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + } + enterBarrier(from.name + "-joined") + } + + val cluster = Cluster(system) + + def startSharding(): Unit = { + val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + ClusterSharding(system).start( + typeName = "Entity", + entityProps = TestActors.echoActorProps, + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId, + allocationStrategy, + handOffStopMessage = StopEntity) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + s"Cluster with min-nr-of-members using sharding ($mode)" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(first, second, third) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + + "use all nodes" in within(30.seconds) { + join(first, first) + runOn(first) { + startSharding() + } + join(second, first) + runOn(second) { + startSharding() + } + join(third, first) + // wait with starting sharding on third + within(remaining) { + awaitAssert { + cluster.state.members.size should ===(3) + cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + enterBarrier("all-up") + + runOn(first) { + region ! 1 + // not allocated because third has not registered yet + expectNoMsg(2.second) + } + enterBarrier("verified") + + runOn(third) { + startSharding() + } + + runOn(first) { + // the 1 was sent above + expectMsg(1) + region ! 2 + expectMsg(2) + region ! 3 + expectMsg(3) + } + enterBarrier("shards-allocated") + + region ! GetClusterShardingStats(remaining) + val stats = expectMsgType[ClusterShardingStats] + val firstAddress = node(first).address + val secondAddress = node(second).address + val thirdAddress = node(third).address + withClue(stats) { + stats.regions.keySet should ===(Set(firstAddress, secondAddress, thirdAddress)) + stats.regions(firstAddress).stats.valuesIterator.sum should ===(1) + } + enterBarrier("after-2") + } + + } +} + diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index f272fed48a..0d6ac5bca6 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -216,6 +216,17 @@ no matter how ``State Store Mode`` is set. The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since it depends on the experimental Distributed Data module. +Startup after minimum number of members +--------------------------------------- + +It's good to use Cluster Sharding with the Cluster setting ``akka.cluster.min-nr-of-members`` or +``akka.cluster.role..min-nr-of-members``. That will defer the allocation of the shards +until at least that number of regions have been started and registered to the coordinator. This +avoids that many shards are allocated to the first region that registers and only later are +rebalanced to other nodes. + +See :ref:`min-members_java` for more information about ``min-nr-of-members``. + Proxy Only Mode --------------- diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index b012ec35d5..ff7b2c21a4 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -325,6 +325,8 @@ and it is typically defined in the start script as a system property or environm The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to. +.. _min-members_java: + How To Startup when Cluster Size Reached ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index f9e7a2b0c6..b52755417d 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -219,6 +219,17 @@ no matter how ``State Store Mode`` is set. The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since it depends on the experimental Distributed Data module. +Startup after minimum number of members +--------------------------------------- + +It's good to use Cluster Sharding with the Cluster setting ``akka.cluster.min-nr-of-members`` or +``akka.cluster.role..min-nr-of-members``. That will defer the allocation of the shards +until at least that number of regions have been started and registered to the coordinator. This +avoids that many shards are allocated to the first region that registers and only later are +rebalanced to other nodes. + +See :ref:`min-members_scala` for more information about ``min-nr-of-members``. + Proxy Only Mode --------------- diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 2ad52bfac9..3b13734a3a 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -321,6 +321,8 @@ and it is typically defined in the start script as a system property or environm The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to. +.. _min-members_scala: + How To Startup when Cluster Size Reached ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/project/MiMa.scala b/project/MiMa.scala index 4a1409e38b..caf028d2af 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1005,7 +1005,11 @@ object MiMa extends AutoPlugin { // #21727 moved all of Unfold.scala in package akka.stream.impl ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.UnfoldAsync"), - ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.Unfold") + ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.Unfold"), + + // #21194 renamed internal actor method + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.allocateShardHomes") + ) ) }