diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 0d85354919..30fd16695d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -590,16 +590,18 @@ private[akka] class ShardRegion( def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match { case Retry ⇒ + sendGracefulShutdownToCoordinator() + if (shardBuffers.nonEmpty) retryCount += 1 if (coordinator.isEmpty) register() else { - sendGracefulShutdownToCoordinator() requestShardBufferHomes() - tryCompleteGracefulShutdown() } + tryCompleteGracefulShutdown() + case GracefulShutdown ⇒ log.debug("Starting graceful shutdown of region and all its shards") gracefulShutdownInProgress = true diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index 12d4dd43ca..f263901410 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -15,12 +15,26 @@ import org.scalatest.mockito.MockitoSugar import scala.concurrent.duration._ +object ClusterShardingInternalsSpec { + case class HandOffStopMessage() extends NoSerializationVerificationNeeded + class EmptyHandlerActor extends Actor { + override def receive: Receive = { + case _ ⇒ + } + + override def postStop(): Unit = { + super.postStop() + } + } +} + class ClusterShardingInternalsSpec extends AkkaSpec( """ |akka.actor.provider = cluster |akka.remote.netty.tcp.port = 0 |akka.remote.artery.canonical.port = 0 |""".stripMargin) with MockitoSugar { + import ClusterShardingInternalsSpec._ val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem])) @@ -50,17 +64,6 @@ class ClusterShardingInternalsSpec extends AkkaSpec( } "HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in { - case class HandOffStopMessage() extends NoSerializationVerificationNeeded - class EmptyHandlerActor extends Actor { - override def receive: Receive = { - case _ ⇒ - } - - override def postStop(): Unit = { - super.postStop() - } - } - val probe = TestProbe() val shardName = "test" val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor)) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala index 94cc39d5ac..344de1f588 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -84,7 +84,7 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi val p3 = TestProbe()(sys2) region2.tell(3, p3.ref) p3.expectMsg(1.seconds, 3) - }, 20.seconds) + }, 10.seconds) } "Sharding and CoordinatedShutdown" must {