From 0eedd714e8562966cf06c20a736eb1f31fc1ff0c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Jan 2018 13:51:35 +0100 Subject: [PATCH] reply to known shard locations immediately when waitingForUpdate, #24064 --- .../cluster/sharding/ShardCoordinator.scala | 74 ++++++---- ...sterShardingSingleShardPerEntitySpec.scala | 138 ++++++++++++++++++ 2 files changed, 184 insertions(+), 28 deletions(-) create mode 100644 akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 7ab646960d..866b1d1904 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -478,34 +478,23 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti } case GetShardHome(shard) ⇒ - if (rebalanceInProgress.contains(shard)) { - log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard) - } else if (!hasAllRegionsRegistered()) { - log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) - } else { - state.shards.get(shard) match { - case Some(ref) ⇒ - if (regionTerminationInProgress(ref)) - log.debug("GetShardHome [{}] request ignored, due to region [{}] termination in progress.", shard, ref) - else - sender() ! ShardHome(shard, ref) - case None ⇒ - val activeRegions = state.regions -- gracefulShutdownInProgress - if (activeRegions.nonEmpty) { - val getShardHomeSender = sender() - val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions) - regionFuture.value match { - case Some(Success(region)) ⇒ - continueGetShardHome(shard, region, getShardHomeSender) - case _ ⇒ - // continue when future is completed - regionFuture.map { region ⇒ - AllocateShardResult(shard, Some(region), getShardHomeSender) - }.recover { - case _ ⇒ AllocateShardResult(shard, None, getShardHomeSender) - }.pipeTo(self) - } - } + if (!handleGetShardHome(shard)) { + // location not know, yet + val activeRegions = state.regions -- gracefulShutdownInProgress + if (activeRegions.nonEmpty) { + val getShardHomeSender = sender() + val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions) + regionFuture.value match { + case Some(Success(region)) ⇒ + continueGetShardHome(shard, region, getShardHomeSender) + case _ ⇒ + // continue when future is completed + regionFuture.map { region ⇒ + AllocateShardResult(shard, Some(region), getShardHomeSender) + }.recover { + case _ ⇒ AllocateShardResult(shard, None, getShardHomeSender) + }.pipeTo(self) + } } } @@ -610,6 +599,31 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti }: Receive).orElse[Any, Unit](receiveTerminated) + /** + * @return `true` if the message could be handled without state update, i.e. + * the shard location was known or the request was ignored + */ + def handleGetShardHome(shard: String): Boolean = { + if (rebalanceInProgress.contains(shard)) { + log.debug("GetShardHome [{}] request ignored, because rebalance is in progress for this shard.", shard) + true + } else if (!hasAllRegionsRegistered()) { + log.debug("GetShardHome [{}] request ignored, because not all regions have registered yet.", shard) + true + } else { + state.shards.get(shard) match { + case Some(ref) ⇒ + if (regionTerminationInProgress(ref)) + log.debug("GetShardHome [{}] request ignored, due to region [{}] termination in progress.", shard, ref) + else + sender() ! ShardHome(shard, ref) + true + case None ⇒ + false // location not known, yet, caller will handle allocation + } + } + } + def receiveTerminated: Receive = { case t @ Terminated(ref) ⇒ if (state.regions.contains(ref)) { @@ -1010,6 +1024,10 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, key, error, evt) throw cause + case GetShardHome(shard) ⇒ + if (!handleGetShardHome(shard)) + stash() // must wait for update that is in progress + case _ ⇒ stash() } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala new file mode 100644 index 0000000000..1efe7875f3 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSingleShardPerEntitySpec.scala @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.duration._ + +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.MultiNodeClusterSpec +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +/** + * one-to-one mapping between shards and entities is not efficient but some use that anyway + */ +object ClusterShardingSingleShardPerEntitySpec { + class Entity extends Actor { + def receive = { + case id: Int ⇒ sender() ! id + } + } + + val extractEntityId: ShardRegion.ExtractEntityId = { + case id: Int ⇒ (id.toString, id) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case id: Int ⇒ id.toString + } + +} + +object ClusterShardingSingleShardPerEntitySpecConfig extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.sharding.state-store-mode = ddata + akka.cluster.sharding.updating-state-timeout = 1s + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +class ClusterShardingSingleShardPerEntitySpecMultiJvmNode1 extends ClusterShardingSingleShardPerEntitySpec +class ClusterShardingSingleShardPerEntitySpecMultiJvmNode2 extends ClusterShardingSingleShardPerEntitySpec +class ClusterShardingSingleShardPerEntitySpecMultiJvmNode3 extends ClusterShardingSingleShardPerEntitySpec +class ClusterShardingSingleShardPerEntitySpecMultiJvmNode4 extends ClusterShardingSingleShardPerEntitySpec +class ClusterShardingSingleShardPerEntitySpecMultiJvmNode5 extends ClusterShardingSingleShardPerEntitySpec + +abstract class ClusterShardingSingleShardPerEntitySpec extends MultiNodeSpec(ClusterShardingSingleShardPerEntitySpecConfig) + with STMultiNodeSpec with ImplicitSender { + import ClusterShardingSingleShardPerEntitySpec._ + import ClusterShardingSingleShardPerEntitySpecConfig._ + + override def initialParticipants = roles.size + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + startSharding() + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entityProps = Props[Entity], + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + def joinAndAllocate(node: RoleName, entityId: Int): Unit = { + within(10.seconds) { + join(node, first) + runOn(node) { + region ! entityId + expectMsg(entityId) + lastSender.path should be(region.path / s"$entityId" / s"$entityId") + } + } + enterBarrier(s"started-$entityId") + } + + s"Cluster sharding with single shard per entity" must { + + "use specified region" in { + joinAndAllocate(first, 1) + joinAndAllocate(second, 2) + joinAndAllocate(third, 3) + joinAndAllocate(fourth, 4) + joinAndAllocate(fifth, 5) + + runOn(first) { + // coordinator is on 'first', blackhole 3 other means that it can't update with WriteMajority + testConductor.blackhole(first, third, Direction.Both).await + testConductor.blackhole(first, fourth, Direction.Both).await + testConductor.blackhole(first, fifth, Direction.Both).await + + // shard 6 not allocated yet and due to the blackhole it will not be completed + region ! 6 + + // shard 1 location is know by 'first' region, not involving coordinator + region ! 1 + expectMsg(1) + + // shard 2 location not known at 'first' region yet, but coordinator is on 'first' and should + // be able to answer GetShardHome even though previous request for shard 4 has not completed yet + region ! 2 + expectMsg(2) + lastSender.path should be(node(second) / "system" / "sharding" / "Entity" / "2" / "2") + + testConductor.passThrough(first, third, Direction.Both).await + testConductor.passThrough(first, fourth, Direction.Both).await + testConductor.passThrough(first, fifth, Direction.Both).await + expectMsg(10.seconds, 6) + } + + enterBarrier("after-1") + } + + } +}