Passivate for Typed Sharding, #24478

* also moved singleton doc sample to own file
* and mv /doc/ to /docs/ as in other places
This commit is contained in:
Patrik Nordwall 2018-06-11 11:53:02 +02:00
parent c9ea0309c9
commit 0f56a6d1ed
12 changed files with 432 additions and 117 deletions

View file

@ -5,25 +5,30 @@
package akka.cluster.sharding.typed package akka.cluster.sharding.typed
package internal package internal
import java.net.URLEncoder
import java.util.Optional import java.util.Optional
import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } import java.util.concurrent.{ CompletionStage, ConcurrentHashMap }
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._ import scala.compat.java8.FutureConverters._
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor.ExtendedActorSystem
import akka.actor.{ InternalActorRef, Scheduler } import akka.actor.{ InternalActorRef, Scheduler }
import akka.actor.typed.ActorContext
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.internal.adapter.ActorRefAdapter import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity } import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.sharding.typed.javadsl.EntityIdToBehavior import akka.cluster.sharding.typed.javadsl.EntityFactory
import akka.cluster.typed.Cluster import akka.cluster.typed.Cluster
import akka.event.Logging import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
@ -31,6 +36,7 @@ import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef import akka.pattern.PromiseActorRef
import akka.util.Timeout import akka.util.Timeout
import akka.japi.function.{ Function JFunction } import akka.japi.function.{ Function JFunction }
import akka.util.ByteString
/** /**
* INTERNAL API * INTERNAL API
@ -91,9 +97,10 @@ import akka.japi.function.{ Function ⇒ JFunction }
// typeKey.name to messageClassName // typeKey.name to messageClassName
private val regions: ConcurrentHashMap[String, String] = new ConcurrentHashMap private val regions: ConcurrentHashMap[String, String] = new ConcurrentHashMap
private val proxies: ConcurrentHashMap[String, String] = new ConcurrentHashMap private val proxies: ConcurrentHashMap[String, String] = new ConcurrentHashMap
private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap
override def spawn[A]( override def spawn[A](
behavior: String Behavior[A], behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[A],
entityProps: Props, entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A], typeKey: scaladsl.EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
@ -104,7 +111,7 @@ import akka.japi.function.{ Function ⇒ JFunction }
} }
override def spawn[A]( override def spawn[A](
behavior: EntityIdToBehavior[A], behavior: EntityFactory[A],
entityProps: Props, entityProps: Props,
typeKey: javadsl.EntityTypeKey[A], typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
@ -115,8 +122,8 @@ import akka.japi.function.{ Function ⇒ JFunction }
Optional.of(defaultShardAllocationStrategy(settings))) Optional.of(defaultShardAllocationStrategy(settings)))
} }
override def spawnWithMessageExtractor[E, A]( def spawnWithMessageExtractor[E, A](
behavior: String Behavior[A], behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[A],
entityProps: Props, entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A], typeKey: scaladsl.EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
@ -140,8 +147,20 @@ import akka.japi.function.{ Function ⇒ JFunction }
if (settings.shouldHostShard(cluster)) { if (settings.shouldHostShard(cluster)) {
log.info("Starting Shard Region [{}]...", typeKey.name) log.info("Starting Shard Region [{}]...", typeKey.name)
val shardCommandDelegator =
shardCommandActors.computeIfAbsent(
typeKey.name,
new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] {
override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = {
// using untyped.systemActorOf to avoid the Future[ActorRef]
system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf(
PropsAdapter(ShardCommandActor.behavior(extractor.handOffStopMessage)),
URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator")
}
})
val untypedEntityPropsFactory: String akka.actor.Props = { entityId val untypedEntityPropsFactory: String akka.actor.Props = { entityId
PropsAdapter(behavior(entityId), entityProps) PropsAdapter(behavior(shardCommandDelegator, entityId), entityProps)
} }
untypedSharding.internalStart( untypedSharding.internalStart(
@ -178,13 +197,13 @@ import akka.japi.function.{ Function ⇒ JFunction }
} }
override def spawnWithMessageExtractor[E, A]( override def spawnWithMessageExtractor[E, A](
behavior: EntityIdToBehavior[A], behavior: EntityFactory[A],
entityProps: Props, entityProps: Props,
typeKey: javadsl.EntityTypeKey[A], typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A], extractor: ShardingMessageExtractor[E, A],
allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = { allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = {
spawnWithMessageExtractor(entityId behavior.apply(entityId), entityProps, typeKey.asScala, spawnWithMessageExtractor((shard, entityId) behavior.apply(shard, entityId), entityProps, typeKey.asScala,
settings, extractor, allocationStrategy.asScala) settings, extractor, allocationStrategy.asScala)
} }
@ -255,3 +274,32 @@ import akka.japi.function.{ Function ⇒ JFunction }
} }
} }
/**
* INTERNAL API
*/
@InternalApi private[akka] object ShardCommandActor {
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.sharding.ShardRegion.{ Passivate UntypedPassivate }
def behavior(stopMessage: Any): Behavior[scaladsl.ClusterSharding.ShardCommand] = {
def sendUntypedPassivate(entity: ActorRef[_], ctx: ActorContext[_]): Unit = {
val pathToShard = entity.toUntyped.path.elements.take(4).mkString("/")
ctx.asScala.system.toUntyped.actorSelection(pathToShard).tell(UntypedPassivate(stopMessage), entity.toUntyped)
}
Behaviors.receive { (ctx, msg)
msg match {
case scaladsl.ClusterSharding.Passivate(entity)
sendUntypedPassivate(entity, ctx)
Behaviors.same
case javadsl.ClusterSharding.Passivate(entity)
sendUntypedPassivate(entity, ctx)
Behaviors.same
case _
Behaviors.unhandled
}
}
}
}

View file

@ -8,7 +8,6 @@ package javadsl
import java.util.Optional import java.util.Optional
import java.util.concurrent.CompletionStage import java.util.concurrent.CompletionStage
import akka.actor.Scheduler
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
@ -21,13 +20,32 @@ import akka.japi.function.{ Function ⇒ JFunction }
import akka.util.Timeout import akka.util.Timeout
@FunctionalInterface @FunctionalInterface
trait EntityIdToBehavior[A] { trait EntityFactory[A] {
def apply(entityId: String): Behavior[A] def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[A]
} }
object ClusterSharding { object ClusterSharding {
def get(system: ActorSystem[_]): ClusterSharding = def get(system: ActorSystem[_]): ClusterSharding =
scaladsl.ClusterSharding(system).asJava scaladsl.ClusterSharding(system).asJava
/**
* When an entity is created an `ActorRef[ShardCommand]` is passed to the
* factory method. The entity can request passivation by sending the [[Passivate]]
* message to this ref. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop itself.
*
* Not for user extension.
*/
@DoNotInherit trait ShardCommand extends scaladsl.ClusterSharding.ShardCommand
/**
* The entity can request passivation by sending the [[Passivate]] message
* to the `ActorRef[ShardCommand]` that was passed in to the factory method
* when creating the entity. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop
* itself.
*/
final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand
} }
/** /**
@ -130,8 +148,9 @@ object ClusterSharding {
* the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
* If a message is already enqueued to the entity when it stops itself the enqueued message * If a message is already enqueued to the entity when it stops itself the enqueued message
* in the mailbox will be dropped. To support graceful passivation without losing such * in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]`
* The specified wrapped message in `Passivate` will be sent back to the entity, which is * that was passed in to the factory method when creating the entity..
* The specified `handOffStopMessage` message will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages * between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity. * are thereafter delivered to a new incarnation of the entity.
@ -158,7 +177,7 @@ abstract class ClusterSharding {
* @tparam A The type of command the entity accepts * @tparam A The type of command the entity accepts
*/ */
def spawn[A]( def spawn[A](
behavior: EntityIdToBehavior[A], behavior: EntityFactory[A],
props: Props, props: Props,
typeKey: EntityTypeKey[A], typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
@ -179,7 +198,7 @@ abstract class ClusterSharding {
* @tparam A The type of command the entity accepts * @tparam A The type of command the entity accepts
*/ */
def spawnWithMessageExtractor[E, A]( def spawnWithMessageExtractor[E, A](
behavior: EntityIdToBehavior[A], behavior: EntityFactory[A],
entityProps: Props, entityProps: Props,
typeKey: EntityTypeKey[A], typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,

View file

@ -7,7 +7,6 @@ package scaladsl
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor.Scheduler
import akka.util.Timeout import akka.util.Timeout
import scala.reflect.ClassTag import scala.reflect.ClassTag
@ -30,6 +29,25 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
override def createExtension(system: ActorSystem[_]): ClusterSharding = override def createExtension(system: ActorSystem[_]): ClusterSharding =
new ClusterShardingImpl(system) new ClusterShardingImpl(system)
/**
* When an entity is created an `ActorRef[ShardCommand]` is passed to the
* factory method. The entity can request passivation by sending the [[Passivate]]
* message to this ref. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop itself.
*
* Not for user extension.
*/
@DoNotInherit trait ShardCommand
/**
* The entity can request passivation by sending the [[Passivate]] message
* to the `ActorRef[ShardCommand]` that was passed in to the factory method
* when creating the entity. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop
* itself.
*/
final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand
} }
/** /**
@ -132,8 +150,9 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
* If a message is already enqueued to the entity when it stops itself the enqueued message * If a message is already enqueued to the entity when it stops itself the enqueued message
* in the mailbox will be dropped. To support graceful passivation without losing such * in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. * messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]`
* The specified wrapped message in `Passivate` will be sent back to the entity, which is * that was passed in to the factory method when creating the entity..
* The specified `handOffStopMessage` message will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages * between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity. * are thereafter delivered to a new incarnation of the entity.
@ -144,6 +163,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
*/ */
@DoNotInherit @DoNotInherit
trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
import ClusterSharding.ShardCommand
/** /**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has * Spawn a shard region or a proxy depending on if the settings require role and if this node has
@ -160,7 +180,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
* @tparam A The type of command the entity accepts * @tparam A The type of command the entity accepts
*/ */
def spawn[A]( def spawn[A](
behavior: String Behavior[A], behavior: (ActorRef[ShardCommand], String) Behavior[A],
props: Props, props: Props,
typeKey: EntityTypeKey[A], typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
@ -181,7 +201,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
* @tparam A The type of command the entity accepts * @tparam A The type of command the entity accepts
*/ */
def spawnWithMessageExtractor[E, A]( def spawnWithMessageExtractor[E, A](
behavior: String Behavior[A], behavior: (ActorRef[ShardCommand], String) Behavior[A],
entityProps: Props, entityProps: Props,
typeKey: EntityTypeKey[A], typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,

View file

@ -67,7 +67,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"start sharding" in { "start sharding" in {
val sharding = ClusterSharding(typedSystem) val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn( val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn(
_ multiDcPinger, (_, _) multiDcPinger,
Props.empty, Props.empty,
typeKey = typeKey, typeKey = typeKey,
ClusterShardingSettings(typedSystem), ClusterShardingSettings(typedSystem),
@ -100,7 +100,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"be able to message cross dc via proxy" in { "be able to message cross dc via proxy" in {
runOn(first, second) { runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn( val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn(
_ multiDcPinger, (_, _) multiDcPinger,
Props.empty, Props.empty,
typeKey = typeKey, typeKey = typeKey,
ClusterShardingSettings(typedSystem).withDataCenter("dc2"), ClusterShardingSettings(typedSystem).withDataCenter("dc2"),

View file

@ -0,0 +1,120 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import java.time.Duration;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.Props;
import akka.actor.typed.javadsl.Behaviors;
//#import
import akka.cluster.sharding.typed.ClusterShardingSettings;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
//#import
public class ShardingCompileOnlyTest {
//#counter-messages
interface CounterCommand {}
public static class Increment implements CounterCommand { }
public static class GoodByeCounter implements CounterCommand { }
public static class GetValue implements CounterCommand {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
//#counter-messages
//#counter
public static Behavior<CounterCommand> counter(String entityId, Integer value) {
return Behaviors.receive(CounterCommand.class)
.onMessage(Increment.class, (ctx, msg) -> {
return counter(entityId,value + 1);
})
.onMessage(GetValue.class, (ctx, msg) -> {
msg.replyTo.tell(value);
return Behaviors.same();
})
.onMessage(GoodByeCounter.class, (ctx, msg) -> {
return Behaviors.stopped();
})
.build();
}
//#counter
//#counter-passivate
public static class Idle implements CounterCommand { }
public static Behavior<CounterCommand> counter2(ActorRef<ClusterSharding.ShardCommand> shard, String entityId) {
return Behaviors.setup(ctx -> {
ctx.setReceiveTimeout(Duration.ofSeconds(30), new Idle());
return counter2(shard, entityId, 0);
});
}
private static Behavior<CounterCommand> counter2(
ActorRef<ClusterSharding.ShardCommand> shard,
String entityId,
Integer value) {
return Behaviors.receive(CounterCommand.class)
.onMessage(Increment.class, (ctx, msg) -> {
return counter(entityId,value + 1);
})
.onMessage(GetValue.class, (ctx, msg) -> {
msg.replyTo.tell(value);
return Behaviors.same();
})
.onMessage(Idle.class, (ctx, msg) -> {
// after receive timeout
shard.tell(new ClusterSharding.Passivate<>(ctx.getSelf()));
return Behaviors.same();
})
.onMessage(GoodByeCounter.class, (ctx, msg) -> {
// the handOffStopMessage, used for rebalance and passivate
return Behaviors.stopped();
})
.build();
}
//#counter-passivate
public static void example() {
ActorSystem system = ActorSystem.create(
Behaviors.empty(), "ShardingExample"
);
//#sharding-extension
ClusterSharding sharding = ClusterSharding.get(system);
//#sharding-extension
//#spawn
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
(shard, entityId) -> counter(entityId,0),
Props.empty(),
typeKey,
ClusterShardingSettings.create(system),
10,
new GoodByeCounter());
//#spawn
//#send
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
counterOne.tell(new Increment());
shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
//#send
}
}

View file

@ -70,8 +70,13 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
Cluster(system).manager ! Join(Cluster(system).selfMember.address) Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in { "start persistent actor" in {
ClusterSharding(system).spawn[Command](persistentActor, Props.empty, typeKey, ClusterSharding(system).spawn[Command](
ClusterShardingSettings(system), maxNumberOfShards = 100, handOffStopMessage = StopPlz) (_, entityId) persistentActor(entityId),
Props.empty,
typeKey,
ClusterShardingSettings(system),
maxNumberOfShards = 100,
handOffStopMessage = StopPlz)
val p = TestProbe[String]() val p = TestProbe[String]()

View file

@ -7,6 +7,8 @@ package akka.cluster.sharding.typed.scaladsl
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.Done
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -63,6 +65,7 @@ object ClusterShardingSpec {
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol final case class StopPlz() extends TestProtocol
final case class PassivatePlz() extends TestProtocol
sealed trait IdTestProtocol extends java.io.Serializable sealed trait IdTestProtocol extends java.io.Serializable
final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol
@ -72,12 +75,13 @@ object ClusterShardingSpec {
class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 48 def identifier: Int = 48
def manifest(o: AnyRef): String = o match { def manifest(o: AnyRef): String = o match {
case _: ReplyPlz "a" case _: ReplyPlz "a"
case _: WhoAreYou "b" case _: WhoAreYou "b"
case _: StopPlz "c" case _: StopPlz "c"
case _: IdReplyPlz "A" case _: PassivatePlz "d"
case _: IdWhoAreYou "B" case _: IdReplyPlz "A"
case _: IdStopPlz "C" case _: IdWhoAreYou "B"
case _: IdStopPlz "C"
} }
private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] = private def actorRefToBinary(ref: ActorRef[_]): Array[Byte] =
@ -94,6 +98,7 @@ object ClusterShardingSpec {
case ReplyPlz(ref) actorRefToBinary(ref) case ReplyPlz(ref) actorRefToBinary(ref)
case WhoAreYou(ref) actorRefToBinary(ref) case WhoAreYou(ref) actorRefToBinary(ref)
case _: StopPlz Array.emptyByteArray case _: StopPlz Array.emptyByteArray
case _: PassivatePlz Array.emptyByteArray
case IdReplyPlz(id, ref) idAndRefToBinary(id, ref) case IdReplyPlz(id, ref) idAndRefToBinary(id, ref)
case IdWhoAreYou(id, ref) idAndRefToBinary(id, ref) case IdWhoAreYou(id, ref) idAndRefToBinary(id, ref)
case _: IdStopPlz Array.emptyByteArray case _: IdStopPlz Array.emptyByteArray
@ -113,6 +118,7 @@ object ClusterShardingSpec {
case "a" ReplyPlz(actorRefFromBinary(bytes)) case "a" ReplyPlz(actorRefFromBinary(bytes))
case "b" WhoAreYou(actorRefFromBinary(bytes)) case "b" WhoAreYou(actorRefFromBinary(bytes))
case "c" StopPlz() case "c" StopPlz()
case "d" PassivatePlz()
case "A" IdReplyPlz.tupled(idAndRefFromBinary(bytes)) case "A" IdReplyPlz.tupled(idAndRefFromBinary(bytes))
case "B" IdWhoAreYou.tupled(idAndRefFromBinary(bytes)) case "B" IdWhoAreYou.tupled(idAndRefFromBinary(bytes))
case "C" IdStopPlz() case "C" IdStopPlz()
@ -135,22 +141,28 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
} }
private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard") private val typeKey = EntityTypeKey[TestProtocol]("envelope-shard")
private val behavior = Behaviors.receive[TestProtocol] { private def behavior(shard: ActorRef[ClusterSharding.ShardCommand], stopProbe: Option[ActorRef[Done]] = None) =
case (_, StopPlz()) Behaviors.receive[TestProtocol] {
Behaviors.stopped case (ctx, PassivatePlz())
shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
case (ctx, WhoAreYou(replyTo)) case (_, StopPlz())
val address = Cluster(ctx.system).selfMember.address stopProbe.foreach(_ ! Done)
replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" Behaviors.stopped
Behaviors.same
case (_, ReplyPlz(toMe)) case (ctx, WhoAreYou(replyTo))
toMe ! "Hello!" val address = Cluster(ctx.system).selfMember.address
Behaviors.same replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}"
} Behaviors.same
case (_, ReplyPlz(toMe))
toMe ! "Hello!"
Behaviors.same
}
private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") private val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
private val behaviorWithId = Behaviors.receive[IdTestProtocol] { private def behaviorWithId(shard: ActorRef[ClusterSharding.ShardCommand]) = Behaviors.receive[IdTestProtocol] {
case (_, IdStopPlz()) case (_, IdStopPlz())
Behaviors.stopped Behaviors.stopped
@ -165,7 +177,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
} }
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn( private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn(
_ behavior, (shard, _) behavior(shard),
Props.empty, Props.empty,
typeKey, typeKey,
ClusterShardingSettings(system), ClusterShardingSettings(system),
@ -173,7 +185,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
StopPlz()) StopPlz())
private val shardingRef2 = sharding2.spawn( private val shardingRef2 = sharding2.spawn(
_ behavior, (shard, _) behavior(shard),
Props.empty, Props.empty,
typeKey, typeKey,
ClusterShardingSettings(system2), ClusterShardingSettings(system2),
@ -181,7 +193,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
StopPlz()) StopPlz())
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor( private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor(
_ behaviorWithId, (shard, _) behaviorWithId(shard),
Props.empty, Props.empty,
typeKey2, typeKey2,
ClusterShardingSettings(system), ClusterShardingSettings(system),
@ -193,7 +205,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
None) None)
private val shardingRef4 = sharding2.spawnWithMessageExtractor( private val shardingRef4 = sharding2.spawnWithMessageExtractor(
_ behaviorWithId, (shard, _) behaviorWithId(shard),
Props.empty, Props.empty,
typeKey2, typeKey2,
ClusterShardingSettings(system2), ClusterShardingSettings(system2),
@ -246,11 +258,34 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
} }
} }
"be able to passivate" in {
val stopProbe = TestProbe[Done]()
val p = TestProbe[String]()
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn(
(shard, _) behavior(shard, Some(stopProbe.ref)),
Props.empty,
typeKey3,
ClusterShardingSettings(system),
10,
StopPlz())
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
shardingRef3 ! ShardingEnvelope(s"test1", PassivatePlz())
stopProbe.expectMessage(Done)
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
}
"fail if starting sharding for already used typeName, but with a different type" in { "fail if starting sharding for already used typeName, but with a different type" in {
// sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard") // sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] { val ex = intercept[Exception] {
sharding.spawn( sharding.spawn(
_ behaviorWithId, (shard, _) behaviorWithId(shard),
Props.empty, Props.empty,
EntityTypeKey[IdTestProtocol]("envelope-shard"), EntityTypeKey[IdTestProtocol]("envelope-shard"),
ClusterShardingSettings(system), ClusterShardingSettings(system),

View file

@ -2,11 +2,11 @@
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/ */
package doc.akka.cluster.sharding.typed package docs.akka.cluster.sharding.typed
import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props } import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.{ ClusterSingleton, ClusterSingletonSettings }
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost } import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost }
@ -24,26 +24,32 @@ object ShardingCompileOnlySpec {
val sharding = ClusterSharding(system) val sharding = ClusterSharding(system)
//#sharding-extension //#sharding-extension
//#counter //#counter-messages
trait CounterCommand trait CounterCommand
case object Increment extends CounterCommand case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand case object GoodByeCounter extends CounterCommand
//#counter-messages
def counter(entityId: String, value: Int): Behavior[CounterCommand] = Behaviors.receiveMessage[CounterCommand] { //#counter
case Increment
counter(entityId, value + 1) def counter(entityId: String, value: Int): Behavior[CounterCommand] =
case GetValue(replyTo) Behaviors.receiveMessage[CounterCommand] {
replyTo ! value case Increment
Behaviors.same counter(entityId, value + 1)
} case GetValue(replyTo)
replyTo ! value
Behaviors.same
case GoodByeCounter
Behaviors.stopped
}
//#counter //#counter
//#spawn //#spawn
val TypeKey = EntityTypeKey[CounterCommand]("Counter") val TypeKey = EntityTypeKey[CounterCommand]("Counter")
// if a extractor is defined then the type would be ActorRef[BasicCommand] // if a extractor is defined then the type would be ActorRef[BasicCommand]
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn( val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn(
behavior = entityId counter(entityId, 0), behavior = (shard, entityId) counter(entityId, 0),
props = Props.empty, props = Props.empty,
typeKey = TypeKey, typeKey = TypeKey,
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
@ -60,10 +66,11 @@ object ShardingCompileOnlySpec {
shardRegion ! ShardingEnvelope("counter-1", Increment) shardRegion ! ShardingEnvelope("counter-1", Increment)
//#send //#send
import InDepthPersistentBehaviorSpec.behavior
//#persistence //#persistence
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost") val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).spawn[BlogCommand]( ClusterSharding(system).spawn[BlogCommand](
behavior = entityId InDepthPersistentBehaviorSpec.behavior(entityId), behavior = (shard, entityId) behavior(entityId),
props = Props.empty, props = Props.empty,
typeKey = ShardingTypeName, typeKey = ShardingTypeName,
settings = ClusterShardingSettings(system), settings = ClusterShardingSettings(system),
@ -71,20 +78,33 @@ object ShardingCompileOnlySpec {
handOffStopMessage = PassivatePost) handOffStopMessage = PassivatePost)
//#persistence //#persistence
// as a singleton //#counter-passivate
//#singleton case object Idle extends CounterCommand
val singletonManager = ClusterSingleton(system)
// Start if needed and provide a proxy to a named singleton
val proxy: ActorRef[CounterCommand] = singletonManager.spawn(
behavior = counter("TheCounter", 0),
"GlobalCounter",
Props.empty,
ClusterSingletonSettings(system),
terminationMessage = GoodByeCounter
)
proxy ! Increment def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
//#singleton Behaviors.setup { ctx
def become(value: Int): Behavior[CounterCommand] =
Behaviors.receiveMessage[CounterCommand] {
case Increment
become(value + 1)
case GetValue(replyTo)
replyTo ! value
Behaviors.same
case Idle
// after receive timeout
shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
case GoodByeCounter
// the handOffStopMessage, used for rebalance and passivate
Behaviors.stopped
}
ctx.setReceiveTimeout(30.seconds, Idle)
become(0)
}
}
//#counter-passivate
} }

View file

@ -2,26 +2,21 @@
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com> * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/ */
package jdoc.akka.cluster.sharding.typed; package jdocs.akka.cluster.typed;
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem; import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.actor.typed.Props; import akka.actor.typed.Props;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
//#import
import akka.cluster.typed.ClusterSingleton; import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.ClusterSingletonSettings; import akka.cluster.typed.ClusterSingletonSettings;
//#import //#import
import akka.cluster.sharding.typed.ClusterShardingSettings;
import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
//#import public class SingletonCompileOnlyTest {
public class ShardingCompileOnlyTest {
//#counter //#counter
interface CounterCommand {} interface CounterCommand {}
@ -44,6 +39,9 @@ public class ShardingCompileOnlyTest {
msg.replyTo.tell(value); msg.replyTo.tell(value);
return Behaviors.same(); return Behaviors.same();
}) })
.onMessage(GoodByeCounter.class, (ctx, msg) -> {
return Behaviors.stopped();
})
.build(); .build();
} }
//#counter //#counter
@ -51,31 +49,9 @@ public class ShardingCompileOnlyTest {
public static void example() { public static void example() {
ActorSystem system = ActorSystem.create( ActorSystem system = ActorSystem.create(
Behaviors.empty(), "ShardingExample" Behaviors.empty(), "SingletonExample"
); );
//#sharding-extension
ClusterSharding sharding = ClusterSharding.get(system);
//#sharding-extension
//#spawn
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
entityId -> counter(entityId,0),
Props.empty(),
typeKey,
ClusterShardingSettings.create(system),
10,
new GoodByeCounter());
//#spawn
//#send
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
counterOne.tell(new Increment());
shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
//#send
//#singleton //#singleton
ClusterSingleton singleton = ClusterSingleton.get(system); ClusterSingleton singleton = ClusterSingleton.get(system);
// Start if needed and provide a proxy to a named singleton // Start if needed and provide a proxy to a named singleton

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.scaladsl.Behaviors
object SingletonCompileOnlySpec {
val system = ActorSystem(Behaviors.empty, "Singleton")
//#counter
trait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand
def counter(entityId: String, value: Int): Behavior[CounterCommand] =
Behaviors.receiveMessage[CounterCommand] {
case Increment
counter(entityId, value + 1)
case GetValue(replyTo)
replyTo ! value
Behaviors.same
case GoodByeCounter
Behaviors.stopped
}
//#counter
//#singleton
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.ClusterSingletonSettings
val singletonManager = ClusterSingleton(system)
// Start if needed and provide a proxy to a named singleton
val proxy: ActorRef[CounterCommand] = singletonManager.spawn(
behavior = counter("TheCounter", 0),
"GlobalCounter",
Props.empty,
ClusterSingletonSettings(system),
terminationMessage = GoodByeCounter
)
proxy ! Increment
//#singleton
}

View file

@ -29,35 +29,35 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in
Sharding is accessed via the `ClusterSharding` extension Sharding is accessed via the `ClusterSharding` extension
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension } : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #sharding-extension }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #import #sharding-extension }
It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter: It is common for sharding to be used with persistence however any Behavior can be used with sharding e.g. a basic counter:
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter }
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier. Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn } : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn }
Messages to a specific entity are then sent via an EntityRef. Messages to a specific entity are then sent via an EntityRef.
It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region. It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region.
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send } : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #send }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send } : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #send }
## Persistence example ## Persistence example
@ -74,7 +74,27 @@ Scala
To create the entity: To create the entity:
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence } : @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence }
Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored. Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored.
See @ref:[persistence](persistence.md) for more details. See @ref:[persistence](persistence.md) for more details.
## Passivation
If the state of the entities are persistent you may stop entities that are not used to
reduce memory consumption. This is done by the application specific implementation of
the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
If a message is already enqueued to the entity when it stops itself the enqueued message
in the mailbox will be dropped. To support graceful passivation without losing such
messages the entity actor can send `ClusterSharding.Passivate` to to the
@scala:[`ActorRef[ShardCommand]`]@java:[`ActorRef<ShardCommand>`] that was passed in to
the factory method when creating the entity. The specified `handOffStopMessage` message
will be sent back to the entity, which is then supposed to stop itself. Incoming messages
will be buffered by the `Shard` between reception of `Passivate` and termination of the
entity. Such buffered messages are thereafter delivered to a new incarnation of the entity.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate }

View file

@ -42,20 +42,20 @@ instance will eventually be started.
Any `Behavior` can be run as a singleton. E.g. a basic counter: Any `Behavior` can be run as a singleton. E.g. a basic counter:
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter } : @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #counter }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter } : @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #counter }
Then on every node in the cluster, or every node with a given role, use the `ClusterSingleton` extension Then on every node in the cluster, or every node with a given role, use the `ClusterSingleton` extension
to spawn the singleton. An instance will per data centre of the cluster: to spawn the singleton. An instance will per data centre of the cluster:
Scala Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/doc/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #singleton } : @@snip [SingletonCompileOnlySpec.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/SingletonCompileOnlySpec.scala) { #singleton }
Java Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdoc/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #singleton } : @@snip [SingletonCompileOnlyTest.java](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/SingletonCompileOnlyTest.java) { #import #singleton }
## Accessing singleton of another data centre ## Accessing singleton of another data centre