add entity id to sharding props (#24053)
This commit is contained in:
parent
c17a11ddd2
commit
a8e5f48f36
6 changed files with 246 additions and 59 deletions
|
|
@ -0,0 +1,2 @@
|
||||||
|
# 24058 - Add ClusterSharding.start overloads
|
||||||
|
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardRegion.props")
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.cluster.sharding
|
||||||
import java.net.URLEncoder
|
import java.net.URLEncoder
|
||||||
import java.util.Optional
|
import java.util.Optional
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.function.{ Function ⇒ JFunction }
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
|
@ -20,7 +21,6 @@ import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.actor.PoisonPill
|
import akka.actor.PoisonPill
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.cluster.Cluster
|
import akka.cluster.Cluster
|
||||||
import akka.cluster.ddata.DistributedData
|
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.pattern.BackoffSupervisor
|
import akka.pattern.BackoffSupervisor
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
@ -33,7 +33,6 @@ import scala.util.control.NonFatal
|
||||||
import akka.actor.Status
|
import akka.actor.Status
|
||||||
import akka.cluster.ClusterSettings
|
import akka.cluster.ClusterSettings
|
||||||
import akka.cluster.ClusterSettings.DataCenter
|
import akka.cluster.ClusterSettings.DataCenter
|
||||||
import akka.stream.{ Inlet, Outlet }
|
|
||||||
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
@ -221,7 +220,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
requireClusterRole(settings.role)
|
requireClusterRole(settings.role)
|
||||||
implicit val timeout = system.settings.CreationTimeout
|
implicit val timeout = system.settings.CreationTimeout
|
||||||
val startMsg = Start(typeName, entityProps, settings,
|
val startMsg = Start(typeName, _ ⇒ entityProps, settings,
|
||||||
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
|
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
|
||||||
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
|
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
|
||||||
regions.put(typeName, shardRegion)
|
regions.put(typeName, shardRegion)
|
||||||
|
|
@ -260,7 +259,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
||||||
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
|
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
|
||||||
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
|
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
|
||||||
|
|
||||||
start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
|
start(typeName, _ ⇒ entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -292,7 +291,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
||||||
|
|
||||||
start(
|
start(
|
||||||
typeName,
|
typeName,
|
||||||
entityProps,
|
_ ⇒ entityProps,
|
||||||
settings,
|
settings,
|
||||||
extractEntityId = {
|
extractEntityId = {
|
||||||
case msg if messageExtractor.entityId(msg) ne null ⇒
|
case msg if messageExtractor.entityId(msg) ne null ⇒
|
||||||
|
|
@ -335,6 +334,154 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
||||||
start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill)
|
start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
|
||||||
|
* the entity actor and functions to extract entity and shard identifier from messages. The
|
||||||
|
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
|
||||||
|
*
|
||||||
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
||||||
|
* of the `reference.conf`.
|
||||||
|
*
|
||||||
|
* @param typeName the name of the entity type
|
||||||
|
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
|
||||||
|
* that will be created by the `ShardRegion`
|
||||||
|
* @param settings configuration settings, see [[ClusterShardingSettings]]
|
||||||
|
* @param extractEntityId partial function to extract the entity id and the message to send to the
|
||||||
|
* entity from the incoming message, if the partial function does not match the message will
|
||||||
|
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
|
||||||
|
* @param extractShardId function to determine the shard id for an incoming message, only messages
|
||||||
|
* that passed the `extractEntityId` will be used
|
||||||
|
* @param allocationStrategy possibility to use a custom shard allocation and
|
||||||
|
* rebalancing logic
|
||||||
|
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
|
||||||
|
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
|
||||||
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
||||||
|
*/
|
||||||
|
def start(
|
||||||
|
typeName: String,
|
||||||
|
entityPropsFactory: String ⇒ Props,
|
||||||
|
settings: ClusterShardingSettings,
|
||||||
|
extractEntityId: ShardRegion.ExtractEntityId,
|
||||||
|
extractShardId: ShardRegion.ExtractShardId,
|
||||||
|
allocationStrategy: ShardAllocationStrategy,
|
||||||
|
handOffStopMessage: Any): ActorRef = {
|
||||||
|
|
||||||
|
requireClusterRole(settings.role)
|
||||||
|
implicit val timeout = system.settings.CreationTimeout
|
||||||
|
val startMsg = Start(typeName, entityPropsFactory, settings,
|
||||||
|
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
|
||||||
|
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
|
||||||
|
regions.put(typeName, shardRegion)
|
||||||
|
shardRegion
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
|
||||||
|
* the entity actor and functions to extract entity and shard identifier from messages. The
|
||||||
|
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
|
||||||
|
*
|
||||||
|
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
|
||||||
|
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
|
||||||
|
*
|
||||||
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
||||||
|
* of the `reference.conf`.
|
||||||
|
*
|
||||||
|
* @param typeName the name of the entity type
|
||||||
|
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
|
||||||
|
* that will be created by the `ShardRegion`
|
||||||
|
* @param settings configuration settings, see [[ClusterShardingSettings]]
|
||||||
|
* @param extractEntityId partial function to extract the entity id and the message to send to the
|
||||||
|
* entity from the incoming message, if the partial function does not match the message will
|
||||||
|
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
|
||||||
|
* @param extractShardId function to determine the shard id for an incoming message, only messages
|
||||||
|
* that passed the `extractEntityId` will be used
|
||||||
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
||||||
|
*/
|
||||||
|
def start(
|
||||||
|
typeName: String,
|
||||||
|
entityPropsFactory: String ⇒ Props,
|
||||||
|
settings: ClusterShardingSettings,
|
||||||
|
extractEntityId: ShardRegion.ExtractEntityId,
|
||||||
|
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
|
||||||
|
|
||||||
|
val allocationStrategy = new LeastShardAllocationStrategy(
|
||||||
|
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
|
||||||
|
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
|
||||||
|
|
||||||
|
start(typeName, entityPropsFactory, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
|
||||||
|
* the entity actor and functions to extract entity and shard identifier from messages. The
|
||||||
|
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
|
||||||
|
*
|
||||||
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
||||||
|
* of the `reference.conf`.
|
||||||
|
*
|
||||||
|
* @param typeName the name of the entity type
|
||||||
|
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
|
||||||
|
* that will be created by the `ShardRegion`
|
||||||
|
* @param settings configuration settings, see [[ClusterShardingSettings]]
|
||||||
|
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
|
||||||
|
* entity from the incoming message, see [[ShardRegion.MessageExtractor]]
|
||||||
|
* @param allocationStrategy possibility to use a custom shard allocation and
|
||||||
|
* rebalancing logic
|
||||||
|
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
|
||||||
|
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
|
||||||
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
||||||
|
*/
|
||||||
|
def start(
|
||||||
|
typeName: String,
|
||||||
|
entityPropsFactory: JFunction[String, Props],
|
||||||
|
settings: ClusterShardingSettings,
|
||||||
|
messageExtractor: ShardRegion.MessageExtractor,
|
||||||
|
allocationStrategy: ShardAllocationStrategy,
|
||||||
|
handOffStopMessage: Any): ActorRef = {
|
||||||
|
|
||||||
|
start(
|
||||||
|
typeName, entityPropsFactory.apply _, settings,
|
||||||
|
extractEntityId = {
|
||||||
|
case msg if messageExtractor.entityId(msg) ne null ⇒
|
||||||
|
(messageExtractor.entityId(msg), messageExtractor.entityMessage(msg))
|
||||||
|
}: ShardRegion.ExtractEntityId,
|
||||||
|
extractShardId = msg ⇒ messageExtractor.shardId(msg),
|
||||||
|
allocationStrategy = allocationStrategy,
|
||||||
|
handOffStopMessage = handOffStopMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java/Scala API: Register a named entity type by defining a factory for the [[akka.actor.Props]] of
|
||||||
|
* the entity actor and functions to extract entity and shard identifier from messages. The
|
||||||
|
* [[ShardRegion]] actor for this type can later be retrieved with the [[#shardRegion]] method.
|
||||||
|
*
|
||||||
|
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
|
||||||
|
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
|
||||||
|
*
|
||||||
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
||||||
|
* of the `reference.conf`.
|
||||||
|
*
|
||||||
|
* @param typeName the name of the entity type
|
||||||
|
* @param entityPropsFactory function that, given an entity id, returns the `Props` of the entity actors
|
||||||
|
* that will be created by the `ShardRegion`
|
||||||
|
* @param settings configuration settings, see [[ClusterShardingSettings]]
|
||||||
|
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
|
||||||
|
* entity from the incoming message
|
||||||
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
||||||
|
*/
|
||||||
|
def start(
|
||||||
|
typeName: String,
|
||||||
|
entityPropsFactory: JFunction[String, Props],
|
||||||
|
settings: ClusterShardingSettings,
|
||||||
|
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
|
||||||
|
|
||||||
|
val allocationStrategy = new LeastShardAllocationStrategy(
|
||||||
|
settings.tuningParameters.leastShardAllocationRebalanceThreshold,
|
||||||
|
settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
|
||||||
|
|
||||||
|
start(typeName, entityPropsFactory, settings, messageExtractor, allocationStrategy, PoisonPill)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
|
* Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
|
||||||
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
||||||
|
|
@ -517,7 +664,7 @@ private[akka] object ClusterShardingGuardian {
|
||||||
import ShardCoordinator.ShardAllocationStrategy
|
import ShardCoordinator.ShardAllocationStrategy
|
||||||
final case class Start(
|
final case class Start(
|
||||||
typeName: String,
|
typeName: String,
|
||||||
entityProps: Props,
|
entityPropsFactory: String ⇒ Props,
|
||||||
settings: ClusterShardingSettings,
|
settings: ClusterShardingSettings,
|
||||||
extractEntityId: ShardRegion.ExtractEntityId,
|
extractEntityId: ShardRegion.ExtractEntityId,
|
||||||
extractShardId: ShardRegion.ExtractShardId,
|
extractShardId: ShardRegion.ExtractShardId,
|
||||||
|
|
@ -582,7 +729,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Start(typeName,
|
case Start(typeName,
|
||||||
entityProps,
|
entityPropsFactory,
|
||||||
settings,
|
settings,
|
||||||
extractEntityId,
|
extractEntityId,
|
||||||
extractShardId,
|
extractShardId,
|
||||||
|
|
@ -622,7 +769,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
||||||
context.actorOf(
|
context.actorOf(
|
||||||
ShardRegion.props(
|
ShardRegion.props(
|
||||||
typeName = typeName,
|
typeName = typeName,
|
||||||
entityProps = entityProps,
|
entityPropsFactory = entityPropsFactory,
|
||||||
settings = settings,
|
settings = settings,
|
||||||
coordinatorPath = cPath,
|
coordinatorPath = cPath,
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ object ShardRegion {
|
||||||
*/
|
*/
|
||||||
private[akka] def props(
|
private[akka] def props(
|
||||||
typeName: String,
|
typeName: String,
|
||||||
entityProps: Props,
|
entityPropsFactory: String ⇒ Props,
|
||||||
settings: ClusterShardingSettings,
|
settings: ClusterShardingSettings,
|
||||||
coordinatorPath: String,
|
coordinatorPath: String,
|
||||||
extractEntityId: ShardRegion.ExtractEntityId,
|
extractEntityId: ShardRegion.ExtractEntityId,
|
||||||
|
|
@ -41,7 +41,7 @@ object ShardRegion {
|
||||||
handOffStopMessage: Any,
|
handOffStopMessage: Any,
|
||||||
replicator: ActorRef,
|
replicator: ActorRef,
|
||||||
majorityMinCap: Int): Props =
|
majorityMinCap: Int): Props =
|
||||||
Props(new ShardRegion(typeName, Some(entityProps), dataCenter = None, settings, coordinatorPath, extractEntityId,
|
Props(new ShardRegion(typeName, Some(entityPropsFactory), dataCenter = None, settings, coordinatorPath, extractEntityId,
|
||||||
extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
|
extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -366,7 +366,7 @@ object ShardRegion {
|
||||||
*/
|
*/
|
||||||
private[akka] class ShardRegion(
|
private[akka] class ShardRegion(
|
||||||
typeName: String,
|
typeName: String,
|
||||||
entityProps: Option[Props],
|
entityPropsFactory: Option[String ⇒ Props],
|
||||||
dataCenter: Option[DataCenter],
|
dataCenter: Option[DataCenter],
|
||||||
settings: ClusterShardingSettings,
|
settings: ClusterShardingSettings,
|
||||||
coordinatorPath: String,
|
coordinatorPath: String,
|
||||||
|
|
@ -678,7 +678,7 @@ private[akka] class ShardRegion(
|
||||||
}
|
}
|
||||||
|
|
||||||
def registrationMessage: Any =
|
def registrationMessage: Any =
|
||||||
if (entityProps.isDefined) Register(self) else RegisterProxy(self)
|
if (entityPropsFactory.isDefined) Register(self) else RegisterProxy(self)
|
||||||
|
|
||||||
def requestShardBufferHomes(): Unit = {
|
def requestShardBufferHomes(): Unit = {
|
||||||
shardBuffers.foreach {
|
shardBuffers.foreach {
|
||||||
|
|
@ -795,7 +795,7 @@ private[akka] class ShardRegion(
|
||||||
None
|
None
|
||||||
else {
|
else {
|
||||||
shards.get(id).orElse(
|
shards.get(id).orElse(
|
||||||
entityProps match {
|
entityPropsFactory match {
|
||||||
case Some(props) if !shardsByRef.values.exists(_ == id) ⇒
|
case Some(props) if !shardsByRef.values.exists(_ == id) ⇒
|
||||||
log.debug("Starting shard [{}] in region", id)
|
log.debug("Starting shard [{}] in region", id)
|
||||||
|
|
||||||
|
|
@ -804,7 +804,7 @@ private[akka] class ShardRegion(
|
||||||
Shard.props(
|
Shard.props(
|
||||||
typeName,
|
typeName,
|
||||||
id,
|
id,
|
||||||
props,
|
props(id),
|
||||||
settings,
|
settings,
|
||||||
extractEntityId,
|
extractEntityId,
|
||||||
extractShardId,
|
extractShardId,
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,10 @@
|
||||||
*/
|
*/
|
||||||
package akka.cluster.sharding
|
package akka.cluster.sharding
|
||||||
|
|
||||||
import akka.cluster.ddata.{ ReplicatorSettings, Replicator }
|
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
|
||||||
import akka.cluster.sharding.ShardCoordinator.Internal.{ ShardStopped, HandOff }
|
import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped }
|
||||||
import akka.cluster.sharding.ShardRegion.Passivate
|
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate }
|
||||||
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
|
|
||||||
import akka.cluster.sharding.ShardRegion.CurrentRegions
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
@ -24,6 +23,7 @@ import akka.remote.testkit.STMultiNodeSpec
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.testkit.TestEvent.Mute
|
import akka.testkit.TestEvent.Mute
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import akka.cluster.singleton.ClusterSingletonManager
|
import akka.cluster.singleton.ClusterSingletonManager
|
||||||
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
import akka.cluster.singleton.ClusterSingletonManagerSettings
|
||||||
|
|
@ -39,13 +39,17 @@ object ClusterShardingSpec {
|
||||||
case object Stop
|
case object Stop
|
||||||
final case class CounterChanged(delta: Int)
|
final case class CounterChanged(delta: Int)
|
||||||
|
|
||||||
class Counter extends PersistentActor {
|
object Counter {
|
||||||
|
val ShardingTypeName: String = "Counter"
|
||||||
|
def props(id: String): Props = Props(new Counter(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
class Counter(id: String) extends PersistentActor {
|
||||||
import ShardRegion.Passivate
|
import ShardRegion.Passivate
|
||||||
|
|
||||||
context.setReceiveTimeout(120.seconds)
|
context.setReceiveTimeout(120.seconds)
|
||||||
|
|
||||||
// self.path.name is the entity identifier (utf-8 URL-encoded)
|
override def persistenceId: String = s"${Counter.ShardingTypeName}-$id"
|
||||||
override def persistenceId: String = "Counter-" + self.path.name
|
|
||||||
|
|
||||||
var count = 0
|
var count = 0
|
||||||
//#counter-actor
|
//#counter-actor
|
||||||
|
|
@ -87,18 +91,30 @@ object ClusterShardingSpec {
|
||||||
case ShardRegion.StartEntity(id) ⇒ (id.toLong % numberOfShards).toString
|
case ShardRegion.StartEntity(id) ⇒ (id.toLong % numberOfShards).toString
|
||||||
}
|
}
|
||||||
|
|
||||||
def qualifiedCounterProps(typeName: String): Props =
|
object QualifiedCounter {
|
||||||
Props(new QualifiedCounter(typeName))
|
val ShardingTypeName: String = "QualifiedCounter"
|
||||||
|
def props(typeName: String, id: String): Props = Props(new QualifiedCounter(typeName, id))
|
||||||
class QualifiedCounter(typeName: String) extends Counter {
|
|
||||||
override def persistenceId: String = typeName + "-" + self.path.name
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class AnotherCounter extends QualifiedCounter("AnotherCounter")
|
class QualifiedCounter(typeName: String, id: String) extends Counter(id) {
|
||||||
|
override def persistenceId: String = s"$typeName-$id"
|
||||||
|
}
|
||||||
|
|
||||||
|
object AnotherCounter {
|
||||||
|
val ShardingTypeName: String = "AnotherCounter"
|
||||||
|
def props(id: String): Props = Props(new AnotherCounter(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
class AnotherCounter(id: String) extends QualifiedCounter("AnotherCounter", id)
|
||||||
|
|
||||||
//#supervisor
|
//#supervisor
|
||||||
class CounterSupervisor extends Actor {
|
object CounterSupervisor {
|
||||||
val counter = context.actorOf(Props[Counter], "theCounter")
|
val ShardingTypeName: String = "CounterSupervisor"
|
||||||
|
def props(id: String): Props = Props(new CounterSupervisor(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
class CounterSupervisor(entityId: String) extends Actor {
|
||||||
|
val counter = context.actorOf(Counter.props(entityId), "theCounter")
|
||||||
|
|
||||||
override val supervisorStrategy = OneForOneStrategy() {
|
override val supervisorStrategy = OneForOneStrategy() {
|
||||||
case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume
|
case _: IllegalArgumentException ⇒ SupervisorStrategy.Resume
|
||||||
|
|
@ -329,7 +345,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
system.actorOf(
|
system.actorOf(
|
||||||
ShardRegion.props(
|
ShardRegion.props(
|
||||||
typeName = typeName,
|
typeName = typeName,
|
||||||
entityProps = qualifiedCounterProps(typeName),
|
entityPropsFactory = entityId ⇒ QualifiedCounter.props(typeName, entityId),
|
||||||
settings = settings,
|
settings = settings,
|
||||||
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
|
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
|
|
@ -459,7 +475,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
val settings = ClusterShardingSettings(cfg)
|
val settings = ClusterShardingSettings(cfg)
|
||||||
val proxy = system.actorOf(
|
val proxy = system.actorOf(
|
||||||
ShardRegion.proxyProps(
|
ShardRegion.proxyProps(
|
||||||
typeName = "counter",
|
typeName = Counter.ShardingTypeName,
|
||||||
dataCenter = None,
|
dataCenter = None,
|
||||||
settings,
|
settings,
|
||||||
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
coordinatorPath = "/user/counterCoordinator/singleton/coordinator",
|
||||||
|
|
@ -628,23 +644,23 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
runOn(third, fourth, fifth, sixth) {
|
runOn(third, fourth, fifth, sixth) {
|
||||||
//#counter-start
|
//#counter-start
|
||||||
val counterRegion: ActorRef = ClusterSharding(system).start(
|
val counterRegion: ActorRef = ClusterSharding(system).start(
|
||||||
typeName = "Counter",
|
typeName = Counter.ShardingTypeName,
|
||||||
entityProps = Props[Counter],
|
entityPropsFactory = entityId ⇒ Counter.props(entityId),
|
||||||
settings = ClusterShardingSettings(system),
|
settings = ClusterShardingSettings(system),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
//#counter-start
|
//#counter-start
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(system).start(
|
||||||
typeName = "AnotherCounter",
|
typeName = AnotherCounter.ShardingTypeName,
|
||||||
entityProps = Props[AnotherCounter],
|
entityPropsFactory = entityId ⇒ AnotherCounter.props(entityId),
|
||||||
settings = ClusterShardingSettings(system),
|
settings = ClusterShardingSettings(system),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
|
|
||||||
//#counter-supervisor-start
|
//#counter-supervisor-start
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(system).start(
|
||||||
typeName = "SupervisedCounter",
|
typeName = CounterSupervisor.ShardingTypeName,
|
||||||
entityProps = Props[CounterSupervisor],
|
entityPropsFactory = entityId ⇒ CounterSupervisor.props(entityId),
|
||||||
settings = ClusterShardingSettings(system),
|
settings = ClusterShardingSettings(system),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
|
|
@ -653,7 +669,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
enterBarrier("extension-started")
|
enterBarrier("extension-started")
|
||||||
runOn(fifth) {
|
runOn(fifth) {
|
||||||
//#counter-usage
|
//#counter-usage
|
||||||
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
|
val counterRegion: ActorRef = ClusterSharding(system).shardRegion(Counter.ShardingTypeName)
|
||||||
counterRegion ! Get(123)
|
counterRegion ! Get(123)
|
||||||
expectMsg(0)
|
expectMsg(0)
|
||||||
|
|
||||||
|
|
@ -662,8 +678,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
//#counter-usage
|
//#counter-usage
|
||||||
|
|
||||||
ClusterSharding(system).shardRegion("AnotherCounter") ! EntityEnvelope(123, Decrement)
|
ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! EntityEnvelope(123, Decrement)
|
||||||
ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123)
|
ClusterSharding(system).shardRegion(AnotherCounter.ShardingTypeName) ! Get(123)
|
||||||
expectMsg(-1)
|
expectMsg(-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -672,8 +688,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
// sixth is a frontend node, i.e. proxy only
|
// sixth is a frontend node, i.e. proxy only
|
||||||
runOn(sixth) {
|
runOn(sixth) {
|
||||||
for (n ← 1000 to 1010) {
|
for (n ← 1000 to 1010) {
|
||||||
ClusterSharding(system).shardRegion("Counter") ! EntityEnvelope(n, Increment)
|
ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! EntityEnvelope(n, Increment)
|
||||||
ClusterSharding(system).shardRegion("Counter") ! Get(n)
|
ClusterSharding(system).shardRegion(Counter.ShardingTypeName) ! Get(n)
|
||||||
expectMsg(1)
|
expectMsg(1)
|
||||||
lastSender.path.address should not be (Cluster(system).selfAddress)
|
lastSender.path.address should not be (Cluster(system).selfAddress)
|
||||||
}
|
}
|
||||||
|
|
@ -686,7 +702,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
|
val counterRegionViaStart: ActorRef = ClusterSharding(system).start(
|
||||||
typeName = "ApiTest",
|
typeName = "ApiTest",
|
||||||
entityProps = Props[Counter],
|
entityPropsFactory = Counter.props,
|
||||||
settings = ClusterShardingSettings(system),
|
settings = ClusterShardingSettings(system),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
|
|
@ -703,7 +719,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extends Mu
|
||||||
runOn(sixth) {
|
runOn(sixth) {
|
||||||
// #proxy-dc
|
// #proxy-dc
|
||||||
val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy(
|
val counterProxyDcB: ActorRef = ClusterSharding(system).startProxy(
|
||||||
typeName = "Counter",
|
typeName = Counter.ShardingTypeName,
|
||||||
role = None,
|
role = None,
|
||||||
dataCenter = Some("B"),
|
dataCenter = Some("B"),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
|
|
|
||||||
|
|
@ -45,8 +45,8 @@ The above actor uses event sourcing and the support provided in @scala[`Persiste
|
||||||
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
||||||
its state if it is valuable.
|
its state if it is valuable.
|
||||||
|
|
||||||
Note how the `persistenceId` is defined. The name of the actor is the entity identifier (utf-8 URL-encoded).
|
Note how the `persistenceId` is defined - it must be unique to the entity, so using the entity identifier is advised.
|
||||||
You may define it another way, but it must be unique.
|
You may define it in other ways, but it must be unique.
|
||||||
|
|
||||||
When using the sharding extension you are first, typically at system startup on each node
|
When using the sharding extension you are first, typically at system startup on each node
|
||||||
in the cluster, supposed to register the supported entity types with the `ClusterSharding.start`
|
in the cluster, supposed to register the supported entity types with the `ClusterSharding.start`
|
||||||
|
|
@ -58,6 +58,10 @@ Scala
|
||||||
Java
|
Java
|
||||||
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start }
|
: @@snip [ClusterShardingTest.java]($code$/java/jdocs/sharding/ClusterShardingTest.java) { #counter-start }
|
||||||
|
|
||||||
|
In some cases, the actor may need to know the `entityId` associated with it. This can be achieved using the `entityPropsFactory`
|
||||||
|
parameter to `ClusterSharding.start`. The entity ID will be passed to the factory as a parameter, which can then be used in
|
||||||
|
the creation of the actor (like the above example).
|
||||||
|
|
||||||
The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity
|
The @scala[`extractEntityId` and `extractShardId` are two] @java[`messageExtractor` defines] application specific @scala[functions] @java[methods] to extract the entity
|
||||||
identifier and the shard identifier from incoming messages.
|
identifier and the shard identifier from incoming messages.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,8 @@ package jdocs.sharding;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
|
|
||||||
import akka.actor.AbstractActor;
|
import akka.actor.AbstractActor;
|
||||||
|
|
@ -17,7 +19,6 @@ import akka.actor.OneForOneStrategy;
|
||||||
import akka.actor.PoisonPill;
|
import akka.actor.PoisonPill;
|
||||||
import akka.actor.Props;
|
import akka.actor.Props;
|
||||||
import akka.actor.SupervisorStrategy;
|
import akka.actor.SupervisorStrategy;
|
||||||
import akka.actor.Terminated;
|
|
||||||
import akka.actor.ReceiveTimeout;
|
import akka.actor.ReceiveTimeout;
|
||||||
//#counter-extractor
|
//#counter-extractor
|
||||||
import akka.cluster.sharding.ShardRegion;
|
import akka.cluster.sharding.ShardRegion;
|
||||||
|
|
@ -31,7 +32,6 @@ import akka.cluster.sharding.ClusterShardingSettings;
|
||||||
|
|
||||||
//#counter-start
|
//#counter-start
|
||||||
import akka.persistence.AbstractPersistentActor;
|
import akka.persistence.AbstractPersistentActor;
|
||||||
import akka.cluster.Cluster;
|
|
||||||
import akka.japi.pf.DeciderBuilder;
|
import akka.japi.pf.DeciderBuilder;
|
||||||
|
|
||||||
// Doc code, compile only
|
// Doc code, compile only
|
||||||
|
|
@ -85,12 +85,12 @@ public class ClusterShardingTest {
|
||||||
//#counter-start
|
//#counter-start
|
||||||
Option<String> roleOption = Option.none();
|
Option<String> roleOption = Option.none();
|
||||||
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
|
ClusterShardingSettings settings = ClusterShardingSettings.create(system);
|
||||||
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
|
ActorRef startedCounterRegion = ClusterSharding.get(system).start(Counter.ShardingTypeName,
|
||||||
Props.create(Counter.class), settings, messageExtractor);
|
entityId -> Counter.props(entityId), settings, messageExtractor);
|
||||||
//#counter-start
|
//#counter-start
|
||||||
|
|
||||||
//#counter-usage
|
//#counter-usage
|
||||||
ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter");
|
ActorRef counterRegion = ClusterSharding.get(system).shardRegion(Counter.ShardingTypeName);
|
||||||
counterRegion.tell(new Counter.Get(123), getSelf());
|
counterRegion.tell(new Counter.Get(123), getSelf());
|
||||||
|
|
||||||
counterRegion.tell(new Counter.EntityEnvelope(123,
|
counterRegion.tell(new Counter.EntityEnvelope(123,
|
||||||
|
|
@ -99,14 +99,14 @@ public class ClusterShardingTest {
|
||||||
//#counter-usage
|
//#counter-usage
|
||||||
|
|
||||||
//#counter-supervisor-start
|
//#counter-supervisor-start
|
||||||
ClusterSharding.get(system).start("SupervisedCounter",
|
ClusterSharding.get(system).start(CounterSupervisor.ShardingTypeName,
|
||||||
Props.create(CounterSupervisor.class), settings, messageExtractor);
|
entityId -> CounterSupervisor.props(entityId), settings, messageExtractor);
|
||||||
//#counter-supervisor-start
|
//#counter-supervisor-start
|
||||||
|
|
||||||
//#proxy-dc
|
//#proxy-dc
|
||||||
ActorRef counterProxyDcB =
|
ActorRef counterProxyDcB =
|
||||||
ClusterSharding.get(system).startProxy(
|
ClusterSharding.get(system).startProxy(
|
||||||
"Counter",
|
Counter.ShardingTypeName,
|
||||||
Optional.empty(),
|
Optional.empty(),
|
||||||
Optional.of("B"), // data center name
|
Optional.of("B"), // data center name
|
||||||
messageExtractor);
|
messageExtractor);
|
||||||
|
|
@ -189,12 +189,22 @@ public class ClusterShardingTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final String ShardingTypeName = "Counter";
|
||||||
|
|
||||||
|
public static Props props(String id) {
|
||||||
|
return Props.create(() -> new Counter(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
final String entityId;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
|
public Counter(String entityId) {
|
||||||
|
this.entityId = entityId;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String persistenceId() {
|
public String persistenceId() {
|
||||||
return "Counter-" + getSelf().path().name();
|
return ShardingTypeName + "-" + entityId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -247,9 +257,17 @@ public class ClusterShardingTest {
|
||||||
|
|
||||||
static//#supervisor
|
static//#supervisor
|
||||||
public class CounterSupervisor extends AbstractActor {
|
public class CounterSupervisor extends AbstractActor {
|
||||||
|
public static final String ShardingTypeName = "CounterSupervisor";
|
||||||
|
|
||||||
private final ActorRef counter = getContext().actorOf(
|
public static Props props(String entityId) {
|
||||||
Props.create(Counter.class), "theCounter");
|
return Props.create(() -> new CounterSupervisor(entityId));
|
||||||
|
}
|
||||||
|
|
||||||
|
private final ActorRef counter;
|
||||||
|
|
||||||
|
public CounterSupervisor(String entityId) {
|
||||||
|
counter = getContext().actorOf(Counter.props(entityId), "theCounter");
|
||||||
|
}
|
||||||
|
|
||||||
private static final SupervisorStrategy strategy =
|
private static final SupervisorStrategy strategy =
|
||||||
new OneForOneStrategy(DeciderBuilder.
|
new OneForOneStrategy(DeciderBuilder.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue