shard coordinator should wait until min-members regions registered, #21194

This commit is contained in:
Patrik Nordwall 2016-10-02 15:44:22 +02:00
parent 1ff1f5edee
commit 141318e60a
8 changed files with 255 additions and 12 deletions

View file

@ -125,9 +125,8 @@ object ClusterShardingSettings {
waitingForStateTimeout,
updatingStateTimeout,
"all",
100 milliseconds,
5
)
100.milliseconds,
5)
}
}
}

View file

@ -398,6 +398,13 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
val cluster = Cluster(context.system)
val removalMargin = cluster.downingProvider.downRemovalMargin
val minMembers = settings.role match {
case None
cluster.settings.MinNrOfMembers
case Some(r)
cluster.settings.MinNrOfMembersOfRole.getOrElse(r, cluster.settings.MinNrOfMembers)
}
var allRegionsRegistered = false
var state = State.empty.withRememberEntities(settings.rememberEntities)
var rebalanceInProgress = Set.empty[ShardId]
@ -434,13 +441,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
else {
gracefulShutdownInProgress -= region
update(ShardRegionRegistered(region)) { evt
val firstRegion = state.regions.isEmpty
state = state.updated(evt)
context.watch(region)
region ! RegisterAck(self)
if (firstRegion)
allocateShardHomes()
allocateShardHomesForRememberEntities()
}
}
} else {
@ -460,7 +465,11 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
}
case GetShardHome(shard)
if (!rebalanceInProgress.contains(shard)) {
if (rebalanceInProgress.contains(shard)) {
log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard)
} else if (!hasAllRegionsRegistered()) {
log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard)
} else {
state.shards.get(shard) match {
case Some(ref)
if (regionTerminationInProgress(ref))
@ -534,7 +543,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
update(ShardHomeDeallocated(shard)) { evt
state = state.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
allocateShardHomes()
allocateShardHomesForRememberEntities()
}
} else // rebalance not completed, graceful shutdown will be retried
gracefulShutdownInProgress -= state.shards(shard)
@ -638,7 +647,16 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
def stateInitialized(): Unit = {
state.shards.foreach { case (a, r) sendHostShardMsg(a, r) }
allocateShardHomes()
allocateShardHomesForRememberEntities()
}
def hasAllRegionsRegistered(): Boolean = {
// the check is only for startup, i.e. once all have registered we don't check more
if (allRegionsRegistered) true
else {
allRegionsRegistered = aliveRegions.size >= minMembers
allRegionsRegistered
}
}
def regionTerminated(ref: ActorRef): Unit =
@ -651,7 +669,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
gracefulShutdownInProgress -= ref
regionTerminationInProgress -= ref
aliveRegions -= ref
allocateShardHomes()
allocateShardHomesForRememberEntities()
}
}
@ -673,7 +691,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
unAckedHostShards = unAckedHostShards.updated(shard, cancel)
}
def allocateShardHomes(): Unit = {
def allocateShardHomesForRememberEntities(): Unit = {
if (settings.rememberEntities)
state.unallocatedShards.foreach { self ! GetShardHome(_) }
}

View file

@ -0,0 +1,196 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import java.io.File
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
import akka.cluster.sharding.ShardRegion.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.MemberStatus
object ClusterShardingMinMembersSpec {
case object StopEntity
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg msg match {
case id: Int id.toString
}
}
abstract class ClusterShardingMinMembersSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingMinMembersSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingMinMembersSpec"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.rebalance-interval = 120s #disable rebalance
akka.cluster.min-nr-of-members = 3
"""))
}
object PersistentClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("persistence")
object DDataClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("ddata")
class PersistentClusterShardingMinMembersSpec extends ClusterShardingMinMembersSpec(PersistentClusterShardingMinMembersSpecConfig)
class DDataClusterShardingMinMembersSpec extends ClusterShardingMinMembersSpec(DDataClusterShardingMinMembersSpecConfig)
class PersistentClusterShardingMinMembersMultiJvmNode1 extends PersistentClusterShardingMinMembersSpec
class PersistentClusterShardingMinMembersMultiJvmNode2 extends PersistentClusterShardingMinMembersSpec
class PersistentClusterShardingMinMembersMultiJvmNode3 extends PersistentClusterShardingMinMembersSpec
class DDataClusterShardingMinMembersMultiJvmNode1 extends DDataClusterShardingMinMembersSpec
class DDataClusterShardingMinMembersMultiJvmNode2 extends DDataClusterShardingMinMembersSpec
class DDataClusterShardingMinMembersMultiJvmNode3 extends DDataClusterShardingMinMembersSpec
abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingMinMembersSpec._
import config._
override def initialParticipants = roles.size
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
override protected def afterTermination() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
}
enterBarrier(from.name + "-joined")
}
val cluster = Cluster(system)
def startSharding(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
ClusterSharding(system).start(
typeName = "Entity",
entityProps = TestActors.echoActorProps,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy,
handOffStopMessage = StopEntity)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
s"Cluster with min-nr-of-members using sharding ($mode)" must {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
runOn(first, second, third) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
"use all nodes" in within(30.seconds) {
join(first, first)
runOn(first) {
startSharding()
}
join(second, first)
runOn(second) {
startSharding()
}
join(third, first)
// wait with starting sharding on third
within(remaining) {
awaitAssert {
cluster.state.members.size should ===(3)
cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}
enterBarrier("all-up")
runOn(first) {
region ! 1
// not allocated because third has not registered yet
expectNoMsg(2.second)
}
enterBarrier("verified")
runOn(third) {
startSharding()
}
runOn(first) {
// the 1 was sent above
expectMsg(1)
region ! 2
expectMsg(2)
region ! 3
expectMsg(3)
}
enterBarrier("shards-allocated")
region ! GetClusterShardingStats(remaining)
val stats = expectMsgType[ClusterShardingStats]
val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address
withClue(stats) {
stats.regions.keySet should ===(Set(firstAddress, secondAddress, thirdAddress))
stats.regions(firstAddress).stats.valuesIterator.sum should ===(1)
}
enterBarrier("after-2")
}
}
}

View file

@ -216,6 +216,17 @@ no matter how ``State Store Mode`` is set.
The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since
it depends on the experimental Distributed Data module.
Startup after minimum number of members
---------------------------------------
It's good to use Cluster Sharding with the Cluster setting ``akka.cluster.min-nr-of-members`` or
``akka.cluster.role.<role-name>.min-nr-of-members``. That will defer the allocation of the shards
until at least that number of regions have been started and registered to the coordinator. This
avoids that many shards are allocated to the first region that registers and only later are
rebalanced to other nodes.
See :ref:`min-members_java` for more information about ``min-nr-of-members``.
Proxy Only Mode
---------------

View file

@ -325,6 +325,8 @@ and it is typically defined in the start script as a system property or environm
The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to.
.. _min-members_java:
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -219,6 +219,17 @@ no matter how ``State Store Mode`` is set.
The ``ddata`` mode is considered as **“experimental”** as of its introduction in Akka 2.4.0, since
it depends on the experimental Distributed Data module.
Startup after minimum number of members
---------------------------------------
It's good to use Cluster Sharding with the Cluster setting ``akka.cluster.min-nr-of-members`` or
``akka.cluster.role.<role-name>.min-nr-of-members``. That will defer the allocation of the shards
until at least that number of regions have been started and registered to the coordinator. This
avoids that many shards are allocated to the first region that registers and only later are
rebalanced to other nodes.
See :ref:`min-members_scala` for more information about ``min-nr-of-members``.
Proxy Only Mode
---------------

View file

@ -321,6 +321,8 @@ and it is typically defined in the start script as a system property or environm
The roles of the nodes is part of the membership information in ``MemberEvent`` that you can subscribe to.
.. _min-members_scala:
How To Startup when Cluster Size Reached
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

View file

@ -1005,7 +1005,11 @@ object MiMa extends AutoPlugin {
// #21727 moved all of Unfold.scala in package akka.stream.impl
ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.UnfoldAsync"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.Unfold")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.scaladsl.Unfold"),
// #21194 renamed internal actor method
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.allocateShardHomes")
)
)
}