Sharded Daemon Process #28710
A new cluster tool for running a number of actor instances balanced over the cluster.
This commit is contained in:
parent
d26453b5e8
commit
6bc4e3b94c
14 changed files with 740 additions and 5 deletions
|
|
@ -12,6 +12,22 @@ akka.cluster.sharding {
|
|||
# //#number-of-shards
|
||||
# //#sharding-ext-config
|
||||
|
||||
|
||||
akka.cluster.sharded-daemon-process {
|
||||
# Settings for the sharded dameon process internal usage of sharding are using the akka.cluste.sharding defaults.
|
||||
# Some of the settings can be overriden specifically for the sharded daemon process here. For example can the
|
||||
# `role` setting limit what nodes the daemon processes and the keep alive pingers will run on.
|
||||
# Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards),
|
||||
# overriding those settings will be ignored.
|
||||
sharding = ${akka.cluster.sharding}
|
||||
|
||||
# Each entity is pinged at this interval from each node in the
|
||||
# cluster to trigger a start if it has stopped, for example during
|
||||
# rebalancing.
|
||||
# Note: How the set of actors is kept alive may change in the future meaning this setting may go away.
|
||||
keep-alive-interval = 10s
|
||||
}
|
||||
|
||||
akka.cluster.configuration-compatibility-check.checkers {
|
||||
akka-cluster-sharding-hash-extractor = "akka.cluster.sharding.typed.internal.JoinConfigCompatCheckerClusterSharding"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed
|
||||
|
||||
import java.time.Duration
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.JavaDurationConverters._
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
@ApiMayChange
|
||||
object ShardedDaemonProcessSettings {
|
||||
|
||||
/** Scala API: Create default settings for system */
|
||||
def apply(system: ActorSystem[_]): ShardedDaemonProcessSettings = {
|
||||
fromConfig(system.settings.config.getConfig("akka.cluster.sharded-daemon-process"))
|
||||
}
|
||||
|
||||
/** Java API: Create default settings for system */
|
||||
def create(system: ActorSystem[_]): ShardedDaemonProcessSettings =
|
||||
apply(system)
|
||||
|
||||
/**
|
||||
* Load settings from a specific config location.
|
||||
*/
|
||||
def fromConfig(config: Config): ShardedDaemonProcessSettings = {
|
||||
val keepAliveInterval = config.getDuration("keep-alive-interval").asScala
|
||||
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, None)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Not for user constructions, use factory methods to instanciate.
|
||||
*/
|
||||
@ApiMayChange
|
||||
final class ShardedDaemonProcessSettings @InternalApi private[akka] (
|
||||
val keepAliveInterval: FiniteDuration,
|
||||
val shardingSettings: Option[ClusterShardingSettings]) {
|
||||
|
||||
/**
|
||||
* Scala API: The interval each parent of the sharded set is pinged from each node in the cluster.
|
||||
*
|
||||
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
|
||||
*/
|
||||
def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings)
|
||||
|
||||
/**
|
||||
* Java API: The interval each parent of the sharded set is pinged from each node in the cluster.
|
||||
*
|
||||
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
|
||||
*/
|
||||
def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings)
|
||||
|
||||
/**
|
||||
* Specify sharding settings that should be used for the sharded daemon process instead of loading from config.
|
||||
* Some settings can not be changed (remember-entitites and related settings, passivation, number-of-shards),
|
||||
* changing those settings will be ignored.
|
||||
*/
|
||||
def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings))
|
||||
}
|
||||
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed.internal
|
||||
|
||||
import java.util.Optional
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.LoggerOps
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.sharding.ShardRegion.EntityId
|
||||
import akka.cluster.sharding.typed.ClusterShardingSettings
|
||||
import akka.cluster.sharding.typed.ClusterShardingSettings.StateStoreModeDData
|
||||
import akka.cluster.sharding.typed.ShardingEnvelope
|
||||
import akka.cluster.sharding.typed.ShardingMessageExtractor
|
||||
import akka.cluster.sharding.typed.scaladsl
|
||||
import akka.cluster.sharding.typed.javadsl
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
|
||||
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
|
||||
import akka.cluster.sharding.typed.scaladsl.Entity
|
||||
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
|
||||
import akka.cluster.sharding.typed.scaladsl.StartEntity
|
||||
import akka.cluster.typed.Cluster
|
||||
import akka.japi.function
|
||||
import akka.util.PrettyDuration
|
||||
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object ShardedDaemonProcessImpl {
|
||||
|
||||
object KeepAlivePinger {
|
||||
sealed trait Event
|
||||
case object Tick extends Event
|
||||
|
||||
def apply[T](
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
name: String,
|
||||
identities: Set[EntityId],
|
||||
shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] =
|
||||
Behaviors.setup { context =>
|
||||
Behaviors.withTimers { timers =>
|
||||
def triggerStartAll(): Unit = {
|
||||
identities.foreach(id => shardingRef ! StartEntity(id))
|
||||
}
|
||||
|
||||
context.log.debug2(
|
||||
s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]",
|
||||
name,
|
||||
PrettyDuration.format(settings.keepAliveInterval))
|
||||
timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
|
||||
triggerStartAll()
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case Tick =>
|
||||
triggerStartAll()
|
||||
context.log.debug("Periodic ping sent to [{}] processes", identities.size)
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final class MessageExtractor[T] extends ShardingMessageExtractor[ShardingEnvelope[T], T] {
|
||||
def entityId(message: ShardingEnvelope[T]): String = message match {
|
||||
case ShardingEnvelope(id, _) => id
|
||||
}
|
||||
|
||||
def shardId(entityId: String): String = entityId
|
||||
|
||||
def unwrapMessage(message: ShardingEnvelope[T]): T = message.message
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
|
||||
extends javadsl.ShardedDaemonProcess
|
||||
with scaladsl.ShardedDaemonProcess {
|
||||
|
||||
import ShardedDaemonProcessImpl._
|
||||
|
||||
def init[T](name: String, numberOfInstances: Int, behaviorFactory: Int => Behavior[T])(
|
||||
implicit classTag: ClassTag[T]): Unit = {
|
||||
init(name, numberOfInstances, behaviorFactory, ShardedDaemonProcessSettings(system), None)(classTag)
|
||||
}
|
||||
|
||||
def init[T](
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: Int => Behavior[T],
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
stopMessage: Option[T])(implicit classTag: ClassTag[T]): Unit = {
|
||||
|
||||
val entityTypeKey = EntityTypeKey[T](s"sharded-daemon-process-$name")
|
||||
|
||||
// One shard per actor identified by the numeric id encoded in the entity id
|
||||
val numberOfShards = numberOfInstances
|
||||
val entityIds = (0 to numberOfInstances).map(_.toString)
|
||||
|
||||
val shardingSettings = {
|
||||
val shardingBaseSettings =
|
||||
settings.shardingSettings match {
|
||||
case None =>
|
||||
// defaults in akka.cluster.sharding but allow overrides specifically for actor-set
|
||||
ClusterShardingSettings.fromConfig(
|
||||
system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding"))
|
||||
case Some(shardingSettings) => shardingSettings
|
||||
}
|
||||
|
||||
new ClusterShardingSettings(
|
||||
numberOfShards,
|
||||
shardingBaseSettings.role,
|
||||
shardingBaseSettings.dataCenter,
|
||||
false, // remember entities disabled
|
||||
"",
|
||||
"",
|
||||
Duration.Zero, // passivation disabled
|
||||
shardingBaseSettings.shardRegionQueryTimeout,
|
||||
StateStoreModeDData,
|
||||
shardingBaseSettings.tuningParameters,
|
||||
shardingBaseSettings.coordinatorSingletonSettings)
|
||||
}
|
||||
|
||||
val nodeRoles = Cluster(system).selfMember.roles
|
||||
if (shardingSettings.role.forall(nodeRoles)) {
|
||||
val entity = Entity(entityTypeKey)(ctx => behaviorFactory(ctx.entityId.toInt))
|
||||
.withSettings(shardingSettings)
|
||||
.withMessageExtractor(new MessageExtractor)
|
||||
|
||||
val entityWithStop = stopMessage match {
|
||||
case Some(stop) => entity.withStopMessage(stop)
|
||||
case None => entity
|
||||
}
|
||||
|
||||
val shardingRef = ClusterSharding(system).init(entityWithStop)
|
||||
|
||||
system.systemActorOf(
|
||||
KeepAlivePinger(settings, name, entityIds.toSet, shardingRef),
|
||||
s"ShardedDaemonProcessKeepAlive-$name")
|
||||
}
|
||||
}
|
||||
|
||||
def init[T](
|
||||
messageClass: Class[T],
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: function.Function[Integer, Behavior[T]]): Unit =
|
||||
init(name, numberOfInstances, n => behaviorFactory(n))(ClassTag(messageClass))
|
||||
|
||||
def init[T](
|
||||
messageClass: Class[T],
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: function.Function[Integer, Behavior[T]],
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
stopMessage: Optional[T]): Unit =
|
||||
init(name, numberOfInstances, n => behaviorFactory(n), settings, stopMessage.asScala)(ClassTag(messageClass))
|
||||
}
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed.javadsl
|
||||
|
||||
import java.util.Optional
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
|
||||
import akka.japi.function
|
||||
|
||||
object ShardedDaemonProcess {
|
||||
def get(system: ActorSystem[_]): ShardedDaemonProcess =
|
||||
akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess(system).asJava
|
||||
}
|
||||
|
||||
/**
|
||||
* This extension runs a pre set number of actors in a cluster.
|
||||
*
|
||||
* The typical use case is when you have a task that can be divided in a number of workers, each doing a
|
||||
* sharded part of the work, for example consuming the read side events from Akka Persistence through
|
||||
* tagged events where each tag decides which consumer that should consume the event.
|
||||
*
|
||||
* Each named set needs to be started on all the nodes of the cluster on start up.
|
||||
*
|
||||
* The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped
|
||||
* and started anew on a new node to rebalance them.
|
||||
*
|
||||
* Not for user extension.
|
||||
*/
|
||||
@DoNotInherit @ApiMayChange
|
||||
abstract class ShardedDaemonProcess {
|
||||
|
||||
/**
|
||||
* Start a specific number of actors that is then kept alive in the cluster.
|
||||
* @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor.
|
||||
*/
|
||||
def init[T](
|
||||
messageClass: Class[T],
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: function.Function[Integer, Behavior[T]]): Unit
|
||||
|
||||
/**
|
||||
* Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster.
|
||||
* @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor.
|
||||
* @param stopMessage if defined sent to the actors when they need to stop because of a rebalance across the nodes of the cluster
|
||||
* or cluster shutdown.
|
||||
*/
|
||||
def init[T](
|
||||
messageClass: Class[T],
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: function.Function[Integer, Behavior[T]],
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
stopMessage: Optional[T]): Unit
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed.scaladsl
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Extension
|
||||
import akka.actor.typed.ExtensionId
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
|
||||
import akka.cluster.sharding.typed.internal.ShardedDaemonProcessImpl
|
||||
import akka.cluster.sharding.typed.javadsl
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
object ShardedDaemonProcess extends ExtensionId[ShardedDaemonProcess] {
|
||||
override def createExtension(system: ActorSystem[_]): ShardedDaemonProcess = new ShardedDaemonProcessImpl(system)
|
||||
}
|
||||
|
||||
/**
|
||||
* This extension runs a pre set number of actors in a cluster.
|
||||
*
|
||||
* The typical use case is when you have a task that can be divided in a number of workers, each doing a
|
||||
* sharded part of the work, for example consuming the read side events from Akka Persistence through
|
||||
* tagged events where each tag decides which consumer that should consume the event.
|
||||
*
|
||||
* Each named set needs to be started on all the nodes of the cluster on start up.
|
||||
*
|
||||
* The processes are spread out across the cluster, when the cluster topology changes the processes may be stopped
|
||||
* and started anew on a new node to rebalance them.
|
||||
*
|
||||
* Not for user extension.
|
||||
*/
|
||||
@DoNotInherit @ApiMayChange
|
||||
trait ShardedDaemonProcess extends Extension { javadslSelf: javadsl.ShardedDaemonProcess =>
|
||||
|
||||
/**
|
||||
* Start a specific number of actors that is then kept alive in the cluster.
|
||||
* @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor.
|
||||
*/
|
||||
def init[T](name: String, numberOfInstances: Int, behaviorFactory: Int => Behavior[T])(
|
||||
implicit classTag: ClassTag[T]): Unit
|
||||
|
||||
/**
|
||||
* Start a specific number of actors, each with a unique numeric id in the set, that is then kept alive in the cluster.
|
||||
* @param behaviorFactory Given a unique id of `0` until `numberOfInstance` create the behavior for that actor.
|
||||
* @param stopMessage if defined sent to the actors when they need to stop because of a rebalance across the nodes of the cluster
|
||||
* or cluster shutdown.
|
||||
*/
|
||||
def init[T](
|
||||
name: String,
|
||||
numberOfInstances: Int,
|
||||
behaviorFactory: Int => Behavior[T],
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
stopMessage: Option[T])(implicit classTag: ClassTag[T]): Unit
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] def asJava: javadsl.ShardedDaemonProcess = javadslSelf
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.receptionist.Receptionist
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.Routers
|
||||
import akka.cluster.MultiNodeClusterSpec
|
||||
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
|
||||
import akka.cluster.typed.MultiNodeTypedClusterSpec
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ShardedDaemonProcessSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
val SnitchServiceKey = ServiceKey[AnyRef]("snitch")
|
||||
|
||||
case class ProcessActorEvent(id: Int, event: Any) extends CborSerializable
|
||||
|
||||
object ProcessActor {
|
||||
trait Command
|
||||
case object Stop extends Command
|
||||
|
||||
def apply(id: Int): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
val snitchRouter = ctx.spawn(Routers.group(SnitchServiceKey), "router")
|
||||
snitchRouter ! ProcessActorEvent(id, "Started")
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case Stop =>
|
||||
snitchRouter ! ProcessActorEvent(id, "Stopped")
|
||||
Behaviors.stopped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.cluster.sharded-daemon-process {
|
||||
sharding {
|
||||
# First is likely to be ignored as shard coordinator not ready
|
||||
retry-interval = 0.2s
|
||||
}
|
||||
# quick ping to make test swift
|
||||
keep-alive-interval = 1s
|
||||
}
|
||||
""").withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class ShardedDaemonProcessMultiJvmNode1 extends ShardedDaemonProcessSpec
|
||||
class ShardedDaemonProcessMultiJvmNode2 extends ShardedDaemonProcessSpec
|
||||
class ShardedDaemonProcessMultiJvmNode3 extends ShardedDaemonProcessSpec
|
||||
|
||||
abstract class ShardedDaemonProcessSpec
|
||||
extends MultiNodeSpec(ShardedDaemonProcessSpec)
|
||||
with MultiNodeTypedClusterSpec
|
||||
with ScalaFutures {
|
||||
|
||||
import ShardedDaemonProcessSpec._
|
||||
|
||||
val probe: TestProbe[AnyRef] = TestProbe[AnyRef]()
|
||||
|
||||
"Cluster sharding in multi dc cluster" must {
|
||||
"form cluster" in {
|
||||
formCluster(first, second, third)
|
||||
runOn(first) {
|
||||
typedSystem.receptionist ! Receptionist.Register(SnitchServiceKey, probe.ref, probe.ref)
|
||||
probe.expectMessageType[Receptionist.Registered]
|
||||
}
|
||||
enterBarrier("snitch-registered")
|
||||
|
||||
probe.awaitAssert({
|
||||
typedSystem.receptionist ! Receptionist.Find(SnitchServiceKey, probe.ref)
|
||||
probe.expectMessageType[Receptionist.Listing].serviceInstances(SnitchServiceKey).size should ===(1)
|
||||
}, 5.seconds)
|
||||
enterBarrier("snitch-seen")
|
||||
}
|
||||
|
||||
"init actor set" in {
|
||||
ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id))
|
||||
enterBarrier("actor-set-initialized")
|
||||
runOn(first) {
|
||||
val startedIds = (0 to 3).map { _ =>
|
||||
val event = probe.expectMessageType[ProcessActorEvent](5.seconds)
|
||||
event.event should ===("Started")
|
||||
event.id
|
||||
}.toSet
|
||||
startedIds.size should ===(4)
|
||||
}
|
||||
enterBarrier("actor-set-started")
|
||||
}
|
||||
|
||||
// FIXME test removing one cluster node and verify all are alive (how do we do that?)
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class ShardedDaemonProcessCompileOnlyTest {
|
||||
|
||||
interface Command {}
|
||||
|
||||
enum Stop implements Command {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
{
|
||||
ActorSystem<Void> system = null;
|
||||
ShardedDaemonProcess.get(system).init(Command.class, "MyName", 8, id -> Behaviors.empty());
|
||||
|
||||
ShardedDaemonProcess.get(system)
|
||||
.init(
|
||||
Command.class,
|
||||
"MyName",
|
||||
8,
|
||||
id -> Behaviors.empty(),
|
||||
ShardedDaemonProcessSettings.create(system),
|
||||
Optional.of(Stop.INSTANCE));
|
||||
|
||||
// #tag-processing
|
||||
List<String> tags = Arrays.asList("tag-1", "tag-2", "tag-3");
|
||||
ShardedDaemonProcess.get(system)
|
||||
.init(
|
||||
TagProcessor.Command.class,
|
||||
"TagProcessors",
|
||||
tags.size(),
|
||||
id -> TagProcessor.create(tags.get(id)));
|
||||
// #tag-processing
|
||||
}
|
||||
|
||||
static class TagProcessor {
|
||||
interface Command {}
|
||||
|
||||
static Behavior<Command> create(String tag) {
|
||||
return Behaviors.setup(
|
||||
context -> {
|
||||
// ... start the tag processing ...
|
||||
return Behaviors.empty();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -4,9 +4,6 @@
|
|||
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>INFO</level>
|
||||
</filter>
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n</pattern>
|
||||
</encoder>
|
||||
|
|
@ -31,5 +28,4 @@
|
|||
<root level="DEBUG">
|
||||
<appender-ref ref="CapturingAppender"/>
|
||||
</root>
|
||||
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding.typed.scaladsl
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.MemberStatus
|
||||
import akka.cluster.sharding.typed.ClusterShardingSettings
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
|
||||
import akka.cluster.typed.Cluster
|
||||
import akka.cluster.typed.Join
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ShardedDaemonProcessSpec {
|
||||
// single node cluster config
|
||||
def config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = cluster
|
||||
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.remote.artery.canonical.hostname = 127.0.0.1
|
||||
|
||||
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
|
||||
|
||||
# ping often/start fast for test
|
||||
akka.cluster.sharded-daemon-process.keep-alive-interval = 1s
|
||||
|
||||
akka.coordinated-shutdown.terminate-actor-system = off
|
||||
akka.coordinated-shutdown.run-by-actor-system-terminate = off
|
||||
""")
|
||||
|
||||
object MyActor {
|
||||
trait Command
|
||||
case object Stop extends Command
|
||||
|
||||
case class Started(id: Int, selfRef: ActorRef[Command])
|
||||
|
||||
def apply(id: Int, probe: ActorRef[Any]): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
probe ! Started(id, ctx.self)
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case Stop =>
|
||||
Behaviors.stopped
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ShardedDaemonProcessSpec
|
||||
extends ScalaTestWithActorTestKit(ShardedDaemonProcessSpec.config)
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
|
||||
import ShardedDaemonProcessSpec._
|
||||
|
||||
"The ShardedDaemonSet" must {
|
||||
|
||||
"have a single node cluster running first" in {
|
||||
val probe = createTestProbe()
|
||||
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
|
||||
probe.awaitAssert({
|
||||
Cluster(system).selfMember.status == MemberStatus.Up
|
||||
}, 3.seconds)
|
||||
}
|
||||
|
||||
"start N actors with unique ids" in {
|
||||
val probe = createTestProbe[Any]()
|
||||
ShardedDaemonProcess(system).init("a", 5, id => MyActor(id, probe.ref))
|
||||
|
||||
val started = probe.receiveMessages(5)
|
||||
started.toSet.size should ===(5)
|
||||
}
|
||||
|
||||
"restart actors if they stop" in {
|
||||
val probe = createTestProbe[Any]()
|
||||
ShardedDaemonProcess(system).init("stop", 2, id => MyActor(id, probe.ref))
|
||||
|
||||
val started = (1 to 2).map(_ => probe.expectMessageType[MyActor.Started]).toSet
|
||||
started.foreach(_.selfRef ! MyActor.Stop)
|
||||
|
||||
// periodic ping every 1s makes it restart
|
||||
(1 to 2).map(_ => probe.expectMessageType[MyActor.Started](3.seconds))
|
||||
}
|
||||
|
||||
"not run if the role does not match node role" in {
|
||||
val probe = createTestProbe[Any]()
|
||||
val settings =
|
||||
ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers"))
|
||||
ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None)
|
||||
|
||||
probe.expectNoMessage()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object TagProcessor {
|
||||
sealed trait Command
|
||||
def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx =>
|
||||
// start the processing ...
|
||||
ctx.log.debug("Starting processor for tag {}", tag)
|
||||
Behaviors.empty
|
||||
}
|
||||
}
|
||||
|
||||
def docExample(): Unit = {
|
||||
// #tag-processing
|
||||
val tags = "tag-1" :: "tag-2" :: "tag-3" :: Nil
|
||||
ShardedDaemonProcess(system).init("TagProcessors", tags.size, id => TagProcessor(tags(id)))
|
||||
// #tag-processing
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -441,7 +441,9 @@ object ShardRegion {
|
|||
* Sent back when a `ShardRegion.StartEntity` message was received and triggered the entity
|
||||
* to start (it does not guarantee the entity successfully started)
|
||||
*/
|
||||
final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable
|
||||
final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId)
|
||||
extends ClusterShardingSerializable
|
||||
with DeadLetterSuppression
|
||||
|
||||
/**
|
||||
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of
|
||||
|
|
|
|||
|
|
@ -29,5 +29,6 @@ that the module or API wasn't useful.
|
|||
These are the current complete modules marked as **may change**:
|
||||
|
||||
* @ref:[Multi Node Testing](../multi-node-testing.md)
|
||||
* @ref:[Sharded Daemon Process](../typed/cluster-sharded-daemon-process.md)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
# Sharded Daemon Process
|
||||
|
||||
|
||||
@@@ warning
|
||||
|
||||
This module is currently marked as @ref:[may change](../common/may-change.md) because it is a new feature that
|
||||
needs feedback from real usage before finalizing the API. This means that API or semantics can change without
|
||||
warning or deprecation period. It is also not recommended to use this module in production just yet.
|
||||
|
||||
@@@
|
||||
|
||||
## Module info
|
||||
|
||||
To use Akka Sharded Daemon Process, you must add the following dependency in your project:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
group=com.typesafe.akka
|
||||
artifact=akka-cluster-sharding-typed_$scala.binary_version$
|
||||
version=$akka.version$
|
||||
}
|
||||
|
||||
@@project-info{ projectId="akka-cluster-sharding-typed" }
|
||||
|
||||
## Introduction
|
||||
|
||||
Sharded Daemon Process provides a way to run `N` actors, each given a numeric id starting from `0` that are then kept alive
|
||||
and balanced across the cluster. When a rebalance is needed the actor is stopped and, triggered by a keep alive running on
|
||||
all nodes, started on a new node (the keep alive should be seen as an implementation detail and may change in future versions).
|
||||
|
||||
The intended use case is for splitting data processing workloads across a set number of workers that each get to work on a subset
|
||||
of the data that needs to be processed. This is commonly needed to create projections based on the event streams available
|
||||
from all the @ref:[EventSourcedBehaviors](persistence.md) in a CQRS application. Events are tagged with one out of `N` tags
|
||||
used to split the workload of consuming and updating a projection between `N` workers.
|
||||
|
||||
For cases where a single actor needs to be kept alive see @ref:[Cluster Singleton](cluster-singleton.md)
|
||||
|
||||
## Basic example
|
||||
|
||||
To set up a set of actors running with Sharded Daemon process each node in the cluster needs to run the same initialization
|
||||
when starting up:
|
||||
|
||||
Scala
|
||||
: @@snip [ShardedDaemonProcessExample.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala) { #tag-processing }
|
||||
|
||||
Java
|
||||
: @@snip [ShardedDaemonProcessExample.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardedDaemonProcessCompileOnlyTest.java) { #tag-processing }
|
||||
|
||||
An additional factory method is provided for further configurability and providing a graceful stop message for the actor.
|
||||
|
||||
## Adressing the actors
|
||||
|
||||
In use cases where you need to send messages to the daemon process actors it is recommended to use the @ref:[system receptionist](actor-discovery.md)
|
||||
either with a single `ServiceKey` which all daemon process actors register themeselves to for broadcasts or individual keys if more fine grained messaging is needed.
|
||||
|
||||
## Scalability
|
||||
|
||||
This cluster tool is intended for small numbers of consumers and will not scale well to a large set. In large clusters
|
||||
it is recommended to limit the nodes the sharded daemon process will run on using a role.
|
||||
|
|
@ -15,6 +15,7 @@ project.description: Akka Cluster concepts, node membership service, CRDT Distri
|
|||
* [cluster-singleton](cluster-singleton.md)
|
||||
* [cluster-sharding](cluster-sharding.md)
|
||||
* [cluster-sharding-specification](cluster-sharding-concepts.md)
|
||||
* [sharded-daemon-process](cluster-sharded-daemon-process.md)
|
||||
* [cluster-dc](cluster-dc.md)
|
||||
* [distributed-pub-sub](distributed-pub-sub.md)
|
||||
* [serialization](../serialization.md)
|
||||
|
|
|
|||
|
|
@ -434,6 +434,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
|
|||
.settings(javacOptions += "-parameters") // for Jackson
|
||||
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
|
||||
// To be able to import ContainerFormats.proto
|
||||
.settings(Protobuf.settings)
|
||||
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf"))
|
||||
.configs(MultiJvm)
|
||||
.enablePlugins(MultiNodeScalaTest)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue