From cd7eae28f604f54efcc57730d959eb3684c45276 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Dec 2017 13:17:04 +0100 Subject: [PATCH] fix entityPropsFactory id param, #21809 --- .../mima-filters/2.5.7.backwards.excludes | 6 ++- .../scala/akka/cluster/sharding/Shard.scala | 19 +++++----- .../akka/cluster/sharding/ShardRegion.scala | 4 +- .../sharding/ClusterShardingSpec.scala | 37 ++++++++++--------- 4 files changed, 37 insertions(+), 29 deletions(-) diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes index 18deda290a..554c5a9582 100644 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.7.backwards.excludes @@ -1,2 +1,6 @@ # 24058 - Add ClusterSharding.start overloads -ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props") \ No newline at end of file +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.props") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.DDataShard.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.PersistentShard.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.Shard.this") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 61eb8eb2a1..e773d17336 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -107,7 +107,7 @@ private[akka] object Shard { def props( typeName: String, shardId: ShardRegion.ShardId, - entityProps: Props, + entityPropsFactory: String ⇒ Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, @@ -115,13 +115,13 @@ private[akka] object Shard { replicator: ActorRef, majorityMinCap: Int): Props = { if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) { - Props(new DDataShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, + Props(new DDataShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) - Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) + Props(new PersistentShard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) else - Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) + Props(new Shard(typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) } } @@ -137,7 +137,7 @@ private[akka] object Shard { private[akka] class Shard( typeName: String, shardId: ShardRegion.ShardId, - entityProps: Props, + entityPropsFactory: String ⇒ Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, @@ -332,6 +332,7 @@ private[akka] class Shard( context.child(name).getOrElse { log.debug("Starting entity [{}] in shard [{}]", id, shardId) + val entityProps = entityPropsFactory(id) val a = context.watch(context.actorOf(entityProps, name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) @@ -475,12 +476,12 @@ private[akka] trait RememberingShard { selfType: Shard ⇒ private[akka] class PersistentShard( typeName: String, shardId: ShardRegion.ShardId, - entityProps: Props, + entityPropsFactory: String ⇒ Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Shard( - typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) with RememberingShard with PersistentActor with ActorLogging { import ShardRegion.{ EntityId, Msg } @@ -569,14 +570,14 @@ private[akka] class PersistentShard( private[akka] class DDataShard( typeName: String, shardId: ShardRegion.ShardId, - entityProps: Props, + entityPropsFactory: String ⇒ Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int) extends Shard( - typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + typeName, shardId, entityPropsFactory, settings, extractEntityId, extractShardId, handOffStopMessage) with RememberingShard with Stash with ActorLogging { import ShardRegion.{ EntityId, Msg } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index eeaec7bac8..6ed29a2958 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -800,7 +800,7 @@ private[akka] class ShardRegion( else { shards.get(id).orElse( entityPropsFactory match { - case Some(props) if !shardsByRef.values.exists(_ == id) ⇒ + case Some(factory) if !shardsByRef.values.exists(_ == id) ⇒ log.debug("Starting shard [{}] in region", id) val name = URLEncoder.encode(id, "utf-8") @@ -808,7 +808,7 @@ private[akka] class ShardRegion( Shard.props( typeName, id, - props(id), + factory, settings, extractEntityId, extractShardId, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 0084b91059..22c12d98d8 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -92,7 +92,6 @@ object ClusterShardingSpec { } object QualifiedCounter { - val ShardingTypeName: String = "QualifiedCounter" def props(typeName: String, id: String): Props = Props(new QualifiedCounter(typeName, id)) } @@ -105,7 +104,7 @@ object ClusterShardingSpec { def props(id: String): Props = Props(new AnotherCounter(id)) } - class AnotherCounter(id: String) extends QualifiedCounter("AnotherCounter", id) + class AnotherCounter(id: String) extends QualifiedCounter(AnotherCounter.ShardingTypeName, id) //#supervisor object CounterSupervisor { @@ -129,6 +128,9 @@ object ClusterShardingSpec { } //#supervisor + // must use different unique name for some tests than the one used in API tests + val TestCounterShardingTypeName = s"Test${Counter.ShardingTypeName}" + } abstract class ClusterShardingSpecConfig( @@ -314,7 +316,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu ShardCoordinator.props(typeName, settings, allocationStrategy, replicator, majorityMinCap) } - List("counter", "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", + List(TestCounterShardingTypeName, "rebalancingCounter", "RememberCounterEntities", "AnotherRememberCounter", "RememberCounter", "RebalancingRememberCounter", "AutoMigrateRememberRegionTest").foreach { typeName ⇒ val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") val rememberEnabled = typeName.toLowerCase.contains("remember") @@ -356,7 +358,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu name = typeName + "Region") } - lazy val region = createRegion("counter", rememberEntities = false) + lazy val region = createRegion(TestCounterShardingTypeName, rememberEntities = false) lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false) lazy val persistentEntitiesRegion = createRegion("RememberCounterEntities", rememberEntities = true) @@ -428,7 +430,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu region ! EntityEnvelope(2, Increment) region ! Get(2) expectMsg(3) - lastSender.path should ===(node(second) / "user" / "counterRegion" / "2" / "2") + lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "2" / "2") region ! Get(11) expectMsg(1) @@ -436,7 +438,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu lastSender.path should ===(region.path / "11" / "11") region ! Get(12) expectMsg(1) - lastSender.path should ===(node(second) / "user" / "counterRegion" / "0" / "12") + lastSender.path should ===(node(second) / "user" / s"${TestCounterShardingTypeName}Region" / "0" / "12") } enterBarrier("first-update") @@ -475,10 +477,10 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu val settings = ClusterShardingSettings(cfg) val proxy = system.actorOf( ShardRegion.proxyProps( - typeName = Counter.ShardingTypeName, + typeName = TestCounterShardingTypeName, dataCenter = None, settings, - coordinatorPath = "/user/counterCoordinator/singleton/coordinator", + coordinatorPath = s"/user/${TestCounterShardingTypeName}Coordinator/singleton/coordinator", extractEntityId = extractEntityId, extractShardId = extractShardId, system.deadLetters, @@ -553,12 +555,12 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu region ! EntityEnvelope(3, Increment) region ! Get(3) expectMsg(11) - lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3") + lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") region ! EntityEnvelope(4, Increment) region ! Get(4) expectMsg(21) - lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4") + lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") } enterBarrier("first-update") @@ -591,7 +593,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu within(1.second) { region.tell(Get(3), probe3.ref) probe3.expectMsg(11) - probe3.lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3") + probe3.lastSender.path should ===(node(third) / "user" / s"${TestCounterShardingTypeName}Region" / "3" / "3") } } val probe4 = TestProbe() @@ -599,7 +601,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu within(1.second) { region.tell(Get(4), probe4.ref) probe4.expectMsg(21) - probe4.lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4") + probe4.lastSender.path should ===(node(fourth) / "user" / s"${TestCounterShardingTypeName}Region" / "4" / "4") } } @@ -670,16 +672,17 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu runOn(fifth) { //#counter-usage val counterRegion: ActorRef = ClusterSharding(system).shardRegion(Counter.ShardingTypeName) - counterRegion ! Get(123) + val entityId = 999 + counterRegion ! Get(entityId) expectMsg(0) - counterRegion ! EntityEnvelope(123, Increment) - counterRegion ! Get(123) + counterRegion ! EntityEnvelope(entityId, Increment) + counterRegion ! Get(entityId) expectMsg(1) //#counter-usage - ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! EntityEnvelope(123, Decrement) - ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! Get(123) + ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! EntityEnvelope(entityId, Decrement) + ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! Get(entityId) expectMsg(-1) }