diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala index 31ecf4164f..8d32e45cbc 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala @@ -283,7 +283,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh } @DoNotInherit -sealed trait ClusterSharding extends Extension { +sealed abstract class ClusterSharding extends Extension { /** * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. diff --git a/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java new file mode 100644 index 0000000000..ff8d386504 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -0,0 +1,82 @@ +package jdoc.akka.cluster.sharding.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.Props; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.ClusterSharding; +import akka.cluster.sharding.typed.*; +import akka.cluster.typed.ClusterSingleton; +import akka.cluster.typed.ClusterSingletonSettings; + +public class ShardingCompileOnlyTest { + + //#counter + interface CounterCommand {} + public static class Increment implements CounterCommand { } + public static class GoodByeCounter implements CounterCommand { } + + public static class GetValue implements CounterCommand { + private final ActorRef replyTo; + public GetValue(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static Behavior counter(Integer value) { + return Behaviors.immutable(CounterCommand.class) + .onMessage(Increment.class, (ctx, msg) -> { + return counter(value + 1); + }) + .onMessage(GetValue.class, (ctx, msg) -> { + msg.replyTo.tell(value); + return Behaviors.same(); + }) + .build(); + } + //#counter + + public static void example() { + + ActorSystem system = ActorSystem.create( + Behaviors.empty(), "ShardingExample" + ); + + //#sharding-extension + ClusterSharding sharding = ClusterSharding.get(system); + //#sharding-extension + + //#spawn + EntityTypeKey typeKey = EntityTypeKey.create(CounterCommand.class, "Counter"); + ActorRef> shardRegion = sharding.spawn( + counter(0), + Props.empty(), + typeKey, + ClusterShardingSettings.create(system), + 10, + new GoodByeCounter()); + //#spawn + + //#send + EntityRef counterOne = sharding.entityRefFor(typeKey, "counter-`"); + counterOne.tell(new Increment()); + + shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment())); + //#send + + //#singleton + ClusterSingleton singleton = ClusterSingleton.get(system); + // Start if needed and provide a proxy to a named singleton + ActorRef proxy = singleton.spawn( + counter(0), + "GlobalCounter", + Props.empty(), + ClusterSingletonSettings.create(system), + new GoodByeCounter() + ); + + proxy.tell(new Increment()); + //#singleton + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala new file mode 100644 index 0000000000..7989dcd748 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -0,0 +1,81 @@ +package doc.akka.cluster.sharding.typed + +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.typed.{ ClusterSingleton, ClusterSingletonSettings } +import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec +import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost } + +object ShardingCompileOnlySpec { + + val system = ActorSystem(Behaviors.empty, "Sharding") + + //#sharding-extension + import akka.cluster.sharding.typed._ + val sharding = ClusterSharding(system) + //#sharding-extension + + //#counter + trait CounterCommand + case object Increment extends CounterCommand + final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand + case object GoodByeCounter extends CounterCommand + + def counter(value: Int): Behavior[CounterCommand] = Behaviors.immutable[CounterCommand] { + case (ctx, Increment) ⇒ + counter(value + 1) + case (ctx, GetValue(replyTo)) ⇒ + replyTo ! value + Behaviors.same + } + //#counter + + //#spawn + val TypeKey = EntityTypeKey[CounterCommand]("Counter") + // if a extractor is defined then the type would be ActorRef[BasicCommand] + val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn[CounterCommand]( + behavior = counter(0), + props = Props.empty, + typeKey = TypeKey, + settings = ClusterShardingSettings(system), + maxNumberOfShards = 10, + handOffStopMessage = GoodByeCounter) + //#spawn + + //#send + // With an EntityRef + val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1") + counterOne ! Increment + + // Entity id is specified via an `ShardingEnvelope` + shardRegion ! ShardingEnvelope("counter-1", Increment) + //#send + + //#persistence + val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost") + ClusterSharding(system).spawn[BlogCommand]( + behavior = InDepthPersistentBehaviorSpec.behavior, + props = Props.empty, + typeKey = ShardingTypeName, + settings = ClusterShardingSettings(system), + maxNumberOfShards = 10, + handOffStopMessage = PassivatePost) + //#persistence + + // as a singleton + + //#singleton + val singletonManager = ClusterSingleton(system) + // Start if needed and provide a proxy to a named singleton + val proxy: ActorRef[CounterCommand] = singletonManager.spawn( + behavior = counter(0), + "GlobalCounter", + Props.empty, + ClusterSingletonSettings(system), + terminationMessage = GoodByeCounter + ) + + proxy ! Increment + //#singleton + +} diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala index 0cedffbd12..bc6e15c521 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/ClusterSingleton.scala @@ -8,7 +8,6 @@ import akka.annotation.{ DoNotInherit, InternalApi } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings } import akka.cluster.typed.internal.AdaptedClusterSingletonImpl -import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } import com.typesafe.config.Config import scala.concurrent.duration._ @@ -20,6 +19,11 @@ object ClusterSingletonSettings { system: ActorSystem[_] ): ClusterSingletonSettings = fromConfig(system.settings.config.getConfig("akka.cluster")) + /** + * Java API + */ + def create(system: ActorSystem[_]): ClusterSingletonSettings = apply(system) + def fromConfig( config: Config ): ClusterSingletonSettings = { @@ -115,7 +119,7 @@ private[akka] object ClusterSingletonImpl { * Not intended for user extension. */ @DoNotInherit -trait ClusterSingleton extends Extension { +abstract class ClusterSingleton extends Extension { /** * Start if needed and provide a proxy to a named singleton diff --git a/akka-docs/src/main/paradox/cluster-sharding-typed.md b/akka-docs/src/main/paradox/cluster-sharding-typed.md index b8f7e11a23..15797932af 100644 --- a/akka-docs/src/main/paradox/cluster-sharding-typed.md +++ b/akka-docs/src/main/paradox/cluster-sharding-typed.md @@ -1,4 +1,4 @@ -# Sharding +# Cluster Sharding @@@ warning @@ -9,7 +9,7 @@ This module is currently marked as @ref:[may change](common/may-change.md) in th @@@ -To use the testkit add the following dependency: +To use cluster sharding add the following dependency: @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka @@ -17,5 +17,60 @@ To use the testkit add the following dependency: version=$akka.version$ } -For an introduction to Akka Cluster concepts see [Cluster Specification]. This documentation shows how to use the typed -Cluster API. \ No newline at end of file +For an introduction to Sharding concepts see @ref:[Cluster Sharding](cluster-sharding.md). This documentation shows how to use the typed +Cluster Sharding API. + +## Basic example + +Sharding is accessed via the `ClusterSharding` extension + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension } + +It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } + +Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier. + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn } + +Messages to a specific entity are then sent via an EntityRef. +It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region. + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send } + +## Persistence example + +When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of +an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the +Java API [here](https://github.com/akka/akka/issues/24193). + +Taking the larger example from the @ref:[persistence documentation](persistence-typed.md#larger-example) and making it into +a sharded entity is the same as for a non persistent behavior. The behavior: + +Scala +: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior } + +To create the entity: + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } + +Sending messages to entities is the same as the example above. The only difference is ow when an entity is moved the state will be restored. +See @ref:[persistence](persistence-typed.md) for more details. diff --git a/akka-docs/src/main/paradox/cluster-singleton-typed.md b/akka-docs/src/main/paradox/cluster-singleton-typed.md new file mode 100644 index 0000000000..be3a248ef0 --- /dev/null +++ b/akka-docs/src/main/paradox/cluster-singleton-typed.md @@ -0,0 +1,55 @@ +# Cluster Singleton + +@@@ warning + +This module is currently marked as @ref:[may change](common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + +@@@ + +To use the cluster singletons add the following dependency: + +@@dependency [sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-cluster-typed_2.12 + version=$akka.version$ +} + +For some use cases it is convenient and sometimes also mandatory to ensure that +you have exactly one actor of a certain type running somewhere in the cluster. + +Some examples: + + * single point of responsibility for certain cluster-wide consistent decisions, or +coordination of actions across the cluster system + * single entry point to an external system + * single master, many workers + * centralized naming service, or routing logic + +Using a singleton should not be the first design choice. It has several drawbacks, +such as single-point of bottleneck. Single-point of failure is also a relevant concern, +but for some cases this feature takes care of that by making sure that another singleton +instance will eventually be started. + +# Example + +Any `Behavior` can be run as a singleton. E.g. a basic counter: + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension } + +Then on every node in the cluster, or every node with a given role, use the `ClusterSingleton` extension +to spawn the singleton. Only a single instance will run in the cluster: + + +Scala +: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #singleton } + +Java +: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #singleton } + diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index d7d343fc6f..022d90986a 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -11,6 +11,7 @@ * [fault-tolerance](fault-tolerance-typed.md) * [actor-discovery](actor-discovery-typed.md) * [cluster](cluster-typed.md) +* [cluster-singleton](cluster-singleton-typed.md) * [cluster-sharding](cluster-sharding-typed.md) * [persistence](persistence-typed.md) * [testing](testing-typed.md) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala index 9f241f20bb..63efa5985c 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala @@ -125,6 +125,3 @@ object InDepthPersistentBehaviorSpec { //#behavior } -class InDepthPersistentBehaviorSpec { - -} diff --git a/build.sbt b/build.sbt index 7f17a0373a..477af127f6 100644 --- a/build.sbt +++ b/build.sbt @@ -415,7 +415,8 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") clusterSharding, testkit % "test->test", typedTestkit % "test->test", - actorTypedTests % "test->test" + actorTypedTests % "test->test", + persistenceTyped % "test->test" ) .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))