Always retry sendGracefulShutdownToCoordinator, #26214

* I could reproduce the issue locally with debug logging and it's clear that it's a
  timing issue. The GracefulShutdownReq message goes to deadletters and it's not
  retried because the coordinator variable was unset.
* cluster-sharding-shutdown-region phase of CoordinatedShutdown timed out
This commit is contained in:
Patrik Nordwall 2019-01-14 13:31:34 +01:00
parent 6f0ea1257e
commit 5c2ffce0df
3 changed files with 19 additions and 14 deletions

View file

@ -590,16 +590,18 @@ private[akka] class ShardRegion(
def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match {
case Retry
sendGracefulShutdownToCoordinator()
if (shardBuffers.nonEmpty)
retryCount += 1
if (coordinator.isEmpty)
register()
else {
sendGracefulShutdownToCoordinator()
requestShardBufferHomes()
tryCompleteGracefulShutdown()
}
tryCompleteGracefulShutdown()
case GracefulShutdown
log.debug("Starting graceful shutdown of region and all its shards")
gracefulShutdownInProgress = true

View file

@ -15,12 +15,26 @@ import org.scalatest.mockito.MockitoSugar
import scala.concurrent.duration._
object ClusterShardingInternalsSpec {
case class HandOffStopMessage() extends NoSerializationVerificationNeeded
class EmptyHandlerActor extends Actor {
override def receive: Receive = {
case _
}
override def postStop(): Unit = {
super.postStop()
}
}
}
class ClusterShardingInternalsSpec extends AkkaSpec(
"""
|akka.actor.provider = cluster
|akka.remote.netty.tcp.port = 0
|akka.remote.artery.canonical.port = 0
|""".stripMargin) with MockitoSugar {
import ClusterShardingInternalsSpec._
val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem]))
@ -50,17 +64,6 @@ class ClusterShardingInternalsSpec extends AkkaSpec(
}
"HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in {
case class HandOffStopMessage() extends NoSerializationVerificationNeeded
class EmptyHandlerActor extends Actor {
override def receive: Receive = {
case _
}
override def postStop(): Unit = {
super.postStop()
}
}
val probe = TestProbe()
val shardName = "test"
val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor))

View file

@ -84,7 +84,7 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
val p3 = TestProbe()(sys2)
region2.tell(3, p3.ref)
p3.expectMsg(1.seconds, 3)
}, 20.seconds)
}, 10.seconds)
}
"Sharding and CoordinatedShutdown" must {