diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes new file mode 100644 index 0000000000..d3932216bb --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes @@ -0,0 +1,2 @@ +# #29490 Add withRole to ShardedDaemonProcessSettings, internal constructor +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.ShardedDaemonProcessSettings.this") diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala index cb69c9189b..4ff792b716 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala @@ -33,7 +33,7 @@ object ShardedDaemonProcessSettings { def fromConfig(config: Config): ShardedDaemonProcessSettings = { val keepAliveInterval = config.getDuration("keep-alive-interval").asScala - new ShardedDaemonProcessSettings(keepAliveInterval, None) + new ShardedDaemonProcessSettings(keepAliveInterval, None, None) } } @@ -44,7 +44,8 @@ object ShardedDaemonProcessSettings { @ApiMayChange final class ShardedDaemonProcessSettings @InternalApi private[akka] ( val keepAliveInterval: FiniteDuration, - val shardingSettings: Option[ClusterShardingSettings]) { + val shardingSettings: Option[ClusterShardingSettings], + val role: Option[String]) { /** * Scala API: The interval each parent of the sharded set is pinged from each node in the cluster. @@ -52,7 +53,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. */ def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings) + copy(keepAliveInterval = keepAliveInterval) /** * Java API: The interval each parent of the sharded set is pinged from each node in the cluster. @@ -60,7 +61,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. */ def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings) + copy(keepAliveInterval = keepAliveInterval.asScala) /** * Specify sharding settings that should be used for the sharded daemon process instead of loading from config. @@ -68,5 +69,20 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * changing those settings will be ignored. */ def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings)) + copy(shardingSettings = Option(shardingSettings)) + + /** + * Specifies that the ShardedDaemonProcess should run on nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. If the given role does + * not match the role of the current node the the ShardedDaemonProcess will not be started. + */ + def withRole(role: String): ShardedDaemonProcessSettings = + copy(role = Option(role)) + + private def copy( + keepAliveInterval: FiniteDuration = keepAliveInterval, + shardingSettings: Option[ClusterShardingSettings] = shardingSettings, + role: Option[String] = role): ShardedDaemonProcessSettings = + new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role) + } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala index a4341c678d..fa6df2ab0b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -116,7 +116,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) val shardingBaseSettings = settings.shardingSettings match { case None => - // defaults in akka.cluster.sharding but allow overrides specifically for actor-set + // defaults in akka.cluster.sharding but allow overrides specifically for sharded-daemon-process ClusterShardingSettings.fromConfig( system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding")) case Some(shardingSettings) => shardingSettings @@ -124,7 +124,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) new ClusterShardingSettings( numberOfShards, - shardingBaseSettings.role, + if (settings.role.isDefined) settings.role else shardingBaseSettings.role, shardingBaseSettings.dataCenter, false, // remember entities disabled "", diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala index 5f1181b509..b81e92048c 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala @@ -92,7 +92,7 @@ abstract class ShardedDaemonProcessSpec "init actor set" in { ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id)) - enterBarrier("actor-set-initialized") + enterBarrier("sharded-daemon-process-initialized") runOn(first) { val startedIds = (0 to 3).map { _ => val event = probe.expectMessageType[ProcessActorEvent](5.seconds) @@ -101,7 +101,7 @@ abstract class ShardedDaemonProcessSpec }.toSet startedIds.size should ===(4) } - enterBarrier("actor-set-started") + enterBarrier("sharded-daemon-process-started") } // FIXME test removing one cluster node and verify all are alive (how do we do that?) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala index 519c0a2953..98c67cc78c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala @@ -15,7 +15,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join @@ -96,8 +95,7 @@ class ShardedDaemonProcessSpec "not run if the role does not match node role" in { val probe = createTestProbe[Any]() - val settings = - ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers")) + val settings = ShardedDaemonProcessSettings(system).withRole("workers") ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None) probe.expectNoMessage()