Simplify signature of ClusterSharding.spawn, #25480

This commit is contained in:
Patrik Nordwall 2018-08-28 11:41:34 +02:00
parent 2a5f254f16
commit 0804daf1a5
16 changed files with 465 additions and 298 deletions

View file

@ -1,3 +1,15 @@
akka.cluster.sharding {
# Number of shards used by the default HashCodeMessageExtractor
# when no other message extractor is defined. This value must be
# the same for all nodes in the cluster and that is verified by
# configuration check when joining. Changing the value requires
# stopping all nodes in the cluster.
number-of-shards = 1000
}
# FIXME JoinConfigCompatChecker for number-of-shards
akka.actor {
serializers {
typed-sharding = "akka.cluster.sharding.typed.internal.ShardingSerializer"

View file

@ -25,7 +25,8 @@ object ClusterShardingSettings {
def fromConfig(config: Config): ClusterShardingSettings = {
val untypedSettings = UntypedShardingSettings(config)
fromUntypedSettings(untypedSettings)
val numberOfShards = config.getInt("number-of-shards")
fromUntypedSettings(numberOfShards, untypedSettings)
}
/** Java API: Creates new cluster sharding settings object */
@ -33,8 +34,9 @@ object ClusterShardingSettings {
apply(system)
/** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */
private[akka] def fromUntypedSettings(untypedSettings: UntypedShardingSettings): ClusterShardingSettings = {
private[akka] def fromUntypedSettings(numberOfShards: Int, untypedSettings: UntypedShardingSettings): ClusterShardingSettings = {
new ClusterShardingSettings(
numberOfShards,
role = untypedSettings.role,
dataCenter = None,
rememberEntities = untypedSettings.rememberEntities,
@ -217,6 +219,7 @@ object ClusterShardingSettings {
}
/**
* @param numberOfShards number of shards used by the default [[HashCodeMessageExtractor]]
* @param role Specifies that this entity type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used. If the given role does
* not match the role of the current node the `ShardRegion` will be started in proxy mode.
@ -237,6 +240,7 @@ object ClusterShardingSettings {
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
*/
final class ClusterShardingSettings(
val numberOfShards: Int,
val role: Option[String],
val dataCenter: Option[DataCenter],
val rememberEntities: Boolean,
@ -263,6 +267,9 @@ final class ClusterShardingSettings(
role.forall(cluster.selfMember.roles.contains) &&
dataCenter.forall(cluster.selfMember.dataCenter.contains)
// no withNumberOfShards because it should be defined in configuration to be able to verify same
// value on all nodes with `JoinConfigCompatChecker`
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.option(role))
def withDataCenter(dataCenter: DataCenter): ClusterShardingSettings =
@ -300,6 +307,7 @@ final class ClusterShardingSettings(
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
new ClusterShardingSettings(
numberOfShards,
role,
dataCenter,
rememberEntities,

View file

@ -14,18 +14,18 @@ object ShardingMessageExtractor {
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
def apply[M](numberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[M], M] =
new HashCodeMessageExtractor[M](numberOfShards)
/**
* Scala API: Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
handOffStopMessage: A)(
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) {
def entityId(message: A) = extractEntityId(message)
def noEnvelope[M](
numberOfShards: Int,
stopMessage: M)(
extractEntityId: M String): ShardingMessageExtractor[M, M] =
new HashCodeNoEnvelopeMessageExtractor[M](numberOfShards) {
def entityId(message: M) = extractEntityId(message)
}
}
@ -34,11 +34,11 @@ object ShardingMessageExtractor {
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or
* [[HashCodeNoEnvelopeMessageExtractor]]if possible.
*
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `M` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
* @tparam M The type of message accepted by the entity actor
*/
abstract class ShardingMessageExtractor[E, A] {
abstract class ShardingMessageExtractor[E, M] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
@ -58,13 +58,8 @@ abstract class ShardingMessageExtractor[E, A] {
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*/
def unwrapMessage(message: E): A
def unwrapMessage(message: E): M
/**
* Message sent to an entity to tell it to stop, e.g. when rebalanced.
* The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`.
*/
def handOffStopMessage: A
}
/**
@ -73,16 +68,15 @@ abstract class ShardingMessageExtractor[E, A] {
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
* @tparam M The type of message accepted by the entity actor
*/
final class HashCodeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
final class HashCodeMessageExtractor[M](
val numberOfShards: Int)
extends ShardingMessageExtractor[ShardingEnvelope[M], M] {
override def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId
override def shardId(entityId: String): String = math.abs(entityId.hashCode % maxNumberOfShards).toString
override def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
override def shardId(entityId: String): String = math.abs(entityId.hashCode % numberOfShards).toString
override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
}
/**
@ -91,17 +85,16 @@ final class HashCodeMessageExtractor[A](
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
* @tparam M The type of message accepted by the entity actor
*/
abstract class HashCodeNoEnvelopeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[A, A] {
abstract class HashCodeNoEnvelopeMessageExtractor[M](
val numberOfShards: Int)
extends ShardingMessageExtractor[M, M] {
override def shardId(entityId: String): String = math.abs(entityId.hashCode % maxNumberOfShards).toString
override final def unwrapMessage(message: A): A = message
override def shardId(entityId: String): String = math.abs(entityId.hashCode % numberOfShards).toString
override final def unwrapMessage(message: M): M = message
override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)"
override def toString = s"HashCodeNoEnvelopeMessageExtractor($numberOfShards)"
}
/**
@ -114,5 +107,5 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A](
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselves carry identifiers.
*/
final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class
final case class ShardingEnvelope[M](entityId: String, message: M) // TODO think if should remain a case class

View file

@ -6,15 +6,15 @@ package akka.cluster.sharding.typed
package internal
import java.net.URLEncoder
import java.util.Optional
import java.util.concurrent.{ CompletionStage, ConcurrentHashMap }
import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.Future
import akka.actor.ExtendedActorSystem
import akka.actor.{ InternalActorRef, Scheduler }
import akka.actor.InternalActorRef
import akka.actor.Scheduler
import akka.actor.typed.ActorContext
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@ -28,23 +28,22 @@ import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.cluster.sharding.typed.javadsl.EntityFactory
import akka.cluster.typed.Cluster
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.japi.function.{ Function JFunction }
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.util.Timeout
import akka.japi.function.{ Function JFunction }
import akka.util.ByteString
import akka.util.Timeout
/**
* INTERNAL API
* Extracts entityId and unwraps ShardingEnvelope and StartEntity messages.
* Other messages are delegated to the given `ShardingMessageExtractor`.
*/
@InternalApi private[akka] class ExtractorAdapter[E, A](delegate: ShardingMessageExtractor[E, A])
extends ShardingMessageExtractor[Any, A] {
@InternalApi private[akka] class ExtractorAdapter[E, M](delegate: ShardingMessageExtractor[E, M])
extends ShardingMessageExtractor[Any, M] {
override def entityId(message: Any): String = {
message match {
case ShardingEnvelope(entityId, _) entityId //also covers UntypedStartEntity in ShardingEnvelope
@ -55,21 +54,19 @@ import akka.util.ByteString
override def shardId(entityId: String): String = delegate.shardId(entityId)
override def unwrapMessage(message: Any): A = {
override def unwrapMessage(message: Any): M = {
message match {
case ShardingEnvelope(_, msg: A @unchecked)
case ShardingEnvelope(_, msg: M @unchecked)
//also covers UntypedStartEntity in ShardingEnvelope
msg
case msg: UntypedStartEntity
// not really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
msg.asInstanceOf[A]
// not really of type M, but erased and StartEntity is only handled internally, not delivered to the entity
msg.asInstanceOf[M]
case msg: E @unchecked
delegate.unwrapMessage(msg)
}
}
override def handOffStopMessage: A = delegate.handOffStopMessage
override def toString: String = delegate.toString
}
@ -99,35 +96,43 @@ import akka.util.ByteString
private val proxies: ConcurrentHashMap[String, String] = new ConcurrentHashMap
private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap
override def spawn[A](
behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[A],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor, Some(defaultShardAllocationStrategy(settings)))
// scaladsl impl
override def start[M, E](shardedEntity: scaladsl.ShardedEntity[M, E]): ActorRef[E] = {
val settings = shardedEntity.settings match {
case None ClusterShardingSettings(system)
case Some(s) s
}
val extractor = (shardedEntity.messageExtractor match {
case None new HashCodeMessageExtractor[M](settings.numberOfShards)
case Some(e) e
}).asInstanceOf[ShardingMessageExtractor[E, M]]
internalStart(shardedEntity.create, shardedEntity.entityProps, shardedEntity.typeKey,
shardedEntity.stopMessage, settings, extractor, shardedEntity.allocationStrategy)
}
override def spawn[A](
behavior: EntityFactory[A],
entityProps: Props,
typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawnWithMessageExtractor(behavior, entityProps, typeKey, settings, extractor,
Optional.of(defaultShardAllocationStrategy(settings)))
// javadsl impl
override def start[M, E](shardedEntity: javadsl.ShardedEntity[M, E]): ActorRef[E] = {
import scala.compat.java8.OptionConverters._
start(new scaladsl.ShardedEntity(
create = (shard, entitityId) shardedEntity.createBehavior.apply(shard, entitityId),
typeKey = shardedEntity.typeKey.asScala,
stopMessage = shardedEntity.stopMessage,
entityProps = shardedEntity.entityProps,
settings = shardedEntity.settings.asScala,
messageExtractor = shardedEntity.messageExtractor.asScala,
allocationStrategy = shardedEntity.allocationStrategy.asScala
))
}
def spawnWithMessageExtractor[E, A](
behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[A],
private def internalStart[M, E](
behavior: (ActorRef[scaladsl.ClusterSharding.ShardCommand], String) Behavior[M],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[A],
typeKey: scaladsl.EntityTypeKey[M],
stopMessage: M,
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
extractor: ShardingMessageExtractor[E, M],
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E] = {
val extractorAdapter = new ExtractorAdapter(extractor)
@ -154,7 +159,7 @@ import akka.util.ByteString
override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = {
// using untyped.systemActorOf to avoid the Future[ActorRef]
system.toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf(
PropsAdapter(ShardCommandActor.behavior(extractor.handOffStopMessage)),
PropsAdapter(ShardCommandActor.behavior(stopMessage)),
URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator")
}
})
@ -170,7 +175,7 @@ import akka.util.ByteString
extractEntityId,
extractShardId,
allocationStrategy.getOrElse(defaultShardAllocationStrategy(settings)),
extractor.handOffStopMessage)
stopMessage)
} else {
log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node) " +
"for role [{}] and dataCenter [{}] ...", typeKey.name, settings.role, settings.dataCenter)
@ -183,7 +188,7 @@ import akka.util.ByteString
extractShardId)
}
val messageClassName = typeKey.asInstanceOf[EntityTypeKeyImpl[A]].messageClassName
val messageClassName = typeKey.asInstanceOf[EntityTypeKeyImpl[M]].messageClassName
val typeNames = if (settings.shouldHostShard(cluster)) regions else proxies
@ -196,23 +201,12 @@ import akka.util.ByteString
ActorRefAdapter(ref)
}
override def spawnWithMessageExtractor[E, A](
behavior: EntityFactory[A],
entityProps: Props,
typeKey: javadsl.EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E] = {
spawnWithMessageExtractor((shard, entityId) behavior.apply(shard, entityId), entityProps, typeKey.asScala,
settings, extractor, allocationStrategy.asScala)
override def entityRefFor[M](typeKey: scaladsl.EntityTypeKey[M], entityId: String): scaladsl.EntityRef[M] = {
new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler)
}
override def entityRefFor[A](typeKey: scaladsl.EntityTypeKey[A], entityId: String): scaladsl.EntityRef[A] = {
new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler)
}
override def entityRefFor[A](typeKey: javadsl.EntityTypeKey[A], entityId: String): javadsl.EntityRef[A] = {
new EntityRefImpl[A](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler)
override def entityRefFor[M](typeKey: javadsl.EntityTypeKey[M], entityId: String): javadsl.EntityRef[M] = {
new EntityRefImpl[M](untypedSharding.shardRegion(typeKey.name), entityId, system.scheduler)
}
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
@ -226,14 +220,14 @@ import akka.util.ByteString
/**
* INTERNAL API
*/
@InternalApi private[akka] final class EntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String,
@InternalApi private[akka] final class EntityRefImpl[M](shardRegion: akka.actor.ActorRef, entityId: String,
scheduler: Scheduler)
extends javadsl.EntityRef[A] with scaladsl.EntityRef[A] {
extends javadsl.EntityRef[M] with scaladsl.EntityRef[M] {
override def tell(msg: A): Unit =
override def tell(msg: M): Unit =
shardRegion ! ShardingEnvelope(entityId, msg)
override def ask[U](message: (ActorRef[U]) A)(implicit timeout: Timeout): Future[U] = {
override def ask[U](message: ActorRef[U] M)(implicit timeout: Timeout): Future[U] = {
val replyTo = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val m = message(replyTo.ref)
if (replyTo.promiseRef ne null) replyTo.promiseRef.messageClassName = m.getClass.getName
@ -241,7 +235,7 @@ import akka.util.ByteString
replyTo.future
}
def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout): CompletionStage[U] =
def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U] =
ask[U](replyTo message.apply(replyTo))(timeout).toJava
/** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */

View file

@ -7,6 +7,7 @@ package javadsl
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@ -20,8 +21,8 @@ import akka.japi.function.{ Function ⇒ JFunction }
import akka.util.Timeout
@FunctionalInterface
trait EntityFactory[A] {
def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[A]
trait EntityFactory[M] {
def apply(shardRegion: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M]
}
object ClusterSharding {
@ -32,7 +33,7 @@ object ClusterSharding {
* When an entity is created an `ActorRef[ShardCommand]` is passed to the
* factory method. The entity can request passivation by sending the [[Passivate]]
* message to this ref. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop itself.
* `stopMessage` message to the entity, which is then supposed to stop itself.
*
* Not for user extension.
*/
@ -42,10 +43,10 @@ object ClusterSharding {
* The entity can request passivation by sending the [[Passivate]] message
* to the `ActorRef[ShardCommand]` that was passed in to the factory method
* when creating the entity. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop
* `stopMessage` message to the entity, which is then supposed to stop
* itself.
*/
final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand
final case class Passivate[M](entity: ActorRef[M]) extends ShardCommand
}
/**
@ -81,7 +82,7 @@ object ClusterSharding {
* to extract the entity identifier and the shard identifier from incoming messages.
* A shard is a group of entities that will be managed together. For the first message in a
* specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
* the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* the [[akka.cluster.sharding.ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* owns the shard. The `ShardRegion` receives the decided home of the shard
* and if that is the `ShardRegion` instance itself it will create a local child
* actor representing the entity and direct all messages for that entity to it.
@ -115,7 +116,7 @@ object ClusterSharding {
* location.
*
* The logic that decides which shards to rebalance is defined in a plugable shard
* allocation strategy. The default implementation [[LeastShardAllocationStrategy]]
* allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]]
* picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
@ -148,9 +149,9 @@ object ClusterSharding {
* the entity actors for example by defining receive timeout (`context.setReceiveTimeout`).
* If a message is already enqueued to the entity when it stops itself the enqueued message
* in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]`
* messages the entity actor can send [[ClusterSharding#Passivate]] to the `ActorRef[ShardCommand]`
* that was passed in to the factory method when creating the entity..
* The specified `handOffStopMessage` message will be sent back to the entity, which is
* The specified `stopMessage` message will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity.
@ -163,47 +164,15 @@ object ClusterSharding {
abstract class ClusterSharding {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has
* Initialize sharding for the given `shardedEntity` factory settings.
*
* It will start a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: EntityFactory[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if `Optional.empty`
* @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnWithMessageExtractor[E, A](
behavior: EntityFactory[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: Optional[ShardAllocationStrategy]): ActorRef[E]
def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
@ -214,12 +183,108 @@ abstract class ClusterSharding {
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}
object ShardedEntity {
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
*
* @param createBehavior Create the behavior for an entity given an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
*
* @tparam M The type of message the entity accepts
*/
def create[M](
createBehavior: JFunction[String, Behavior[M]],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] = {
create(new BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] {
override def apply(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[M] =
createBehavior.apply(entityId)
}, typeKey, stopMessage)
}
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
*
* @param createBehavior Create the behavior for an entity given `ShardCommand` ref and an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts
*/
def create[M](
createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] =
new ShardedEntity(createBehavior, typeKey, stopMessage, Props.empty, Optional.empty(), Optional.empty(), Optional.empty())
}
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]].
*/
final class ShardedEntity[M, E] private[akka] (
val createBehavior: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]],
val typeKey: EntityTypeKey[M],
val stopMessage: M,
val entityProps: Props,
val settings: Optional[ClusterShardingSettings],
val messageExtractor: Optional[ShardingMessageExtractor[E, M]],
val allocationStrategy: Optional[ShardAllocationStrategy]) {
/**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
*/
def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] =
copy(entityProps = newEntityProps)
/**
* Additional settings, typically loaded from configuration.
*/
def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] =
copy(settings = Optional.ofNullable(newSettings))
/**
*
* If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
* them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope
* is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of
* shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
* is configured with `akka.cluster.sharding.number-of-shards`.
*/
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] =
new ShardedEntity(createBehavior, typeKey, stopMessage, entityProps, settings, Optional.ofNullable(newExtractor), allocationStrategy)
/**
* Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
*/
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] =
copy(allocationStrategy = Optional.ofNullable(newAllocationStrategy))
private def copy(
create: BiFunction[ActorRef[ClusterSharding.ShardCommand], String, Behavior[M]] = createBehavior,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
entityProps: Props = entityProps,
settings: Optional[ClusterShardingSettings] = settings,
allocationStrategy: Optional[ShardAllocationStrategy] = allocationStrategy
): ShardedEntity[M, E] = {
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
}
}
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
@ -227,8 +292,8 @@ object StartEntity {
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] =
scaladsl.StartEntity[A](entityId)
def create[M](msgClass: Class[M], entityId: String): ShardingEnvelope[M] =
scaladsl.StartEntity[M](entityId)
}
/**
@ -268,23 +333,23 @@ object EntityTypeKey {
*
* Not for user extension.
*/
@DoNotInherit abstract class EntityRef[A] { scaladslSelf: scaladsl.EntityRef[A]
@DoNotInherit abstract class EntityRef[M] { scaladslSelf: scaladsl.EntityRef[M]
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
def tell(msg: M): Unit
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.javadsl.AskPattern]] for a complete write-up of this pattern
*/
def ask[U](message: JFunction[ActorRef[U], A], timeout: Timeout): CompletionStage[U]
def ask[U](message: JFunction[ActorRef[U], M], timeout: Timeout): CompletionStage[U]
/**
* INTERNAL API
*/
@InternalApi private[akka] def asScala: scaladsl.EntityRef[A] = scaladslSelf
@InternalApi private[akka] def asScala: scaladsl.EntityRef[M] = scaladslSelf
}

View file

@ -33,7 +33,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* When an entity is created an `ActorRef[ShardCommand]` is passed to the
* factory method. The entity can request passivation by sending the [[Passivate]]
* message to this ref. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop itself.
* `stopMessage` message to the entity, which is then supposed to stop itself.
*
* Not for user extension.
*/
@ -43,10 +43,10 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* The entity can request passivation by sending the [[Passivate]] message
* to the `ActorRef[ShardCommand]` that was passed in to the factory method
* when creating the entity. Sharding will then send back the specified
* `handOffStopMessage` message to the entity, which is then supposed to stop
* `stopMessage` message to the entity, which is then supposed to stop
* itself.
*/
final case class Passivate[A](entity: ActorRef[A]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand
final case class Passivate[M](entity: ActorRef[M]) extends ShardCommand with javadsl.ClusterSharding.ShardCommand
}
@ -83,7 +83,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* to extract the entity identifier and the shard identifier from incoming messages.
* A shard is a group of entities that will be managed together. For the first message in a
* specific shard the `ShardRegion` requests the location of the shard from a central coordinator,
* the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* the [[akka.cluster.sharding.ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion`
* owns the shard. The `ShardRegion` receives the decided home of the shard
* and if that is the `ShardRegion` instance itself it will create a local child
* actor representing the entity and direct all messages for that entity to it.
@ -117,7 +117,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* location.
*
* The logic that decides which shards to rebalance is defined in a plugable shard
* allocation strategy. The default implementation [[LeastShardAllocationStrategy]]
* allocation strategy. The default implementation [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]]
* picks shards for handoff from the `ShardRegion` with most number of previously allocated shards.
* They will then be allocated to the `ShardRegion` with least number of previously allocated shards,
* i.e. new members in the cluster. There is a configurable threshold of how large the difference
@ -152,7 +152,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* in the mailbox will be dropped. To support graceful passivation without losing such
* messages the entity actor can send [[ClusterSharding.Passivate]] to the `ActorRef[ShardCommand]`
* that was passed in to the factory method when creating the entity..
* The specified `handOffStopMessage` message will be sent back to the entity, which is
* The specified `stopMessage` message will be sent back to the entity, which is
* then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion`
* between reception of `Passivate` and termination of the entity. Such buffered messages
* are thereafter delivered to a new incarnation of the entity.
@ -163,50 +163,17 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
*/
@DoNotInherit
trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
import ClusterSharding.ShardCommand
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has
* Initialize sharding for the given `shardedEntity` factory settings.
*
* It will start a shard region or a proxy depending on if the settings require role and if this node has
* such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: (ActorRef[ShardCommand], String) Behavior[A],
props: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node
* has such a role.
*
* @param behavior Create the behavior for an entity given a entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if `None`
* @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawnWithMessageExtractor[E, A](
behavior: (ActorRef[ShardCommand], String) Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: Option[ShardAllocationStrategy]): ActorRef[E]
def start[M, E](shardedEntity: ShardedEntity[M, E]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
@ -217,9 +184,12 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeKey: EntityTypeKey[A], entityId: String): EntityRef[A]
def entityRefFor[M](typeKey: EntityTypeKey[M], entityId: String): EntityRef[M]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
/**
@ -228,6 +198,95 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
@InternalApi private[akka] def asJava: javadsl.ClusterSharding = javadslSelf
}
object ShardedEntity {
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
*
* @param create Create the behavior for an entity given an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
*
* @tparam M The type of message the entity accepts
*/
def apply[M](
create: String Behavior[M],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] =
apply((_, entityId) create(entityId), typeKey, stopMessage)
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* settings can be defined using the `with` methods of the returned [[ShardedEntity]].
*
* @param create Create the behavior for an entity given `ShardCommand` ref and an entityId
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param stopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
* @tparam M The type of message the entity accepts
*/
def apply[M](
create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M],
typeKey: EntityTypeKey[M],
stopMessage: M): ShardedEntity[M, ShardingEnvelope[M]] =
new ShardedEntity(create, typeKey, stopMessage, Props.empty, None, None, None)
}
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]].
*/
final class ShardedEntity[M, E] private[akka] (
val create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M],
val typeKey: EntityTypeKey[M],
val stopMessage: M,
val entityProps: Props,
val settings: Option[ClusterShardingSettings],
val messageExtractor: Option[ShardingMessageExtractor[E, M]],
val allocationStrategy: Option[ShardAllocationStrategy]) {
/**
* [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
*/
def withEntityProps(newEntityProps: Props): ShardedEntity[M, E] =
copy(entityProps = newEntityProps)
/**
* Additional settings, typically loaded from configuration.
*/
def withSettings(newSettings: ClusterShardingSettings): ShardedEntity[M, E] =
copy(settings = Option(newSettings))
/**
*
* If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
* them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope
* is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of
* shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
* is configured with `akka.cluster.sharding.number-of-shards`.
*/
def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): ShardedEntity[M, Envelope] =
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy)
/**
* Allocation strategy which decides on which nodes to allocate new shards,
* [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
*/
def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): ShardedEntity[M, E] =
copy(allocationStrategy = Option(newAllocationStrategy))
private def copy(
create: (ActorRef[ClusterSharding.ShardCommand], String) Behavior[M] = create,
typeKey: EntityTypeKey[M] = typeKey,
stopMessage: M = stopMessage,
entityProps: Props = entityProps,
settings: Option[ClusterShardingSettings] = settings,
allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy
): ShardedEntity[M, E] = {
new ShardedEntity(create, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy)
}
}
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
@ -235,9 +294,9 @@ object StartEntity {
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def apply[A](entityId: String): ShardingEnvelope[A] = {
// StartEntity isn't really of type A, but erased and StartEntity is only handled internally, not delivered to the entity
new ShardingEnvelope[A](entityId, UntypedStartEntity(entityId).asInstanceOf[A])
def apply[M](entityId: String): ShardingEnvelope[M] = {
// StartEntity isn't really of type M, but erased and StartEntity is only handled internally, not delivered to the entity
new ShardingEnvelope[M](entityId, UntypedStartEntity(entityId).asInstanceOf[M])
}
}
@ -271,7 +330,7 @@ object EntityTypeKey {
* [[ActorRef]] and watch it in case such notification is desired.
* Not for user extension.
*/
@DoNotInherit trait EntityRef[A] {
@DoNotInherit trait EntityRef[M] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
@ -283,7 +342,7 @@ object EntityTypeKey {
* target.tell("Hello")
* }}}
*/
def tell(msg: A): Unit
def tell(msg: M): Unit
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
@ -295,7 +354,7 @@ object EntityTypeKey {
* target ! "Hello"
* }}}
*/
def !(msg: A): Unit = this.tell(msg)
def !(msg: M): Unit = this.tell(msg)
/**
* Allows to "ask" the [[EntityRef]] for a reply.
@ -313,7 +372,7 @@ object EntityTypeKey {
*
* Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern.
*/
def ask[U](f: ActorRef[U] A)(implicit timeout: Timeout): Future[U]
def ask[U](f: ActorRef[U] M)(implicit timeout: Timeout): Future[U]
/**
* Allows to "ask" the [[EntityRef]] for a reply.
@ -331,7 +390,7 @@ object EntityTypeKey {
*
* Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern.
*/
def ?[U](message: ActorRef[U] A)(implicit timeout: Timeout): Future[U] =
def ?[U](message: ActorRef[U] M)(implicit timeout: Timeout): Future[U] =
this.ask(message)(timeout)
}

View file

@ -7,6 +7,7 @@ package akka.cluster.sharding.typed
import akka.actor.typed.{ ActorRef, Props }
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.ShardedEntity
import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec }
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.actor.testkit.typed.scaladsl.TestProbe
@ -26,6 +27,7 @@ object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
"""
akka.loglevel = DEBUG
akka.cluster.sharding {
number-of-shards = 10
# First is likely to be ignored as shard coordinator not ready
retry-interval = 0.2s
}
@ -66,14 +68,11 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"start sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn(
(_, _) multiDcPinger,
Props.empty,
typeKey = typeKey,
ClusterShardingSettings(typedSystem),
10,
NoMore
)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start(
ShardedEntity(
_ multiDcPinger,
typeKey,
NoMore))
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(Pong(cluster.selfMember.dataCenter))
@ -99,14 +98,12 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"be able to message cross dc via proxy" in {
runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn(
(_, _) multiDcPinger,
Props.empty,
typeKey = typeKey,
ClusterShardingSettings(typedSystem).withDataCenter("dc2"),
10,
NoMore
)
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start(
ShardedEntity(
_ multiDcPinger,
typeKey,
NoMore)
.withSettings(ClusterShardingSettings(typedSystem).withDataCenter("dc2")))
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(remainingOrDefault, Pong("dc2"))

View file

@ -18,9 +18,14 @@ import akka.cluster.sharding.typed.ShardingEnvelope;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.ShardedEntity;
//#import
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogCommand;
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogBehavior;
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.PassivatePost;
public class ShardingCompileOnlyTest {
//#counter-messages
@ -82,13 +87,31 @@ public class ShardingCompileOnlyTest {
return Behaviors.same();
})
.onMessage(GoodByeCounter.class, (ctx, msg) -> {
// the handOffStopMessage, used for rebalance and passivate
// the stopMessage, used for rebalance and passivate
return Behaviors.stopped();
})
.build();
}
//#counter-passivate
public static void startPassivateExample() {
ActorSystem system = ActorSystem.create(
Behaviors.empty(), "ShardingExample"
);
ClusterSharding sharding = ClusterSharding.get(system);
//#counter-passivate-start
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
sharding.start(
ShardedEntity.create(
(shard, entityId) -> counter2(shard, entityId),
typeKey,
new GoodByeCounter()));
//#counter-passivate-start
}
public static void example() {
ActorSystem system = ActorSystem.create(
@ -99,16 +122,15 @@ public class ShardingCompileOnlyTest {
ClusterSharding sharding = ClusterSharding.get(system);
//#sharding-extension
//#spawn
//#start
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.spawn(
(shard, entityId) -> counter(entityId,0),
Props.empty(),
typeKey,
ClusterShardingSettings.create(system),
10,
new GoodByeCounter());
//#spawn
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.start(
ShardedEntity.create(
entityId -> counter(entityId,0),
typeKey,
new GoodByeCounter()));
//#start
//#send
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
@ -117,4 +139,21 @@ public class ShardingCompileOnlyTest {
shardRegion.tell(new ShardingEnvelope<>("counter-1", new Increment()));
//#send
}
public static void persistenceExample() {
ActorSystem system = ActorSystem.create(
Behaviors.empty(), "ShardingExample"
);
ClusterSharding sharding = ClusterSharding.get(system);
//#persistence
EntityTypeKey<BlogCommand> blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost");
sharding.start(
ShardedEntity.create(
BlogBehavior::behavior,
blogTypeKey,
new PassivatePost()));
//#persistence
}
}

View file

@ -70,13 +70,11 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in {
ClusterSharding(system).spawn[Command](
(_, entityId) persistentActor(entityId),
Props.empty,
ClusterSharding(system).start(ShardedEntity(
entityId persistentActor(entityId),
typeKey,
ClusterShardingSettings(system),
maxNumberOfShards = 100,
handOffStopMessage = StopPlz)
StopPlz
))
val p = TestProbe[String]()

View file

@ -44,6 +44,8 @@ object ClusterShardingSpec {
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.cluster.sharding.number-of-shards = 10
akka.coordinated-shutdown.terminate-actor-system = off
akka.actor {
@ -176,45 +178,38 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Behaviors.same
}
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn(
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity(
(shard, _) behavior(shard),
Props.empty,
typeKey,
ClusterShardingSettings(system),
10,
StopPlz())
StopPlz()))
private val shardingRef2 = sharding2.spawn(
private val shardingRef2 = sharding2.start(ShardedEntity(
(shard, _) behavior(shard),
Props.empty,
typeKey,
ClusterShardingSettings(system2),
10,
StopPlz())
StopPlz()))
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.spawnWithMessageExtractor(
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(ShardedEntity(
(shard, _) behaviorWithId(shard),
Props.empty,
typeKey2,
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
IdStopPlz())
.withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
},
None)
})
)
private val shardingRef4 = sharding2.spawnWithMessageExtractor(
private val shardingRef4 = sharding2.start(ShardedEntity(
(shard, _) behaviorWithId(shard),
Props.empty,
typeKey2,
ClusterShardingSettings(system2),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
},
None)
IdStopPlz())
.withMessageExtractor(
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
)
def totalEntityCount1(): Int = {
import akka.pattern.ask
@ -263,13 +258,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
val p = TestProbe[String]()
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.spawn(
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(ShardedEntity(
(shard, _) behavior(shard, Some(stopProbe.ref)),
Props.empty,
typeKey3,
ClusterShardingSettings(system),
10,
StopPlz())
StopPlz()))
shardingRef3 ! ShardingEnvelope(s"test1", ReplyPlz(p.ref))
p.expectMessage("Hello!")
@ -284,13 +276,10 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
"fail if starting sharding for already used typeName, but with a different type" in {
// sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] {
sharding.spawn(
sharding.start(ShardedEntity(
(shard, _) behaviorWithId(shard),
Props.empty,
EntityTypeKey[IdTestProtocol]("envelope-shard"),
ClusterShardingSettings(system),
10,
IdStopPlz())
IdStopPlz()))
}
ex.getMessage should include("already spawned")
@ -350,7 +339,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
}
}
"use the handOffStopMessage for leaving/rebalance" in {
"use the stopMessage for leaving/rebalance" in {
var replies1 = Set.empty[String]
(1 to 10).foreach { n
val p = TestProbe[String]()

View file

@ -5,8 +5,10 @@
package docs.akka.cluster.sharding.typed
import scala.concurrent.duration._
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.ShardedEntity
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost }
@ -45,17 +47,14 @@ object ShardingCompileOnlySpec {
}
//#counter
//#spawn
//#start
val TypeKey = EntityTypeKey[CounterCommand]("Counter")
// if a extractor is defined then the type would be ActorRef[BasicCommand]
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.spawn(
behavior = (shard, entityId) counter(entityId, 0),
props = Props.empty,
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(ShardedEntity(
create = entityId counter(entityId, 0),
typeKey = TypeKey,
settings = ClusterShardingSettings(system),
maxNumberOfShards = 10,
handOffStopMessage = GoodByeCounter)
//#spawn
stopMessage = GoodByeCounter))
//#start
//#send
// With an EntityRef
@ -68,14 +67,12 @@ object ShardingCompileOnlySpec {
import InDepthPersistentBehaviorSpec.behavior
//#persistence
val ShardingTypeName = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).spawn[BlogCommand](
behavior = (shard, entityId) behavior(entityId),
props = Props.empty,
typeKey = ShardingTypeName,
settings = ClusterShardingSettings(system),
maxNumberOfShards = 100,
handOffStopMessage = PassivatePost)
val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).start(ShardedEntity(
create = entityId behavior(entityId),
typeKey = BlogTypeKey,
stopMessage = PassivatePost))
//#persistence
//#counter-passivate
@ -97,7 +94,7 @@ object ShardingCompileOnlySpec {
shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
case GoodByeCounter
// the handOffStopMessage, used for rebalance and passivate
// the stopMessage, used for rebalance and passivate
Behaviors.stopped
}
@ -105,6 +102,11 @@ object ShardingCompileOnlySpec {
become(0)
}
}
sharding.start(ShardedEntity(
create = (shard, entityId) counter2(shard, entityId),
typeKey = TypeKey,
stopMessage = GoodByeCounter))
//#counter-passivate
}

View file

@ -539,6 +539,10 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
}
}
/**
* The default is currently [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] with the
* given `settings`. This could be changed in the future.
*/
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance

View file

@ -132,6 +132,8 @@ private[akka] object ClusterSingletonImpl {
@DoNotInherit
abstract class ClusterSingleton extends Extension {
// FIXME align with ClusterSharding API, issue #25480
/**
* Start if needed and provide a proxy to a named singleton
*

View file

@ -209,7 +209,7 @@ That means they will start buffering incoming messages for that shard, in the sa
shard location is unknown. During the rebalance process the coordinator will not answer any
requests for the location of shards that are being rebalanced, i.e. local buffering will
continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard
will stop all entities in that shard by sending the specified `handOffStopMessage`
will stop all entities in that shard by sending the specified `stopMessage`
(default `PoisonPill`) to them. When all entities have been terminated the `ShardRegion`
owning the entities will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of

View file

@ -45,10 +45,10 @@ Java
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #spawn }
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #start }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #spawn }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #start }
Messages to a specific entity are then sent via an EntityRef.
It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region.
@ -62,8 +62,7 @@ Java
## Persistence example
When using sharding entities can be moved to different nodes in the cluster. Persistence can be used to recover the state of
an actor after it has moved. Currently Akka typed only has a Scala API for persistence, you can track the progress of the
Java API [here](https://github.com/akka/akka/issues/24193).
an actor after it has moved.
Taking the larger example from the @ref:[persistence documentation](persistence.md#larger-example) and making it into
a sharded entity is the same as for a non persistent behavior. The behavior:
@ -71,11 +70,17 @@ a sharded entity is the same as for a non persistent behavior. The behavior:
Scala
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior }
Java
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior }
To create the entity:
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #persistence }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #persistence }
Sending messages to entities is the same as the example above. The only difference is when an entity is moved the state will be restored.
See @ref:[persistence](persistence.md) for more details.
@ -97,4 +102,5 @@ Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start }

View file

@ -133,7 +133,6 @@ public class InDepthPersistentBehaviorTest {
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;