|
|
|
|
@ -43,6 +43,7 @@ import scala.concurrent.Future
|
|
|
|
|
import akka.dispatch.ExecutionContexts
|
|
|
|
|
import akka.pattern.pipe
|
|
|
|
|
import scala.util.Success
|
|
|
|
|
import akka.util.ByteString
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This extension provides sharding functionality of actors in a cluster.
|
|
|
|
|
@ -175,18 +176,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
import ShardCoordinator.LeastShardAllocationStrategy
|
|
|
|
|
|
|
|
|
|
private val cluster = Cluster(system)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object Settings {
|
|
|
|
|
val config = system.settings.config.getConfig("akka.cluster.sharding")
|
|
|
|
|
|
|
|
|
|
val Role: Option[String] = config.getString("role") match {
|
|
|
|
|
case "" ⇒ None
|
|
|
|
|
case r ⇒ Some(r)
|
|
|
|
|
}
|
|
|
|
|
def hasNecessaryClusterRole(role: Option[String]): Boolean = role.forall(cluster.selfRoles.contains)
|
|
|
|
|
|
|
|
|
|
val GuardianName: String = config.getString("guardian-name")
|
|
|
|
|
val CoordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis
|
|
|
|
|
val RetryInterval: FiniteDuration = config.getDuration("retry-interval", MILLISECONDS).millis
|
|
|
|
|
@ -202,10 +198,15 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
val LeastShardAllocationMaxSimultaneousRebalance: Int =
|
|
|
|
|
config.getInt("least-shard-allocation-strategy.max-simultaneous-rebalance")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
import Settings._
|
|
|
|
|
private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap
|
|
|
|
|
private lazy val guardian = system.actorOf(Props[ClusterShardingGuardian], Settings.GuardianName)
|
|
|
|
|
|
|
|
|
|
private[akka] def requireClusterRole(role: Option[String]): Unit =
|
|
|
|
|
require(role.forall(cluster.selfRoles.contains),
|
|
|
|
|
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor
|
|
|
|
|
* and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor
|
|
|
|
|
@ -215,12 +216,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`,
|
|
|
|
|
* if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e.
|
|
|
|
|
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself
|
|
|
|
|
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
|
|
|
|
|
* @param role specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
|
|
|
|
|
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
|
|
|
|
|
* @param idExtractor partial function to extract the entry id and the message to send to the
|
|
|
|
|
@ -236,18 +234,17 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
*/
|
|
|
|
|
def start(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Option[Props],
|
|
|
|
|
roleOverride: Option[String],
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver: ShardRegion.ShardResolver,
|
|
|
|
|
allocationStrategy: ShardAllocationStrategy,
|
|
|
|
|
handOffStopMessage: Any): ActorRef = {
|
|
|
|
|
|
|
|
|
|
val resolvedRole = if (roleOverride == None) Role else roleOverride
|
|
|
|
|
|
|
|
|
|
requireClusterRole(role)
|
|
|
|
|
implicit val timeout = system.settings.CreationTimeout
|
|
|
|
|
val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries,
|
|
|
|
|
val startMsg = Start(typeName, entryProps, role, rememberEntries,
|
|
|
|
|
idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
|
|
|
|
|
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
|
|
|
|
|
regions.put(typeName, shardRegion)
|
|
|
|
|
@ -266,12 +263,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`,
|
|
|
|
|
* if not defined (None) the `ShardRegion` on this node will run in proxy only mode, i.e.
|
|
|
|
|
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself
|
|
|
|
|
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
|
|
|
|
|
* @param role specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
|
|
|
|
|
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
|
|
|
|
|
* @param idExtractor partial function to extract the entry id and the message to send to the
|
|
|
|
|
@ -283,13 +277,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
*/
|
|
|
|
|
def start(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Option[Props],
|
|
|
|
|
roleOverride: Option[String],
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver: ShardRegion.ShardResolver): ActorRef = {
|
|
|
|
|
|
|
|
|
|
start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver,
|
|
|
|
|
start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver,
|
|
|
|
|
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
|
|
|
|
|
PoisonPill)
|
|
|
|
|
}
|
|
|
|
|
@ -303,12 +297,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`,
|
|
|
|
|
* if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e.
|
|
|
|
|
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself
|
|
|
|
|
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
|
|
|
|
|
* @param role specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
|
|
|
|
|
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
|
|
|
|
|
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
|
|
|
|
|
@ -322,13 +313,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
def start(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
roleOverride: Option[String],
|
|
|
|
|
role: Option[String],
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
messageExtractor: ShardRegion.MessageExtractor,
|
|
|
|
|
allocationStrategy: ShardAllocationStrategy,
|
|
|
|
|
handOffStopMessage: Any): ActorRef = {
|
|
|
|
|
|
|
|
|
|
start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries,
|
|
|
|
|
start(typeName, entryProps, role, rememberEntries,
|
|
|
|
|
idExtractor = {
|
|
|
|
|
case msg if messageExtractor.entryId(msg) ne null ⇒
|
|
|
|
|
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
|
|
|
|
|
@ -350,12 +341,9 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`,
|
|
|
|
|
* if not defined (null) the `ShardRegion` on this node will run in proxy only mode, i.e.
|
|
|
|
|
* it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself
|
|
|
|
|
* @param roleOverride specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* if not defined (None), then defaults to standard behavior of using Role (if any) from configuration
|
|
|
|
|
* @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion`
|
|
|
|
|
* @param role specifies that this entry type requires cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param rememberEntries true if entry actors shall created be automatically restarted upon `Shard`
|
|
|
|
|
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
|
|
|
|
|
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
|
|
|
|
|
@ -365,15 +353,77 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
def start(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
roleOverride: Option[String],
|
|
|
|
|
role: Option[String],
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
|
|
|
|
|
|
|
|
|
|
start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor,
|
|
|
|
|
start(typeName, entryProps, role, rememberEntries, messageExtractor,
|
|
|
|
|
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
|
|
|
|
|
PoisonPill)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode,
|
|
|
|
|
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
|
|
|
|
|
* [[#shardRegion]] method.
|
|
|
|
|
*
|
|
|
|
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param role specifies that this entry type is located on cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param idExtractor partial function to extract the entry id and the message to send to the
|
|
|
|
|
* entry from the incoming message, if the partial function does not match the message will
|
|
|
|
|
* be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
|
|
|
|
|
* @param shardResolver function to determine the shard id for an incoming message, only messages
|
|
|
|
|
* that passed the `idExtractor` will be used
|
|
|
|
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
|
|
|
|
*/
|
|
|
|
|
def startProxy(
|
|
|
|
|
typeName: String,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver: ShardRegion.ShardResolver): ActorRef = {
|
|
|
|
|
|
|
|
|
|
implicit val timeout = system.settings.CreationTimeout
|
|
|
|
|
val startMsg = StartProxy(typeName, role, idExtractor, shardResolver)
|
|
|
|
|
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
|
|
|
|
|
regions.put(typeName, shardRegion)
|
|
|
|
|
shardRegion
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode,
|
|
|
|
|
* i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
|
|
|
|
|
* entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
|
|
|
|
|
* [[#shardRegion]] method.
|
|
|
|
|
*
|
|
|
|
|
* Some settings can be configured as described in the `akka.cluster.sharding` section
|
|
|
|
|
* of the `reference.conf`.
|
|
|
|
|
*
|
|
|
|
|
* @param typeName the name of the entry type
|
|
|
|
|
* @param role specifies that this entry type is located on cluster nodes with a specific role.
|
|
|
|
|
* If the role is not specified all nodes in the cluster are used.
|
|
|
|
|
* @param messageExtractor functions to extract the entry id, shard id, and the message to send to the
|
|
|
|
|
* entry from the incoming message
|
|
|
|
|
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
|
|
|
|
|
*/
|
|
|
|
|
def startProxy(
|
|
|
|
|
typeName: String,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
|
|
|
|
|
|
|
|
|
|
startProxy(typeName, role,
|
|
|
|
|
idExtractor = {
|
|
|
|
|
case msg if messageExtractor.entryId(msg) ne null ⇒
|
|
|
|
|
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
|
|
|
|
|
},
|
|
|
|
|
shardResolver = msg ⇒ messageExtractor.shardId(msg))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entry type.
|
|
|
|
|
* The entry type must be registered with the [[#start]] method before it can be used here.
|
|
|
|
|
@ -391,10 +441,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object ClusterShardingGuardian {
|
|
|
|
|
import ShardCoordinator.ShardAllocationStrategy
|
|
|
|
|
final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean,
|
|
|
|
|
final case class Start(typeName: String, entryProps: Props, role: Option[String], rememberEntries: Boolean,
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver,
|
|
|
|
|
allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any)
|
|
|
|
|
extends NoSerializationVerificationNeeded
|
|
|
|
|
final case class StartProxy(typeName: String, role: Option[String],
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver)
|
|
|
|
|
extends NoSerializationVerificationNeeded
|
|
|
|
|
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -409,14 +462,19 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|
|
|
|
val sharding = ClusterSharding(context.system)
|
|
|
|
|
import sharding.Settings._
|
|
|
|
|
|
|
|
|
|
private def coordinatorSingletonManagerName(encName: String): String =
|
|
|
|
|
encName + "Coordinator"
|
|
|
|
|
|
|
|
|
|
private def coordinatorPath(encName: String): String =
|
|
|
|
|
(self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒
|
|
|
|
|
val encName = URLEncoder.encode(typeName, "utf-8")
|
|
|
|
|
val coordinatorSingletonManagerName = encName + "Coordinator"
|
|
|
|
|
val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress
|
|
|
|
|
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
|
|
|
|
|
val cName = coordinatorSingletonManagerName(encName)
|
|
|
|
|
val cPath = coordinatorPath(encName)
|
|
|
|
|
val shardRegion = context.child(encName).getOrElse {
|
|
|
|
|
val hasRequiredRole = hasNecessaryClusterRole(role)
|
|
|
|
|
if (hasRequiredRole && context.child(coordinatorSingletonManagerName).isEmpty) {
|
|
|
|
|
if (context.child(cName).isEmpty) {
|
|
|
|
|
val coordinatorProps = ShardCoordinator.props(handOffTimeout = HandOffTimeout, shardStartTimeout = ShardStartTimeout,
|
|
|
|
|
rebalanceInterval = RebalanceInterval, snapshotInterval = SnapshotInterval, allocationStrategy)
|
|
|
|
|
val singletonProps = ShardCoordinatorSupervisor.props(CoordinatorFailureBackoff, coordinatorProps)
|
|
|
|
|
@ -426,14 +484,14 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|
|
|
|
singletonProps,
|
|
|
|
|
terminationMessage = PoisonPill,
|
|
|
|
|
singletonSettings),
|
|
|
|
|
name = coordinatorSingletonManagerName)
|
|
|
|
|
name = cName)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
context.actorOf(ShardRegion.props(
|
|
|
|
|
typeName = typeName,
|
|
|
|
|
entryProps = if (hasRequiredRole) entryProps else None,
|
|
|
|
|
entryProps = entryProps,
|
|
|
|
|
role = role,
|
|
|
|
|
coordinatorPath = coordinatorPath,
|
|
|
|
|
coordinatorPath = cPath,
|
|
|
|
|
retryInterval = RetryInterval,
|
|
|
|
|
snapshotInterval = SnapshotInterval,
|
|
|
|
|
shardFailureBackoff = ShardFailureBackoff,
|
|
|
|
|
@ -447,6 +505,23 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|
|
|
|
}
|
|
|
|
|
sender() ! Started(shardRegion)
|
|
|
|
|
|
|
|
|
|
case StartProxy(typeName, role, idExtractor, shardResolver) ⇒
|
|
|
|
|
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
|
|
|
|
|
val cName = coordinatorSingletonManagerName(encName)
|
|
|
|
|
val cPath = coordinatorPath(encName)
|
|
|
|
|
val shardRegion = context.child(encName).getOrElse {
|
|
|
|
|
context.actorOf(ShardRegion.proxyProps(
|
|
|
|
|
typeName = typeName,
|
|
|
|
|
role = role,
|
|
|
|
|
coordinatorPath = cPath,
|
|
|
|
|
retryInterval = RetryInterval,
|
|
|
|
|
bufferSize = BufferSize,
|
|
|
|
|
idExtractor = idExtractor,
|
|
|
|
|
shardResolver = shardResolver),
|
|
|
|
|
name = encName)
|
|
|
|
|
}
|
|
|
|
|
sender() ! Started(shardRegion)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
@ -457,11 +532,12 @@ private[akka] class ClusterShardingGuardian extends Actor {
|
|
|
|
|
object ShardRegion {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
* Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
|
|
|
|
|
*/
|
|
|
|
|
def props(
|
|
|
|
|
private[akka] def props(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Option[Props],
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
coordinatorPath: String,
|
|
|
|
|
retryInterval: FiniteDuration,
|
|
|
|
|
@ -473,61 +549,15 @@ object ShardRegion {
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver: ShardRegion.ShardResolver,
|
|
|
|
|
handOffStopMessage: Any): Props =
|
|
|
|
|
Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
|
|
|
|
|
Props(new ShardRegion(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
|
|
|
|
|
snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
|
|
|
|
|
*/
|
|
|
|
|
def props(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
coordinatorPath: String,
|
|
|
|
|
retryInterval: FiniteDuration,
|
|
|
|
|
shardFailureBackoff: FiniteDuration,
|
|
|
|
|
entryRestartBackoff: FiniteDuration,
|
|
|
|
|
snapshotInterval: FiniteDuration,
|
|
|
|
|
bufferSize: Int,
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
idExtractor: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver: ShardRegion.ShardResolver,
|
|
|
|
|
handOffStopMessage: Any): Props =
|
|
|
|
|
props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
|
|
|
|
|
snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
|
|
|
|
|
*/
|
|
|
|
|
def props(
|
|
|
|
|
typeName: String,
|
|
|
|
|
entryProps: Props,
|
|
|
|
|
role: String,
|
|
|
|
|
coordinatorPath: String,
|
|
|
|
|
retryInterval: FiniteDuration,
|
|
|
|
|
shardFailureBackoff: FiniteDuration,
|
|
|
|
|
entryRestartBackoff: FiniteDuration,
|
|
|
|
|
snapshotInterval: FiniteDuration,
|
|
|
|
|
bufferSize: Int,
|
|
|
|
|
rememberEntries: Boolean,
|
|
|
|
|
messageExtractor: ShardRegion.MessageExtractor,
|
|
|
|
|
handOffStopMessage: Any): Props = {
|
|
|
|
|
|
|
|
|
|
props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff,
|
|
|
|
|
entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries,
|
|
|
|
|
idExtractor = {
|
|
|
|
|
case msg if messageExtractor.entryId(msg) ne null ⇒
|
|
|
|
|
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
|
|
|
|
|
}: ShardRegion.IdExtractor,
|
|
|
|
|
shardResolver = msg ⇒ messageExtractor.shardId(msg),
|
|
|
|
|
handOffStopMessage = handOffStopMessage)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
* Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor
|
|
|
|
|
* when using it in proxy only mode.
|
|
|
|
|
*/
|
|
|
|
|
def proxyProps(
|
|
|
|
|
private[akka] def proxyProps(
|
|
|
|
|
typeName: String,
|
|
|
|
|
role: Option[String],
|
|
|
|
|
coordinatorPath: String,
|
|
|
|
|
@ -538,25 +568,6 @@ object ShardRegion {
|
|
|
|
|
Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero,
|
|
|
|
|
Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Java API: : Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor
|
|
|
|
|
* when using it in proxy only mode.
|
|
|
|
|
*/
|
|
|
|
|
def proxyProps(
|
|
|
|
|
typeName: String,
|
|
|
|
|
role: String,
|
|
|
|
|
coordinatorPath: String,
|
|
|
|
|
retryInterval: FiniteDuration,
|
|
|
|
|
bufferSize: Int,
|
|
|
|
|
messageExtractor: ShardRegion.MessageExtractor): Props = {
|
|
|
|
|
|
|
|
|
|
proxyProps(typeName, roleOption(role), coordinatorPath, retryInterval, bufferSize, idExtractor = {
|
|
|
|
|
case msg if messageExtractor.entryId(msg) ne null ⇒
|
|
|
|
|
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
|
|
|
|
|
},
|
|
|
|
|
shardResolver = msg ⇒ messageExtractor.shardId(msg))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Marker type of entry identifier (`String`).
|
|
|
|
|
*/
|
|
|
|
|
@ -1289,9 +1300,10 @@ private[akka] class Shard(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
* @see [[ClusterSharding$ ClusterSharding extension]]
|
|
|
|
|
*/
|
|
|
|
|
object ShardCoordinatorSupervisor {
|
|
|
|
|
private[akka] object ShardCoordinatorSupervisor {
|
|
|
|
|
/**
|
|
|
|
|
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
|
|
|
|
|
*/
|
|
|
|
|
@ -1304,7 +1316,10 @@ object ShardCoordinatorSupervisor {
|
|
|
|
|
private case object StartCoordinator
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor {
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor {
|
|
|
|
|
import ShardCoordinatorSupervisor._
|
|
|
|
|
|
|
|
|
|
def startCoordinator(): Unit = {
|
|
|
|
|
@ -1330,10 +1345,11 @@ object ShardCoordinator {
|
|
|
|
|
import ShardRegion.ShardId
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
|
|
|
|
|
*/
|
|
|
|
|
def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration,
|
|
|
|
|
allocationStrategy: ShardAllocationStrategy): Props =
|
|
|
|
|
private[akka] def props(handOffTimeout: FiniteDuration, shardStartTimeout: FiniteDuration, rebalanceInterval: FiniteDuration, snapshotInterval: FiniteDuration,
|
|
|
|
|
allocationStrategy: ShardAllocationStrategy): Props =
|
|
|
|
|
Props(new ShardCoordinator(handOffTimeout, shardStartTimeout, rebalanceInterval, snapshotInterval, allocationStrategy))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|