diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala index 6773b6ef65..4a02335d49 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala @@ -14,8 +14,10 @@ import scala.reflect.ClassTag import akka.util.ccompat.JavaConverters._ import java.util.{ Set => JSet } +import akka.actor.typed.Behavior import akka.annotation.ApiMayChange import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.persistence.typed.ReplicationId import akka.persistence.typed.ReplicationId.Separator @ApiMayChange @@ -24,29 +26,33 @@ object ReplicatedEntityProvider { /** * Java API: * + * Provides full control over the [[ReplicatedEntity]] and the [[Entity]] + * Most use cases can use the [[createPerDataCenter]] and [[createPerRole]] + * * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ - def create[M, E]( + def create[M]( messageClass: Class[M], typeName: String, allReplicaIds: JSet[ReplicaId], - settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M, E]]) - : ReplicatedEntityProvider[M, E] = { + settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M]]) + : ReplicatedEntityProvider[M] = { implicit val classTag: ClassTag[M] = ClassTag(messageClass) - apply[M, E](typeName, allReplicaIds.asScala.toSet)((key, replica) => + apply[M](typeName, allReplicaIds.asScala.toSet)((key, replica) => settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica)) } /** * Scala API: + * + * Provides full control over the [[ReplicatedEntity]] and the [[Entity]] + * Most use cases can use the [[perDataCenter]] and [[perRole]] + * * @param typeName The type name used in the [[EntityTypeKey]] * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ - def apply[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( - settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M, E]) - : ReplicatedEntityProvider[M, E] = { + def apply[M: ClassTag](typeName: String, allReplicaIds: Set[ReplicaId])( + settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M]): ReplicatedEntityProvider[M] = { new ReplicatedEntityProvider(allReplicaIds.map { replicaId => if (typeName.contains(Separator)) throw new IllegalArgumentException(s"typeName [$typeName] contains [$Separator] which is a reserved character") @@ -55,15 +61,86 @@ object ReplicatedEntityProvider { (settingsPerReplicaFactory(typeKey, replicaId), typeName) }.toVector, directReplication = true) } + + /** + * Scala API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. A replica will be run per data center. + */ + def perDataCenter[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( + create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = { + apply(typeName, allReplicaIds) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + create(ReplicationId.fromString(entityContext.entityId)) + }.withDataCenter(replicaId.id)) + } + } + + /** + * Scala API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. The replicas in allReplicaIds should be roles used by nodes. A replica for each + * entity will run on each role. + */ + def perRole[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( + create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = { + apply(typeName, allReplicaIds) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + create(ReplicationId.fromString(entityContext.entityId)) + }.withRole(replicaId.id)) + } + } + + /** + * Java API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. A replica will be run per data center. + */ + def createPerDataCenter[M]( + messageClass: Class[M], + typeName: String, + allReplicaIds: JSet[ReplicaId], + createBehavior: java.util.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = { + implicit val classTag: ClassTag[M] = ClassTag(messageClass) + apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + createBehavior(ReplicationId.fromString(entityContext.entityId)) + }.withDataCenter(replicaId.id)) + } + } + + /** + * Java API + * + * Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in + * ClusterSharding. + * + * Map replicas to roles and then there will be a replica per role e.g. to match to availability zones/racks + */ + def createPerRole[M]( + messageClass: Class[M], + typeName: String, + allReplicaIds: JSet[ReplicaId], + createBehavior: akka.japi.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = { + implicit val classTag: ClassTag[M] = ClassTag(messageClass) + apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(typeKey) { entityContext => + createBehavior(ReplicationId.fromString(entityContext.entityId)) + }.withRole(replicaId.id)) + } + } } /** + * * @tparam M The type of messages the replicated entity accepts - * @tparam E The type for envelopes used for sending `M`s over sharding */ @ApiMayChange -final class ReplicatedEntityProvider[M, E] private ( - val replicas: immutable.Seq[(ReplicatedEntity[M, E], String)], +final class ReplicatedEntityProvider[M] private ( + val replicas: immutable.Seq[(ReplicatedEntity[M], String)], val directReplication: Boolean) { /** @@ -73,7 +150,7 @@ final class ReplicatedEntityProvider[M, E] private ( * to work. * */ - def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M, E] = + def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M] = new ReplicatedEntityProvider(replicas, directReplication = enabled) } @@ -87,7 +164,7 @@ object ReplicatedEntity { * [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]] * as that requires a single writer and that would cause it to have multiple writers. */ - def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicatedEntity[M, E] = + def create[M](replicaId: ReplicaId, entity: JEntity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] = apply(replicaId, entity.toScala) /** @@ -96,12 +173,13 @@ object ReplicatedEntity { * [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]] * as that requires a single writer and that would cause it to have multiple writers. */ - def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicatedEntity[M, E] = + def apply[M](replicaId: ReplicaId, entity: Entity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] = new ReplicatedEntity(replicaId, entity) } /** * Settings for a specific replica id in replicated sharding + * Currently only Entity's with ShardingEnvelope are supported but this may change in the future */ @ApiMayChange -final class ReplicatedEntity[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E]) +final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]]) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala index 37654f6bb6..af35f26cc7 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedShardingExtension.scala @@ -14,8 +14,6 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.persistence.typed.ReplicaId import java.util.{ Map => JMap } -import akka.actor.typed.ActorRef - /** * Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica. * The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes. @@ -41,22 +39,20 @@ trait ReplicatedShardingExtension extends Extension { * Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those. * * @tparam M The type of messages the replicated event sourced actor accepts - * @tparam E The type of envelope used for routing messages to actors, the same for all replicas * * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] */ - def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] + def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] /** * Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those. * * @param thisReplica If provided saves messages being forwarded to sharding for this replica * @tparam M The type of messages the replicated event sourced actor accepts - * @tparam E The type of envelope used for routing messages to actors, the same for all replicas * * Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]] */ - def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] + def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] } /** @@ -66,34 +62,15 @@ trait ReplicatedShardingExtension extends Extension { */ @DoNotInherit @ApiMayChange -trait ReplicatedSharding[M, E] { - - /** - * Scala API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined - * routing/replica selection. - */ - def shardingRefs: Map[ReplicaId, ActorRef[E]] - - /** - * Java API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined - * routing/replica selection. - */ - def getShardingRefs: JMap[ReplicaId, ActorRef[E]] +trait ReplicatedSharding[M] { /** * Scala API: Returns the entity ref for each replica for user defined routing/replica selection - * - * This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message - * entity ids you will need to use [[#shardingRefs]] */ def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] /** * Java API: Returns the entity ref for each replica for user defined routing/replica selection - * - * This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message - * entity ids you will need to use [[#getShardingRefs]] */ - def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] - + def getEntityRefsFor(entityId: String): JMap[ReplicaId, javadsl.EntityRef[M]] } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index ad30aeb6dd..f7a7138b02 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -385,6 +385,11 @@ import akka.util.JavaDurationConverters._ } override def toString: String = s"EntityRef($typeKey, $entityId)" + + /** + * INTERNAL API + */ + override private[akka] def asJava: javadsl.EntityRef[M] = this } /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala index a9f218a0a3..57ef3965e1 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala @@ -7,7 +7,6 @@ package akka.cluster.sharding.typed.internal import java.util.concurrent.atomic.AtomicLong import java.util.{ Map => JMap } -import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.cluster.sharding.typed.ReplicatedShardingExtension @@ -34,15 +33,15 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] private val logger = LoggerFactory.getLogger(getClass) - override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + override def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = initInternal(None, settings) - override def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = + override def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = initInternal(Some(thisReplica), settings) - private def initInternal[M, E]( + private def initInternal[M]( thisReplica: Option[ReplicaId], - settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = { + settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = { val sharding = ClusterSharding(system) val initializedReplicas = settings.replicas.map { case (replicaSettings, typeName) => @@ -72,7 +71,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] val replicaToTypeKey = initializedReplicas.map { case (typeName, id, typeKey, _, dc) => id -> ((typeKey, dc, typeName)) }.toMap - new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey) + new ReplicatedShardingImpl(sharding, replicaToTypeKey) } } @@ -80,15 +79,10 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] * INTERNAL API */ @InternalApi -private[akka] final class ReplicatedShardingImpl[M, E]( +private[akka] final class ReplicatedShardingImpl[M]( sharding: ClusterSharding, - shardingPerReplica: Map[ReplicaId, ActorRef[E]], replicaTypeKeys: Map[ReplicaId, (EntityTypeKey[M], Option[DataCenter], String)]) - extends ReplicatedSharding[M, E] { - - // FIXME add test coverage for these - override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica - override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava + extends ReplicatedSharding[M] { override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] = replicaTypeKeys.map { @@ -100,7 +94,7 @@ private[akka] final class ReplicatedShardingImpl[M, E]( }) } - override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] = - entityRefsFor(entityId).asJava + override def getEntityRefsFor(entityId: String): JMap[ReplicaId, akka.cluster.sharding.typed.javadsl.EntityRef[M]] = + entityRefsFor(entityId).transform((_, v) => v.asJava).asJava } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala index 2c11f5bb8f..08a8d82217 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala @@ -15,6 +15,7 @@ import akka.actor.typed.Scheduler import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.InternalApi import akka.cluster.sharding.typed.javadsl +import akka.cluster.sharding.typed.javadsl.EntityRef import akka.cluster.sharding.typed.scaladsl import akka.japi.function.{ Function => JFunction } import akka.pattern.StatusReply @@ -59,4 +60,6 @@ import akka.util.Timeout } override def toString: String = s"TestEntityRef($entityId)" + + override private[akka] def asJava: EntityRef[M] = this } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 2211719e48..140631eddd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -441,7 +441,7 @@ object EntityTypeKey { * * Not for user extension. */ -@DoNotInherit abstract class EntityRef[M] extends RecipientRef[M] { +@DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] { scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] => /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 9e46aaabdb..5bd2287750 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -500,6 +500,11 @@ object EntityTypeKey { def ?[Res](message: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] = this.ask(message)(timeout) + /** + * INTERNAL API + */ + @InternalApi private[akka] def asJava: javadsl.EntityRef[M] + } object ClusterShardingSetup { diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index 548b9945be..156037b46d 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -70,7 +70,7 @@ object ReplicatedShardingSpec extends MultiNodeConfig { def apply(id: ReplicationId, ctx: ActorContext[Command]): EventSourcedBehavior[Command, String, State] = { // Relies on direct replication as there is no proxy query journal - ReplicatedEventSourcing.withSharedJournal(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) { + ReplicatedEventSourcing.commonJournalConfig(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => ctx.log.info("Creating replica {}", replicationContext.replicationId) EventSourcedBehavior[Command, String, State]( diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java index 0f9c105b1d..da813a698b 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java @@ -15,7 +15,7 @@ import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Receive; import akka.cluster.MemberStatus; import akka.cluster.sharding.typed.javadsl.Entity; -import akka.cluster.sharding.typed.scaladsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; import akka.persistence.testkit.PersistenceTestKitPlugin; @@ -67,7 +67,7 @@ public class ReplicatedShardingTest extends JUnitSuite { } static Behavior create(ReplicationId replicationId) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( replicationId, ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), @@ -143,44 +143,39 @@ public class ReplicatedShardingTest extends JUnitSuite { Arrays.asList( new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C")))); - private final ReplicatedSharding< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedSharding; + private final ReplicatedSharding replicatedSharding; private ProxyActor(ActorContext context) { super(context); // #bootstrap - ReplicatedEntityProvider< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedEntityProvider = - ReplicatedEntityProvider.create( - MyReplicatedStringSet.Command.class, - "StringSet", - ALL_REPLICAS, - // factory for replicated entity for a given replica - (entityTypeKey, replicaId) -> - ReplicatedEntity.create( - replicaId, - // use the replica id as typekey for sharding to get one sharding instance - // per replica - Entity.of( - entityTypeKey, - entityContext -> - // factory for the entity for a given entity in that replica - MyReplicatedStringSet.create( - ReplicationId.fromString(entityContext.getEntityId()))) - // potentially use replica id as role or dc in Akka multi dc for the - // sharding instance - // to control where replicas will live - // .withDataCenter(replicaId.id())) - .withRole(replicaId.id()))); + ReplicatedEntityProvider replicatedEntityProvider = + ReplicatedEntityProvider.create( + MyReplicatedStringSet.Command.class, + "StringSet", + ALL_REPLICAS, + // factory for replicated entity for a given replica + (entityTypeKey, replicaId) -> + ReplicatedEntity.create( + replicaId, + // use the replica id as typekey for sharding to get one sharding instance + // per replica + Entity.of( + entityTypeKey, + entityContext -> + // factory for the entity for a given entity in that replica + MyReplicatedStringSet.create( + ReplicationId.fromString(entityContext.getEntityId()))) + // potentially use replica id as role or dc in Akka multi dc for the + // sharding instance + // to control where replicas will live + // .withDataCenter(replicaId.id())) + .withRole(replicaId.id()))); ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(getContext().getSystem()); - ReplicatedSharding< - MyReplicatedStringSet.Command, ShardingEnvelope> - replicatedSharding = extension.init(replicatedEntityProvider); + ReplicatedSharding replicatedSharding = + extension.init(replicatedEntityProvider); // #bootstrap this.replicatedSharding = replicatedSharding; diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java index a65a7337c5..afc488bf64 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java @@ -4,12 +4,11 @@ package jdocs.akka.cluster.sharding.typed; -import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.cluster.sharding.typed.*; import akka.cluster.sharding.typed.javadsl.Entity; -import akka.cluster.sharding.typed.scaladsl.EntityRef; +import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.persistence.typed.ReplicaId; import akka.persistence.typed.ReplicationId; @@ -30,7 +29,7 @@ public class ReplicatedShardingCompileOnlySpec { new HashSet<>( Arrays.asList(new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C")))); - public static ReplicatedEntityProvider> provider() { + public static ReplicatedEntityProvider provider() { // #bootstrap return ReplicatedEntityProvider.create( Command.class, @@ -67,7 +66,7 @@ public class ReplicatedShardingCompileOnlySpec { // #bootstrap-dc } - public static ReplicatedEntityProvider> role() { + public static ReplicatedEntityProvider role() { // #bootstrap-role return ReplicatedEntityProvider.create( Command.class, @@ -90,13 +89,10 @@ public class ReplicatedShardingCompileOnlySpec { // #sending-messages ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system); - ReplicatedSharding> replicatedSharding = - extension.init(provider()); + ReplicatedSharding replicatedSharding = extension.init(provider()); Map> myEntityId = replicatedSharding.getEntityRefsFor("myEntityId"); - Map>> shardingRefs = - replicatedSharding.getShardingRefs(); // #sending-messages } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index 51908fe605..6fd6d0925a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -76,7 +76,7 @@ object ReplicatedShardingSpec { case class Texts(texts: Set[String]) extends CborSerializable def apply(replicationId: ReplicationId): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory replicationId, AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => @@ -97,7 +97,7 @@ object ReplicatedShardingSpec { } def provider(replicationType: ReplicationType) = - ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( + ReplicatedEntityProvider[MyReplicatedStringSet.Command]( // all replicas "StringSet", AllReplicas) { (entityTypeKey, replicaId) => @@ -127,7 +127,7 @@ object ReplicatedShardingSpec { case class Ints(ints: Set[Int]) extends CborSerializable def apply(id: ReplicationId, allReplicas: Set[ReplicaId]): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory id, allReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => @@ -148,9 +148,7 @@ object ReplicatedShardingSpec { } def provider(replicationType: ReplicationType) = - ReplicatedEntityProvider[MyReplicatedIntSet.Command, ShardingEnvelope[MyReplicatedIntSet.Command]]( - "IntSet", - AllReplicas) { (entityTypeKey, replicaId) => + ReplicatedEntityProvider[MyReplicatedIntSet.Command]("IntSet", AllReplicas) { (entityTypeKey, replicaId) => val entity = { val e = Entity(entityTypeKey) { entityContext => val replicationId = ReplicationId.fromString(entityContext.entityId) diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala index 45bb443191..3954ea139c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala @@ -4,14 +4,12 @@ package docs.akka.cluster.sharding.typed -import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.cluster.sharding.typed.ReplicatedEntity import akka.cluster.sharding.typed.ReplicatedEntityProvider import akka.cluster.sharding.typed.ReplicatedSharding import akka.cluster.sharding.typed.ReplicatedShardingExtension -import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.persistence.typed.ReplicaId @@ -30,45 +28,34 @@ object ReplicatedShardingCompileOnlySpec { } //#bootstrap - ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - // the sharding entity id contains the business entityId, entityType, and replica id - // which you'll need to create a ReplicatedEventSourcedBehavior - val replicationId = ReplicationId.fromString(entityContext.entityId) - MyEventSourcedBehavior(replicationId) - }) + ReplicatedEntityProvider[Command]("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { + (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + // the sharding entity id contains the business entityId, entityType, and replica id + // which you'll need to create a ReplicatedEventSourcedBehavior + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyEventSourcedBehavior(replicationId) + }) } //#bootstrap //#bootstrap-dc - ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - val replicationId = ReplicationId.fromString(entityContext.entityId) - MyEventSourcedBehavior(replicationId) - }.withDataCenter(replicaId.id)) + ReplicatedEntityProvider.perDataCenter("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId => + MyEventSourcedBehavior(replicationId) } //#bootstrap-dc //#bootstrap-role - val provider = ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( - "MyEntityType", - Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - val replicationId = ReplicationId.fromString(entityContext.entityId) + val provider = ReplicatedEntityProvider.perRole("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { + replicationId => MyEventSourcedBehavior(replicationId) - }.withRole(replicaId.id)) } //#bootstrap-role //#sending-messages - val myReplicatedSharding: ReplicatedSharding[Command, ShardingEnvelope[Command]] = + val myReplicatedSharding: ReplicatedSharding[Command] = ReplicatedShardingExtension(system).init(provider) val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId") - val actorRefs: Map[ReplicaId, ActorRef[ShardingEnvelope[Command]]] = myReplicatedSharding.shardingRefs //#sending-messages } diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java index 7cbc460f10..8badd779ad 100644 --- a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java @@ -80,7 +80,7 @@ public class ReplicatedEventSourcingTest extends JUnitSuite { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("ReplicatedEventSourcingTest", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java index 5a04ddf503..25166dccd7 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java @@ -33,7 +33,7 @@ public class MyReplicatedBehavior // #factory-shared public static Behavior create( String entityId, ReplicaId replicaId, String queryPluginId) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), ALL_REPLICAS, queryPluginId, @@ -47,7 +47,7 @@ public class MyReplicatedBehavior allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); - return ReplicatedEventSourcing.create( + return ReplicatedEventSourcing.perReplicaJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), allReplicasAndQueryPlugins, MyReplicatedBehavior::new); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java index 8d1091b7c3..e5b2497ddf 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java @@ -262,7 +262,7 @@ class AuctionEntity extends ReplicatedEventSourcedBehavior Behaviors.withTimers( timers -> - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("Auction", name, replica), ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java index b32c811ea5..fb550b04fe 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -187,7 +187,7 @@ interface ReplicatedBlogExample { String entityId, ReplicaId replicaId, Set allReplicas) { return Behaviors.setup( context -> - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java index 94b219c6c1..da80971b95 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java @@ -61,7 +61,7 @@ interface ReplicatedMovieExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("movies", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java index 629065b614..069f0982aa 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java @@ -85,7 +85,7 @@ interface ReplicatedShoppingCartExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java index c9198f65d0..011108922f 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java @@ -27,7 +27,7 @@ public final class ReplicatedStringSet public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { - return ReplicatedEventSourcing.withSharedJournal( + return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("StringSet", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala index 368f9885dc..eb8e701238 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala @@ -30,7 +30,7 @@ object ReplicatedEventPublishingSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup { ctx => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId(EntityType, entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier)( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index c6ba98a771..919bda434b 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -71,7 +71,7 @@ object ReplicatedEventSourcingSpec { entityId: String, replicaId: String, probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe)) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala index 626c07daf1..f5a7e61601 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala @@ -42,7 +42,7 @@ object ReplicatedEventSourcingTaggingSpec { replica: ReplicaId, allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = { // #tagging - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("TaggingSpec", entityId, replica), allReplicas, queryPluginId)( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala index 79503a5d3b..cf457d5e4d 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala @@ -28,7 +28,7 @@ object ReplicationIllegalAccessSpec { case class State(all: List[String]) extends CborSerializable def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("IllegalAccessSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier)( @@ -88,7 +88,7 @@ class ReplicationIllegalAccessSpec } "detect illegal access in the factory" in { val exception = intercept[UnsupportedOperationException] { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("IllegalAccessSpec", "id2", R1), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala index 3fe5722b95..620c8ffbb4 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala @@ -35,7 +35,7 @@ object ReplicationSnapshotSpec { entityId: String, replicaId: ReplicaId, probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId(EntityType, entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala index aeecc51881..3d6a9468c9 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala @@ -29,7 +29,7 @@ object CounterSpec { snapshotEvery: Long = 100, eventProbe: Option[ActorRef[Counter.Updated]] = None) = Behaviors.setup[PlainCounter.Command] { context => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("CounterSpec", entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier) { ctx => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala index 1de011f629..f7a1cfbc73 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala @@ -27,7 +27,7 @@ object LwwSpec { object LwwRegistry { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("LwwRegistrySpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala index 95a2f383a9..6f9a384ad8 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala @@ -28,7 +28,7 @@ object ORSetSpec { def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("ORSetSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index fcae4417a1..71d868a6a1 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -143,7 +143,7 @@ object ReplicatedAuctionExampleSpec { responsibleForClosing: Boolean, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx => Behaviors.withTimers { timers => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("auction", name, replica), allReplicas, PersistenceTestKitReadJournal.Identifier) { replicationCtx => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index 53d3949e0c..cba33aee8e 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -52,7 +52,7 @@ object ReplicatedBlogExampleSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { Behaviors.setup[Command] { ctx => - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala index f6edecc687..002f2cd0ef 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala @@ -25,7 +25,7 @@ object ReplicatedEventSourcingCompileOnlySpec { trait Event //#factory-shared - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("entityTypeHint", "entityId", DCA), AllReplicas, queryPluginId) { context => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala index c67a0bc203..53a726bb3a 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala @@ -28,7 +28,7 @@ object ReplicatedMovieWatchListExampleSpec { final case class MovieList(movieIds: Set[String]) def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("movies", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala index a8ed48b856..53a9fd2106 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala @@ -41,7 +41,7 @@ object ReplicatedShoppingCartExampleSpec { final case class State(items: Map[ProductId, Counter]) def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { - ReplicatedEventSourcing.withSharedJournal( + ReplicatedEventSourcing.commonJournalConfig( ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala index 3f3e6c049c..1ba2d88151 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala @@ -72,7 +72,8 @@ trait ReplicationContext { object ReplicatedEventSourcing { /** - * Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal. + * Initialize a replicated event sourced behavior where all entity replicas are share the same journal configuration. + * This is typical if there is a shared database and no replica specific configuratin is required. * * Events from each replica for the same entityId will be replicated to every copy. * Care must be taken to handle events in any order as events can happen concurrently at different replicas. @@ -85,16 +86,20 @@ object ReplicatedEventSourcing { * * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ - def withSharedJournal[Command, Event, State]( + def commonJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicaIds: JSet[ReplicaId], queryPluginId: String, behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) : EventSourcedBehavior[Command, Event, State] = - create(replicationId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory) + perReplicaJournalConfig( + replicationId, + allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, + behaviorFactory) /** - * Initialize a replicated event sourced behavior. + * Initialize a replicated event sourced behavior where each journal has different journal configuration e.g. + * each replica uses a different database or requires different database configuration for a shared database. * * Events from each replica for the same entityId will be replicated to every copy. * Care must be taken to handle events in any order as events can happen concurrently at different replicas. @@ -108,7 +113,7 @@ object ReplicatedEventSourcing { * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ - def create[Command, Event, State]( + def perReplicaJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicasAndQueryPlugins: JMap[ReplicaId, String], eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala index d5ceaff944..c9d12d6655 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala @@ -82,13 +82,14 @@ object ReplicatedEventSourcing { * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ - def withSharedJournal[Command, Event, State]( + def commonJournalConfig[Command, Event, State]( replicationId: ReplicationId, allReplicaIds: Set[ReplicaId], queryPluginId: String)( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = - apply(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory) + perReplicaJournalConfig(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)( + eventSourcedBehaviorFactory) /** * Initialize a replicated event sourced behavior. @@ -104,7 +105,9 @@ object ReplicatedEventSourcing { * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ - def apply[Command, Event, State](replicationId: ReplicationId, allReplicasAndQueryPlugins: Map[ReplicaId, String])( + def perReplicaJournalConfig[Command, Event, State]( + replicationId: ReplicationId, + allReplicasAndQueryPlugins: Map[ReplicaId, String])( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = { val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins)