Rename replicated settings (#29460)
This commit is contained in:
parent
67eb74f076
commit
f41f093372
7 changed files with 30 additions and 30 deletions
|
|
@ -18,7 +18,7 @@ import akka.annotation.ApiMayChange
|
||||||
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
|
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
|
||||||
|
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
object ReplicatedShardingSettings {
|
object ReplicatedEntityProvider {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API:
|
* Java API:
|
||||||
|
|
@ -33,7 +33,7 @@ object ReplicatedShardingSettings {
|
||||||
JEntityTypeKey[M],
|
JEntityTypeKey[M],
|
||||||
ReplicaId,
|
ReplicaId,
|
||||||
JSet[ReplicaId],
|
JSet[ReplicaId],
|
||||||
ReplicaSettings[M, E]]): ReplicatedShardingSettings[M, E] = {
|
ReplicatedEntity[M, E]]): ReplicatedEntityProvider[M, E] = {
|
||||||
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
|
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
|
||||||
apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) =>
|
apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) =>
|
||||||
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds))
|
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds))
|
||||||
|
|
@ -46,9 +46,9 @@ object ReplicatedShardingSettings {
|
||||||
* @tparam E The type for envelopes used for sending `M`s over sharding
|
* @tparam E The type for envelopes used for sending `M`s over sharding
|
||||||
*/
|
*/
|
||||||
def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])(
|
def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])(
|
||||||
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicaSettings[M, E])
|
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicatedEntity[M, E])
|
||||||
: ReplicatedShardingSettings[M, E] = {
|
: ReplicatedEntityProvider[M, E] = {
|
||||||
new ReplicatedShardingSettings(allReplicaIds.map { replicaId =>
|
new ReplicatedEntityProvider(allReplicaIds.map { replicaId =>
|
||||||
val typeKey = EntityTypeKey[M](replicaId.id)
|
val typeKey = EntityTypeKey[M](replicaId.id)
|
||||||
settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds)
|
settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds)
|
||||||
}.toVector, directReplication = false)
|
}.toVector, directReplication = false)
|
||||||
|
|
@ -60,8 +60,8 @@ object ReplicatedShardingSettings {
|
||||||
* @tparam E The type for envelopes used for sending `M`s over sharding
|
* @tparam E The type for envelopes used for sending `M`s over sharding
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
final class ReplicatedShardingSettings[M, E] private (
|
final class ReplicatedEntityProvider[M, E] private (
|
||||||
val replicas: immutable.Seq[ReplicaSettings[M, E]],
|
val replicas: immutable.Seq[ReplicatedEntity[M, E]],
|
||||||
val directReplication: Boolean) {
|
val directReplication: Boolean) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -71,13 +71,13 @@ final class ReplicatedShardingSettings[M, E] private (
|
||||||
* to work.
|
* to work.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
def withDirectReplication(enabled: Boolean): ReplicatedShardingSettings[M, E] =
|
def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M, E] =
|
||||||
new ReplicatedShardingSettings(replicas, directReplication = enabled)
|
new ReplicatedEntityProvider(replicas, directReplication = enabled)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
object ReplicaSettings {
|
object ReplicatedEntity {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Defines the [[akka.cluster.sharding.typed.javadsl.Entity]] to use for a given replica, note that the behavior
|
* Java API: Defines the [[akka.cluster.sharding.typed.javadsl.Entity]] to use for a given replica, note that the behavior
|
||||||
|
|
@ -85,7 +85,7 @@ object ReplicaSettings {
|
||||||
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
|
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
|
||||||
* as that requires a single writer and that would cause it to have multiple writers.
|
* as that requires a single writer and that would cause it to have multiple writers.
|
||||||
*/
|
*/
|
||||||
def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicaSettings[M, E] =
|
def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicatedEntity[M, E] =
|
||||||
apply(replicaId, entity.toScala)
|
apply(replicaId, entity.toScala)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -94,12 +94,12 @@ object ReplicaSettings {
|
||||||
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
|
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
|
||||||
* as that requires a single writer and that would cause it to have multiple writers.
|
* as that requires a single writer and that would cause it to have multiple writers.
|
||||||
*/
|
*/
|
||||||
def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicaSettings[M, E] =
|
def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicatedEntity[M, E] =
|
||||||
new ReplicaSettings(replicaId, entity)
|
new ReplicatedEntity(replicaId, entity)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Settings for a specific replica id in replicated sharding
|
* Settings for a specific replica id in replicated sharding
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
final class ReplicaSettings[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E])
|
final class ReplicatedEntity[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E])
|
||||||
|
|
@ -45,7 +45,7 @@ trait ReplicatedShardingExtension extends Extension {
|
||||||
*
|
*
|
||||||
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
|
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
|
||||||
*/
|
*/
|
||||||
def init[M, E](settings: ReplicatedShardingSettings[M, E]): ReplicatedSharding[M, E]
|
def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ import akka.util.ccompat.JavaConverters._
|
||||||
* This actor should be started once on each node where Replicated Event Sourced entities will run (the same nodes that you start
|
* This actor should be started once on each node where Replicated Event Sourced entities will run (the same nodes that you start
|
||||||
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]]
|
* sharding on). The entities should be set up with [[akka.persistence.typed.scaladsl.EventSourcedBehavior.withEventPublishing]]
|
||||||
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
|
* or [[akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior#withEventPublishing()]]
|
||||||
* If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedShardingSettings.withDirectReplication]]
|
* If using [[ReplicatedSharding]] the replication can be enabled through [[ReplicatedEntityProvider.withDirectReplication]]
|
||||||
* instead of starting this actor manually.
|
* instead of starting this actor manually.
|
||||||
*
|
*
|
||||||
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas
|
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,7 @@ import akka.actor.typed.ActorSystem
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.cluster.sharding.typed.ReplicatedShardingExtension
|
import akka.cluster.sharding.typed.ReplicatedShardingExtension
|
||||||
import akka.cluster.sharding.typed.ReplicatedSharding
|
import akka.cluster.sharding.typed.ReplicatedSharding
|
||||||
import akka.cluster.sharding.typed.ReplicatedShardingSettings
|
import akka.cluster.sharding.typed.ReplicatedEntityProvider
|
||||||
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
|
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
|
||||||
import akka.cluster.sharding.typed.scaladsl.EntityRef
|
import akka.cluster.sharding.typed.scaladsl.EntityRef
|
||||||
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
|
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
|
||||||
|
|
@ -34,7 +34,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
|
||||||
|
|
||||||
private val logger = LoggerFactory.getLogger(getClass)
|
private val logger = LoggerFactory.getLogger(getClass)
|
||||||
|
|
||||||
override def init[M, E](settings: ReplicatedShardingSettings[M, E]): ReplicatedSharding[M, E] = {
|
override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = {
|
||||||
val sharding = ClusterSharding(system)
|
val sharding = ClusterSharding(system)
|
||||||
val initializedReplicas = settings.replicas.map { replicaSettings =>
|
val initializedReplicas = settings.replicas.map { replicaSettings =>
|
||||||
// start up a sharding instance per replica id
|
// start up a sharding instance per replica id
|
||||||
|
|
|
||||||
|
|
@ -151,15 +151,15 @@ public class ReplicatedShardingTest extends JUnitSuite {
|
||||||
super(context);
|
super(context);
|
||||||
|
|
||||||
// #bootstrap
|
// #bootstrap
|
||||||
ReplicatedShardingSettings<
|
ReplicatedEntityProvider<
|
||||||
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
|
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
|
||||||
replicatedShardingSettings =
|
replicatedEntityProvider =
|
||||||
ReplicatedShardingSettings.create(
|
ReplicatedEntityProvider.create(
|
||||||
MyReplicatedStringSet.Command.class,
|
MyReplicatedStringSet.Command.class,
|
||||||
ALL_REPLICAS,
|
ALL_REPLICAS,
|
||||||
// factory for replica settings for a given replica
|
// factory for replicated entity for a given replica
|
||||||
(entityTypeKey, replicaId, allReplicas) ->
|
(entityTypeKey, replicaId, allReplicas) ->
|
||||||
ReplicaSettings.create(
|
ReplicatedEntity.create(
|
||||||
replicaId,
|
replicaId,
|
||||||
// use the replica id as typekey for sharding to get one sharding instance
|
// use the replica id as typekey for sharding to get one sharding instance
|
||||||
// per replica
|
// per replica
|
||||||
|
|
@ -179,7 +179,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
|
||||||
ReplicatedShardingExtension.get(getContext().getSystem());
|
ReplicatedShardingExtension.get(getContext().getSystem());
|
||||||
ReplicatedSharding<
|
ReplicatedSharding<
|
||||||
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
|
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
|
||||||
replicatedSharding = extension.init(replicatedShardingSettings);
|
replicatedSharding = extension.init(replicatedEntityProvider);
|
||||||
// #bootstrap
|
// #bootstrap
|
||||||
|
|
||||||
this.replicatedSharding = replicatedSharding;
|
this.replicatedSharding = replicatedSharding;
|
||||||
|
|
|
||||||
|
|
@ -75,12 +75,12 @@ class ReplicatedShardingSpec
|
||||||
|
|
||||||
def apply(): Behavior[Command] = Behaviors.setup { context =>
|
def apply(): Behavior[Command] = Behaviors.setup { context =>
|
||||||
// #bootstrap
|
// #bootstrap
|
||||||
val replicatedShardingSettings =
|
val replicatedShardingProvider =
|
||||||
ReplicatedShardingSettings[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
|
ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
|
||||||
// all replicas
|
// all replicas
|
||||||
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
|
Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) =>
|
||||||
// factory for replica settings for a given replica
|
// factory for replicated entity for a given replica
|
||||||
ReplicaSettings(
|
ReplicatedEntity(
|
||||||
replicaId,
|
replicaId,
|
||||||
// use the provided entity type key for sharding to get one sharding instance per replica
|
// use the provided entity type key for sharding to get one sharding instance per replica
|
||||||
Entity(entityTypeKey) { entityContext =>
|
Entity(entityTypeKey) { entityContext =>
|
||||||
|
|
@ -93,7 +93,7 @@ class ReplicatedShardingSpec
|
||||||
.withRole(replicaId.id))
|
.withRole(replicaId.id))
|
||||||
}
|
}
|
||||||
|
|
||||||
val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingSettings)
|
val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingProvider)
|
||||||
// #bootstrap
|
// #bootstrap
|
||||||
|
|
||||||
Behaviors.receiveMessage {
|
Behaviors.receiveMessage {
|
||||||
|
|
|
||||||
|
|
@ -323,7 +323,7 @@ query is still needed as delivery is not guaranteed, but can be configured to po
|
||||||
events will arrive at the replicas through the cluster.
|
events will arrive at the replicas through the cluster.
|
||||||
|
|
||||||
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
|
To enable this feature you first need to enable event publishing on the @scala[`EventSourcedBehavior`]@java[`ReplicatedEventSourcedBehavior`] with `withEventPublishing`
|
||||||
and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedShardingSettings] (if not using
|
and then enable direct replication through `withDirectReplication(true)` on @apidoc[ReplicatedEntityProvider] (if not using
|
||||||
replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor).
|
replicated sharding the replication can be run standalone by starting the @apidoc[ShardingDirectReplication] actor).
|
||||||
|
|
||||||
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,
|
The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue