From d1114495dd2399a6e5989bd1002018cefb7b278c Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 24 Aug 2020 10:52:28 +0100 Subject: [PATCH] Feedback from creating replicated entity sample (#29510) * Fix javadsl and remove shard regions from ReplicatedSharding * Simplyfy sharding API for replicated event sourcing As the ShardRegion access has been removed then we will only initially support Entity's with ShardingEnvelope meaning we can remove the type param. Also provide convenience constructors for running a replica on a role and a replica in each DC * Compile * Review feedback * feedback --- .../typed/ReplicatedEntityProvider.scala | 110 +++++++++++++++--- .../typed/ReplicatedShardingExtension.scala | 31 +---- .../typed/internal/ClusterShardingImpl.scala | 5 + .../ReplicatedShardingExtensionImpl.scala | 24 ++-- .../internal/testkit/TestEntityRefImpl.scala | 3 + .../typed/javadsl/ClusterSharding.scala | 2 +- .../typed/scaladsl/ClusterSharding.scala | 5 + .../typed/ReplicatedShardingSpec.scala | 2 +- .../typed/ReplicatedShardingTest.java | 59 +++++----- .../ReplicatedShardingCompileOnlySpec.java | 12 +- .../typed/ReplicatedShardingSpec.scala | 10 +- .../ReplicatedShardingCompileOnlySpec.scala | 39 +++---- .../typed/ReplicatedEventSourcingTest.java | 2 +- .../typed/MyReplicatedBehavior.java | 4 +- .../typed/ReplicatedAuctionExampleTest.java | 2 +- .../typed/ReplicatedBlogExample.java | 2 +- .../typed/ReplicatedMovieExample.java | 2 +- .../typed/ReplicatedShoppingCartExample.java | 2 +- .../typed/ReplicatedStringSet.java | 2 +- .../typed/ReplicatedEventPublishingSpec.scala | 2 +- .../typed/ReplicatedEventSourcingSpec.scala | 2 +- .../ReplicatedEventSourcingTaggingSpec.scala | 2 +- .../typed/ReplicationIllegalAccessSpec.scala | 4 +- .../typed/ReplicationSnapshotSpec.scala | 2 +- .../persistence/typed/crdt/CounterSpec.scala | 2 +- .../akka/persistence/typed/crdt/LwwSpec.scala | 2 +- .../persistence/typed/crdt/ORSetSpec.scala | 2 +- .../typed/ReplicatedAuctionExampleSpec.scala | 2 +- .../typed/ReplicatedBlogExampleSpec.scala | 2 +- ...plicatedEventSourcingCompileOnlySpec.scala | 2 +- .../ReplicatedMovieWatchListExampleSpec.scala | 2 +- .../ReplicatedShoppingCartExampleSpec.scala | 2 +- .../javadsl/ReplicatedEventSourcing.scala | 15 ++- .../scaladsl/ReplicatedEventSourcing.scala | 9 +- 34 files changed, 208 insertions(+), 162 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala index 6773b6ef65..4a02335d49 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala @@ -14,8 +14,10 @@ import scala.reflect.ClassTag import akka.util.ccompat.JavaConverters._ import java.util.{ Set => JSet } +import akka.actor.typed.Behavior import akka.annotation.ApiMayChange import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.persistence.typed.ReplicationId import akka.persistence.typed.ReplicationId.Separator @ApiMayChange @@ -24,29 +26,33 @@ object ReplicatedEntityProvider { /** * Java API: * + * Provides full control over the [[ReplicatedEntity]] and the [[Entity]] + * Most use cases can use the [[createPerDataCenter]] and [[createPerRole]] + * * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ - def create[M, E]( + def create[M]( messageClass: Class[M], typeName: String, allReplicaIds: JSet[ReplicaId], - settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M, E]]) - : ReplicatedEntityProvider[M, E] = { + settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M]]) + : ReplicatedEntityProvider[M] = { implicit val classTag: ClassTag[M] = ClassTag(messageClass) - apply[M, E](typeName, allReplicaIds.asScala.toSet)((key, replica) => + apply[M](typeName, allReplicaIds.asScala.toSet)((key, replica) => settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica)) } /** * Scala API: + * + * Provides full control over the [[ReplicatedEntity]] and the [[Entity]] + * Most use cases can use the [[perDataCenter]] and [[perRole]] + * * @param typeName The type name used in the [[EntityTypeKey]] * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ - def apply[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( - settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M, E]) - : ReplicatedEntityProvider[M, E] = { + def apply[M: ClassTag](typeName: String, allReplicaIds: Set[ReplicaId])( + settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M]): ReplicatedEntityProvider[M] = { new ReplicatedEntityProvider(allReplicaIds.map { replicaId => if (typeName.contains(Separator)) throw new IllegalArgumentException(s"typeName [$typeName] contains [$Separator] which is a reserved character") @@ -55,15 +61,86 @@ object ReplicatedEntityProvider { (settingsPerReplicaFactory(typeKey, replicaId), typeName) }.toVector, directReplication = true) } + + /** + * Scala API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. A replica will be run per data center. + */ + def perDataCenter[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( + create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = { + apply(typeName, allReplicaIds) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + create(ReplicationId.fromString(entityContext.entityId)) + }.withDataCenter(replicaId.id)) + } + } + + /** + * Scala API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. The replicas in allReplicaIds should be roles used by nodes. A replica for each + * entity will run on each role. + */ + def perRole[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( + create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = { + apply(typeName, allReplicaIds) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + create(ReplicationId.fromString(entityContext.entityId)) + }.withRole(replicaId.id)) + } + } + + /** + * Java API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. A replica will be run per data center. + */ + def createPerDataCenter[M]( + messageClass: Class[M], + typeName: String, + allReplicaIds: JSet[ReplicaId], + createBehavior: java.util.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = { + implicit val classTag: ClassTag[M] = ClassTag(messageClass) + apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + createBehavior(ReplicationId.fromString(entityContext.entityId)) + }.withDataCenter(replicaId.id)) + } + } + + /** + * Java API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. + * + * Map replicas to roles and then there will be a replica per role e.g. to match to availability zones/racks + */ + def createPerRole[M]( + messageClass: Class[M], + typeName: String, + allReplicaIds: JSet[ReplicaId], + createBehavior: akka.japi.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = { + implicit val classTag: ClassTag[M] = ClassTag(messageClass) + apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + createBehavior(ReplicationId.fromString(entityContext.entityId)) + }.withRole(replicaId.id)) + } + } } /** + * * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ @ApiMayChange -final class ReplicatedEntityProvider[M, E] private ( - val replicas: immutable.Seq[(ReplicatedEntity[M, E], String)], +final class ReplicatedEntityProvider[M] private ( + val replicas: immutable.Seq[(ReplicatedEntity[M], String)], val directReplication: Boolean) { /** @@ -73,7 +150,7 @@ final class ReplicatedEntityProvider[M, E] private ( * to work. * */ - def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M, E] = + def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M] = new ReplicatedEntityProvider(replicas, directReplication = enabled) } @@ -87,7 +164,7 @@ object ReplicatedEntity { * [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]] * as that requires a single writer and that would cause it to have multiple writers. */ - def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicatedEntity[M, E] = + def create[M](replicaId: ReplicaId, entity: JEntity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] = apply(replicaId, entity.toScala) /** @@ -96,12 +173,13 @@ object ReplicatedEntity { * [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]] * as that requires a single writer and that would cause it to have multiple writers. */ - def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicatedEntity[M, E] = + def apply[M](replicaId: ReplicaId, entity: Entity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] = new ReplicatedEntity(replicaId, entity) } /** * Settings for a specific replica id in replicated sharding + * Currently only Entity's with ShardingEnvelope are supported but this may change in the future */ @ApiMayChange -final class ReplicatedEntity[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E]) +final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]]) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala index 37654f6bb6..af35f26cc7 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala @@ -14,8 +14,6 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.persistence.typed.ReplicaId import java.util.{ Map => JMap } -import akka.actor.typed.ActorRef - /** * Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica. * The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes. @@ -41,22 +39,20 @@ trait ReplicatedShardingExtension extends Extension { * Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those. * * @tparam M The type of messages the replicated event sourced actor accepts - * @tparam E The type of envelope used for routing messages to actors, the same for all replicas * * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] */ - def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] + def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] /** * Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those. * * @param thisReplica If provided saves messages being forwarded to sharding for this replica * @tparam M The type of messages the replicated event sourced actor accepts - * @tparam E The type of envelope used for routing messages to actors, the same for all replicas * * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] */ - def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] + def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] } /** @@ -66,34 +62,15 @@ trait ReplicatedShardingExtension extends Extension { */ @DoNotInherit @ApiMayChange -trait ReplicatedSharding[M, E] { - - /** - * Scala API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined - * routing/replica selection. - */ - def shardingRefs: Map[ReplicaId, ActorRef[E]] - - /** - * Java API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined - * routing/replica selection. - */ - def getShardingRefs: JMap[ReplicaId, ActorRef[E]] +trait ReplicatedSharding[M] { /** * Scala API: Returns the entity ref for each replica for user defined routing/replica selection - * - * This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message - * entity ids you will need to use [[#shardingRefs]] */ def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] /** * Java API: Returns the entity ref for each replica for user defined routing/replica selection - * - * This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message - * entity ids you will need to use [[#getShardingRefs]] */ - def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] - + def getEntityRefsFor(entityId: String): JMap[ReplicaId, javadsl.EntityRef[M]] } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index ad30aeb6dd..f7a7138b02 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -385,6 +385,11 @@ import akka.util.JavaDurationConverters._ } override def toString: String = s"EntityRef($typeKey, $entityId)" + + /** + * INTERNAL API + */ + override private[akka] def asJava: javadsl.EntityRef[M] = this } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala index a9f218a0a3..57ef3965e1 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala @@ -7,7 +7,6 @@ package akka.cluster.sharding.typed.internal import java.util.concurrent.atomic.AtomicLong import java.util.{ Map => JMap } -import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.cluster.sharding.typed.ReplicatedShardingExtension @@ -34,15 +33,15 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] private val logger = LoggerFactory.getLogger(getClass) - override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + override def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = initInternal(None, settings) - override def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + override def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = initInternal(Some(thisReplica), settings) - private def initInternal[M, E]( + private def initInternal[M]( thisReplica: Option[ReplicaId], - settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = { + settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = { val sharding = ClusterSharding(system) val initializedReplicas = settings.replicas.map { case (replicaSettings, typeName) => @@ -72,7 +71,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] val replicaToTypeKey = initializedReplicas.map { case (typeName, id, typeKey, _, dc) => id -> ((typeKey, dc, typeName)) }.toMap - new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey) + new ReplicatedShardingImpl(sharding, replicaToTypeKey) } } @@ -80,15 +79,10 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] * INTERNAL API */ @InternalApi -private[akka] final class ReplicatedShardingImpl[M, E]( +private[akka] final class ReplicatedShardingImpl[M]( sharding: ClusterSharding, - shardingPerReplica: Map[ReplicaId, ActorRef[E]], replicaTypeKeys: Map[ReplicaId, (EntityTypeKey[M], Option[DataCenter], String)]) - extends ReplicatedSharding[M, E] { - - // FIXME add test coverage for these - override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica - override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava + extends ReplicatedSharding[M] { override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] = replicaTypeKeys.map { @@ -100,7 +94,7 @@ private[akka] final class ReplicatedShardingImpl[M, E]( }) } - override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] = - entityRefsFor(entityId).asJava + override def getEntityRefsFor(entityId: String): JMap[ReplicaId, akka.cluster.sharding.typed.javadsl.EntityRef[M]] = + entityRefsFor(entityId).transform((_, v) => v.asJava).asJava } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala index 2c11f5bb8f..08a8d82217 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala @@ -15,6 +15,7 @@ import akka.actor.typed.Scheduler import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.InternalApi import akka.cluster.sharding.typed.javadsl +import akka.cluster.sharding.typed.javadsl.EntityRef import akka.cluster.sharding.typed.scaladsl import akka.japi.function.{ Function => JFunction } import akka.pattern.StatusReply @@ -59,4 +60,6 @@ import akka.util.Timeout } override def toString: String = s"TestEntityRef($entityId)" + + override private[akka] def asJava: EntityRef[M] = this } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 2211719e48..140631eddd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -441,7 +441,7 @@ object EntityTypeKey { * * Not for user extension. */ -@DoNotInherit abstract class EntityRef[M] extends RecipientRef[M] { +@DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] { scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] => /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 9e46aaabdb..5bd2287750 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -500,6 +500,11 @@ object EntityTypeKey { def ?[Res](message: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] = this.ask(message)(timeout) + /** + * INTERNAL API + */ + @InternalApi private[akka] def asJava: javadsl.EntityRef[M] + } object ClusterShardingSetup { diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index 548b9945be..156037b46d 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -70,7 +70,7 @@ object ReplicatedShardingSpec extends MultiNodeConfig { def apply(id: ReplicationId, ctx: ActorContext[Command]): EventSourcedBehavior[Command, String, State] = { // Relies on direct replication as there is no proxy query journal - ReplicatedEventSourcing.withSharedJournal(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) { + ReplicatedEventSourcing.commonJournalConfig(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => ctx.log.info("Creating replica {}", replicationContext.replicationId) EventSourcedBehavior[Command, String, State]( diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java index 0f9c105b1d..da813a698b 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java @@ -15,7 +15,7 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.MemberStatus; import akka.cluster.sharding.typed.javadsl.Entity; -import akka.cluster.sharding.typed.scaladsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; import akka.persistence.testkit.PersistenceTestKitPlugin; @@ -67,7 +67,7 @@ public class ReplicatedShardingTest extends JUnitSuite { } static Behavior create(ReplicationId replicationId) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( replicationId, ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), @@ -143,44 +143,39 @@ public class ReplicatedShardingTest extends JUnitSuite { Arrays.asList( new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C")))); - private final ReplicatedSharding< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedSharding; + private final ReplicatedSharding replicatedSharding; private ProxyActor(ActorContext context) { super(context); // #bootstrap - ReplicatedEntityProvider< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedEntityProvider = - ReplicatedEntityProvider.create( - MyReplicatedStringSet.Command.class, - "StringSet", - ALL_REPLICAS, - // factory for replicated entity for a given replica - (entityTypeKey, replicaId) -> - ReplicatedEntity.create( - replicaId, - // use the replica id as typekey for sharding to get one sharding instance - // per replica - Entity.of( - entityTypeKey, - entityContext -> - // factory for the entity for a given entity in that replica - MyReplicatedStringSet.create( - ReplicationId.fromString(entityContext.getEntityId()))) - // potentially use replica id as role or dc in Akka multi dc for the - // sharding instance - // to control where replicas will live - // .withDataCenter(replicaId.id())) - .withRole(replicaId.id()))); + ReplicatedEntityProvider replicatedEntityProvider = + ReplicatedEntityProvider.create( + MyReplicatedStringSet.Command.class, + "StringSet", + ALL_REPLICAS, + // factory for replicated entity for a given replica + (entityTypeKey, replicaId) -> + ReplicatedEntity.create( + replicaId, + // use the replica id as typekey for sharding to get one sharding instance + // per replica + Entity.of( + entityTypeKey, + entityContext -> + // factory for the entity for a given entity in that replica + MyReplicatedStringSet.create( + ReplicationId.fromString(entityContext.getEntityId()))) + // potentially use replica id as role or dc in Akka multi dc for the + // sharding instance + // to control where replicas will live + // .withDataCenter(replicaId.id())) + .withRole(replicaId.id()))); ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(getContext().getSystem()); - ReplicatedSharding< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedSharding = extension.init(replicatedEntityProvider); + ReplicatedSharding replicatedSharding = + extension.init(replicatedEntityProvider); // #bootstrap this.replicatedSharding = replicatedSharding; diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java index a65a7337c5..afc488bf64 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java @@ -4,12 +4,11 @@ package jdocs.akka.cluster.sharding.typed; -import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.cluster.sharding.typed.*; import akka.cluster.sharding.typed.javadsl.Entity; -import akka.cluster.sharding.typed.scaladsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.persistence.typed.ReplicaId; import akka.persistence.typed.ReplicationId; @@ -30,7 +29,7 @@ public class ReplicatedShardingCompileOnlySpec { new HashSet<>( Arrays.asList(new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C")))); - public static ReplicatedEntityProvider> provider() { + public static ReplicatedEntityProvider provider() { // #bootstrap return ReplicatedEntityProvider.create( Command.class, @@ -67,7 +66,7 @@ public class ReplicatedShardingCompileOnlySpec { // #bootstrap-dc } - public static ReplicatedEntityProvider> role() { + public static ReplicatedEntityProvider role() { // #bootstrap-role return ReplicatedEntityProvider.create( Command.class, @@ -90,13 +89,10 @@ public class ReplicatedShardingCompileOnlySpec { // #sending-messages ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system); - ReplicatedSharding> replicatedSharding = - extension.init(provider()); + ReplicatedSharding replicatedSharding = extension.init(provider()); Map> myEntityId = replicatedSharding.getEntityRefsFor("myEntityId"); - Map>> shardingRefs = - replicatedSharding.getShardingRefs(); // #sending-messages } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index 51908fe605..6fd6d0925a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -76,7 +76,7 @@ object ReplicatedShardingSpec { case class Texts(texts: Set[String]) extends CborSerializable def apply(replicationId: ReplicationId): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory replicationId, AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => @@ -97,7 +97,7 @@ object ReplicatedShardingSpec { } def provider(replicationType: ReplicationType) = - ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( + ReplicatedEntityProvider[MyReplicatedStringSet.Command]( // all replicas "StringSet", AllReplicas) { (entityTypeKey, replicaId) => @@ -127,7 +127,7 @@ object ReplicatedShardingSpec { case class Ints(ints: Set[Int]) extends CborSerializable def apply(id: ReplicationId, allReplicas: Set[ReplicaId]): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory id, allReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => @@ -148,9 +148,7 @@ object ReplicatedShardingSpec { } def provider(replicationType: ReplicationType) = - ReplicatedEntityProvider[MyReplicatedIntSet.Command, ShardingEnvelope[MyReplicatedIntSet.Command]]( - "IntSet", - AllReplicas) { (entityTypeKey, replicaId) => + ReplicatedEntityProvider[MyReplicatedIntSet.Command]("IntSet", AllReplicas) { (entityTypeKey, replicaId) => val entity = { val e = Entity(entityTypeKey) { entityContext => val replicationId = ReplicationId.fromString(entityContext.entityId) diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala index 45bb443191..3954ea139c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala @@ -4,14 +4,12 @@ package docs.akka.cluster.sharding.typed -import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.cluster.sharding.typed.ReplicatedEntity import akka.cluster.sharding.typed.ReplicatedEntityProvider import akka.cluster.sharding.typed.ReplicatedSharding import akka.cluster.sharding.typed.ReplicatedShardingExtension -import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.persistence.typed.ReplicaId @@ -30,45 +28,34 @@ object ReplicatedShardingCompileOnlySpec { } //#bootstrap - ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - // the sharding entity id contains the business entityId, entityType, and replica id - // which you'll need to create a ReplicatedEventSourcedBehavior - val replicationId = ReplicationId.fromString(entityContext.entityId) - MyEventSourcedBehavior(replicationId) - }) + ReplicatedEntityProvider[Command]("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { + (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + // the sharding entity id contains the business entityId, entityType, and replica id + // which you'll need to create a ReplicatedEventSourcedBehavior + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyEventSourcedBehavior(replicationId) + }) } //#bootstrap //#bootstrap-dc - ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - val replicationId = ReplicationId.fromString(entityContext.entityId) - MyEventSourcedBehavior(replicationId) - }.withDataCenter(replicaId.id)) + ReplicatedEntityProvider.perDataCenter("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId => + MyEventSourcedBehavior(replicationId) } //#bootstrap-dc //#bootstrap-role - val provider = ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - val replicationId = ReplicationId.fromString(entityContext.entityId) + val provider = ReplicatedEntityProvider.perRole("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { + replicationId => MyEventSourcedBehavior(replicationId) - }.withRole(replicaId.id)) } //#bootstrap-role //#sending-messages - val myReplicatedSharding: ReplicatedSharding[Command, ShardingEnvelope[Command]] = + val myReplicatedSharding: ReplicatedSharding[Command] = ReplicatedShardingExtension(system).init(provider) val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId") - val actorRefs: Map[ReplicaId, ActorRef[ShardingEnvelope[Command]]] = myReplicatedSharding.shardingRefs //#sending-messages } diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java index 7cbc460f10..8badd779ad 100644 --- a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java @@ -80,7 +80,7 @@ public class ReplicatedEventSourcingTest extends JUnitSuite { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("ReplicatedEventSourcingTest", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java index 5a04ddf503..25166dccd7 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java @@ -33,7 +33,7 @@ public class MyReplicatedBehavior // #factory-shared public static Behavior create( String entityId, ReplicaId replicaId, String queryPluginId) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), ALL_REPLICAS, queryPluginId, @@ -47,7 +47,7 @@ public class MyReplicatedBehavior allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); - return ReplicatedEventSourcing.create( + return ReplicatedEventSourcing.perReplicaJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), allReplicasAndQueryPlugins, MyReplicatedBehavior::new); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java index 8d1091b7c3..e5b2497ddf 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java @@ -262,7 +262,7 @@ class AuctionEntity extends ReplicatedEventSourcedBehavior Behaviors.withTimers( timers -> - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("Auction", name, replica), ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java index b32c811ea5..fb550b04fe 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -187,7 +187,7 @@ interface ReplicatedBlogExample { String entityId, ReplicaId replicaId, Set allReplicas) { return Behaviors.setup( context -> - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java index 94b219c6c1..da80971b95 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java @@ -61,7 +61,7 @@ interface ReplicatedMovieExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("movies", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java index 629065b614..069f0982aa 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java @@ -85,7 +85,7 @@ interface ReplicatedShoppingCartExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java index c9198f65d0..011108922f 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java @@ -27,7 +27,7 @@ public final class ReplicatedStringSet public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("StringSet", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala index 368f9885dc..eb8e701238 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala @@ -30,7 +30,7 @@ object ReplicatedEventPublishingSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup { ctx => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId(EntityType, entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier)( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index c6ba98a771..919bda434b 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -71,7 +71,7 @@ object ReplicatedEventSourcingSpec { entityId: String, replicaId: String, probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe)) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala index 626c07daf1..f5a7e61601 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala @@ -42,7 +42,7 @@ object ReplicatedEventSourcingTaggingSpec { replica: ReplicaId, allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = { // #tagging - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("TaggingSpec", entityId, replica), allReplicas, queryPluginId)( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala index 79503a5d3b..cf457d5e4d 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala @@ -28,7 +28,7 @@ object ReplicationIllegalAccessSpec { case class State(all: List[String]) extends CborSerializable def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("IllegalAccessSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier)( @@ -88,7 +88,7 @@ class ReplicationIllegalAccessSpec } "detect illegal access in the factory" in { val exception = intercept[UnsupportedOperationException] { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("IllegalAccessSpec", "id2", R1), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala index 3fe5722b95..620c8ffbb4 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala @@ -35,7 +35,7 @@ object ReplicationSnapshotSpec { entityId: String, replicaId: ReplicaId, probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId(EntityType, entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala index aeecc51881..3d6a9468c9 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala @@ -29,7 +29,7 @@ object CounterSpec { snapshotEvery: Long = 100, eventProbe: Option[ActorRef[Counter.Updated]] = None) = Behaviors.setup[PlainCounter.Command] { context => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("CounterSpec", entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier) { ctx => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala index 1de011f629..f7a1cfbc73 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala @@ -27,7 +27,7 @@ object LwwSpec { object LwwRegistry { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("LwwRegistrySpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala index 95a2f383a9..6f9a384ad8 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala @@ -28,7 +28,7 @@ object ORSetSpec { def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("ORSetSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index fcae4417a1..71d868a6a1 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -143,7 +143,7 @@ object ReplicatedAuctionExampleSpec { responsibleForClosing: Boolean, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx => Behaviors.withTimers { timers => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("auction", name, replica), allReplicas, PersistenceTestKitReadJournal.Identifier) { replicationCtx => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index 53d3949e0c..cba33aee8e 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -52,7 +52,7 @@ object ReplicatedBlogExampleSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { Behaviors.setup[Command] { ctx => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala index f6edecc687..002f2cd0ef 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala @@ -25,7 +25,7 @@ object ReplicatedEventSourcingCompileOnlySpec { trait Event //#factory-shared - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("entityTypeHint", "entityId", DCA), AllReplicas, queryPluginId) { context => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala index c67a0bc203..53a726bb3a 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala @@ -28,7 +28,7 @@ object ReplicatedMovieWatchListExampleSpec { final case class MovieList(movieIds: Set[String]) def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("movies", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala index a8ed48b856..53a9fd2106 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala @@ -41,7 +41,7 @@ object ReplicatedShoppingCartExampleSpec { final case class State(items: Map[ProductId, Counter]) def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala index 3f3e6c049c..1ba2d88151 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala @@ -72,7 +72,8 @@ trait ReplicationContext { object ReplicatedEventSourcing { /** - * Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal. + * Initialize a replicated event sourced behavior where all entity replicas are share the same journal configuration. + * This is typical if there is a shared database and no replica specific configuratin is required. * * Events from each replica for the same entityId will be replicated to every copy. * Care must be taken to handle events in any order as events can happen concurrently at different replicas. @@ -85,16 +86,20 @@ object ReplicatedEventSourcing { * * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ - def withSharedJournal[Command, Event, State]( + def commonJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicaIds: JSet[ReplicaId], queryPluginId: String, behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) : EventSourcedBehavior[Command, Event, State] = - create(replicationId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory) + perReplicaJournalConfig( + replicationId, + allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, + behaviorFactory) /** - * Initialize a replicated event sourced behavior. + * Initialize a replicated event sourced behavior where each journal has different journal configuration e.g. + * each replica uses a different database or requires different database configuration for a shared database. * * Events from each replica for the same entityId will be replicated to every copy. * Care must be taken to handle events in any order as events can happen concurrently at different replicas. @@ -108,7 +113,7 @@ object ReplicatedEventSourcing { * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ - def create[Command, Event, State]( + def perReplicaJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicasAndQueryPlugins: JMap[ReplicaId, String], eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala index d5ceaff944..c9d12d6655 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala @@ -82,13 +82,14 @@ object ReplicatedEventSourcing { * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ - def withSharedJournal[Command, Event, State]( + def commonJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicaIds: Set[ReplicaId], queryPluginId: String)( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = - apply(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory) + perReplicaJournalConfig(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)( + eventSourcedBehaviorFactory) /** * Initialize a replicated event sourced behavior. @@ -104,7 +105,9 @@ object ReplicatedEventSourcing { * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ - def apply[Command, Event, State](replicationId: ReplicationId, allReplicasAndQueryPlugins: Map[ReplicaId, String])( + def perReplicaJournalConfig[Command, Event, State]( + replicationId: ReplicationId, + allReplicasAndQueryPlugins: Map[ReplicaId, String])( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = { val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins)