From 70024298ac7bd2879a71b3752367c4d46feaa294 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Jun 2015 12:25:58 +0200 Subject: [PATCH] !cls #16422 Rename Entry to Entity in sharding --- .../src/main/resources/reference.conf | 22 +- .../cluster/sharding/ClusterSharding.scala | 370 +++++++++--------- .../sharding/ClusterShardingSettings.scala | 28 +- ...terShardingCustomShardAllocationSpec.scala | 2 +- .../sharding/ClusterShardingFailureSpec.scala | 12 +- .../ClusterShardingGracefulShutdownSpec.scala | 2 +- .../sharding/ClusterShardingLeavingSpec.scala | 2 +- .../sharding/ClusterShardingSpec.scala | 134 +++---- .../cluster/sharding/ClusterShardingTest.java | 24 +- .../project/migration-guide-2.3.x-2.4.x.rst | 3 + akka-docs/rst/scala/cluster-sharding.rst | 148 +++---- 11 files changed, 375 insertions(+), 372 deletions(-) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 4e06d13693..e2f38377f7 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -14,14 +14,14 @@ akka.cluster.sharding { # e.g. '/user/sharding' guardian-name = sharding - # Specifies that entries runs on cluster nodes with a specific role. + # Specifies that entities runs on cluster nodes with a specific role. # If the role is not specified (or empty) all nodes in the cluster are used. role = "" - # When this is set to 'on' the active entry actors will automatically be restarted + # When this is set to 'on' the active entity actors will automatically be restarted # upon Shard restart. i.e. if the Shard is started on a different ShardRegion # due to rebalance or crash. - remember-entries = off + remember-entities = off # If the coordinator can't store state changes it will be stopped # and started again after this duration. @@ -41,28 +41,28 @@ akka.cluster.sharding { shard-start-timeout = 10 s # If the shard can't store state changes it will retry the action - # again after this duration. Any messages sent to an affected entry + # again after this duration. Any messages sent to an affected entity # will be buffered until the state change is processed shard-failure-backoff = 10 s - # If the shard is remembering entries and an entry stops itself without - # using passivate. The entry will be restarted after this duration or when + # If the shard is remembering entities and an entity stops itself without + # using passivate. The entity will be restarted after this duration or when # the next message for it is received, which ever occurs first. - entry-restart-backoff = 10 s + entity-restart-backoff = 10 s # Rebalance check is performed periodically with this interval. rebalance-interval = 10 s - # Absolute path to the journal plugin configuration entry that is to be + # Absolute path to the journal plugin configuration entity that is to be # used for the internal persistence of ClusterSharding. If not defined # the default journal plugin is used. Note that this is not related to - # persistence used by the entry actors. + # persistence used by the entity actors. journal-plugin-id = "" - # Absolute path to the snapshot plugin configuration entry that is to be + # Absolute path to the snapshot plugin configuration entity that is to be # used for the internal persistence of ClusterSharding. If not defined # the default snapshot plugin is used. Note that this is not related to - # persistence used by the entry actors. + # persistence used by the entity actors. snapshot-plugin-id = "" # The coordinator saves persistent snapshots after this number of persistent diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index bf09a1c0a7..4919747bea 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -54,40 +54,40 @@ import java.util.Optional * several nodes in the cluster and you want to be able to interact with them using their * logical identifier, but without having to care about their physical location in the cluster, * which might also change over time. It could for example be actors representing Aggregate Roots in - * Domain-Driven Design terminology. Here we call these actors "entries". These actors + * Domain-Driven Design terminology. Here we call these actors "entities". These actors * typically have persistent (durable) state, but this feature is not limited to * actors with persistent state. * - * In this context sharding means that actors with an identifier, so called entries, - * can be automatically distributed across multiple nodes in the cluster. Each entry - * actor runs only at one place, and messages can be sent to the entry without requiring + * In this context sharding means that actors with an identifier, so called entities, + * can be automatically distributed across multiple nodes in the cluster. Each entity + * actor runs only at one place, and messages can be sent to the entity without requiring * the sender to know the location of the destination actor. This is achieved by sending * the messages via a [[ShardRegion]] actor provided by this extension, which knows how - * to route the message with the entry id to the final destination. + * to route the message with the entity id to the final destination. * * This extension is supposed to be used by first, typically at system startup on each node - * in the cluster, registering the supported entry types with the [[ClusterSharding#start]] - * method and then the `ShardRegion` actor for a named entry type can be retrieved with - * [[ClusterSharding#shardRegion]]. Messages to the entries are always sent via the local + * in the cluster, registering the supported entity types with the [[ClusterSharding#start]] + * method and then the `ShardRegion` actor for a named entity type can be retrieved with + * [[ClusterSharding#shardRegion]]. Messages to the entities are always sent via the local * `ShardRegion`. Some settings can be configured as described in the `akka.cluster.sharding` * section of the `reference.conf`. * * The `ShardRegion` actor is started on each node in the cluster, or group of nodes * tagged with a specific role. The `ShardRegion` is created with two application specific - * functions to extract the entry identifier and the shard identifier from incoming messages. - * A shard is a group of entries that will be managed together. For the first message in a + * functions to extract the entity identifier and the shard identifier from incoming messages. + * A shard is a group of entities that will be managed together. For the first message in a * specific shard the `ShardRegion` request the location of the shard from a central coordinator, * the [[ShardCoordinator]]. The `ShardCoordinator` decides which `ShardRegion` that * owns the shard. The `ShardRegion` receives the decided home of the shard * and if that is the `ShardRegion` instance itself it will create a local child - * actor representing the entry and direct all messages for that entry to it. + * actor representing the entity and direct all messages for that entity to it. * If the shard home is another `ShardRegion` instance messages will be forwarded * to that `ShardRegion` instance instead. While resolving the location of a * shard incoming messages for that shard are buffered and later delivered when the * shard home is known. Subsequent messages to the resolved shard can be delivered * to the target destination immediately without involving the `ShardCoordinator`. * - * To make sure that at most one instance of a specific entry actor is running somewhere + * To make sure that at most one instance of a specific entity actor is running somewhere * in the cluster it is important that all nodes have the same view of where the shards * are located. Therefore the shard allocation decisions are taken by the central * `ShardCoordinator`, which is running as a cluster singleton, i.e. one instance on @@ -100,18 +100,18 @@ import java.util.Optional * This strategy can be replaced by an application specific implementation. * * To be able to use newly added members in the cluster the coordinator facilitates rebalancing - * of shards, i.e. migrate entries from one node to another. In the rebalance process the + * of shards, i.e. migrate entities from one node to another. In the rebalance process the * coordinator first notifies all `ShardRegion` actors that a handoff for a shard has started. * That means they will start buffering incoming messages for that shard, in the same way as if the * shard location is unknown. During the rebalance process the coordinator will not answer any * requests for the location of shards that are being rebalanced, i.e. local buffering will * continue until the handoff is completed. The `ShardRegion` responsible for the rebalanced shard - * will stop all entries in that shard by sending `PoisonPill` to them. When all entries have - * been terminated the `ShardRegion` owning the entries will acknowledge the handoff as completed + * will stop all entities in that shard by sending `PoisonPill` to them. When all entities have + * been terminated the `ShardRegion` owning the entities will acknowledge the handoff as completed * to the coordinator. Thereafter the coordinator will reply to requests for the location of * the shard and thereby allocate a new home for the shard and then buffered messages in the - * `ShardRegion` actors are delivered to the new location. This means that the state of the entries - * are not transferred or migrated. If the state of the entries are of importance it should be + * `ShardRegion` actors are delivered to the new location. This means that the state of the entities + * are not transferred or migrated. If the state of the entities are of importance it should be * persistent (durable), e.g. with `akka-persistence`, so that it can be recovered at the new * location. * @@ -131,7 +131,7 @@ import java.util.Optional * with known location are still available, while messages for new (unknown) shards * are buffered until the new `ShardCoordinator` becomes available. * - * As long as a sender uses the same `ShardRegion` actor to deliver messages to an entry + * As long as a sender uses the same `ShardRegion` actor to deliver messages to an entity * actor the order of the messages is preserved. As long as the buffer limit is not reached * messages are delivered on a best effort basis, with at-most once delivery semantics, * in the same way as ordinary message sending. Reliable end-to-end messaging, with @@ -143,21 +143,21 @@ import java.util.Optional * shard resolution, e.g. to avoid too fine grained shards. * * The `ShardRegion` actor can also be started in proxy only mode, i.e. it will not - * host any entries itself, but knows how to delegate messages to the right location. + * host any entities itself, but knows how to delegate messages to the right location. * A `ShardRegion` starts in proxy only mode if the roles of the node does not include * the node role specified in `akka.cluster.sharding.role` config property - * or if the specified `entryProps` is `None`/`null`. + * or if the specified `entityProps` is `None`/`null`. * - * If the state of the entries are persistent you may stop entries that are not used to + * If the state of the entities are persistent you may stop entities that are not used to * reduce memory consumption. This is done by the application specific implementation of - * the entry actors for example by defining receive timeout (`context.setReceiveTimeout`). - * If a message is already enqueued to the entry when it stops itself the enqueued message + * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). + * If a message is already enqueued to the entity when it stops itself the enqueued message * in the mailbox will be dropped. To support graceful passivation without loosing such - * messages the entry actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. - * The specified wrapped message in `Passivate` will be sent back to the entry, which is + * messages the entity actor can send [[ShardRegion.Passivate]] to its parent `ShardRegion`. + * The specified wrapped message in `Passivate` will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` - * between reception of `Passivate` and termination of the entry. Such buffered messages - * are thereafter delivered to a new incarnation of the entry. + * between reception of `Passivate` and termination of the entity. Such buffered messages + * are thereafter delivered to a new incarnation of the entity. * */ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider { @@ -191,29 +191,29 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") /** - * Scala API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor - * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor + * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor + * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` - * @param role specifies that this entry type requires cluster nodes with a specific role. + * @param typeName the name of the entity type + * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` + * @param role specifies that this entity type requires cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. * @param settings configuration settings, see [[ClusterShardingSettings]] * @param shardResolver function to determine the shard id for an incoming message, only messages * that passed the `idExtractor` will be used * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic - * @param handOffStopMessage the message that will be sent to entries when they are to be stopped + * @param handOffStopMessage the message that will be sent to entities when they are to be stopped * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, @@ -222,7 +222,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { requireClusterRole(settings.role) implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, settings, + val startMsg = Start(typeName, entityProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) @@ -230,8 +230,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } /** - * Register a named entry type by defining the [[akka.actor.Props]] of the entry actor and - * functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor + * Register a named entity type by defining the [[akka.actor.Props]] of the entity actor and + * functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] @@ -240,11 +240,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param typeName the name of the entity type + * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param idExtractor partial function to extract the entry id and the message to send to the - * entry from the incoming message, if the partial function does not match the message will + * @param idExtractor partial function to extract the entity id and the message to send to the + * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream * @param shardResolver function to determine the shard id for an incoming message, only messages * that passed the `idExtractor` will be used @@ -252,7 +252,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ def start( typeName: String, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): ActorRef = { @@ -261,40 +261,40 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, PoisonPill) + start(typeName, entityProps, settings, idExtractor, shardResolver, allocationStrategy, PoisonPill) } /** - * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor - * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor + * Java API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor + * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param typeName the name of the entity type + * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the - * entry from the incoming message + * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the + * entity from the incoming message * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic - * @param handOffStopMessage the message that will be sent to entries when they are to be stopped + * @param handOffStopMessage the message that will be sent to entities when they are to be stopped * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, messageExtractor: ShardRegion.MessageExtractor, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any): ActorRef = { - start(typeName, entryProps, settings, + start(typeName, entityProps, settings, idExtractor = { - case msg if messageExtractor.entryId(msg) ne null ⇒ - (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) + case msg if messageExtractor.entityId(msg) ne null ⇒ + (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) }, shardResolver = msg ⇒ messageExtractor.shardId(msg), allocationStrategy = allocationStrategy, @@ -302,8 +302,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } /** - * Java API: Register a named entry type by defining the [[akka.actor.Props]] of the entry actor - * and functions to extract entry and shard identifier from messages. The [[ShardRegion]] actor + * Java API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor + * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] @@ -312,16 +312,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param entryProps the `Props` of the entry actors that will be created by the `ShardRegion` + * @param typeName the name of the entity type + * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] - * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the - * entry from the incoming message + * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the + * entity from the incoming message * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, messageExtractor: ShardRegion.MessageExtractor): ActorRef = { @@ -329,23 +329,23 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance) - start(typeName, entryProps, settings, messageExtractor, allocationStrategy, PoisonPill) + start(typeName, entityProps, settings, messageExtractor, allocationStrategy, PoisonPill) } /** - * Scala API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode, + * Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the * [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param role specifies that this entry type is located on cluster nodes with a specific role. + * @param typeName the name of the entity type + * @param role specifies that this entity type is located on cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param idExtractor partial function to extract the entry id and the message to send to the - * entry from the incoming message, if the partial function does not match the message will + * @param idExtractor partial function to extract the entity id and the message to send to the + * entity from the incoming message, if the partial function does not match the message will * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream * @param shardResolver function to determine the shard id for an incoming message, only messages * that passed the `idExtractor` will be used @@ -366,19 +366,19 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { } /** - * Java API: Register a named entry type `ShardRegion` on this node that will run in proxy only mode, + * Java API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode, * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any - * entry actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the + * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the * [[#shardRegion]] method. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. * - * @param typeName the name of the entry type - * @param role specifies that this entry type is located on cluster nodes with a specific role. + * @param typeName the name of the entity type + * @param role specifies that this entity type is located on cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the - * entry from the incoming message + * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the + * entity from the incoming message * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def startProxy( @@ -388,17 +388,17 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { startProxy(typeName, Option(role.orElse(null)), idExtractor = { - case msg if messageExtractor.entryId(msg) ne null ⇒ - (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) + case msg if messageExtractor.entityId(msg) ne null ⇒ + (messageExtractor.entityId(msg), messageExtractor.entityMessage(msg)) }, shardResolver = msg ⇒ messageExtractor.shardId(msg)) } /** - * Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entry type. - * The entry type must be registered with the [[#start]] method before it can be used here. - * Messages to the entry is always sent via the `ShardRegion`. + * Retrieve the actor reference of the [[ShardRegion]] actor responsible for the named entity type. + * The entity type must be registered with the [[#start]] method before it can be used here. + * Messages to the entity is always sent via the `ShardRegion`. */ def shardRegion(typeName: String): ActorRef = regions.get(typeName) match { case null ⇒ throw new IllegalArgumentException(s"Shard type [$typeName] must be started first") @@ -412,7 +412,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { */ private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy - final case class Start(typeName: String, entryProps: Props, settings: ClusterShardingSettings, + final case class Start(typeName: String, entityProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded @@ -439,7 +439,7 @@ private[akka] class ClusterShardingGuardian extends Actor { (self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress def receive = { - case Start(typeName, entryProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ + case Start(typeName, entityProps, settings, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ import settings.role import settings.tuningParameters.coordinatorFailureBackoff val encName = URLEncoder.encode(typeName, ByteString.UTF_8) @@ -460,7 +460,7 @@ private[akka] class ClusterShardingGuardian extends Actor { context.actorOf(ShardRegion.props( typeName = typeName, - entryProps = entryProps, + entityProps = entityProps, settings = settings, coordinatorPath = cPath, idExtractor = idExtractor, @@ -500,13 +500,13 @@ object ShardRegion { */ private[akka] def props( typeName: String, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, coordinatorPath: String, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any): Props = - Props(new ShardRegion(typeName, Some(entryProps), settings, coordinatorPath, idExtractor, + Props(new ShardRegion(typeName, Some(entityProps), settings, coordinatorPath, idExtractor, shardResolver, handOffStopMessage)).withDeploy(Deploy.local) /** @@ -524,9 +524,9 @@ object ShardRegion { .withDeploy(Deploy.local) /** - * Marker type of entry identifier (`String`). + * Marker type of entity identifier (`String`). */ - type EntryId = String + type EntityId = String /** * Marker type of shard identifier (`String`). */ @@ -537,15 +537,15 @@ object ShardRegion { type Msg = Any /** * Interface of the partial function used by the [[ShardRegion]] to - * extract the entry id and the message to send to the entry from an + * extract the entity id and the message to send to the entity from an * incoming message. The implementation is application specific. * If the partial function does not match the message will be * `unhandled`, i.e. posted as `Unhandled` messages on the event stream. * Note that the extracted message does not have to be the same as the incoming * message to support wrapping in message envelope that is unwrapped before - * sending to the entry actor. + * sending to the entity actor. */ - type IdExtractor = PartialFunction[Msg, (EntryId, Msg)] + type IdExtractor = PartialFunction[Msg, (EntityId, Msg)] /** * Interface of the function used by the [[ShardRegion]] to * extract the shard id from an incoming message. @@ -555,25 +555,25 @@ object ShardRegion { type ShardResolver = Msg ⇒ ShardId /** - * Java API: Interface of functions to extract entry id, - * shard id, and the message to send to the entry from an + * Java API: Interface of functions to extract entity id, + * shard id, and the message to send to the entity from an * incoming message. */ trait MessageExtractor { /** - * Extract the entry id from an incoming `message`. If `null` is returned + * Extract the entity id from an incoming `message`. If `null` is returned * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream */ - def entryId(message: Any): String + def entityId(message: Any): String /** - * Extract the message to send to the entry from an incoming `message`. + * Extract the message to send to the entity from an incoming `message`. * Note that the extracted message does not have to be the same as the incoming * message to support wrapping in message envelope that is unwrapped before - * sending to the entry actor. + * sending to the entity actor. */ - def entryMessage(message: Any): Any + def entityMessage(message: Any): Any /** - * Extract the entry id from an incoming `message`. Only messages that passed the [[#entryId]] + * Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]] * function will be used as input to this function. */ def shardId(message: Any): String @@ -582,16 +582,16 @@ object ShardRegion { sealed trait ShardRegionCommand /** - * If the state of the entries are persistent you may stop entries that are not used to + * If the state of the entities are persistent you may stop entities that are not used to * reduce memory consumption. This is done by the application specific implementation of - * the entry actors for example by defining receive timeout (`context.setReceiveTimeout`). - * If a message is already enqueued to the entry when it stops itself the enqueued message + * the entity actors for example by defining receive timeout (`context.setReceiveTimeout`). + * If a message is already enqueued to the entity when it stops itself the enqueued message * in the mailbox will be dropped. To support graceful passivation without loosing such - * messages the entry actor can send this `Passivate` message to its parent `ShardRegion`. - * The specified wrapped `stopMessage` will be sent back to the entry, which is + * messages the entity actor can send this `Passivate` message to its parent `ShardRegion`. + * The specified wrapped `stopMessage` will be sent back to the entity, which is * then supposed to stop itself. Incoming messages will be buffered by the `ShardRegion` - * between reception of `Passivate` and termination of the entry. Such buffered messages - * are thereafter delivered to a new incarnation of the entry. + * between reception of `Passivate` and termination of the entity. Such buffered messages + * are thereafter delivered to a new incarnation of the entity. * * [[akka.actor.PoisonPill]] is a perfectly fine `stopMessage`. */ @@ -640,19 +640,19 @@ object ShardRegion { if (role == "") None else Option(role) /** - * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entries and when all of + * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of * them have terminated it replies with `ShardStopped`. */ - private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any) + private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any) extends Actor { import ShardCoordinator.Internal.ShardStopped - entries.foreach { a ⇒ + entities.foreach { a ⇒ context watch a a ! stopMessage } - var remaining = entries + var remaining = entities def receive = { case Terminated(ref) ⇒ @@ -665,12 +665,12 @@ object ShardRegion { } private[akka] def handOffStopperProps( - shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any): Props = - Props(new HandOffStopper(shard, replyTo, entries, stopMessage)).withDeploy(Deploy.local) + shard: String, replyTo: ActorRef, entities: Set[ActorRef], stopMessage: Any): Props = + Props(new HandOffStopper(shard, replyTo, entities, stopMessage)).withDeploy(Deploy.local) } /** - * This actor creates children entry actors on demand for the shards that it is told to be + * This actor creates children entity actors on demand for the shards that it is told to be * responsible for. It delegates messages targeted to other shards to the responsible * `ShardRegion` actor on other nodes. * @@ -678,7 +678,7 @@ object ShardRegion { */ class ShardRegion( typeName: String, - entryProps: Option[Props], + entityProps: Option[Props], settings: ClusterShardingSettings, coordinatorPath: String, idExtractor: ShardRegion.IdExtractor, @@ -704,7 +704,7 @@ class ShardRegion( var handingOff = Set.empty[ActorRef] var gracefulShutdownInProgress = false - def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entry) ⇒ sum + entry._2.size } + def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size } import context.dispatcher val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) @@ -888,7 +888,7 @@ class ShardRegion( } def registrationMessage: Any = - if (entryProps.isDefined) Register(self) else RegisterProxy(self) + if (entityProps.isDefined) Register(self) else RegisterProxy(self) def requestShardBufferHomes(): Unit = { shardBuffers.foreach { @@ -936,7 +936,7 @@ class ShardRegion( def getShard(id: ShardId): ActorRef = shards.getOrElse( id, - entryProps match { + entityProps match { case Some(props) ⇒ log.debug("Starting shard [{}] in region", id) @@ -968,7 +968,7 @@ class ShardRegion( * @see [[ClusterSharding$ ClusterSharding extension]] */ private[akka] object Shard { - import ShardRegion.EntryId + import ShardRegion.EntityId object State { val Empty = State() @@ -986,50 +986,50 @@ private[akka] object Shard { final case class RetryPersistence(payload: StateChange) extends ShardCommand /** - * When an remembering entries and the entry stops without issuing a `Passivate`, we + * When an remembering entities and the entity stops without issuing a `Passivate`, we * restart it after a back off using this message. */ - final case class RestartEntry(entry: EntryId) extends ShardCommand + final case class RestartEntity(entity: EntityId) extends ShardCommand /** * A case class which represents a state change for the Shard */ - sealed trait StateChange { val entryId: EntryId } + sealed trait StateChange { val entityId: EntityId } /** - * `State` change for starting an entry in this `Shard` + * `State` change for starting an entity in this `Shard` */ - @SerialVersionUID(1L) final case class EntryStarted(entryId: EntryId) extends StateChange + @SerialVersionUID(1L) final case class EntityStarted(entityId: EntityId) extends StateChange /** - * `State` change for an entry which has terminated. + * `State` change for an entity which has terminated. */ - @SerialVersionUID(1L) final case class EntryStopped(entryId: EntryId) extends StateChange + @SerialVersionUID(1L) final case class EntityStopped(entityId: EntityId) extends StateChange /** * Persistent state of the Shard. */ @SerialVersionUID(1L) final case class State private ( - entries: Set[EntryId] = Set.empty) + entities: Set[EntityId] = Set.empty) /** * Factory method for the [[akka.actor.Props]] of the [[Shard]] actor. */ def props(typeName: String, shardId: ShardRegion.ShardId, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any): Props = - Props(new Shard(typeName, shardId, entryProps, settings, idExtractor, shardResolver, handOffStopMessage)) + Props(new Shard(typeName, shardId, entityProps, settings, idExtractor, shardResolver, handOffStopMessage)) .withDeploy(Deploy.local) } /** * INTERNAL API * - * This actor creates children entry actors on demand that it is told to be + * This actor creates children entity actors on demand that it is told to be * responsible for. * * @see [[ClusterSharding$ ClusterSharding extension]] @@ -1037,20 +1037,20 @@ private[akka] object Shard { private[akka] class Shard( typeName: String, shardId: ShardRegion.ShardId, - entryProps: Props, + entityProps: Props, settings: ClusterShardingSettings, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, handOffStopMessage: Any) extends PersistentActor with ActorLogging { - import ShardRegion.{ handOffStopperProps, EntryId, Msg, Passivate } + import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate } import ShardCoordinator.Internal.{ HandOff, ShardStopped } - import Shard.{ State, RetryPersistence, RestartEntry, EntryStopped, EntryStarted } + import Shard.{ State, RetryPersistence, RestartEntity, EntityStopped, EntityStarted } import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage import akka.cluster.sharding.ShardRegion.ShardRegionCommand import akka.persistence.RecoveryCompleted - import settings.rememberEntries + import settings.rememberEntities import settings.tuningParameters._ override def persistenceId = s"/sharding/${typeName}Shard/${shardId}" @@ -1060,18 +1060,18 @@ private[akka] class Shard( override def snapshotPluginId: String = settings.snapshotPluginId var state = State.Empty - var idByRef = Map.empty[ActorRef, EntryId] - var refById = Map.empty[EntryId, ActorRef] + var idByRef = Map.empty[ActorRef, EntityId] + var refById = Map.empty[EntityId, ActorRef] var passivating = Set.empty[ActorRef] - var messageBuffers = Map.empty[EntryId, Vector[(Msg, ActorRef)]] + var messageBuffers = Map.empty[EntityId, Vector[(Msg, ActorRef)]] var persistCount = 0 var handOffStopper: Option[ActorRef] = None - def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entry) ⇒ sum + entry._2.size } + def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size } def processChange[A](event: A)(handler: A ⇒ Unit): Unit = - if (rememberEntries) { + if (rememberEntities) { saveSnapshotWhenNeeded() persist(event)(handler) } else handler(event) @@ -1085,10 +1085,10 @@ private[akka] class Shard( } override def receiveRecover: Receive = { - case EntryStarted(id) if rememberEntries ⇒ state = state.copy(state.entries + id) - case EntryStopped(id) if rememberEntries ⇒ state = state.copy(state.entries - id) - case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot - case RecoveryCompleted ⇒ state.entries foreach getEntry + case EntityStarted(id) if rememberEntities ⇒ state = state.copy(state.entities + id) + case EntityStopped(id) if rememberEntities ⇒ state = state.copy(state.entities - id) + case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot + case RecoveryCompleted ⇒ state.entities foreach getEntity } override def receiveCommand: Receive = { @@ -1102,7 +1102,7 @@ private[akka] class Shard( def receiveShardCommand(msg: ShardCommand): Unit = msg match { case RetryPersistence(payload) ⇒ retryPersistence(payload) - case RestartEntry(id) ⇒ getEntry(id) + case RestartEntity(id) ⇒ getEntity(id) } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -1118,8 +1118,8 @@ private[akka] class Shard( def persistenceFailure(payload: StateChange): Unit = { log.debug("Persistence of [{}] failed, will backoff and retry", payload) - if (!messageBuffers.isDefinedAt(payload.entryId)) { - messageBuffers = messageBuffers.updated(payload.entryId, Vector.empty) + if (!messageBuffers.isDefinedAt(payload.entityId)) { + messageBuffers = messageBuffers.updated(payload.entityId, Vector.empty) } import context.dispatcher @@ -1130,8 +1130,8 @@ private[akka] class Shard( log.debug("Retrying Persistence of [{}]", payload) persist(payload) { _ ⇒ payload match { - case msg: EntryStarted ⇒ sendMsgBuffer(msg) - case msg: EntryStopped ⇒ passivateCompleted(msg) + case msg: EntityStarted ⇒ sendMsgBuffer(msg) + case msg: EntityStopped ⇒ passivateCompleted(msg) } } } @@ -1141,7 +1141,7 @@ private[akka] class Shard( case None ⇒ log.debug("HandOff shard [{}]", shardId) - if (state.entries.nonEmpty) { + if (state.entities.nonEmpty) { handOffStopper = Some(context.watch(context.actorOf( handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage)))) @@ -1161,56 +1161,56 @@ private[akka] class Shard( } else if (idByRef.contains(ref) && handOffStopper.isEmpty) { val id = idByRef(ref) if (messageBuffers.getOrElse(id, Vector.empty).nonEmpty) { - //Note; because we're not persisting the EntryStopped, we don't need - // to persist the EntryStarted either. - log.debug("Starting entry [{}] again, there are buffered messages for it", id) - sendMsgBuffer(EntryStarted(id)) + //Note; because we're not persisting the EntityStopped, we don't need + // to persist the EntityStarted either. + log.debug("Starting entity [{}] again, there are buffered messages for it", id) + sendMsgBuffer(EntityStarted(id)) } else { - if (rememberEntries && !passivating.contains(ref)) { - log.debug("Entry [{}] stopped without passivating, will restart after backoff", id) + if (rememberEntities && !passivating.contains(ref)) { + log.debug("Entity [{}] stopped without passivating, will restart after backoff", id) import context.dispatcher - context.system.scheduler.scheduleOnce(entryRestartBackoff, self, RestartEntry(id)) - } else processChange(EntryStopped(id))(passivateCompleted) + context.system.scheduler.scheduleOnce(entityRestartBackoff, self, RestartEntity(id)) + } else processChange(EntityStopped(id))(passivateCompleted) } passivating = passivating - ref } } - def passivate(entry: ActorRef, stopMessage: Any): Unit = { - idByRef.get(entry) match { + def passivate(entity: ActorRef, stopMessage: Any): Unit = { + idByRef.get(entity) match { case Some(id) if !messageBuffers.contains(id) ⇒ - log.debug("Passivating started on entry {}", id) + log.debug("Passivating started on entity {}", id) - passivating = passivating + entry + passivating = passivating + entity messageBuffers = messageBuffers.updated(id, Vector.empty) - entry ! stopMessage + entity ! stopMessage case _ ⇒ //ignored } } - // EntryStopped persistence handler - def passivateCompleted(event: EntryStopped): Unit = { - log.debug("Entry stopped [{}]", event.entryId) + // EntityStopped persistence handler + def passivateCompleted(event: EntityStopped): Unit = { + log.debug("Entity stopped [{}]", event.entityId) - val ref = refById(event.entryId) + val ref = refById(event.entityId) idByRef -= ref - refById -= event.entryId + refById -= event.entityId - state = state.copy(state.entries - event.entryId) - messageBuffers = messageBuffers - event.entryId + state = state.copy(state.entities - event.entityId) + messageBuffers = messageBuffers - event.entityId } - // EntryStarted persistence handler - def sendMsgBuffer(event: EntryStarted): Unit = { + // EntityStarted persistence handler + def sendMsgBuffer(event: EntityStarted): Unit = { //Get the buffered messages and remove the buffer - val messages = messageBuffers.getOrElse(event.entryId, Vector.empty) - messageBuffers = messageBuffers - event.entryId + val messages = messageBuffers.getOrElse(event.entityId, Vector.empty) + messageBuffers = messageBuffers - event.entityId if (messages.nonEmpty) { - log.debug("Sending message buffer for entry [{}] ([{}] messages)", event.entryId, messages.size) - getEntry(event.entryId) + log.debug("Sending message buffer for entity [{}] ([{}] messages)", event.entityId, messages.size) + getEntity(event.entityId) //Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded @@ -1230,42 +1230,42 @@ private[akka] class Shard( case None ⇒ deliverTo(id, msg, payload, snd) case Some(buf) if totalBufferSize >= bufferSize ⇒ - log.debug("Buffer is full, dropping message for entry [{}]", id) + log.debug("Buffer is full, dropping message for entity [{}]", id) context.system.deadLetters ! msg case Some(buf) ⇒ - log.debug("Message for entry [{}] buffered", id) + log.debug("Message for entity [{}] buffered", id) messageBuffers = messageBuffers.updated(id, buf :+ ((msg, snd))) } } } - def deliverTo(id: EntryId, msg: Any, payload: Msg, snd: ActorRef): Unit = { + def deliverTo(id: EntityId, msg: Any, payload: Msg, snd: ActorRef): Unit = { val name = URLEncoder.encode(id, "utf-8") context.child(name) match { case Some(actor) ⇒ actor.tell(payload, snd) - case None if rememberEntries ⇒ + case None if rememberEntities ⇒ //Note; we only do this if remembering, otherwise the buffer is an overhead messageBuffers = messageBuffers.updated(id, Vector((msg, snd))) saveSnapshotWhenNeeded() - persist(EntryStarted(id))(sendMsgBuffer) + persist(EntityStarted(id))(sendMsgBuffer) case None ⇒ - getEntry(id).tell(payload, snd) + getEntity(id).tell(payload, snd) } } - def getEntry(id: EntryId): ActorRef = { + def getEntity(id: EntityId): ActorRef = { val name = URLEncoder.encode(id, "utf-8") context.child(name).getOrElse { - log.debug("Starting entry [{}] in shard [{}]", id, shardId) + log.debug("Starting entity [{}] in shard [{}]", id, shardId) - val a = context.watch(context.actorOf(entryProps, name)) + val a = context.watch(context.actorOf(entityProps, name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) - state = state.copy(state.entries + id) + state = state.copy(state.entities + id) a } } @@ -1492,12 +1492,12 @@ object ShardCoordinator { /** * When all `ShardRegion` actors have acknoledged the `BeginHandOff` the * `ShardCoordinator` sends this message to the `ShardRegion` responsible for the - * shard. The `ShardRegion` is supposed to stop all entries in that shard and when - * all entries have terminated reply with `ShardStopped` to the `ShardCoordinator`. + * shard. The `ShardRegion` is supposed to stop all entities in that shard and when + * all entities have terminated reply with `ShardStopped` to the `ShardCoordinator`. */ @SerialVersionUID(1L) final case class HandOff(shard: ShardId) extends CoordinatorMessage /** - * Reply to `HandOff` when all entries in the shard have been terminated. + * Reply to `HandOff` when all entities in the shard have been terminated. */ @SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 4295628f86..636e06caa1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -30,7 +30,7 @@ object ClusterShardingSettings { handOffTimeout = config.getDuration("handoff-timeout", MILLISECONDS).millis, shardStartTimeout = config.getDuration("shard-start-timeout", MILLISECONDS).millis, shardFailureBackoff = config.getDuration("shard-failure-backoff", MILLISECONDS).millis, - entryRestartBackoff = config.getDuration("entry-restart-backoff", MILLISECONDS).millis, + entityRestartBackoff = config.getDuration("entity-restart-backoff", MILLISECONDS).millis, rebalanceInterval = config.getDuration("rebalance-interval", MILLISECONDS).millis, snapshotAfter = config.getInt("snapshot-after"), leastShardAllocationRebalanceThreshold = @@ -42,7 +42,7 @@ object ClusterShardingSettings { new ClusterShardingSettings( role = roleOption(config.getString("role")), - rememberEntries = config.getBoolean("remember-entries"), + rememberEntities = config.getBoolean("remember-entities"), journalPluginId = config.getString("journal-plugin-id"), snapshotPluginId = config.getString("snapshot-plugin-id"), tuningParameters, @@ -74,7 +74,7 @@ object ClusterShardingSettings { val handOffTimeout: FiniteDuration, val shardStartTimeout: FiniteDuration, val shardFailureBackoff: FiniteDuration, - val entryRestartBackoff: FiniteDuration, + val entityRestartBackoff: FiniteDuration, val rebalanceInterval: FiniteDuration, val snapshotAfter: Int, val leastShardAllocationRebalanceThreshold: Int, @@ -82,23 +82,23 @@ object ClusterShardingSettings { } /** - * @param role specifies that this entry type requires cluster nodes with a specific role. + * @param role specifies that this entity type requires cluster nodes with a specific role. * If the role is not specified all nodes in the cluster are used. - * @param rememberEntries true if active entry actors shall be automatically restarted upon `Shard` + * @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard` * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. - * @param journalPluginId Absolute path to the journal plugin configuration entry that is to + * @param journalPluginId Absolute path to the journal plugin configuration entity that is to * be used for the internal persistence of ClusterSharding. If not defined the default - * journal plugin is used. Note that this is not related to persistence used by the entry + * journal plugin is used. Note that this is not related to persistence used by the entity * actors. - * @param snapshotPluginId Absolute path to the snapshot plugin configuration entry that is to + * @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to * be used for the internal persistence of ClusterSharding. If not defined the default - * snapshot plugin is used. Note that this is not related to persistence used by the entry + * snapshot plugin is used. Note that this is not related to persistence used by the entity * actors. * @param tuningParameters additional tuning parameters, see descriptions in reference.conf */ final class ClusterShardingSettings( val role: Option[String], - val rememberEntries: Boolean, + val rememberEntities: Boolean, val journalPluginId: String, val snapshotPluginId: String, val tuningParameters: ClusterShardingSettings.TuningParameters, @@ -108,8 +108,8 @@ final class ClusterShardingSettings( def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role) - def withRememberEntries(rememberEntries: Boolean): ClusterShardingSettings = - copy(rememberEntries = rememberEntries) + def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings = + copy(rememberEntities = rememberEntities) def withJournalPluginId(journalPluginId: String): ClusterShardingSettings = copy(journalPluginId = journalPluginId) @@ -124,14 +124,14 @@ final class ClusterShardingSettings( copy(coordinatorSingletonSettings = coordinatorSingletonSettings) private def copy(role: Option[String] = role, - rememberEntries: Boolean = rememberEntries, + rememberEntities: Boolean = rememberEntities, journalPluginId: String = journalPluginId, snapshotPluginId: String = snapshotPluginId, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = new ClusterShardingSettings( role, - rememberEntries, + rememberEntities, journalPluginId, snapshotPluginId, tuningParameters, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 36ee134ebb..37b5ce3935 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -134,7 +134,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Props[Entity], + entityProps = Props[Entity], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index edaeaf6170..063083b652 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -111,8 +111,8 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Props[Entity], - settings = ClusterShardingSettings(system).withRememberEntries(true), + entityProps = Props[Entity], + settings = ClusterShardingSettings(system).withRememberEntities(true), idExtractor = idExtractor, shardResolver = shardResolver) } @@ -183,17 +183,17 @@ class ClusterShardingFailureSpec extends MultiNodeSpec(ClusterShardingFailureSpe runOn(first) { region ! Get("21") expectMsg(Value("21", 3)) - val entry21 = lastSender - val shard2 = system.actorSelection(entry21.path.parent) + val entity21 = lastSender + val shard2 = system.actorSelection(entity21.path.parent) //Test the ShardCoordinator allocating shards during a journal failure region ! Add("30", 3) - //Test the Shard starting entries and persisting during a journal failure + //Test the Shard starting entities and persisting during a journal failure region ! Add("11", 1) //Test the Shard passivate works during a journal failure - shard2.tell(Passivate(PoisonPill), entry21) + shard2.tell(Passivate(PoisonPill), entity21) region ! Add("21", 1) region ! Get("21") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index b6e3043687..afb9ca8ea2 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -121,7 +121,7 @@ class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingG val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) ClusterSharding(system).start( typeName = "Entity", - entryProps = Props[Entity], + entityProps = Props[Entity], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index fdb5f69be6..78f5a675e0 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -123,7 +123,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe def startSharding(): Unit = { ClusterSharding(system).start( typeName = "Entity", - entryProps = Props[Entity], + entityProps = Props[Entity], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 6634bd991f..a19c0008be 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -53,7 +53,7 @@ object ClusterShardingSpec extends MultiNodeConfig { retry-interval = 1 s handoff-timeout = 10 s shard-start-timeout = 5s - entry-restart-backoff = 1s + entity-restart-backoff = 1s rebalance-interval = 2 s least-shard-allocation-strategy { rebalance-threshold = 2 @@ -70,7 +70,7 @@ object ClusterShardingSpec extends MultiNodeConfig { case object Increment case object Decrement final case class Get(counterId: Long) - final case class EntryEnvelope(id: Long, payload: Any) + final case class EntityEnvelope(id: Long, payload: Any) case object Stop final case class CounterChanged(delta: Int) @@ -81,7 +81,7 @@ object ClusterShardingSpec extends MultiNodeConfig { context.setReceiveTimeout(120.seconds) // self.path.parent.parent.name is the type name (utf-8 URL-encoded) - // self.path.name is the entry identifier (utf-8 URL-encoded) + // self.path.name is the entity identifier (utf-8 URL-encoded) override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name var count = 0 @@ -112,15 +112,15 @@ object ClusterShardingSpec extends MultiNodeConfig { //#counter-actor val idExtractor: ShardRegion.IdExtractor = { - case EntryEnvelope(id, payload) ⇒ (id.toString, payload) - case msg @ Get(id) ⇒ (id.toString, msg) + case EntityEnvelope(id, payload) ⇒ (id.toString, payload) + case msg @ Get(id) ⇒ (id.toString, msg) } val numberOfShards = 12 val shardResolver: ShardRegion.ShardResolver = { - case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString - case Get(id) ⇒ (id % numberOfShards).toString + case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString } } @@ -131,15 +131,15 @@ object ClusterShardingDocCode { //#counter-extractor val idExtractor: ShardRegion.IdExtractor = { - case EntryEnvelope(id, payload) ⇒ (id.toString, payload) - case msg @ Get(id) ⇒ (id.toString, msg) + case EntityEnvelope(id, payload) ⇒ (id.toString, payload) + case msg @ Get(id) ⇒ (id.toString, msg) } val numberOfShards = 100 val shardResolver: ShardRegion.ShardResolver = { - case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString - case Get(id) ⇒ (id % numberOfShards).toString + case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString } //#counter-extractor @@ -195,7 +195,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult ShardCoordinator.props(typeName, settings, allocationStrategy) } - List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter", + List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter", "PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { typeName ⇒ val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") system.actorOf(ClusterSingletonManager.props( @@ -207,18 +207,18 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } } - def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = { + def createRegion(typeName: String, rememberEntities: Boolean): ActorRef = { val cfg = ConfigFactory.parseString(""" retry-interval = 1s shard-failure-backoff = 1s - entry-restart-backoff = 1s + entity-restart-backoff = 1s buffer-size = 1000 """).withFallback(system.settings.config.getConfig("akka.cluster.sharding")) val settings = ClusterShardingSettings(cfg) - .withRememberEntries(rememberEntries) + .withRememberEntities(rememberEntities) system.actorOf(ShardRegion.props( typeName = typeName, - entryProps = Props[Counter], + entityProps = Props[Counter], settings = settings, coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator", idExtractor = idExtractor, @@ -227,14 +227,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult name = typeName + "Region") } - lazy val region = createRegion("counter", rememberEntries = false) - lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false) + lazy val region = createRegion("counter", rememberEntities = false) + lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntities = false) - lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true) - lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true) - lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true) - lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntries = true) - lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true) + lazy val persistentEntitiesRegion = createRegion("PersistentCounterEntities", rememberEntities = true) + lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntities = true) + lazy val persistentRegion = createRegion("PersistentCounter", rememberEntities = true) + lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntities = true) + lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntities = true) "Cluster sharding" must { @@ -259,10 +259,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult join(first, first) runOn(first) { - region ! EntryEnvelope(1, Increment) - region ! EntryEnvelope(1, Increment) - region ! EntryEnvelope(1, Increment) - region ! EntryEnvelope(1, Decrement) + region ! EntityEnvelope(1, Increment) + region ! EntityEnvelope(1, Increment) + region ! EntityEnvelope(1, Increment) + region ! EntityEnvelope(1, Decrement) region ! Get(1) expectMsg(2) @@ -277,15 +277,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult join(second, first) runOn(second) { - region ! EntryEnvelope(2, Increment) - region ! EntryEnvelope(2, Increment) - region ! EntryEnvelope(2, Increment) - region ! EntryEnvelope(2, Decrement) + region ! EntityEnvelope(2, Increment) + region ! EntityEnvelope(2, Increment) + region ! EntityEnvelope(2, Increment) + region ! EntityEnvelope(2, Decrement) region ! Get(2) expectMsg(2) - region ! EntryEnvelope(11, Increment) - region ! EntryEnvelope(12, Increment) + region ! EntityEnvelope(11, Increment) + region ! EntityEnvelope(12, Increment) region ! Get(11) expectMsg(1) region ! Get(12) @@ -293,7 +293,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } enterBarrier("second-update") runOn(first) { - region ! EntryEnvelope(2, Increment) + region ! EntityEnvelope(2, Increment) region ! Get(2) expectMsg(3) lastSender.path should ===(node(second) / "user" / "counterRegion" / "2" / "2") @@ -320,14 +320,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-3") } - "support passivation and activation of entries" in { + "support passivation and activation of entities" in { runOn(second) { region ! Get(2) expectMsg(3) - region ! EntryEnvelope(2, ReceiveTimeout) + region ! EntityEnvelope(2, ReceiveTimeout) // let the Passivate-Stop roundtrip begin to trigger buffering of subsequent messages Thread.sleep(200) - region ! EntryEnvelope(2, Increment) + region ! EntityEnvelope(2, Increment) region ! Get(2) expectMsg(4) } @@ -396,7 +396,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(third) { for (_ ← 1 to 10) - region ! EntryEnvelope(3, Increment) + region ! EntityEnvelope(3, Increment) region ! Get(3) expectMsg(10) lastSender.path should ===(region.path / "3" / "3") // local @@ -405,7 +405,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(fourth) { for (_ ← 1 to 20) - region ! EntryEnvelope(4, Increment) + region ! EntityEnvelope(4, Increment) region ! Get(4) expectMsg(20) lastSender.path should ===(region.path / "4" / "4") // local @@ -413,12 +413,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("fourth-update") runOn(first) { - region ! EntryEnvelope(3, Increment) + region ! EntityEnvelope(3, Increment) region ! Get(3) expectMsg(11) lastSender.path should ===(node(third) / "user" / "counterRegion" / "3" / "3") - region ! EntryEnvelope(4, Increment) + region ! EntityEnvelope(4, Increment) region ! Get(4) expectMsg(21) lastSender.path should ===(node(fourth) / "user" / "counterRegion" / "4" / "4") @@ -475,7 +475,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(fourth) { for (n ← 1 to 10) { - rebalancingRegion ! EntryEnvelope(n, Increment) + rebalancingRegion ! EntityEnvelope(n, Increment) rebalancingRegion ! Get(n) expectMsg(1) } @@ -510,14 +510,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //#counter-start val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", - entryProps = Props[Counter], + entityProps = Props[Counter], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) //#counter-start ClusterSharding(system).start( typeName = "AnotherCounter", - entryProps = Props[Counter], + entityProps = Props[Counter], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) @@ -529,12 +529,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult counterRegion ! Get(123) expectMsg(0) - counterRegion ! EntryEnvelope(123, Increment) + counterRegion ! EntityEnvelope(123, Increment) counterRegion ! Get(123) expectMsg(1) //#counter-usage - ClusterSharding(system).shardRegion("AnotherCounter") ! EntryEnvelope(123, Decrement) + ClusterSharding(system).shardRegion("AnotherCounter") ! EntityEnvelope(123, Decrement) ClusterSharding(system).shardRegion("AnotherCounter") ! Get(123) expectMsg(-1) } @@ -544,7 +544,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult // sixth is a frontend node, i.e. proxy only runOn(sixth) { for (n ← 1000 to 1010) { - ClusterSharding(system).shardRegion("Counter") ! EntryEnvelope(n, Increment) + ClusterSharding(system).shardRegion("Counter") ! EntityEnvelope(n, Increment) ClusterSharding(system).shardRegion("Counter") ! Get(n) expectMsg(1) lastSender.path.address should not be (Cluster(system).selfAddress) @@ -558,7 +558,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(first) { val counterRegionViaStart: ActorRef = ClusterSharding(system).start( typeName = "ApiTest", - entryProps = Props[Counter], + entityProps = Props[Counter], settings = ClusterShardingSettings(system), idExtractor = idExtractor, shardResolver = shardResolver) @@ -572,17 +572,17 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } "Persistent Cluster Shards" must { - "recover entries upon restart" in within(50.seconds) { + "recover entities upon restart" in within(50.seconds) { runOn(third, fourth, fifth) { - persistentEntriesRegion + persistentEntitiesRegion anotherPersistentRegion } enterBarrier("persistent-started") runOn(third) { //Create an increment counter 1 - persistentEntriesRegion ! EntryEnvelope(1, Increment) - persistentEntriesRegion ! Get(1) + persistentEntitiesRegion ! EntityEnvelope(1, Increment) + persistentEntitiesRegion ! Get(1) expectMsg(1) //Shut down the shard and confirm it's dead @@ -600,7 +600,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult }, 5 seconds, 500 millis) //Get the path to where the shard now resides - persistentEntriesRegion ! Get(13) + persistentEntitiesRegion ! Get(13) expectMsg(0) //Check that counter 1 is now alive again, even though we have @@ -619,7 +619,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Check a second region does not share the same persistent shards //Create a separate 13 counter - anotherPersistentRegion ! EntryEnvelope(13, Increment) + anotherPersistentRegion ! EntityEnvelope(13, Increment) anotherPersistentRegion ! Get(13) expectMsg(1) @@ -632,7 +632,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-12") } - "permanently stop entries which passivate" in within(15.seconds) { + "permanently stop entities which passivate" in within(15.seconds) { runOn(third, fourth, fifth) { persistentRegion } @@ -640,7 +640,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(third) { //Create and increment counter 1 - persistentRegion ! EntryEnvelope(1, Increment) + persistentRegion ! EntityEnvelope(1, Increment) persistentRegion ! Get(1) expectMsg(1) @@ -649,7 +649,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult val region = system.actorSelection(counter1.path.parent.parent) //Create and increment counter 13 - persistentRegion ! EntryEnvelope(13, Increment) + persistentRegion ! EntityEnvelope(13, Increment) persistentRegion ! Get(13) expectMsg(1) @@ -668,7 +668,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult awaitAssert({ //Check counter 1 is dead counter1.tell(Identify(1), probe1.ref) - probe1.expectMsg(1 second, "Entry 1 was still around", ActorIdentity(1, None)) + probe1.expectMsg(1 second, "Entity 1 was still around", ActorIdentity(1, None)) }, 5 second, 500 millis) //Stop the shard cleanly @@ -703,7 +703,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult enterBarrier("after-13") } - "restart entries which stop without passivating" in within(50.seconds) { + "restart entities which stop without passivating" in within(50.seconds) { runOn(third, fourth) { persistentRegion } @@ -711,7 +711,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult runOn(third) { //Create and increment counter 1 - persistentRegion ! EntryEnvelope(1, Increment) + persistentRegion ! EntityEnvelope(1, Increment) persistentRegion ! Get(1) expectMsg(2) @@ -731,9 +731,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "be migrated to new regions upon region failure" in within(15.seconds) { - //Start only one region, and force an entry onto that region + //Start only one region, and force an entity onto that region runOn(third) { - autoMigrateRegion ! EntryEnvelope(1, Increment) + autoMigrateRegion ! EntityEnvelope(1, Increment) autoMigrateRegion ! Get(1) expectMsg(1) } @@ -741,7 +741,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Start another region and test it talks to node 3 runOn(fourth) { - autoMigrateRegion ! EntryEnvelope(1, Increment) + autoMigrateRegion ! EntityEnvelope(1, Increment) autoMigrateRegion ! Get(1) expectMsg(2) @@ -772,7 +772,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "ensure rebalance restarts shards" in within(50.seconds) { runOn(fourth) { for (i ← 2 to 12) { - rebalancingPersistentRegion ! EntryEnvelope(i, Increment) + rebalancingPersistentRegion ! EntityEnvelope(i, Increment) } for (i ← 2 to 12) { @@ -780,7 +780,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult expectMsg(1) } } - enterBarrier("entries-started") + enterBarrier("entities-started") runOn(fifth) { rebalancingPersistentRegion @@ -791,8 +791,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult awaitAssert { var count = 0 for (n ← 2 to 12) { - val entry = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString) - entry ! Identify(n) + val entity = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString) + entity ! Identify(n) receiveOne(3 seconds) match { case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1 case ActorIdentity(id, None) ⇒ //Not on the fifth shard diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index c0f862a2ab..d9fc4b07e3 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -34,9 +34,9 @@ public class ClusterShardingTest { ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() { @Override - public String entryId(Object message) { - if (message instanceof Counter.EntryEnvelope) - return String.valueOf(((Counter.EntryEnvelope) message).id); + public String entityId(Object message) { + if (message instanceof Counter.EntityEnvelope) + return String.valueOf(((Counter.EntityEnvelope) message).id); else if (message instanceof Counter.Get) return String.valueOf(((Counter.Get) message).counterId); else @@ -44,9 +44,9 @@ public class ClusterShardingTest { } @Override - public Object entryMessage(Object message) { - if (message instanceof Counter.EntryEnvelope) - return ((Counter.EntryEnvelope) message).payload; + public Object entityMessage(Object message) { + if (message instanceof Counter.EntityEnvelope) + return ((Counter.EntityEnvelope) message).payload; else return message; } @@ -54,8 +54,8 @@ public class ClusterShardingTest { @Override public String shardId(Object message) { int numberOfShards = 100; - if (message instanceof Counter.EntryEnvelope) { - long id = ((Counter.EntryEnvelope) message).id; + if (message instanceof Counter.EntityEnvelope) { + long id = ((Counter.EntityEnvelope) message).id; return String.valueOf(id % numberOfShards); } else if (message instanceof Counter.Get) { long id = ((Counter.Get) message).counterId; @@ -79,7 +79,7 @@ public class ClusterShardingTest { ActorRef counterRegion = ClusterSharding.get(system).shardRegion("Counter"); counterRegion.tell(new Counter.Get(123), getSelf()); - counterRegion.tell(new Counter.EntryEnvelope(123, + counterRegion.tell(new Counter.EntityEnvelope(123, Counter.CounterOp.INCREMENT), getSelf()); counterRegion.tell(new Counter.Get(123), getSelf()); //#counter-usage @@ -100,11 +100,11 @@ public class ClusterShardingTest { } } - public static class EntryEnvelope { + public static class EntityEnvelope { final public long id; final public Object payload; - public EntryEnvelope(long id, Object payload) { + public EntityEnvelope(long id, Object payload) { this.id = id; this.payload = payload; } @@ -121,7 +121,7 @@ public class ClusterShardingTest { int count = 0; // getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded) - // getSelf().path().name() is the entry identifier (utf-8 URL-encoded) + // getSelf().path().name() is the entity identifier (utf-8 URL-encoded) @Override public String persistenceId() { return getSelf().path().parent().parent().name() + "-" + getSelf().path().name(); diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index f2ec29584f..b5819ddfcc 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -238,6 +238,9 @@ These settings can be defined differently per entry type if needed. Starting the ``ShardRegion`` in proxy mode is now done with the ``startProxy`` method of the ``ClusterSharding`` extension instead of the optional ``entryProps`` parameter. +Entry was renamed to Entity, for example in the ``MessagesExtractor`` in the Java API +and the ``EntityId`` type in the Scala API. + ClusterSingletonManager and ClusterSingletonProxy construction ============================================================== diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 27b79e8d8c..733bded617 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -8,131 +8,131 @@ be able to interact with them using their logical identifier, but without having their physical location in the cluster, which might also change over time. It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. -Here we call these actors "entries". These actors typically have persistent (durable) state, +Here we call these actors "entities". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state. Cluster sharding is typically used when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors it might be easier to run them on a :ref:`cluster-singleton` node. -In this context sharding means that actors with an identifier, so called entries, -can be automatically distributed across multiple nodes in the cluster. Each entry -actor runs only at one place, and messages can be sent to the entry without requiring +In this context sharding means that actors with an identifier, so called entities, +can be automatically distributed across multiple nodes in the cluster. Each entity +actor runs only at one place, and messages can be sent to the entity without requiring the sender to know the location of the destination actor. This is achieved by sending the messages via a ``ShardRegion`` actor provided by this extension, which knows how -to route the message with the entry id to the final destination. +to route the message with the entity id to the final destination. An Example in Java ------------------ -This is how an entry actor may look like: +This is how an entity actor may look like: .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state. -It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover +It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover its state if it is valuable. Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique. When using the sharding extension you are first, typically at system startup on each node -in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` +in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start`` method. ``ClusterSharding.start`` gives you the reference which you can pass along. .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-start -The ``messageExtractor`` defines application specific methods to extract the entry +The ``messageExtractor`` defines application specific methods to extract the entity identifier and the shard identifier from incoming messages. .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-extractor -This example illustrates two different ways to define the entry identifier in the messages: +This example illustrates two different ways to define the entity identifier in the messages: * The ``Get`` message includes the identifier itself. - * The ``EntryEnvelope`` holds the identifier, and the actual message that is - sent to the entry actor is wrapped in the envelope. + * The ``EntityEnvelope`` holds the identifier, and the actual message that is + sent to the entity actor is wrapped in the envelope. -Note how these two messages types are handled in the ``entryId`` and ``entryMessage`` methods shown above. -The message sent to the entry actor is what ``entryMessage`` returns and that makes it possible to unwrap envelopes +Note how these two messages types are handled in the ``entityId`` and ``entityMessage`` methods shown above. +The message sent to the entity actor is what ``entityMessage`` returns and that makes it possible to unwrap envelopes if needed. -A shard is a group of entries that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always -be the same. Otherwise the entry actor might accidentally be started in several places at the same time. +A shard is a group of entities that will be managed together. The grouping is defined by the +``shardResolver`` function shown above. For a specific entity identifier the shard identifier must always +be the same. Otherwise the entity actor might accidentally be started in several places at the same time. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, -i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater +i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster. -A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entry identifier modulo +A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entity identifier modulo number of shards. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a -named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. -The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will -delegate the message to the right node and it will create the entry actor on demand, i.e. when the -first message for a specific entry is delivered. +Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a +named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. +The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will +delegate the message to the right node and it will create the entity actor on demand, i.e. when the +first message for a specific entity is delivered. .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-usage An Example in Scala ------------------- -This is how an entry actor may look like: +This is how an entity actor may look like: .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-actor The above actor uses event sourcing and the support provided in ``PersistentActor`` to store its state. -It does not have to be a persistent actor, but in case of failure or migration of entries between nodes it must be able to recover +It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover its state if it is valuable. Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique. When using the sharding extension you are first, typically at system startup on each node -in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` +in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start`` method. ``ClusterSharding.start`` gives you the reference which you can pass along. .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-start -The ``idExtractor`` and ``shardResolver`` are two application specific functions to extract the entry +The ``idExtractor`` and ``shardResolver`` are two application specific functions to extract the entity identifier and the shard identifier from incoming messages. .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-extractor -This example illustrates two different ways to define the entry identifier in the messages: +This example illustrates two different ways to define the entity identifier in the messages: * The ``Get`` message includes the identifier itself. - * The ``EntryEnvelope`` holds the identifier, and the actual message that is - sent to the entry actor is wrapped in the envelope. + * The ``EntityEnvelope`` holds the identifier, and the actual message that is + sent to the entity actor is wrapped in the envelope. Note how these two messages types are handled in the ``idExtractor`` function shown above. -The message sent to the entry actor is the second part of the tuple return by the ``idExtractor`` and that makes it +The message sent to the entity actor is the second part of the tuple return by the ``idExtractor`` and that makes it possible to unwrap envelopes if needed. -A shard is a group of entries that will be managed together. The grouping is defined by the -``shardResolver`` function shown above. For a specific entry identifier the shard identifier must always +A shard is a group of entities that will be managed together. The grouping is defined by the +``shardResolver`` function shown above. For a specific entity identifier the shard identifier must always be the same. Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, -i.e. same amount of entries in each shard. As a rule of thumb, the number of shards should be a factor ten greater +i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster. -A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entry identifier modulo +A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entity identifier modulo number of shards. -Messages to the entries are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a -named entry type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. -The ``ShardRegion`` will lookup the location of the shard for the entry if it does not already know its location. It will -delegate the message to the right node and it will create the entry actor on demand, i.e. when the -first message for a specific entry is delivered. +Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a +named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``. +The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will +delegate the message to the right node and it will create the entity actor on demand, i.e. when the +first message for a specific entity is delivered. .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala#counter-usage @@ -144,16 +144,16 @@ How it works The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes tagged with a specific role. The ``ShardRegion`` is created with two application specific -functions to extract the entry identifier and the shard identifier from incoming messages. -A shard is a group of entries that will be managed together. For the first message in a +functions to extract the entity identifier and the shard identifier from incoming messages. +A shard is a group of entities that will be managed together. For the first message in a specific shard the ``ShardRegion`` request the location of the shard from a central coordinator, the ``ShardCoordinator``. The ``ShardCoordinator`` decides which ``ShardRegion`` shall own the ``Shard`` and informs that ``ShardRegion``. The region will confirm this request and create the ``Shard`` supervisor -as a child actor. The individual ``Entries`` will then be created when needed by the ``Shard`` +as a child actor. The individual ``Entities`` will then be created when needed by the ``Shard`` actor. Incoming messages thus travel via the ``ShardRegion`` and the ``Shard`` to the target -``Entry``. +``Entity``. If the shard home is another ``ShardRegion`` instance messages will be forwarded to that ``ShardRegion`` instance instead. While resolving the location of a @@ -166,8 +166,8 @@ Scenario 1: #. Incoming message M1 to ``ShardRegion`` instance R1. #. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1. #. C answers that the home of S1 is R1. -#. R1 creates child actor for the entry E1 and sends buffered messages for S1 to E1 child -#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entry children as needed, and forwards messages to them. +#. R1 creates child actor for the entity E1 and sends buffered messages for S1 to E1 child +#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entity children as needed, and forwards messages to them. Scenario 2: @@ -178,7 +178,7 @@ Scenario 2: #. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2. #. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2). -To make sure that at most one instance of a specific entry actor is running somewhere +To make sure that at most one instance of a specific entity actor is running somewhere in the cluster it is important that all nodes have the same view of where the shards are located. Therefore the shard allocation decisions are taken by the central ``ShardCoordinator``, which is running as a cluster singleton, i.e. one instance on @@ -191,19 +191,19 @@ allocates new shards to the ``ShardRegion`` with least number of previously allo This strategy can be replaced by an application specific implementation. To be able to use newly added members in the cluster the coordinator facilitates rebalancing -of shards, i.e. migrate entries from one node to another. In the rebalance process the +of shards, i.e. migrate entities from one node to another. In the rebalance process the coordinator first notifies all ``ShardRegion`` actors that a handoff for a shard has started. That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown. During the rebalance process the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard -will stop all entries in that shard by sending the specified ``handOffStopMessage`` -(default ``PoisonPill``) to them. When all entries have been terminated the ``ShardRegion`` -owning the entries will acknowledge the handoff as completed to the coordinator. +will stop all entities in that shard by sending the specified ``handOffStopMessage`` +(default ``PoisonPill``) to them. When all entities have been terminated the ``ShardRegion`` +owning the entities will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of the shard and thereby allocate a new home for the shard and then buffered messages in the -``ShardRegion`` actors are delivered to the new location. This means that the state of the entries -are not transferred or migrated. If the state of the entries are of importance it should be +``ShardRegion`` actors are delivered to the new location. This means that the state of the entities +are not transferred or migrated. If the state of the entities are of importance it should be persistent (durable), e.g. with ``akka-persistence``, so that it can be recovered at the new location. @@ -223,7 +223,7 @@ actor will take over and the state is recovered. During such a failure period sh with known location are still available, while messages for new (unknown) shards are buffered until the new ``ShardCoordinator`` becomes available. -As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entry +As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entity actor the order of the messages is preserved. As long as the buffer limit is not reached messages are delivered on a best effort basis, with at-most once delivery semantics, in the same way as ordinary message sending. Reliable end-to-end messaging, with @@ -238,42 +238,42 @@ Proxy Only Mode --------------- The ``ShardRegion`` actor can also be started in proxy only mode, i.e. it will not -host any entries itself, but knows how to delegate messages to the right location. +host any entities itself, but knows how to delegate messages to the right location. A ``ShardRegion`` is started in proxy only mode with the method ``ClusterSharding.startProxy`` method. Passivation ----------- -If the state of the entries are persistent you may stop entries that are not used to +If the state of the entities are persistent you may stop entities that are not used to reduce memory consumption. This is done by the application specific implementation of -the entry actors for example by defining receive timeout (``context.setReceiveTimeout``). -If a message is already enqueued to the entry when it stops itself the enqueued message +the entity actors for example by defining receive timeout (``context.setReceiveTimeout``). +If a message is already enqueued to the entity when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without loosing such -messages the entry actor can send ``ShardRegion.Passivate`` to its parent ``Shard``. -The specified wrapped message in ``Passivate`` will be sent back to the entry, which is +messages the entity actor can send ``ShardRegion.Passivate`` to its parent ``Shard``. +The specified wrapped message in ``Passivate`` will be sent back to the entity, which is then supposed to stop itself. Incoming messages will be buffered by the ``Shard`` -between reception of ``Passivate`` and termination of the entry. Such buffered messages -are thereafter delivered to a new incarnation of the entry. +between reception of ``Passivate`` and termination of the entity. Such buffered messages +are thereafter delivered to a new incarnation of the entity. -Remembering Entries -------------------- +Remembering Entities +-------------------- -The list of entries in each ``Shard`` can be made persistent (durable) by setting -the ``rememberEntries`` flag to true in ``ClusterShardingSettings`` when calling -``ClusterSharding.start``. When configured to remember entries, whenever a ``Shard`` +The list of entities in each ``Shard`` can be made persistent (durable) by setting +the ``rememberEntities`` flag to true in ``ClusterShardingSettings`` when calling +``ClusterSharding.start``. When configured to remember entities, whenever a ``Shard`` is rebalanced onto another node or recovers after a crash it will recreate all the -entries which were previously running in that ``Shard``. To permanently stop entries, +entities which were previously running in that ``Shard``. To permanently stop entities, a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the -entry will be automatically restarted after the entry restart backoff specified in +entity will be automatically restarted after the entity restart backoff specified in the configuration. -When ``rememberEntries`` is set to false, a ``Shard`` will not automatically restart any entries -after a rebalance or recovering from a crash. Entries will only be started once the first message -for that entry has been received in the ``Shard``. Entries will not be restarted if they stop without +When ``rememberEntities`` is set to false, a ``Shard`` will not automatically restart any entities +after a rebalance or recovering from a crash. Entities will only be started once the first message +for that entity has been received in the ``Shard``. Entities will not be restarted if they stop without using a ``Passivate``. -Note that the state of the entries themselves will not be restored unless they have been made persistent, +Note that the state of the entities themselves will not be restored unless they have been made persistent, e.g. with ``akka-persistence``. Graceful Shutdown