diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes new file mode 100644 index 0000000000..460960f77e --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.18.backwards.excludes @@ -0,0 +1,3 @@ +# #23751 warn if handOffStopMessage not handled +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion#HandOffStopper.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.handOffStopperProps") \ No newline at end of file diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 4cf2117278..be9314aea2 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -40,6 +40,8 @@ akka.cluster.sharding { buffer-size = 100000 # Timeout of the shard rebalancing process. + # Additionally, if an entity doesn't handle the stopMessage + # after (handoff-timeout - 5.seconds).max(1.second) it will be stopped forcefully handoff-timeout = 60 s # Time given to a region to acknowledge it's hosting a shard. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index dc7d12830c..65730e4e0a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -16,7 +16,7 @@ import akka.actor.Actor import akka.util.MessageBufferMap import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import akka.cluster.Cluster import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey @@ -233,8 +233,9 @@ private[akka] class Shard( log.debug("HandOff shard [{}]", shardId) if (state.entities.nonEmpty) { + val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) handOffStopper = Some(context.watch(context.actorOf( - handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage)))) + handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage, entityHandOffTimeout)))) //During hand off we only care about watching for termination of the hand off stopper context become { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index d7bd8b190c..f67c6a6686 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -329,11 +329,14 @@ object ShardRegion { /** * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of * them have terminated it replies with `ShardStopped`. + * If the entities don't terminate after `handoffTimeout` it will try stopping them forcefully. */ - private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any) - extends Actor { + private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any, handoffTimeout: FiniteDuration) + extends Actor with ActorLogging { import ShardCoordinator.Internal.ShardStopped + context.setReceiveTimeout(handoffTimeout) + entities.foreach { a ⇒ context watch a a ! stopMessage @@ -342,6 +345,15 @@ object ShardRegion { var remaining = entities def receive = { + case ReceiveTimeout ⇒ + log.warning("HandOffStopMessage[{}] is not handled by some of the entities of the `{}` shard, " + + "stopping the remaining entities.", stopMessage.getClass.getName, shard) + + remaining.foreach { + ref ⇒ + context stop ref + } + case Terminated(ref) ⇒ remaining -= ref if (remaining.isEmpty) { @@ -352,8 +364,8 @@ object ShardRegion { } private[akka] def handOffStopperProps( - shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any): Props = - Props(new HandOffStopper(shard, replyTo, entities, stopMessage)).withDeploy(Deploy.local) + shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any, handoffTimeout: FiniteDuration): Props = + Props(new HandOffStopper(shard, replyTo, entities, stopMessage, handoffTimeout)).withDeploy(Deploy.local) } /** diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index 4529830cea..8472af3cb8 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -4,14 +4,23 @@ package akka.cluster.sharding -import akka.actor.{ ExtendedActorSystem, PoisonPill, Props } +import akka.actor.{ Actor, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props } +import akka.cluster.sharding.ShardCoordinator.Internal.ShardStopped import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import akka.testkit.AkkaSpec +import akka.cluster.sharding.ShardRegion.HandOffStopper +import akka.testkit.{ AkkaSpec, TestProbe } import org.mockito.ArgumentMatchers import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar -class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cluster"""") with MockitoSugar { +import scala.concurrent.duration._ + +class ClusterShardingInternalsSpec extends AkkaSpec( + """ + |akka.actor.provider = cluster + |akka.remote.netty.tcp.port = 0 + |akka.remote.artery.canonical.port = 0 + |""".stripMargin) with MockitoSugar { val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem])) @@ -39,5 +48,33 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""akka.actor.provider = "cl ArgumentMatchers.eq(extractEntityId), ArgumentMatchers.eq(extractShardId)) } + + "HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in { + case class HandOffStopMessage() extends NoSerializationVerificationNeeded + class EmptyHandlerActor extends Actor { + override def receive: Receive = { + case _ ⇒ + } + + override def postStop(): Unit = { + super.postStop() + } + } + + val probe = TestProbe() + val shardName = "test" + val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor)) + val handOffStopper = system.actorOf( + Props(new HandOffStopper(shardName, probe.ref, Set(emptyHandlerActor), HandOffStopMessage, 10.millis)) + ) + + watch(emptyHandlerActor) + expectTerminated(emptyHandlerActor, 1.seconds) + + probe.expectMsg(1.seconds, ShardStopped(shardName)) + + watch(handOffStopper) + expectTerminated(handOffStopper, 1.seconds) + } } }