From 6bc4e3b94c50c4a9ae58887eab4b837f9ffe29db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 10 Mar 2020 18:04:09 +0100 Subject: [PATCH] Sharded Daemon Process #28710 A new cluster tool for running a number of actor instances balanced over the cluster. --- .../src/main/resources/reference.conf | 16 ++ .../typed/ShardedDaemonProcessSettings.scala | 71 ++++++++ .../internal/ShardedDaemonProcessImpl.scala | 171 ++++++++++++++++++ .../typed/javadsl/ShardedDaemonProcess.scala | 62 +++++++ .../typed/scaladsl/ShardedDaemonProcess.scala | 66 +++++++ .../typed/ShardedDaemonProcessSpec.scala | 109 +++++++++++ .../ShardedDaemonProcessCompileOnlyTest.java | 59 ++++++ .../src/test/resources/logback-test.xml | 4 - .../scaladsl/ShardedDaemonProcessSpec.scala | 122 +++++++++++++ .../akka/cluster/sharding/ShardRegion.scala | 4 +- .../src/main/paradox/common/may-change.md | 1 + .../typed/cluster-sharded-daemon-process.md | 58 ++++++ .../src/main/paradox/typed/index-cluster.md | 1 + build.sbt | 1 + 14 files changed, 740 insertions(+), 5 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcess.scala create mode 100644 akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala create mode 100644 akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java create mode 100644 akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala create mode 100644 akka-docs/src/main/paradox/typed/cluster-sharded-daemon-process.md diff --git a/akka-cluster-sharding-typed/src/main/resources/reference.conf b/akka-cluster-sharding-typed/src/main/resources/reference.conf index 1c311574de..6307f87dac 100644 --- a/akka-cluster-sharding-typed/src/main/resources/reference.conf +++ b/akka-cluster-sharding-typed/src/main/resources/reference.conf @@ -12,6 +12,22 @@ akka.cluster.sharding { # //#number-of-shards # //#sharding-ext-config + +akka.cluster.sharded-daemon-process { + # Settings for the sharded dameon process internal usage of sharding are using the akka.cluste.sharding defaults. + # Some of the settings can be overriden specifically for the sharded daemon process here. For example can the + # `role` setting limit what nodes the daemon processes and the keep alive pingers will run on. + # Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards), + # overriding those settings will be ignored. + sharding = ${akka.cluster.sharding} + + # Each entity is pinged at this interval from each node in the + # cluster to trigger a start if it has stopped, for example during + # rebalancing. + # Note: How the set of actors is kept alive may change in the future meaning this setting may go away. + keep-alive-interval = 10s +} + akka.cluster.configuration-compatibility-check.checkers { akka-cluster-sharding-hash-extractor = "akka.cluster.sharding.typed.internal.JoinConfigCompatCheckerClusterSharding" } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala new file mode 100644 index 0000000000..aa98928c5f --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import java.time.Duration + +import akka.actor.typed.ActorSystem +import akka.annotation.ApiMayChange +import akka.annotation.InternalApi +import akka.util.JavaDurationConverters._ +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +@ApiMayChange +object ShardedDaemonProcessSettings { + + /** Scala API: Create default settings for system */ + def apply(system: ActorSystem[_]): ShardedDaemonProcessSettings = { + fromConfig(system.settings.config.getConfig("akka.cluster.sharded-daemon-process")) + } + + /** Java API: Create default settings for system */ + def create(system: ActorSystem[_]): ShardedDaemonProcessSettings = + apply(system) + + /** + * Load settings from a specific config location. + */ + def fromConfig(config: Config): ShardedDaemonProcessSettings = { + val keepAliveInterval = config.getDuration("keep-alive-interval").asScala + + new ShardedDaemonProcessSettings(keepAliveInterval, None) + } + +} + +/** + * Not for user constructions, use factory methods to instanciate. + */ +@ApiMayChange +final class ShardedDaemonProcessSettings @InternalApi private[akka] ( + val keepAliveInterval: FiniteDuration, + val shardingSettings: Option[ClusterShardingSettings]) { + + /** + * Scala API: The interval each parent of the sharded set is pinged from each node in the cluster. + * + * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. + */ + def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings = + new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings) + + /** + * Java API: The interval each parent of the sharded set is pinged from each node in the cluster. + * + * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. + */ + def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings = + new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings) + + /** + * Specify sharding settings that should be used for the sharded daemon process instead of loading from config. + * Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards), + * changing those settings will be ignored. + */ + def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings = + new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings)) +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala new file mode 100644 index 0000000000..9c62b648ad --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.internal + +import java.util.Optional + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.LoggerOps +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardRegion.EntityId +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.ShardingMessageExtractor +import akka.cluster.sharding.typed.scaladsl +import akka.cluster.sharding.typed.javadsl +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.StartEntity +import akka.cluster.typed.Cluster +import akka.japi.function +import akka.util.PrettyDuration + +import scala.compat.java8.OptionConverters._ +import scala.concurrent.duration.Duration +import scala.reflect.ClassTag + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object ShardedDaemonProcessImpl { + + object KeepAlivePinger { + sealed trait Event + case object Tick extends Event + + def apply[T]( + settings: ShardedDaemonProcessSettings, + name: String, + identities: Set[EntityId], + shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] = + Behaviors.setup { context => + Behaviors.withTimers { timers => + def triggerStartAll(): Unit = { + identities.foreach(id => shardingRef ! StartEntity(id)) + } + + context.log.debug2( + s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]", + name, + PrettyDuration.format(settings.keepAliveInterval)) + timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval) + triggerStartAll() + + Behaviors.receiveMessage { + case Tick => + triggerStartAll() + context.log.debug("Periodic ping sent to [{}] processes", identities.size) + Behaviors.same + } + } + } + } + + final class MessageExtractor[T] extends ShardingMessageExtractor[ShardingEnvelope[T], T] { + def entityId(message: ShardingEnvelope[T]): String = message match { + case ShardingEnvelope(id, _) => id + } + + def shardId(entityId: String): String = entityId + + def unwrapMessage(message: ShardingEnvelope[T]): T = message.message + } + +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) + extends javadsl.ShardedDaemonProcess + with scaladsl.ShardedDaemonProcess { + + import ShardedDaemonProcessImpl._ + + def init[T](name: String, numberOfInstances: Int, behaviorFactory: Int => Behavior[T])( + implicit classTag: ClassTag[T]): Unit = { + init(name, numberOfInstances, behaviorFactory, ShardedDaemonProcessSettings(system), None)(classTag) + } + + def init[T]( + name: String, + numberOfInstances: Int, + behaviorFactory: Int => Behavior[T], + settings: ShardedDaemonProcessSettings, + stopMessage: Option[T])(implicit classTag: ClassTag[T]): Unit = { + + val entityTypeKey = EntityTypeKey[T](s"sharded-daemon-process-$name") + + // One shard per actor identified by the numeric id encoded in the entity id + val numberOfShards = numberOfInstances + val entityIds = (0 to numberOfInstances).map(_.toString) + + val shardingSettings = { + val shardingBaseSettings = + settings.shardingSettings match { + case None => + // defaults in akka.cluster.sharding but allow overrides specifically for actor-set + ClusterShardingSettings.fromConfig( + system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding")) + case Some(shardingSettings) => shardingSettings + } + + new ClusterShardingSettings( + numberOfShards, + shardingBaseSettings.role, + shardingBaseSettings.dataCenter, + false, // remember entities disabled + "", + "", + Duration.Zero, // passivation disabled + shardingBaseSettings.shardRegionQueryTimeout, + StateStoreModeDData, + shardingBaseSettings.tuningParameters, + shardingBaseSettings.coordinatorSingletonSettings) + } + + val nodeRoles = Cluster(system).selfMember.roles + if (shardingSettings.role.forall(nodeRoles)) { + val entity = Entity(entityTypeKey)(ctx => behaviorFactory(ctx.entityId.toInt)) + .withSettings(shardingSettings) + .withMessageExtractor(new MessageExtractor) + + val entityWithStop = stopMessage match { + case Some(stop) => entity.withStopMessage(stop) + case None => entity + } + + val shardingRef = ClusterSharding(system).init(entityWithStop) + + system.systemActorOf( + KeepAlivePinger(settings, name, entityIds.toSet, shardingRef), + s"ShardedDaemonProcessKeepAlive-$name") + } + } + + def init[T]( + messageClass: Class[T], + name: String, + numberOfInstances: Int, + behaviorFactory: function.Function[Integer, Behavior[T]]): Unit = + init(name, numberOfInstances, n => behaviorFactory(n))(ClassTag(messageClass)) + + def init[T]( + messageClass: Class[T], + name: String, + numberOfInstances: Int, + behaviorFactory: function.Function[Integer, Behavior[T]], + settings: ShardedDaemonProcessSettings, + stopMessage: Optional[T]): Unit = + init(name, numberOfInstances, n => behaviorFactory(n), settings, stopMessage.asScala)(ClassTag(messageClass)) +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala new file mode 100644 index 0000000000..fbef1bba26 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcess.scala @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl + +import java.util.Optional + +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.japi.function + +object ShardedDaemonProcess { + def get(system: ActorSystem[_]): ShardedDaemonProcess = + akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess(system).asJava +} + +/** + * This extension runs a pre set number of actors in a cluster. + * + * The typical use case is when you have a task that can be divided in a number of workers, each doing a + * sharded part of the work, for example consuming the read side events from Akka Persistence through + * tagged events where each tag decides which consumer that should consume the event. + * + * Each named set needs to be started on all the nodes of the cluster on start up. + * + * The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped + * and started anew on a new node to rebalance them. + * + * Not for user extension. + */ +@DoNotInherit @ApiMayChange +abstract class ShardedDaemonProcess { + + /** + * Start a specific number of actors that is then kept alive in the cluster. + * @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor. + */ + def init[T]( + messageClass: Class[T], + name: String, + numberOfInstances: Int, + behaviorFactory: function.Function[Integer, Behavior[T]]): Unit + + /** + * Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster. + * @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor. + * @param stopMessage if defined sent to the actors when they need to stop because of a rebalance across the nodes of the cluster + * or cluster shutdown. + */ + def init[T]( + messageClass: Class[T], + name: String, + numberOfInstances: Int, + behaviorFactory: function.Function[Integer, Behavior[T]], + settings: ShardedDaemonProcessSettings, + stopMessage: Optional[T]): Unit + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcess.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcess.scala new file mode 100644 index 0000000000..276f7baf38 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcess.scala @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Extension +import akka.actor.typed.ExtensionId +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.cluster.sharding.typed.internal.ShardedDaemonProcessImpl +import akka.cluster.sharding.typed.javadsl + +import scala.reflect.ClassTag + +object ShardedDaemonProcess extends ExtensionId[ShardedDaemonProcess] { + override def createExtension(system: ActorSystem[_]): ShardedDaemonProcess = new ShardedDaemonProcessImpl(system) +} + +/** + * This extension runs a pre set number of actors in a cluster. + * + * The typical use case is when you have a task that can be divided in a number of workers, each doing a + * sharded part of the work, for example consuming the read side events from Akka Persistence through + * tagged events where each tag decides which consumer that should consume the event. + * + * Each named set needs to be started on all the nodes of the cluster on start up. + * + * The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped + * and started anew on a new node to rebalance them. + * + * Not for user extension. + */ +@DoNotInherit @ApiMayChange +trait ShardedDaemonProcess extends Extension { javadslSelf: javadsl.ShardedDaemonProcess => + + /** + * Start a specific number of actors that is then kept alive in the cluster. + * @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor. + */ + def init[T](name: String, numberOfInstances: Int, behaviorFactory: Int => Behavior[T])( + implicit classTag: ClassTag[T]): Unit + + /** + * Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster. + * @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor. + * @param stopMessage if defined sent to the actors when they need to stop because of a rebalance across the nodes of the cluster + * or cluster shutdown. + */ + def init[T]( + name: String, + numberOfInstances: Int, + behaviorFactory: Int => Behavior[T], + settings: ShardedDaemonProcessSettings, + stopMessage: Option[T])(implicit classTag: ClassTag[T]): Unit + + /** + * INTERNAL API + */ + @InternalApi private[akka] def asJava: javadsl.ShardedDaemonProcess = javadslSelf + +} diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala new file mode 100644 index 0000000000..c8e5ca0da3 --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.Behavior +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.Routers +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +object ShardedDaemonProcessSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + val SnitchServiceKey = ServiceKey[AnyRef]("snitch") + + case class ProcessActorEvent(id: Int, event: Any) extends CborSerializable + + object ProcessActor { + trait Command + case object Stop extends Command + + def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx => + val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router") + snitchRouter ! ProcessActorEvent(id, "Started") + + Behaviors.receiveMessage { + case Stop => + snitchRouter ! ProcessActorEvent(id, "Stopped") + Behaviors.stopped + } + } + } + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.cluster.sharded-daemon-process { + sharding { + # First is likely to be ignored as shard coordinator not ready + retry-interval = 0.2s + } + # quick ping to make test swift + keep-alive-interval = 1s + } + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ShardedDaemonProcessMultiJvmNode1 extends ShardedDaemonProcessSpec +class ShardedDaemonProcessMultiJvmNode2 extends ShardedDaemonProcessSpec +class ShardedDaemonProcessMultiJvmNode3 extends ShardedDaemonProcessSpec + +abstract class ShardedDaemonProcessSpec + extends MultiNodeSpec(ShardedDaemonProcessSpec) + with MultiNodeTypedClusterSpec + with ScalaFutures { + + import ShardedDaemonProcessSpec._ + + val probe: TestProbe[AnyRef] = TestProbe[AnyRef]() + + "Cluster sharding in multi dc cluster" must { + "form cluster" in { + formCluster(first, second, third) + runOn(first) { + typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, probe.ref, probe.ref) + probe.expectMessageType[Receptionist.Registered] + } + enterBarrier("snitch-registered") + + probe.awaitAssert({ + typedSystem.receptionist ! Receptionist.Find(SnitchServiceKey, probe.ref) + probe.expectMessageType[Receptionist.Listing].serviceInstances(SnitchServiceKey).size should ===(1) + }, 5.seconds) + enterBarrier("snitch-seen") + } + + "init actor set" in { + ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id)) + enterBarrier("actor-set-initialized") + runOn(first) { + val startedIds = (0 to 3).map { _ => + val event = probe.expectMessageType[ProcessActorEvent](5.seconds) + event.event should ===("Started") + event.id + }.toSet + startedIds.size should ===(4) + } + enterBarrier("actor-set-started") + } + + // FIXME test removing one cluster node and verify all are alive (how do we do that?) + + } +} diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java new file mode 100644 index 0000000000..603d083696 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.javadsl; + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +public class ShardedDaemonProcessCompileOnlyTest { + + interface Command {} + + enum Stop implements Command { + INSTANCE + } + + { + ActorSystem system = null; + ShardedDaemonProcess.get(system).init(Command.class, "MyName", 8, id -> Behaviors.empty()); + + ShardedDaemonProcess.get(system) + .init( + Command.class, + "MyName", + 8, + id -> Behaviors.empty(), + ShardedDaemonProcessSettings.create(system), + Optional.of(Stop.INSTANCE)); + + // #tag-processing + List tags = Arrays.asList("tag-1", "tag-2", "tag-3"); + ShardedDaemonProcess.get(system) + .init( + TagProcessor.Command.class, + "TagProcessors", + tags.size(), + id -> TagProcessor.create(tags.get(id))); + // #tag-processing + } + + static class TagProcessor { + interface Command {} + + static Behavior create(String tag) { + return Behaviors.setup( + context -> { + // ... start the tag processing ... + return Behaviors.empty(); + }); + } + } +} diff --git a/akka-cluster-sharding-typed/src/test/resources/logback-test.xml b/akka-cluster-sharding-typed/src/test/resources/logback-test.xml index 41ea808109..a8653ba3cd 100644 --- a/akka-cluster-sharding-typed/src/test/resources/logback-test.xml +++ b/akka-cluster-sharding-typed/src/test/resources/logback-test.xml @@ -4,9 +4,6 @@ - - INFO - %date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n @@ -31,5 +28,4 @@ - diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala new file mode 100644 index 0000000000..e06b610ae0 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.ShardedDaemonProcessSettings +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ + +object ShardedDaemonProcessSpec { + // single node cluster config + def config = ConfigFactory.parseString(""" + akka.actor.provider = cluster + + akka.remote.classic.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + + # ping often/start fast for test + akka.cluster.sharded-daemon-process.keep-alive-interval = 1s + + akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off + """) + + object MyActor { + trait Command + case object Stop extends Command + + case class Started(id: Int, selfRef: ActorRef[Command]) + + def apply(id: Int, probe: ActorRef[Any]): Behavior[Command] = Behaviors.setup { ctx => + probe ! Started(id, ctx.self) + + Behaviors.receiveMessage { + case Stop => + Behaviors.stopped + } + } + + } + +} + +class ShardedDaemonProcessSpec + extends ScalaTestWithActorTestKit(ShardedDaemonProcessSpec.config) + with AnyWordSpecLike + with LogCapturing { + + import ShardedDaemonProcessSpec._ + + "The ShardedDaemonSet" must { + + "have a single node cluster running first" in { + val probe = createTestProbe() + Cluster(system).manager ! Join(Cluster(system).selfMember.address) + probe.awaitAssert({ + Cluster(system).selfMember.status == MemberStatus.Up + }, 3.seconds) + } + + "start N actors with unique ids" in { + val probe = createTestProbe[Any]() + ShardedDaemonProcess(system).init("a", 5, id => MyActor(id, probe.ref)) + + val started = probe.receiveMessages(5) + started.toSet.size should ===(5) + } + + "restart actors if they stop" in { + val probe = createTestProbe[Any]() + ShardedDaemonProcess(system).init("stop", 2, id => MyActor(id, probe.ref)) + + val started = (1 to 2).map(_ => probe.expectMessageType[MyActor.Started]).toSet + started.foreach(_.selfRef ! MyActor.Stop) + + // periodic ping every 1s makes it restart + (1 to 2).map(_ => probe.expectMessageType[MyActor.Started](3.seconds)) + } + + "not run if the role does not match node role" in { + val probe = createTestProbe[Any]() + val settings = + ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers")) + ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None) + + probe.expectNoMessage() + } + + } + + object TagProcessor { + sealed trait Command + def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx => + // start the processing ... + ctx.log.debug("Starting processor for tag {}", tag) + Behaviors.empty + } + } + + def docExample(): Unit = { + // #tag-processing + val tags = "tag-1" :: "tag-2" :: "tag-3" :: Nil + ShardedDaemonProcess(system).init("TagProcessors", tags.size, id => TagProcessor(tags(id))) + // #tag-processing + } + +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 00681c80db..8756aa27af 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -441,7 +441,9 @@ object ShardRegion { * Sent back when a `ShardRegion.StartEntity` message was received and triggered the entity * to start (it does not guarantee the entity successfully started) */ - final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable + final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) + extends ClusterShardingSerializable + with DeadLetterSuppression /** * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of diff --git a/akka-docs/src/main/paradox/common/may-change.md b/akka-docs/src/main/paradox/common/may-change.md index 7849ea0b1b..0c39c1b5c0 100644 --- a/akka-docs/src/main/paradox/common/may-change.md +++ b/akka-docs/src/main/paradox/common/may-change.md @@ -29,5 +29,6 @@ that the module or API wasn't useful. These are the current complete modules marked as **may change**: * @ref:[Multi Node Testing](../multi-node-testing.md) +* @ref:[Sharded Daemon Process](../typed/cluster-sharded-daemon-process.md) diff --git a/akka-docs/src/main/paradox/typed/cluster-sharded-daemon-process.md b/akka-docs/src/main/paradox/typed/cluster-sharded-daemon-process.md new file mode 100644 index 0000000000..9ed36d620f --- /dev/null +++ b/akka-docs/src/main/paradox/typed/cluster-sharded-daemon-process.md @@ -0,0 +1,58 @@ +# Sharded Daemon Process + + +@@@ warning + +This module is currently marked as @ref:[may change](../common/may-change.md) because it is a new feature that +needs feedback from real usage before finalizing the API. This means that API or semantics can change without +warning or deprecation period. It is also not recommended to use this module in production just yet. + +@@@ + +## Module info + +To use Akka Sharded Daemon Process, you must add the following dependency in your project: + +@@dependency[sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-cluster-sharding-typed_$scala.binary_version$ + version=$akka.version$ +} + +@@project-info{ projectId="akka-cluster-sharding-typed" } + +## Introduction + +Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive +and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on +all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions). + +The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset +of the data that needs to be processed. This is commonly needed to create projections based on the event streams available +from all the @ref:[EventSourcedBehaviors](persistence.md) in a CQRS application. Events are tagged with one out of `N` tags +used to split the workload of consuming and updating a projection between `N` workers. + +For cases where a single actor needs to be kept alive see @ref:[Cluster Singleton](cluster-singleton.md) + +## Basic example + +To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization +when starting up: + +Scala +: @@snip [ShardedDaemonProcessExample.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala) { #tag-processing } + +Java +: @@snip [ShardedDaemonProcessExample.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java) { #tag-processing } + +An additional factory method is provided for further configurability and providing a graceful stop message for the actor. + +## Adressing the actors + +In use cases where you need to send messages to the daemon process actors it is recommended to use the @ref:[system receptionist](actor-discovery.md) +either with a single `ServiceKey` which all daemon process actors register themeselves to for broadcasts or individual keys if more fine grained messaging is needed. + +## Scalability + +This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters +it is recommended to limit the nodes the sharded daemon process will run on using a role. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/typed/index-cluster.md b/akka-docs/src/main/paradox/typed/index-cluster.md index af402f28c3..c5741a7a96 100644 --- a/akka-docs/src/main/paradox/typed/index-cluster.md +++ b/akka-docs/src/main/paradox/typed/index-cluster.md @@ -15,6 +15,7 @@ project.description: Akka Cluster concepts, node membership service, CRDT Distri * [cluster-singleton](cluster-singleton.md) * [cluster-sharding](cluster-sharding.md) * [cluster-sharding-specification](cluster-sharding-concepts.md) +* [sharded-daemon-process](cluster-sharded-daemon-process.md) * [cluster-dc](cluster-dc.md) * [distributed-pub-sub](distributed-pub-sub.md) * [serialization](../serialization.md) diff --git a/build.sbt b/build.sbt index c02f92e853..a887999827 100644 --- a/build.sbt +++ b/build.sbt @@ -434,6 +434,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .settings(javacOptions += "-parameters") // for Jackson .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed")) // To be able to import ContainerFormats.proto + .settings(Protobuf.settings) .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf")) .configs(MultiJvm) .enablePlugins(MultiNodeScalaTest)