Don't allocate to terminated regions, #29131

* when using down-removal-margin it could allocate to an
  already terminated region
* the watch fix in PR #29092 solves this
* this is an "optimization" to avoid the regions that
  have been terminated
This commit is contained in:
Patrik Nordwall 2020-05-27 11:07:10 +02:00
parent 7c77617d18
commit 95111955c4
3 changed files with 199 additions and 4 deletions

View file

@ -614,7 +614,7 @@ abstract class ShardCoordinator(
case GetShardHome(shard) => case GetShardHome(shard) =>
if (!handleGetShardHome(shard)) { if (!handleGetShardHome(shard)) {
// location not know, yet // location not know, yet
val activeRegions = state.regions -- gracefulShutdownInProgress val activeRegions = (state.regions -- gracefulShutdownInProgress) -- regionTerminationInProgress
if (activeRegions.nonEmpty) { if (activeRegions.nonEmpty) {
val getShardHomeSender = sender() val getShardHomeSender = sender()
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions) val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions)
@ -923,7 +923,8 @@ abstract class ShardCoordinator(
state.shards.get(shard) match { state.shards.get(shard) match {
case Some(ref) => getShardHomeSender ! ShardHome(shard, ref) case Some(ref) => getShardHomeSender ! ShardHome(shard, ref)
case None => case None =>
if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) { if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region) && !regionTerminationInProgress
.contains(region)) {
update(ShardHomeAllocated(shard, region)) { evt => update(ShardHomeAllocated(shard, region)) { evt =>
state = state.updated(evt) state = state.updated(evt)
log.debug( log.debug(

View file

@ -0,0 +1,181 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.MemberStatus
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.serialization.jackson.CborSerializable
import akka.testkit._
import akka.util.ccompat._
@ccompatUsedUntil213
object ClusterShardCoordinatorDowning2Spec {
case class Ping(id: String) extends CborSerializable
class Entity extends Actor {
def receive = {
case Ping(_) => sender() ! self
}
}
case object GetLocations extends CborSerializable
case class Locations(locations: Map[String, ActorRef]) extends CborSerializable
class ShardLocations extends Actor {
var locations: Locations = _
def receive = {
case GetLocations => sender() ! locations
case l: Locations => locations = l
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case m @ Ping(id) => (id, m)
}
val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id: String) => id.charAt(0).toString
}
}
abstract class ClusterShardCoordinatorDowning2SpecConfig(mode: String)
extends MultiNodeClusterShardingConfig(
mode,
loglevel = "INFO",
additionalConfig = """
akka.cluster.sharding.rebalance-interval = 120 s
# setting down-removal-margin, for testing of issue #29131
akka.cluster.down-removal-margin = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
""") {
val first = role("first")
val second = role("second")
testTransport(on = true)
}
object PersistentClusterShardCoordinatorDowning2SpecConfig
extends ClusterShardCoordinatorDowning2SpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardCoordinatorDowning2SpecConfig
extends ClusterShardCoordinatorDowning2SpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardCoordinatorDowning2Spec
extends ClusterShardCoordinatorDowning2Spec(PersistentClusterShardCoordinatorDowning2SpecConfig)
class DDataClusterShardCoordinatorDowning2Spec
extends ClusterShardCoordinatorDowning2Spec(DDataClusterShardCoordinatorDowning2SpecConfig)
class PersistentClusterShardCoordinatorDowning2MultiJvmNode1 extends PersistentClusterShardCoordinatorDowning2Spec
class PersistentClusterShardCoordinatorDowning2MultiJvmNode2 extends PersistentClusterShardCoordinatorDowning2Spec
class DDataClusterShardCoordinatorDowning2MultiJvmNode1 extends DDataClusterShardCoordinatorDowning2Spec
class DDataClusterShardCoordinatorDowning2MultiJvmNode2 extends DDataClusterShardCoordinatorDowning2Spec
abstract class ClusterShardCoordinatorDowning2Spec(multiNodeConfig: ClusterShardCoordinatorDowning2SpecConfig)
extends MultiNodeClusterShardingSpec(multiNodeConfig)
with ImplicitSender {
import multiNodeConfig._
import ClusterShardCoordinatorDowning2Spec._
def startSharding(): Unit = {
startSharding(
system,
typeName = "Entity",
entityProps = Props[Entity](),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
s"Cluster sharding ($mode) with down member, scenario 2" must {
"join cluster" in within(20.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second))
join(first, first, onJoinedRunOnFrom = startSharding())
join(second, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false)
// all Up, everywhere before continuing
runOn(first, second) {
awaitAssert {
cluster.state.members.size should ===(2)
cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
}
}
enterBarrier("after-2")
}
"initialize shards" in {
runOn(first) {
val shardLocations = system.actorOf(Props[ShardLocations](), "shardLocations")
val locations = (for (n <- 1 to 4) yield {
val id = n.toString
region ! Ping(id)
id -> expectMsgType[ActorRef]
}).toMap
shardLocations ! Locations(locations)
system.log.debug("Original locations: {}", locations)
}
enterBarrier("after-3")
}
"recover after downing other node (not coordinator)" in within(20.seconds) {
val secondAddress = address(second)
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
}
Thread.sleep(3000)
runOn(first) {
cluster.down(second)
awaitAssert {
cluster.state.members.size should ===(1)
}
// start a few more new shards, could be allocated to second but should notice that it's terminated
val additionalLocations =
awaitAssert {
val probe = TestProbe()
(for (n <- 5 to 8) yield {
val id = n.toString
region.tell(Ping(id), probe.ref)
id -> probe.expectMsgType[ActorRef](1.second)
}).toMap
}
system.log.debug("Additional locations: {}", additionalLocations)
system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations
val Locations(originalLocations) = expectMsgType[Locations]
awaitAssert {
val probe = TestProbe()
(originalLocations ++ additionalLocations).foreach {
case (id, ref) =>
region.tell(Ping(id), probe.ref)
if (ref.path.address == secondAddress) {
val newRef = probe.expectMsgType[ActorRef](1.second)
newRef should not be (ref)
system.log.debug("Moved [{}] from [{}] to [{}]", id, ref, newRef)
} else
probe.expectMsg(1.second, ref) // should not move
}
}
}
enterBarrier("after-4")
}
}
}

View file

@ -51,6 +51,7 @@ abstract class ClusterShardCoordinatorDowningSpecConfig(mode: String)
loglevel = "INFO", loglevel = "INFO",
additionalConfig = """ additionalConfig = """
akka.cluster.sharding.rebalance-interval = 120 s akka.cluster.sharding.rebalance-interval = 120 s
# setting down-removal-margin, for testing of issue #29131
akka.cluster.down-removal-margin = 3 s akka.cluster.down-removal-margin = 3 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3s akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
""") { """) {
@ -98,7 +99,7 @@ abstract class ClusterShardCoordinatorDowningSpec(multiNodeConfig: ClusterShardC
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")
s"Cluster sharding ($mode) with leaving member" must { s"Cluster sharding ($mode) with down member, scenario 1" must {
"join cluster" in within(20.seconds) { "join cluster" in within(20.seconds) {
startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second)) startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second))
@ -148,9 +149,21 @@ abstract class ClusterShardCoordinatorDowningSpec(multiNodeConfig: ClusterShardC
cluster.state.members.size should ===(1) cluster.state.members.size should ===(1)
} }
// start a few more new shards, could be allocated to first but should notice that it's terminated
val additionalLocations =
awaitAssert { awaitAssert {
val probe = TestProbe() val probe = TestProbe()
originalLocations.foreach { (for (n <- 5 to 8) yield {
val id = n.toString
region.tell(Ping(id), probe.ref)
id -> probe.expectMsgType[ActorRef](1.second)
}).toMap
}
system.log.debug("Additional locations: {}", additionalLocations)
awaitAssert {
val probe = TestProbe()
(originalLocations ++ additionalLocations).foreach {
case (id, ref) => case (id, ref) =>
region.tell(Ping(id), probe.ref) region.tell(Ping(id), probe.ref)
if (ref.path.address == firstAddress) { if (ref.path.address == firstAddress) {