convenience for ShardedDaemonProcess role, #29490

This commit is contained in:
Patrik Nordwall 2020-08-17 13:28:38 +02:00
parent 94dc84d5d0
commit cb51646d8d
5 changed files with 28 additions and 12 deletions

View file

@ -0,0 +1,2 @@
# #29490 Add withRole to ShardedDaemonProcessSettings, internal constructor
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.ShardedDaemonProcessSettings.this")

View file

@ -33,7 +33,7 @@ object ShardedDaemonProcessSettings {
def fromConfig(config: Config): ShardedDaemonProcessSettings = {
val keepAliveInterval = config.getDuration("keep-alive-interval").asScala
new ShardedDaemonProcessSettings(keepAliveInterval, None)
new ShardedDaemonProcessSettings(keepAliveInterval, None, None)
}
}
@ -44,7 +44,8 @@ object ShardedDaemonProcessSettings {
@ApiMayChange
final class ShardedDaemonProcessSettings @InternalApi private[akka] (
val keepAliveInterval: FiniteDuration,
val shardingSettings: Option[ClusterShardingSettings]) {
val shardingSettings: Option[ClusterShardingSettings],
val role: Option[String]) {
/**
* Scala API: The interval each parent of the sharded set is pinged from each node in the cluster.
@ -52,7 +53,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
*/
def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings)
copy(keepAliveInterval = keepAliveInterval)
/**
* Java API: The interval each parent of the sharded set is pinged from each node in the cluster.
@ -60,7 +61,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
*/
def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings)
copy(keepAliveInterval = keepAliveInterval.asScala)
/**
* Specify sharding settings that should be used for the sharded daemon process instead of loading from config.
@ -68,5 +69,20 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
* changing those settings will be ignored.
*/
def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings))
copy(shardingSettings = Option(shardingSettings))
/**
* Specifies that the ShardedDaemonProcess should run on nodes with a specific role.
* If the role is not specified all nodes in the cluster are used. If the given role does
* not match the role of the current node the the ShardedDaemonProcess will not be started.
*/
def withRole(role: String): ShardedDaemonProcessSettings =
copy(role = Option(role))
private def copy(
keepAliveInterval: FiniteDuration = keepAliveInterval,
shardingSettings: Option[ClusterShardingSettings] = shardingSettings,
role: Option[String] = role): ShardedDaemonProcessSettings =
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role)
}

View file

@ -116,7 +116,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
val shardingBaseSettings =
settings.shardingSettings match {
case None =>
// defaults in akka.cluster.sharding but allow overrides specifically for actor-set
// defaults in akka.cluster.sharding but allow overrides specifically for sharded-daemon-process
ClusterShardingSettings.fromConfig(
system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding"))
case Some(shardingSettings) => shardingSettings
@ -124,7 +124,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
new ClusterShardingSettings(
numberOfShards,
shardingBaseSettings.role,
if (settings.role.isDefined) settings.role else shardingBaseSettings.role,
shardingBaseSettings.dataCenter,
false, // remember entities disabled
"",

View file

@ -92,7 +92,7 @@ abstract class ShardedDaemonProcessSpec
"init actor set" in {
ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id))
enterBarrier("actor-set-initialized")
enterBarrier("sharded-daemon-process-initialized")
runOn(first) {
val startedIds = (0 to 3).map { _ =>
val event = probe.expectMessageType[ProcessActorEvent](5.seconds)
@ -101,7 +101,7 @@ abstract class ShardedDaemonProcessSpec
}.toSet
startedIds.size should ===(4)
}
enterBarrier("actor-set-started")
enterBarrier("sharded-daemon-process-started")
}
// FIXME test removing one cluster node and verify all are alive (how do we do that?)

View file

@ -15,7 +15,6 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MemberStatus
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
@ -96,8 +95,7 @@ class ShardedDaemonProcessSpec
"not run if the role does not match node role" in {
val probe = createTestProbe[Any]()
val settings =
ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers"))
val settings = ShardedDaemonProcessSettings(system).withRole("workers")
ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None)
probe.expectNoMessage()