Typed docs for sharding and singleton (#24364)
This commit is contained in:
parent
f32cd63dc1
commit
719f0fb672
9 changed files with 287 additions and 11 deletions
|
|
@ -283,7 +283,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
|
||||||
}
|
}
|
||||||
|
|
||||||
@DoNotInherit
|
@DoNotInherit
|
||||||
sealed trait ClusterSharding extends Extension {
|
sealed abstract class ClusterSharding extends Extension {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
|
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,82 @@
|
||||||
|
package jdoc.akka.cluster.sharding.typed;
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorRef;
|
||||||
|
import akka.actor.typed.ActorSystem;
|
||||||
|
import akka.actor.typed.Behavior;
|
||||||
|
import akka.actor.typed.Props;
|
||||||
|
import akka.actor.typed.javadsl.Behaviors;
|
||||||
|
import akka.cluster.sharding.typed.ClusterSharding;
|
||||||
|
import akka.cluster.sharding.typed.*;
|
||||||
|
import akka.cluster.typed.ClusterSingleton;
|
||||||
|
import akka.cluster.typed.ClusterSingletonSettings;
|
||||||
|
|
||||||
|
public class ShardingCompileOnlyTest {
|
||||||
|
|
||||||
|
//#counter
|
||||||
|
interface CounterCommand {}
|
||||||
|
public static class Increment implements CounterCommand { }
|
||||||
|
public static class GoodByeCounter implements CounterCommand { }
|
||||||
|
|
||||||
|
public static class GetValue implements CounterCommand {
|
||||||
|
private final ActorRef<Integer> replyTo;
|
||||||
|
public GetValue(ActorRef<Integer> replyTo) {
|
||||||
|
this.replyTo = replyTo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Behavior<CounterCommand> counter(Integer value) {
|
||||||
|
return Behaviors.immutable(CounterCommand.class)
|
||||||
|
.onMessage(Increment.class, (ctx, msg) -> {
|
||||||
|
return counter(value + 1);
|
||||||
|
})
|
||||||
|
.onMessage(GetValue.class, (ctx, msg) -> {
|
||||||
|
msg.replyTo.tell(value);
|
||||||
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
//#counter
|
||||||
|
|
||||||
|
public static void example() {
|
||||||
|
|
||||||
|
ActorSystem system = ActorSystem.create(
|
||||||
|
Behaviors.empty(), "ShardingExample"
|
||||||
|
);
|
||||||
|
|
||||||
|
//#sharding-extension
|
||||||
|
ClusterSharding sharding = ClusterSharding.get(system);
|
||||||
|
//#sharding-extension
|
||||||
|
|
||||||
|
//#spawn
|
||||||
|
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
|
||||||
|
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
|
||||||
|
counter(0),
|
||||||
|
Props.empty(),
|
||||||
|
typeKey,
|
||||||
|
ClusterShardingSettings.create(system),
|
||||||
|
10,
|
||||||
|
new GoodByeCounter());
|
||||||
|
//#spawn
|
||||||
|
|
||||||
|
//#send
|
||||||
|
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
|
||||||
|
counterOne.tell(new Increment());
|
||||||
|
|
||||||
|
shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
|
||||||
|
//#send
|
||||||
|
|
||||||
|
//#singleton
|
||||||
|
ClusterSingleton singleton = ClusterSingleton.get(system);
|
||||||
|
// Start if needed and provide a proxy to a named singleton
|
||||||
|
ActorRef<CounterCommand> proxy = singleton.spawn(
|
||||||
|
counter(0),
|
||||||
|
"GlobalCounter",
|
||||||
|
Props.empty(),
|
||||||
|
ClusterSingletonSettings.create(system),
|
||||||
|
new GoodByeCounter()
|
||||||
|
);
|
||||||
|
|
||||||
|
proxy.tell(new Increment());
|
||||||
|
//#singleton
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,81 @@
|
||||||
|
package doc.akka.cluster.sharding.typed
|
||||||
|
|
||||||
|
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
|
import akka.cluster.typed.{ ClusterSingleton, ClusterSingletonSettings }
|
||||||
|
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec
|
||||||
|
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost }
|
||||||
|
|
||||||
|
object ShardingCompileOnlySpec {
|
||||||
|
|
||||||
|
val system = ActorSystem(Behaviors.empty, "Sharding")
|
||||||
|
|
||||||
|
//#sharding-extension
|
||||||
|
import akka.cluster.sharding.typed._
|
||||||
|
val sharding = ClusterSharding(system)
|
||||||
|
//#sharding-extension
|
||||||
|
|
||||||
|
//#counter
|
||||||
|
trait CounterCommand
|
||||||
|
case object Increment extends CounterCommand
|
||||||
|
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
|
||||||
|
case object GoodByeCounter extends CounterCommand
|
||||||
|
|
||||||
|
def counter(value: Int): Behavior[CounterCommand] = Behaviors.immutable[CounterCommand] {
|
||||||
|
case (ctx, Increment) ⇒
|
||||||
|
counter(value + 1)
|
||||||
|
case (ctx, GetValue(replyTo)) ⇒
|
||||||
|
replyTo ! value
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
//#counter
|
||||||
|
|
||||||
|
//#spawn
|
||||||
|
val TypeKey = EntityTypeKey[CounterCommand]("Counter")
|
||||||
|
// if a extractor is defined then the type would be ActorRef[BasicCommand]
|
||||||
|
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn[CounterCommand](
|
||||||
|
behavior = counter(0),
|
||||||
|
props = Props.empty,
|
||||||
|
typeKey = TypeKey,
|
||||||
|
settings = ClusterShardingSettings(system),
|
||||||
|
maxNumberOfShards = 10,
|
||||||
|
handOffStopMessage = GoodByeCounter)
|
||||||
|
//#spawn
|
||||||
|
|
||||||
|
//#send
|
||||||
|
// With an EntityRef
|
||||||
|
val counterOne: EntityRef[CounterCommand] = sharding.entityRefFor(TypeKey, "counter-1")
|
||||||
|
counterOne ! Increment
|
||||||
|
|
||||||
|
// Entity id is specified via an `ShardingEnvelope`
|
||||||
|
shardRegion ! ShardingEnvelope("counter-1", Increment)
|
||||||
|
//#send
|
||||||
|
|
||||||
|
//#persistence
|
||||||
|
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
|
||||||
|
ClusterSharding(system).spawn[BlogCommand](
|
||||||
|
behavior = InDepthPersistentBehaviorSpec.behavior,
|
||||||
|
props = Props.empty,
|
||||||
|
typeKey = ShardingTypeName,
|
||||||
|
settings = ClusterShardingSettings(system),
|
||||||
|
maxNumberOfShards = 10,
|
||||||
|
handOffStopMessage = PassivatePost)
|
||||||
|
//#persistence
|
||||||
|
|
||||||
|
// as a singleton
|
||||||
|
|
||||||
|
//#singleton
|
||||||
|
val singletonManager = ClusterSingleton(system)
|
||||||
|
// Start if needed and provide a proxy to a named singleton
|
||||||
|
val proxy: ActorRef[CounterCommand] = singletonManager.spawn(
|
||||||
|
behavior = counter(0),
|
||||||
|
"GlobalCounter",
|
||||||
|
Props.empty,
|
||||||
|
ClusterSingletonSettings(system),
|
||||||
|
terminationMessage = GoodByeCounter
|
||||||
|
)
|
||||||
|
|
||||||
|
proxy ! Increment
|
||||||
|
//#singleton
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -8,7 +8,6 @@ import akka.annotation.{ DoNotInherit, InternalApi }
|
||||||
import akka.cluster.ClusterSettings.DataCenter
|
import akka.cluster.ClusterSettings.DataCenter
|
||||||
import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings }
|
import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings }
|
||||||
import akka.cluster.typed.internal.AdaptedClusterSingletonImpl
|
import akka.cluster.typed.internal.AdaptedClusterSingletonImpl
|
||||||
import akka.actor.typed.internal.adapter.ActorSystemAdapter
|
|
||||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
|
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
@ -20,6 +19,11 @@ object ClusterSingletonSettings {
|
||||||
system: ActorSystem[_]
|
system: ActorSystem[_]
|
||||||
): ClusterSingletonSettings = fromConfig(system.settings.config.getConfig("akka.cluster"))
|
): ClusterSingletonSettings = fromConfig(system.settings.config.getConfig("akka.cluster"))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def create(system: ActorSystem[_]): ClusterSingletonSettings = apply(system)
|
||||||
|
|
||||||
def fromConfig(
|
def fromConfig(
|
||||||
config: Config
|
config: Config
|
||||||
): ClusterSingletonSettings = {
|
): ClusterSingletonSettings = {
|
||||||
|
|
@ -115,7 +119,7 @@ private[akka] object ClusterSingletonImpl {
|
||||||
* Not intended for user extension.
|
* Not intended for user extension.
|
||||||
*/
|
*/
|
||||||
@DoNotInherit
|
@DoNotInherit
|
||||||
trait ClusterSingleton extends Extension {
|
abstract class ClusterSingleton extends Extension {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start if needed and provide a proxy to a named singleton
|
* Start if needed and provide a proxy to a named singleton
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
# Sharding
|
# Cluster Sharding
|
||||||
|
|
||||||
@@@ warning
|
@@@ warning
|
||||||
|
|
||||||
|
|
@ -9,7 +9,7 @@ This module is currently marked as @ref:[may change](common/may-change.md) in th
|
||||||
|
|
||||||
@@@
|
@@@
|
||||||
|
|
||||||
To use the testkit add the following dependency:
|
To use cluster sharding add the following dependency:
|
||||||
|
|
||||||
@@dependency [sbt,Maven,Gradle] {
|
@@dependency [sbt,Maven,Gradle] {
|
||||||
group=com.typesafe.akka
|
group=com.typesafe.akka
|
||||||
|
|
@ -17,5 +17,60 @@ To use the testkit add the following dependency:
|
||||||
version=$akka.version$
|
version=$akka.version$
|
||||||
}
|
}
|
||||||
|
|
||||||
For an introduction to Akka Cluster concepts see [Cluster Specification]. This documentation shows how to use the typed
|
For an introduction to Sharding concepts see @ref:[Cluster Sharding](cluster-sharding.md). This documentation shows how to use the typed
|
||||||
Cluster API.
|
Cluster Sharding API.
|
||||||
|
|
||||||
|
## Basic example
|
||||||
|
|
||||||
|
Sharding is accessed via the `ClusterSharding` extension
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension }
|
||||||
|
|
||||||
|
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter }
|
||||||
|
|
||||||
|
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn }
|
||||||
|
|
||||||
|
Messages to a specific entity are then sent via an EntityRef.
|
||||||
|
It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send }
|
||||||
|
|
||||||
|
## Persistence example
|
||||||
|
|
||||||
|
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of
|
||||||
|
an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the
|
||||||
|
Java API [here](https://github.com/akka/akka/issues/24193).
|
||||||
|
|
||||||
|
Taking the larger example from the @ref:[persistence documentation](persistence-typed.md#larger-example) and making it into
|
||||||
|
a sharded entity is the same as for a non persistent behavior. The behavior:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior }
|
||||||
|
|
||||||
|
To create the entity:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence }
|
||||||
|
|
||||||
|
Sending messages to entities is the same as the example above. The only difference is ow when an entity is moved the state will be restored.
|
||||||
|
See @ref:[persistence](persistence-typed.md) for more details.
|
||||||
|
|
|
||||||
55
akka-docs/src/main/paradox/cluster-singleton-typed.md
Normal file
55
akka-docs/src/main/paradox/cluster-singleton-typed.md
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
# Cluster Singleton
|
||||||
|
|
||||||
|
@@@ warning
|
||||||
|
|
||||||
|
This module is currently marked as @ref:[may change](common/may-change.md) in the sense
|
||||||
|
of being the subject of active research. This means that API or semantics can
|
||||||
|
change without warning or deprecation period and it is not recommended to use
|
||||||
|
this module in production just yet—you have been warned.
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
To use the cluster singletons add the following dependency:
|
||||||
|
|
||||||
|
@@dependency [sbt,Maven,Gradle] {
|
||||||
|
group=com.typesafe.akka
|
||||||
|
artifact=akka-cluster-typed_2.12
|
||||||
|
version=$akka.version$
|
||||||
|
}
|
||||||
|
|
||||||
|
For some use cases it is convenient and sometimes also mandatory to ensure that
|
||||||
|
you have exactly one actor of a certain type running somewhere in the cluster.
|
||||||
|
|
||||||
|
Some examples:
|
||||||
|
|
||||||
|
* single point of responsibility for certain cluster-wide consistent decisions, or
|
||||||
|
coordination of actions across the cluster system
|
||||||
|
* single entry point to an external system
|
||||||
|
* single master, many workers
|
||||||
|
* centralized naming service, or routing logic
|
||||||
|
|
||||||
|
Using a singleton should not be the first design choice. It has several drawbacks,
|
||||||
|
such as single-point of bottleneck. Single-point of failure is also a relevant concern,
|
||||||
|
but for some cases this feature takes care of that by making sure that another singleton
|
||||||
|
instance will eventually be started.
|
||||||
|
|
||||||
|
# Example
|
||||||
|
|
||||||
|
Any `Behavior` can be run as a singleton. E.g. a basic counter:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #sharding-extension }
|
||||||
|
|
||||||
|
Then on every node in the cluster, or every node with a given role, use the `ClusterSingleton` extension
|
||||||
|
to spawn the singleton. Only a single instance will run in the cluster:
|
||||||
|
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [ShardingCompileOnlySpec.scala]($akka$/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #singleton }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [ShardingCompileOnlyTest.java]($akka$/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #singleton }
|
||||||
|
|
||||||
|
|
@ -11,6 +11,7 @@
|
||||||
* [fault-tolerance](fault-tolerance-typed.md)
|
* [fault-tolerance](fault-tolerance-typed.md)
|
||||||
* [actor-discovery](actor-discovery-typed.md)
|
* [actor-discovery](actor-discovery-typed.md)
|
||||||
* [cluster](cluster-typed.md)
|
* [cluster](cluster-typed.md)
|
||||||
|
* [cluster-singleton](cluster-singleton-typed.md)
|
||||||
* [cluster-sharding](cluster-sharding-typed.md)
|
* [cluster-sharding](cluster-sharding-typed.md)
|
||||||
* [persistence](persistence-typed.md)
|
* [persistence](persistence-typed.md)
|
||||||
* [testing](testing-typed.md)
|
* [testing](testing-typed.md)
|
||||||
|
|
|
||||||
|
|
@ -125,6 +125,3 @@ object InDepthPersistentBehaviorSpec {
|
||||||
//#behavior
|
//#behavior
|
||||||
}
|
}
|
||||||
|
|
||||||
class InDepthPersistentBehaviorSpec {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -415,7 +415,8 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
|
||||||
clusterSharding,
|
clusterSharding,
|
||||||
testkit % "test->test",
|
testkit % "test->test",
|
||||||
typedTestkit % "test->test",
|
typedTestkit % "test->test",
|
||||||
actorTypedTests % "test->test"
|
actorTypedTests % "test->test",
|
||||||
|
persistenceTyped % "test->test"
|
||||||
)
|
)
|
||||||
.settings(AkkaBuild.mayChangeSettings)
|
.settings(AkkaBuild.mayChangeSettings)
|
||||||
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
|
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue