Warn if handOffStopMessage not handled (#25648)

* Stops entities of shard forcefully if they don't handle stopMessage #23751

* Prints a warning log while stopping the entities

* fix version of backward exclude file and checks for shard stopped

* adds documentation for handoff timeout
This commit is contained in:
Saleh Khazaei 2018-12-04 20:50:58 +03:30 committed by Patrik Nordwall
parent af38a1eefd
commit c383f4483b
5 changed files with 64 additions and 9 deletions

View file

@ -0,0 +1,3 @@
# #23751 warn if handOffStopMessage not handled
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion#HandOffStopper.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.handOffStopperProps")

View file

@ -40,6 +40,8 @@ akka.cluster.sharding {
buffer-size = 100000
# Timeout of the shard rebalancing process.
# Additionally, if an entity doesn't handle the stopMessage
# after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefully
handoff-timeout = 60 s
# Time given to a region to acknowledge it's hosting a shard.

View file

@ -16,7 +16,7 @@ import akka.actor.Actor
import akka.util.MessageBufferMap
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey
@ -233,8 +233,9 @@ private[akka] class Shard(
log.debug("HandOff shard [{}]", shardId)
if (state.entities.nonEmpty) {
val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds)
handOffStopper = Some(context.watch(context.actorOf(
handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage))))
handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage, entityHandOffTimeout))))
//During hand off we only care about watching for termination of the hand off stopper
context become {

View file

@ -329,11 +329,14 @@ object ShardRegion {
/**
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of
* them have terminated it replies with `ShardStopped`.
* If the entities don't terminate after `handoffTimeout` it will try stopping them forcefully.
*/
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any)
extends Actor {
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any, handoffTimeout: FiniteDuration)
extends Actor with ActorLogging {
import ShardCoordinator.Internal.ShardStopped
context.setReceiveTimeout(handoffTimeout)
entities.foreach { a
context watch a
a ! stopMessage
@ -342,6 +345,15 @@ object ShardRegion {
var remaining = entities
def receive = {
case ReceiveTimeout
log.warning("HandOffStopMessage[{}] is not handled by some of the entities of the `{}` shard, " +
"stopping the remaining entities.", stopMessage.getClass.getName, shard)
remaining.foreach {
ref
context stop ref
}
case Terminated(ref)
remaining -= ref
if (remaining.isEmpty) {
@ -352,8 +364,8 @@ object ShardRegion {
}
private[akka] def handOffStopperProps(
shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any): Props =
Props(new HandOffStopper(shard, replyTo, entities, stopMessage)).withDeploy(Deploy.local)
shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any, handoffTimeout: FiniteDuration): Props =
Props(new HandOffStopper(shard, replyTo, entities, stopMessage, handoffTimeout)).withDeploy(Deploy.local)
}
/**

View file

@ -4,14 +4,23 @@
package akka.cluster.sharding
import akka.actor.{ ExtendedActorSystem, PoisonPill, Props }
import akka.actor.{ Actor, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props }
import akka.cluster.sharding.ShardCoordinator.Internal.ShardStopped
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.testkit.AkkaSpec
import akka.cluster.sharding.ShardRegion.HandOffStopper
import akka.testkit.{ AkkaSpec, TestProbe }
import org.mockito.ArgumentMatchers
import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar
class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cluster"""") with MockitoSugar {
import scala.concurrent.duration._
class ClusterShardingInternalsSpec extends AkkaSpec(
"""
|akka.actor.provider = cluster
|akka.remote.netty.tcp.port = 0
|akka.remote.artery.canonical.port = 0
|""".stripMargin) with MockitoSugar {
val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem]))
@ -39,5 +48,33 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cl
ArgumentMatchers.eq(extractEntityId),
ArgumentMatchers.eq(extractShardId))
}
"HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in {
case class HandOffStopMessage() extends NoSerializationVerificationNeeded
class EmptyHandlerActor extends Actor {
override def receive: Receive = {
case _
}
override def postStop(): Unit = {
super.postStop()
}
}
val probe = TestProbe()
val shardName = "test"
val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor))
val handOffStopper = system.actorOf(
Props(new HandOffStopper(shardName, probe.ref, Set(emptyHandlerActor), HandOffStopMessage, 10.millis))
)
watch(emptyHandlerActor)
expectTerminated(emptyHandlerActor, 1.seconds)
probe.expectMsg(1.seconds, ShardStopped(shardName))
watch(handOffStopper)
expectTerminated(handOffStopper, 1.seconds)
}
}
}