diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala index 6b2e5a0586..2a73361093 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ReplicatedEntityProvider.scala @@ -16,6 +16,7 @@ import java.util.{ Set => JSet } import akka.annotation.ApiMayChange import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.persistence.typed.ReplicationId.Separator @ApiMayChange object ReplicatedEntityProvider { @@ -28,29 +29,30 @@ object ReplicatedEntityProvider { */ def create[M, E]( messageClass: Class[M], + typeName: String, allReplicaIds: JSet[ReplicaId], - settingsPerReplicaFactory: akka.japi.function.Function3[ - JEntityTypeKey[M], - ReplicaId, - JSet[ReplicaId], - ReplicatedEntity[M, E]]): ReplicatedEntityProvider[M, E] = { + settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M, E]]) + : ReplicatedEntityProvider[M, E] = { implicit val classTag: ClassTag[M] = ClassTag(messageClass) - apply[M, E](allReplicaIds.asScala.toSet)((key, replica, _) => - settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica, allReplicaIds)) + apply[M, E](typeName, allReplicaIds.asScala.toSet)((key, replica) => + settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica)) } /** * Scala API: - * + * @param typeName The type name used in the [[EntityTypeKey]] * @tparam M The type of messages the replicated entity accepts * @tparam E The type for envelopes used for sending `M`s over sharding */ - def apply[M: ClassTag, E](allReplicaIds: Set[ReplicaId])( - settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId, Set[ReplicaId]) => ReplicatedEntity[M, E]) + def apply[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])( + settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M, E]) : ReplicatedEntityProvider[M, E] = { new ReplicatedEntityProvider(allReplicaIds.map { replicaId => - val typeKey = EntityTypeKey[M](replicaId.id) - settingsPerReplicaFactory(typeKey, replicaId, allReplicaIds) + if (typeName.contains(Separator)) + throw new IllegalArgumentException(s"typeName [$typeName] contains [$Separator] which is a reserved character") + + val typeKey = EntityTypeKey[M](s"$typeName${Separator}${replicaId.id}") + (settingsPerReplicaFactory(typeKey, replicaId), typeName) }.toVector, directReplication = false) } } @@ -61,7 +63,7 @@ object ReplicatedEntityProvider { */ @ApiMayChange final class ReplicatedEntityProvider[M, E] private ( - val replicas: immutable.Seq[ReplicatedEntity[M, E]], + val replicas: immutable.Seq[(ReplicatedEntity[M, E], String)], val directReplication: Boolean) { /** diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala index aff3515bb9..f8f3e2383f 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ReplicatedShardingExtensionImpl.scala @@ -16,12 +16,12 @@ import akka.cluster.sharding.typed.ReplicatedEntityProvider import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.cluster.sharding.typed.scaladsl.EntityTypeKey -import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId import org.slf4j.LoggerFactory import akka.actor.typed.scaladsl.LoggerOps +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.typed.ShardingDirectReplication - +import akka.persistence.typed.ReplicationId import akka.util.ccompat.JavaConverters._ /** @@ -36,17 +36,23 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = { val sharding = ClusterSharding(system) - val initializedReplicas = settings.replicas.map { replicaSettings => - // start up a sharding instance per replica id - logger.infoN( - "Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])", - replicaSettings.replicaId.id, - replicaSettings.entity.typeKey.name) - val regionOrProxy = sharding.init(replicaSettings.entity) - (replicaSettings.replicaId, replicaSettings.entity.typeKey, regionOrProxy) + val initializedReplicas = settings.replicas.map { + case (replicaSettings, typeName) => + // start up a sharding instance per replica id + logger.infoN( + "Starting Replicated Event Sourcing sharding for replica [{}] (ShardType: [{}])", + replicaSettings.replicaId.id, + replicaSettings.entity.typeKey.name) + val regionOrProxy = sharding.init(replicaSettings.entity) + ( + typeName, + replicaSettings.replicaId, + replicaSettings.entity.typeKey, + regionOrProxy, + replicaSettings.entity.dataCenter) } val replicaToRegionOrProxy = initializedReplicas.map { - case (id, _, regionOrProxy) => id -> regionOrProxy + case (_, replicaId, _, regionOrProxy, _) => replicaId -> regionOrProxy }.toMap if (settings.directReplication) { logger.infoN("Starting Replicated Event Sourcing Direct Replication") @@ -55,7 +61,9 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] s"directReplication-${counter.incrementAndGet()}") } - val replicaToTypeKey = initializedReplicas.map { case (id, typeKey, _) => id -> typeKey }.toMap + val replicaToTypeKey = initializedReplicas.map { + case (typeName, id, typeKey, _, dc) => id -> ((typeKey, dc, typeName)) + }.toMap new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey) } } @@ -67,16 +75,21 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_] private[akka] final class ReplicatedShardingImpl[M, E]( sharding: ClusterSharding, shardingPerReplica: Map[ReplicaId, ActorRef[E]], - replicaTypeKeys: Map[ReplicaId, EntityTypeKey[M]]) + replicaTypeKeys: Map[ReplicaId, (EntityTypeKey[M], Option[DataCenter], String)]) extends ReplicatedSharding[M, E] { + // FIXME add test coverage for these override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] = replicaTypeKeys.map { - case (replicaId, typeKey) => - replicaId -> sharding.entityRefFor(typeKey, PersistenceId.ofUniqueId(entityId).id) + case (replicaId, (typeKey, dc, typeName)) => + replicaId -> (dc match { + case None => sharding.entityRefFor(typeKey, ReplicationId(typeName, entityId, replicaId).persistenceId.id) + case Some(dc) => + sharding.entityRefFor(typeKey, ReplicationId(typeName, entityId, replicaId).persistenceId.id, dc) + }) } override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] = diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java index be2ec24770..0f9c105b1d 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java @@ -21,6 +21,7 @@ import akka.cluster.typed.Join; import akka.persistence.testkit.PersistenceTestKitPlugin; import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.javadsl.*; import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; @@ -32,6 +33,7 @@ import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static akka.cluster.sharding.typed.ReplicatedShardingTest.ProxyActor.ALL_REPLICAS; import static org.junit.Assert.assertEquals; public class ReplicatedShardingTest extends JUnitSuite { @@ -64,13 +66,10 @@ public class ReplicatedShardingTest extends JUnitSuite { } } - static Behavior create( - String entityId, ReplicaId replicaId, Set allReplicas) { + static Behavior create(ReplicationId replicationId) { return ReplicatedEventSourcing.withSharedJournal( - "StringSet", - entityId, - replicaId, - allReplicas, + replicationId, + ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), MyReplicatedStringSet::new); } @@ -157,9 +156,10 @@ public class ReplicatedShardingTest extends JUnitSuite { replicatedEntityProvider = ReplicatedEntityProvider.create( MyReplicatedStringSet.Command.class, + "StringSet", ALL_REPLICAS, // factory for replicated entity for a given replica - (entityTypeKey, replicaId, allReplicas) -> + (entityTypeKey, replicaId) -> ReplicatedEntity.create( replicaId, // use the replica id as typekey for sharding to get one sharding instance @@ -169,7 +169,7 @@ public class ReplicatedShardingTest extends JUnitSuite { entityContext -> // factory for the entity for a given entity in that replica MyReplicatedStringSet.create( - entityContext.getEntityId(), replicaId, allReplicas)) + ReplicationId.fromString(entityContext.getEntityId()))) // potentially use replica id as role or dc in Akka multi dc for the // sharding instance // to control where replicas will live diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java new file mode 100644 index 0000000000..a65a7337c5 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java @@ -0,0 +1,103 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.cluster.sharding.typed.*; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.scaladsl.EntityRef; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; + +import java.util.*; + +public class ReplicatedShardingCompileOnlySpec { + + private static ActorSystem system = null; + + interface Command {} + + public static Behavior myEventSourcedBehavior(ReplicationId replicationId) { + return null; + } + + public static final Set ALL_REPLICAS = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList(new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C")))); + + public static ReplicatedEntityProvider> provider() { + // #bootstrap + return ReplicatedEntityProvider.create( + Command.class, + "MyReplicatedType", + ALL_REPLICAS, + (entityTypeKey, replicaId) -> + ReplicatedEntity.create( + replicaId, + Entity.of( + entityTypeKey, + entityContext -> + myEventSourcedBehavior( + ReplicationId.fromString(entityContext.getEntityId()))))); + + // #bootstrap + } + + public static void dc() { + // #bootstrap-dc + ReplicatedEntityProvider.create( + Command.class, + "MyReplicatedType", + ALL_REPLICAS, + (entityTypeKey, replicaId) -> + ReplicatedEntity.create( + replicaId, + Entity.of( + entityTypeKey, + entityContext -> + myEventSourcedBehavior( + ReplicationId.fromString(entityContext.getEntityId()))) + .withDataCenter(replicaId.id()))); + + // #bootstrap-dc + } + + public static ReplicatedEntityProvider> role() { + // #bootstrap-role + return ReplicatedEntityProvider.create( + Command.class, + "MyReplicatedType", + ALL_REPLICAS, + (entityTypeKey, replicaId) -> + ReplicatedEntity.create( + replicaId, + Entity.of( + entityTypeKey, + entityContext -> + myEventSourcedBehavior( + ReplicationId.fromString(entityContext.getEntityId()))) + .withRole(replicaId.id()))); + + // #bootstrap-role + } + + public static void sendingMessages() { + // #sending-messages + ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system); + + ReplicatedSharding> replicatedSharding = + extension.init(provider()); + + Map> myEntityId = + replicatedSharding.getEntityRefsFor("myEntityId"); + Map>> shardingRefs = + replicatedSharding.getShardingRefs(); + // #sending-messages + + } +} diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala index 871317b1ce..550ca2e296 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala @@ -10,10 +10,10 @@ import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.eventstream.EventStream import akka.persistence.typed -import akka.persistence.typed.PersistenceId import akka.persistence.typed.PublishedEvent import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector } import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { @@ -37,7 +37,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough val event = PublishedEventImpl( - PersistenceId.replicatedId("ReplicatedShardingSpec", "pid", ReplicaId("ReplicaA")), + ReplicationId("ReplicatedShardingSpec", "pid", ReplicaId("ReplicaA")).persistenceId, 1L, "event", System.currentTimeMillis(), diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index b37a6d26d6..af33673f36 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -6,12 +6,18 @@ package akka.cluster.sharding.typed import java.util.concurrent.ThreadLocalRandom +import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.cluster.MemberStatus +import akka.cluster.sharding.typed.ReplicatedShardingSpec.DataCenter +import akka.cluster.sharding.typed.ReplicatedShardingSpec.Normal +import akka.cluster.sharding.typed.ReplicatedShardingSpec.ReplicationType +import akka.cluster.sharding.typed.ReplicatedShardingSpec.Role import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.typed.Cluster import akka.cluster.typed.Join @@ -24,35 +30,54 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.serialization.jackson.CborSerializable import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike +import akka.actor.typed.scaladsl.LoggerOps +import akka.cluster.sharding.typed.ReplicatedShardingSpec.MyReplicatedIntSet +import akka.cluster.sharding.typed.ReplicatedShardingSpec.MyReplicatedStringSet +import akka.persistence.typed.ReplicationId +import com.typesafe.config.Config object ReplicatedShardingSpec { - def config = ConfigFactory.parseString(""" - akka.loglevel = DEBUG + def commonConfig = ConfigFactory.parseString(""" + akka.loglevel = INFO akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] akka.actor.provider = "cluster" - # pretend we're a node in all dc:s - akka.cluster.roles = ["DC-A", "DC-B", "DC-C"] akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0""").withFallback(PersistenceTestKitPlugin.config) -} -class ReplicatedShardingSpec - extends ScalaTestWithActorTestKit(ReplicatedShardingSpec.config) - with AnyWordSpecLike - with LogCapturing { + def roleAConfig = ConfigFactory.parseString(""" + akka.cluster.roles = ["DC-A"] + """.stripMargin).withFallback(commonConfig) + + def roleBConfig = ConfigFactory.parseString(""" + akka.cluster.roles = ["DC-B"] + """.stripMargin).withFallback(commonConfig) + + def dcAConfig = ConfigFactory.parseString(""" + akka.cluster.multi-data-center.self-data-center = "DC-A" + """).withFallback(commonConfig) + + def dcBConfig = ConfigFactory.parseString(""" + akka.cluster.multi-data-center.self-data-center = "DC-B" + """).withFallback(commonConfig) + + sealed trait ReplicationType + case object Role extends ReplicationType + case object DataCenter extends ReplicationType + case object Normal extends ReplicationType + + val AllReplicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B")) object MyReplicatedStringSet { trait Command extends CborSerializable case class Add(text: String) extends Command case class GetTexts(replyTo: ActorRef[Texts]) extends Command + case class Texts(texts: Set[String]) extends CborSerializable - def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] = - ReplicatedEventSourcing.withSharedJournal( - "StringSet", - entityId, - replicaId, - allReplicas, + def apply(replicationId: ReplicationId): Behavior[Command] = + ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + replicationId, + AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, String, Set[String]]( replicationContext.persistenceId, @@ -65,81 +90,205 @@ class ReplicatedShardingSpec replyTo ! Texts(state) Effect.none }, - (state, event) => state + event).withJournalPluginId(PersistenceTestKitPlugin.PluginId) + (state, event) => state + event) + .withJournalPluginId(PersistenceTestKitPlugin.PluginId) + .withEventPublishing(true) } + + def provider(replicationType: ReplicationType) = + ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( + // all replicas + "StringSet", + AllReplicas) { (entityTypeKey, replicaId) => + // factory for replicated entity for a given replica + val entity = { + val e = Entity(entityTypeKey) { entityContext => + MyReplicatedStringSet(ReplicationId.fromString(entityContext.entityId)) + } + replicationType match { + case Role => + e.withRole(replicaId.id) + case DataCenter => + e.withDataCenter(replicaId.id) + case Normal => + e + } + } + ReplicatedEntity(replicaId, entity) + }.withDirectReplication(true) + } - object ProxyActor { - sealed trait Command - case class ForwardToRandom(entityId: String, msg: MyReplicatedStringSet.Command) extends Command - case class ForwardToAll(entityId: String, msg: MyReplicatedStringSet.Command) extends Command + object MyReplicatedIntSet { + trait Command extends CborSerializable + case class Add(text: Int) extends Command + case class GetInts(replyTo: ActorRef[Ints]) extends Command + case class Ints(ints: Set[Int]) extends CborSerializable - def apply(): Behavior[Command] = Behaviors.setup { context => - // #bootstrap - val replicatedShardingProvider = - ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]]( - // all replicas - Set(ReplicaId("DC-A"), ReplicaId("DC-B"), ReplicaId("DC-C"))) { (entityTypeKey, replicaId, allReplicaIds) => - // factory for replicated entity for a given replica - ReplicatedEntity( - replicaId, - // use the provided entity type key for sharding to get one sharding instance per replica - Entity(entityTypeKey) { entityContext => - // factory for the entity for a given entity in that replica - MyReplicatedStringSet(entityContext.entityId, replicaId, allReplicaIds) - } - // potentially use replica id as role or dc in Akka multi dc for the sharding instance - // to control where replicas will live - // .withDataCenter(replicaId.id)) - .withRole(replicaId.id)) + def apply(id: ReplicationId, allReplicas: Set[ReplicaId]): Behavior[Command] = + ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory + id, + allReplicas, + PersistenceTestKitReadJournal.Identifier) { replicationContext => + EventSourcedBehavior[Command, Int, Set[Int]]( + replicationContext.persistenceId, + Set.empty[Int], + (state, command) => + command match { + case Add(int) => + Effect.persist(int) + case GetInts(replyTo) => + replyTo ! Ints(state) + Effect.none + }, + (state, event) => state + event) + .withJournalPluginId(PersistenceTestKitPlugin.PluginId) + .withEventPublishing(true) + } + + def provider(replicationType: ReplicationType) = + ReplicatedEntityProvider[MyReplicatedIntSet.Command, ShardingEnvelope[MyReplicatedIntSet.Command]]( + "IntSet", + AllReplicas) { (entityTypeKey, replicaId) => + val entity = { + val e = Entity(entityTypeKey) { entityContext => + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyReplicatedIntSet(replicationId, AllReplicas) + } + replicationType match { + case Role => + e.withRole(replicaId.id) + case DataCenter => + e.withDataCenter(replicaId.id) + case Normal => + e + } } + ReplicatedEntity(replicaId, entity) + }.withDirectReplication(true) + } +} - val replicatedSharding = ReplicatedShardingExtension(context.system).init(replicatedShardingProvider) - // #bootstrap +object ProxyActor { + sealed trait Command + case class ForwardToRandomString(entityId: String, msg: MyReplicatedStringSet.Command) extends Command + case class ForwardToAllString(entityId: String, msg: MyReplicatedStringSet.Command) extends Command + case class ForwardToRandomInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command + case class ForwardToAllInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command + def apply(replicationType: ReplicationType): Behavior[Command] = Behaviors.setup { context => + val replicatedShardingStringSet = + ReplicatedShardingExtension(context.system).init(MyReplicatedStringSet.provider(replicationType)) + val replicatedShardingIntSet = + ReplicatedShardingExtension(context.system).init(MyReplicatedIntSet.provider(replicationType)) + Behaviors.setup { ctx => Behaviors.receiveMessage { - case ForwardToAll(entityId, cmd) => - // #all-entity-refs - replicatedSharding.entityRefsFor(entityId).foreach { + case ForwardToAllString(entityId, cmd) => + val entityRefs = replicatedShardingStringSet.entityRefsFor(entityId) + + ctx.log.infoN("Entity refs {}", entityRefs) + + entityRefs.foreach { + case (replica, ref) => + ctx.log.infoN("Forwarding to replica {} ref {}", replica, ref) + ref ! cmd + } + Behaviors.same + case ForwardToRandomString(entityId, cmd) => + val refs = replicatedShardingStringSet.entityRefsFor(entityId) + val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size) + val chosen = refs.values.toIndexedSeq(chosenIdx) + ctx.log.info("Forwarding to {}", chosen) + chosen ! cmd + Behaviors.same + case ForwardToAllInt(entityId, cmd) => + replicatedShardingIntSet.entityRefsFor(entityId).foreach { case (_, ref) => ref ! cmd } - // #all-entity-refs Behaviors.same - case ForwardToRandom(entityId, cmd) => - val refs = replicatedSharding.entityRefsFor(entityId) + case ForwardToRandomInt(entityId, cmd) => + val refs = + replicatedShardingIntSet.entityRefsFor(entityId) val chosenIdx = ThreadLocalRandom.current().nextInt(refs.size) - refs.values.toIndexedSeq(chosenIdx) ! cmd; + refs.values.toIndexedSeq(chosenIdx) ! cmd Behaviors.same } } } +} + +class NormalReplicatedShardingSpec + extends ReplicatedShardingSpec(Normal, ReplicatedShardingSpec.commonConfig, ReplicatedShardingSpec.commonConfig) +class RoleReplicatedShardingSpec + extends ReplicatedShardingSpec(Role, ReplicatedShardingSpec.roleAConfig, ReplicatedShardingSpec.roleBConfig) +class DataCenterReplicatedShardingSpec + extends ReplicatedShardingSpec(DataCenter, ReplicatedShardingSpec.dcAConfig, ReplicatedShardingSpec.dcBConfig) + +abstract class ReplicatedShardingSpec(replicationType: ReplicationType, configA: Config, configB: Config) + extends ScalaTestWithActorTestKit(configA) + with AnyWordSpecLike + with LogCapturing { + + val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = configB) + + override protected def afterAll(): Unit = { + super.afterAll() + ActorTestKit.shutdown( + system2, + testKitSettings.DefaultActorSystemShutdownTimeout, + testKitSettings.ThrowOnShutdownTimeout) + } "Replicated sharding" should { "form a one node cluster" in { - val node = Cluster(system) - node.manager ! Join(node.selfMember.address) + Cluster(system).manager ! Join(Cluster(system).selfMember.address) + Cluster(system2).manager ! Join(Cluster(system).selfMember.address) + eventually { - node.selfMember.status should ===(MemberStatus.Up) + Cluster(system).state.members.size should ===(2) + Cluster(system).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up)) + } + eventually { + Cluster(system2).state.members.size should ===(2) + Cluster(system2).state.members.map(_.status) should ===(Set[MemberStatus](MemberStatus.Up)) } } - "forward to replicas" in { - val proxy = spawn(ProxyActor()) + "start replicated sharding on both nodes" in { + def start(sys: ActorSystem[_]) = { + ReplicatedShardingExtension(sys).init(MyReplicatedStringSet.provider(replicationType)) + ReplicatedShardingExtension(sys).init(MyReplicatedIntSet.provider(replicationType)) + } + start(system) + start(system2) + } - proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.Add("to-all")) - proxy ! ProxyActor.ForwardToRandom("id1", MyReplicatedStringSet.Add("to-random")) + "forward to replicas" in { + val proxy = spawn(ProxyActor(replicationType)) + + proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.Add("to-all")) + proxy ! ProxyActor.ForwardToRandomString("id1", MyReplicatedStringSet.Add("to-random")) eventually { val probe = createTestProbe[MyReplicatedStringSet.Texts]() - proxy ! ProxyActor.ForwardToAll("id1", MyReplicatedStringSet.GetTexts(probe.ref)) - val responses: Seq[MyReplicatedStringSet.Texts] = probe.receiveMessages(3) + proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.GetTexts(probe.ref)) + val responses: Seq[MyReplicatedStringSet.Texts] = probe.receiveMessages(2) val uniqueTexts = responses.flatMap(res => res.texts).toSet uniqueTexts should ===(Set("to-all", "to-random")) } - } + proxy ! ProxyActor.ForwardToAllInt("id1", MyReplicatedIntSet.Add(10)) + proxy ! ProxyActor.ForwardToRandomInt("id1", MyReplicatedIntSet.Add(11)) + eventually { + val probe = createTestProbe[MyReplicatedIntSet.Ints]() + proxy ! ProxyActor.ForwardToAllInt("id1", MyReplicatedIntSet.GetInts(probe.ref)) + val responses: Seq[MyReplicatedIntSet.Ints] = probe.receiveMessages(2) + val uniqueTexts = responses.flatMap(res => res.ints).toSet + uniqueTexts should ===(Set(10, 11)) + } + } } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala new file mode 100644 index 0000000000..45bb443191 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.cluster.sharding.typed + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.ReplicatedEntity +import akka.cluster.sharding.typed.ReplicatedEntityProvider +import akka.cluster.sharding.typed.ReplicatedSharding +import akka.cluster.sharding.typed.ReplicatedShardingExtension +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityRef +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId +import com.github.ghik.silencer.silent + +@silent("never used") +object ReplicatedShardingCompileOnlySpec { + + sealed trait Command + + val system: ActorSystem[_] = ??? + + object MyEventSourcedBehavior { + def apply(replicationId: ReplicationId): Behavior[Command] = ??? + } + + //#bootstrap + ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( + "MyEntityType", + Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + // the sharding entity id contains the business entityId, entityType, and replica id + // which you'll need to create a ReplicatedEventSourcedBehavior + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyEventSourcedBehavior(replicationId) + }) + } + //#bootstrap + + //#bootstrap-dc + ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( + "MyEntityType", + Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyEventSourcedBehavior(replicationId) + }.withDataCenter(replicaId.id)) + } + //#bootstrap-dc + + //#bootstrap-role + val provider = ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]( + "MyEntityType", + Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + val replicationId = ReplicationId.fromString(entityContext.entityId) + MyEventSourcedBehavior(replicationId) + }.withRole(replicaId.id)) + } + //#bootstrap-role + + //#sending-messages + val myReplicatedSharding: ReplicatedSharding[Command, ShardingEnvelope[Command]] = + ReplicatedShardingExtension(system).init(provider) + + val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId") + val actorRefs: Map[ReplicaId, ActorRef[ShardingEnvelope[Command]]] = myReplicatedSharding.shardingRefs + //#sending-messages +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index c9be7625d0..b21b890397 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -632,7 +632,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { case null => proxies.get(proxyName(typeName, None)) match { case null => - throw new IllegalStateException(s"Shard type [$typeName] must be started first") + throw new IllegalStateException( + s"Shard type [$typeName] must be started first. Started ${regions.keySet()} proxies ${proxies.keySet()}") case ref => ref } case ref => ref diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 6af9765a09..1f01f53cbe 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -41,9 +41,9 @@ class Member private[cluster] ( } override def toString = if (dataCenter == ClusterSettings.DefaultDataCenter) - s"Member(address = $address, status = $status)" + s"Member(address = $address, status = $status, roles = $roles)" else - s"Member(address = $address, dataCenter = $dataCenter, status = $status)" + s"Member(address = $address, dataCenter = $dataCenter, status = $status, roles = $roles)" def hasRole(role: String): Boolean = roles.contains(role) diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index ae05d76266..72361f204b 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -298,27 +298,50 @@ When comparing two version vectors `v1` and `v2`: ## Sharded Replicated Event Sourced entities -To simplify what probably are the most common use cases for how you will want to distribute the replicated actors there is a minimal API for running multiple instances of @ref[Akka Cluster Sharding](cluster-sharding.md), -each instance holding the entities for a single replica. +There are three ways to integrate replicated event sourced entities with sharding: -The distribution of the replicas can be controlled either through cluster roles or using the @ref[multi datacenter](cluster-dc.md) support in Akka Cluster. +* Ensure that each replica has a unique entity id by using the replica id as part of the entity id +* Use @ref[multi datacenter](cluster-dc.md) to run a full copy of sharding per replica +* Use roles to run a full copy of sharding per replica -The API consists of bootstrapping logic for starting the sharding instances through @apidoc[ReplicatedShardingExtension] available from the + +To simplify all three cases the @apidoc[ReplicatedShardingExtension] is available from the `akka-cluster-sharding-typed` module. Scala -: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #bootstrap } +: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap } Java -: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #bootstrap } +: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap } + +This will run a single instance of sharding and the replicas will be differentiated by having the replica id in the sharding entity id. +Replicas could be on the same node if they end up in the same shard or if the shards get allocated to the same node. + +To prevent this roles can be used. You could for instance add a cluster role per availability zone / rack and have a replica per rack. + +Scala +: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap-role } + +Java +: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap-role } + +Lastly if your Akka Cluster is setup across DCs you can run a replica per DC. + +Scala +: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #bootstrap-dc } + +Java +: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #bootstrap-dc } + +Regardless of which replication strategy you use sending messages to the replicated entities is the same. `init` returns an @apidoc[ReplicatedSharding] instance which gives access to @apidoc[EntityRef]s for each of the replicas for arbitrary routing logic: Scala -: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala) { #all-entity-refs } +: @@snip [ReplicatedShardingSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.scala) { #sending-messages } Java -: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/ReplicatedShardingTest.java) { #all-entity-refs } +: @@snip [ReplicatedShardingTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ReplicatedShardingCompileOnlySpec.java) { #sending-messages } More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)). diff --git a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java index a5efaf67fa..7cbc460f10 100644 --- a/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java +++ b/akka-persistence-typed-tests/src/test/java/akka/persistence/typed/ReplicatedEventSourcingTest.java @@ -81,9 +81,7 @@ public class ReplicatedEventSourcingTest extends JUnitSuite { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { return ReplicatedEventSourcing.withSharedJournal( - "ReplicatedEventSourcingTest", - entityId, - replicaId, + new ReplicationId("ReplicatedEventSourcingTest", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), TestBehavior::new); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java index 911d91486f..5a04ddf503 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java @@ -6,6 +6,7 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.Behavior; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.javadsl.*; import java.util.*; @@ -33,9 +34,7 @@ public class MyReplicatedBehavior public static Behavior create( String entityId, ReplicaId replicaId, String queryPluginId) { return ReplicatedEventSourcing.withSharedJournal( - "MyReplicatedEntity", - entityId, - replicaId, + new ReplicationId("MyReplicatedEntity", entityId, replicaId), ALL_REPLICAS, queryPluginId, MyReplicatedBehavior::new); @@ -49,9 +48,7 @@ public class MyReplicatedBehavior allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); return ReplicatedEventSourcing.create( - "MyReplicatedEntity", - entityId, - replicaId, + new ReplicationId("MyReplicatedEntity", entityId, replicaId), allReplicasAndQueryPlugins, MyReplicatedBehavior::new); } diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java index 955f1ff8b2..8d1091b7c3 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java @@ -16,6 +16,7 @@ import akka.persistence.testkit.PersistenceTestKitPlugin; import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal; import akka.persistence.typed.RecoveryCompleted; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -262,9 +263,7 @@ class AuctionEntity extends ReplicatedEventSourcedBehavior ReplicatedEventSourcing.withSharedJournal( - "Auction", - name, - replica, + new ReplicationId("Auction", name, replica), ALL_REPLICAS, PersistenceTestKitReadJournal.Identifier(), replicationCtx -> diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java index 09c14b60c4..b32c811ea5 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -11,6 +11,7 @@ import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.crdt.LwwTime; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.Effect; @@ -187,9 +188,7 @@ interface ReplicatedBlogExample { return Behaviors.setup( context -> ReplicatedEventSourcing.withSharedJournal( - "blog", - entityId, - replicaId, + new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), replicationContext -> new BlogEntity(context, replicationContext))); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java index 16ae730de1..94b219c6c1 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java @@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.crdt.ORSet; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.EventHandler; @@ -61,9 +62,7 @@ interface ReplicatedMovieExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { return ReplicatedEventSourcing.withSharedJournal( - "movies", - entityId, - replicaId, + new ReplicationId("movies", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), MovieWatchList::new); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java index 2c49cd2fd9..629065b614 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java @@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.crdt.Counter; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.Effect; @@ -85,9 +86,7 @@ interface ReplicatedShoppingCartExample { public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { return ReplicatedEventSourcing.withSharedJournal( - "blog", - entityId, - replicaId, + new ReplicationId("blog", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), ShoppingCart::new); diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java index 0aceecac1e..c9198f65d0 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java @@ -7,6 +7,7 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.Behavior; import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.ReplicationId; import akka.persistence.typed.javadsl.*; import java.util.HashSet; @@ -27,9 +28,7 @@ public final class ReplicatedStringSet public static Behavior create( String entityId, ReplicaId replicaId, Set allReplicas) { return ReplicatedEventSourcing.withSharedJournal( - "StringSet", - entityId, - replicaId, + new ReplicationId("StringSet", entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier(), ReplicatedStringSet::new); diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala index 2c4f0614af..4257e324f8 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala @@ -33,9 +33,7 @@ object MultiJournalReplicationSpec { private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal") def apply(entityId: String, replicaId: String): Behavior[Command] = { ReplicatedEventSourcing( - "MultiJournalSpec", - entityId, - ReplicaId(replicaId), + ReplicationId("MultiJournalSpec", entityId, ReplicaId(replicaId)), Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))( replicationContext => EventSourcedBehavior[Command, String, Set[String]]( @@ -100,11 +98,17 @@ class MultiJournalReplicationSpec val readJournal2 = PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery]("journal2.query") val eventsForJournal1 = - readJournal1.currentEventsByPersistenceId("id1|R1", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + readJournal1 + .currentEventsByPersistenceId("MultiJournalSpec|id1|R1", 0L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue eventsForJournal1.map(_.event).toSet should ===(Set("r1 m1", "r2 m1")) val eventsForJournal2 = - readJournal2.currentEventsByPersistenceId("id1|R2", 0L, Long.MaxValue).runWith(Sink.seq).futureValue + readJournal2 + .currentEventsByPersistenceId("MultiJournalSpec|id1|R2", 0L, Long.MaxValue) + .runWith(Sink.seq) + .futureValue eventsForJournal2.map(_.event).toSet should ===(Set("r1 m1", "r2 m1")) } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala index 051be16df0..368f9885dc 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala @@ -31,9 +31,7 @@ object ReplicatedEventPublishingSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup { ctx => ReplicatedEventSourcing.withSharedJournal( - EntityType, - entityId, - replicaId, + ReplicationId(EntityType, entityId, replicaId), allReplicas, PersistenceTestKitReadJournal.Identifier)( replicationContext => @@ -86,7 +84,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 1L, "two", System.currentTimeMillis(), @@ -107,7 +105,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 2L, // missing 1L "two", System.currentTimeMillis(), @@ -128,7 +126,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCC), + ReplicationId(EntityType, id, DCC).persistenceId, 1L, "two", System.currentTimeMillis(), @@ -149,14 +147,14 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, "myId4", DCB), + ReplicationId(EntityType, "myId4", DCB).persistenceId, 1L, "two", System.currentTimeMillis(), Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty))) // simulate another published event from that replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 1L, "two-again", // ofc this would be the same in the real world, different just so we can detect System.currentTimeMillis(), @@ -188,7 +186,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 1L, "two", System.currentTimeMillis(), @@ -211,7 +209,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 1L, "two", System.currentTimeMillis(), @@ -224,7 +222,7 @@ class ReplicatedEventPublishingSpec // simulate a published event from another replica incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, id, DCB), + ReplicationId(EntityType, id, DCB).persistenceId, 2L, "three", System.currentTimeMillis(), diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index e84d7c91e8..c6ba98a771 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -72,9 +72,7 @@ object ReplicatedEventSourcingSpec { replicaId: String, probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] = ReplicatedEventSourcing.withSharedJournal( - "ReplicatedEventSourcingSpec", - entityId, - ReplicaId(replicaId), + ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe)) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala index fdfc44ab82..626c07daf1 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala @@ -42,7 +42,10 @@ object ReplicatedEventSourcingTaggingSpec { replica: ReplicaId, allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = { // #tagging - ReplicatedEventSourcing.withSharedJournal("TaggingSpec", entityId, replica, allReplicas, queryPluginId)( + ReplicatedEventSourcing.withSharedJournal( + ReplicationId("TaggingSpec", entityId, replica), + allReplicas, + queryPluginId)( replicationContext => EventSourcedBehavior[Command, String, State]( replicationContext.persistenceId, diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala index 5857a26fbc..79503a5d3b 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationIllegalAccessSpec.scala @@ -29,9 +29,7 @@ object ReplicationIllegalAccessSpec { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal( - "IllegalAccessSpec", - entityId, - replica, + ReplicationId("IllegalAccessSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier)( replicationContext => @@ -91,9 +89,7 @@ class ReplicationIllegalAccessSpec "detect illegal access in the factory" in { val exception = intercept[UnsupportedOperationException] { ReplicatedEventSourcing.withSharedJournal( - "IllegalAccessSpec", - "id2", - R1, + ReplicationId("IllegalAccessSpec", "id2", R1), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => replicationContext.origin diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala index b22514af28..3fe5722b95 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala @@ -20,7 +20,7 @@ object ReplicationSnapshotSpec { import ReplicatedEventSourcingSpec._ - val EntityType = "SpapsnotSpec" + val EntityType = "SnapshotSpec" def behaviorWithSnapshotting(entityId: String, replicaId: ReplicaId): Behavior[Command] = behaviorWithSnapshotting(entityId, replicaId, None) @@ -36,9 +36,7 @@ object ReplicationSnapshotSpec { replicaId: ReplicaId, probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal( - EntityType, - entityId, - replicaId, + ReplicationId(EntityType, entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0)) @@ -67,8 +65,8 @@ class ReplicationSnapshotSpec "ReplicatedEventSourcing" should { "recover state from snapshots" in { val entityId = nextEntityId - val persistenceIdR1 = s"$entityId|R1" - val persistenceIdR2 = s"$entityId|R2" + val persistenceIdR1 = s"$EntityType|$entityId|R1" + val persistenceIdR2 = s"$EntityType|$entityId|R2" val probe = createTestProbe[Done]() val r2EventProbe = createTestProbe[EventAndContext]() @@ -84,7 +82,7 @@ class ReplicationSnapshotSpec snapshotTestKit.expectNextPersisted(persistenceIdR2, State(List("r1 2", "r1 1"))) r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, entityId, R1), + ReplicationId(EntityType, entityId, R1).persistenceId, 1L, "two-again", System.currentTimeMillis(), @@ -98,7 +96,7 @@ class ReplicationSnapshotSpec { val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref)) r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - PersistenceId.replicatedId(EntityType, entityId, R1), + ReplicationId(EntityType, entityId, R1).persistenceId, 1L, "two-again", System.currentTimeMillis(), @@ -109,8 +107,6 @@ class ReplicationSnapshotSpec r2 ! GetState(stateProbe.ref) stateProbe.expectMessage(State(List("r1 2", "r1 1"))) } - } } - } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala index 16f6eeb6cf..aeecc51881 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/CounterSpec.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.crdt import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.Behaviors import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicationId import akka.persistence.typed.crdt.CounterSpec.PlainCounter.{ Decrement, Get, Increment } import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing } import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec } @@ -29,9 +30,7 @@ object CounterSpec { eventProbe: Option[ActorRef[Counter.Updated]] = None) = Behaviors.setup[PlainCounter.Command] { context => ReplicatedEventSourcing.withSharedJournal( - "CounterSpec", - entityId, - replicaId, + ReplicationId("CounterSpec", entityId, replicaId), AllReplicas, PersistenceTestKitReadJournal.Identifier) { ctx => EventSourcedBehavior[PlainCounter.Command, Counter.Updated, Counter]( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala index 07a2091ca8..1de011f629 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala @@ -6,6 +6,7 @@ package akka.persistence.typed.crdt import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing } import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec } import akka.serialization.jackson.CborSerializable @@ -27,9 +28,7 @@ object LwwSpec { def apply(entityId: String, replica: ReplicaId): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal( - "LwwRegistrySpec", - entityId, - replica, + ReplicationId("LwwRegistrySpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, Event, Registry]( diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala index 915bb8b64c..95a2f383a9 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/ORSetSpec.scala @@ -10,6 +10,7 @@ import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, Replicate import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec } import ORSetSpec.ORSetEntity._ import akka.persistence.typed.ReplicationBaseSpec.{ R1, R2 } +import akka.persistence.typed.ReplicationId import akka.persistence.typed.crdt.ORSetSpec.ORSetEntity import scala.util.Random @@ -28,9 +29,7 @@ object ORSetSpec { def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = { ReplicatedEventSourcing.withSharedJournal( - "ORSetSpec", - entityId, - replica, + ReplicationId("ORSetSpec", entityId, replica), AllReplicas, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]]( diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index 2d59b5ce97..fcae4417a1 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -23,6 +23,7 @@ import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.ReplicatedEventSourcing @@ -142,12 +143,13 @@ object ReplicatedAuctionExampleSpec { responsibleForClosing: Boolean, allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx => Behaviors.withTimers { timers => - ReplicatedEventSourcing - .withSharedJournal("auction", name, replica, allReplicas, PersistenceTestKitReadJournal.Identifier) { - replicationCtx => - new AuctionEntity(ctx, replicationCtx, timers, closingAt, responsibleForClosing, allReplicas) - .behavior(initialBid) - } + ReplicatedEventSourcing.withSharedJournal( + ReplicationId("auction", name, replica), + allReplicas, + PersistenceTestKitReadJournal.Identifier) { replicationCtx => + new AuctionEntity(ctx, replicationCtx, timers, closingAt, responsibleForClosing, allReplicas) + .behavior(initialBid) + } } } diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index 4d1d9592d6..53d3949e0c 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -16,6 +16,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.crdt.LwwTime import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable @@ -52,9 +53,7 @@ object ReplicatedBlogExampleSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { Behaviors.setup[Command] { ctx => ReplicatedEventSourcing.withSharedJournal( - "blog", - entityId, - replicaId, + ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, Event, BlogState]( diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala index e98b7418c6..f6edecc687 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala @@ -5,6 +5,7 @@ package docs.akka.persistence.typed import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, ReplicatedEventSourcing } import com.github.ghik.silencer.silent @@ -24,15 +25,19 @@ object ReplicatedEventSourcingCompileOnlySpec { trait Event //#factory-shared - ReplicatedEventSourcing.withSharedJournal("entityTypeHint", "entityId", DCA, AllReplicas, queryPluginId) { context => + ReplicatedEventSourcing.withSharedJournal( + ReplicationId("entityTypeHint", "entityId", DCA), + AllReplicas, + queryPluginId) { context => EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } //#factory-shared //#factory - ReplicatedEventSourcing("entityTypeHint", "entityId", DCA, Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { - context => - EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) + ReplicatedEventSourcing( + ReplicationId("entityTypeHint", "entityId", DCA), + Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context => + EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } //#factory diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala index ff9c14e8dd..c67a0bc203 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala @@ -5,7 +5,6 @@ package docs.akka.persistence.typed import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef @@ -13,6 +12,7 @@ import akka.actor.typed.Behavior import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.crdt.ORSet import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -29,9 +29,7 @@ object ReplicatedMovieWatchListExampleSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal( - "movies", - entityId, - replicaId, + ReplicationId("movies", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]]( diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala index 8a311df9a6..a8ed48b856 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala @@ -8,7 +8,6 @@ import java.util.UUID import docs.akka.persistence.typed.ReplicatedShoppingCartExampleSpec.ShoppingCart.CartItems import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef @@ -16,6 +15,7 @@ import akka.actor.typed.Behavior import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.crdt.Counter import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -42,9 +42,7 @@ object ReplicatedShoppingCartExampleSpec { def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { ReplicatedEventSourcing.withSharedJournal( - "blog", - entityId, - replicaId, + ReplicationId("blog", entityId, replicaId), allReplicaIds, PersistenceTestKitReadJournal.Identifier) { replicationContext => EventSourcedBehavior[Command, Event, State]( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala index 47b014a9c4..0f694fe4f8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala @@ -3,7 +3,6 @@ */ package akka.persistence.typed -import akka.annotation.ApiMayChange object PersistenceId { @@ -125,27 +124,6 @@ object PersistenceId { */ def ofUniqueId(id: String): PersistenceId = new PersistenceId(id) - - /** - * Constructs a [[PersistenceId]] from the given `entityTypeHint`, `entityId` and `replicaId` by - * concatenating them with the `|` separator. - */ - @ApiMayChange - def replicatedId(entityTypeHint: String, entityId: String, replicaId: ReplicaId): PersistenceId = { - if (entityTypeHint.contains(DefaultSeparator)) - throw new IllegalArgumentException( - s"entityTypeHint [$entityTypeHint] contains [$DefaultSeparator] which is a reserved character") - - if (entityId.contains(DefaultSeparator)) - throw new IllegalArgumentException( - s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character") - - if (replicaId.id.contains(DefaultSeparator)) - throw new IllegalArgumentException( - s"replicaId [${replicaId.id}] contains [$DefaultSeparator] which is a reserved character") - - new PersistenceId(entityTypeHint + DefaultSeparator + entityId + DefaultSeparator + replicaId.id) - } } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicationId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicationId.scala new file mode 100644 index 0000000000..a8e9a392ba --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ReplicationId.scala @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +object ReplicationId { + private[akka] val Separator = "|" + def fromString(id: String): ReplicationId = { + val split = id.split("\\|") + require(split.size == 3, s"invalid replication id $id") + ReplicationId(split(0), split(1), ReplicaId(split(2))) + } + + /** + * @param typeName The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities + * @param entityId The unique entity id + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + */ + def apply(typeName: String, entityId: String, replicaId: ReplicaId): ReplicationId = + new ReplicationId(typeName, entityId, replicaId) +} + +/** + * @param typeName The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities + * @param entityId The unique entity id + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + */ +final class ReplicationId(val typeName: String, val entityId: String, val replicaId: ReplicaId) { + import ReplicationId._ + if (typeName.contains(Separator)) + throw new IllegalArgumentException( + s"entityTypeHint [$typeName] contains [$Separator] which is a reserved character") + + if (entityId.contains(Separator)) + throw new IllegalArgumentException(s"entityId [$entityId] contains [$Separator] which is a reserved character") + + if (replicaId.id.contains(Separator)) + throw new IllegalArgumentException( + s"replicaId [${replicaId.id}] contains [$Separator] which is a reserved character") + + private val id: String = s"$typeName$Separator$entityId$Separator${replicaId.id}" + + def persistenceId: PersistenceId = PersistenceId.ofUniqueId(id) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 09503e67f2..6f0fb58463 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -257,7 +257,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override private[akka] def withReplication( context: ReplicationContextImpl): EventSourcedBehavior[Command, Event, State] = { - copy(replication = Some(ReplicationSetup(context.replicaId, context.replicasAndQueryPlugins, context))) + copy( + replication = Some(ReplicationSetup(context.replicationId.replicaId, context.replicasAndQueryPlugins, context))) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala index 76639ab35f..a118c27dea 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.internal import akka.annotation.InternalApi import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.util.OptionVal import akka.util.WallClock import akka.util.ccompat.JavaConverters._ @@ -16,14 +17,11 @@ import akka.util.ccompat.JavaConverters._ */ @InternalApi private[akka] final class ReplicationContextImpl( - val entityTypeHint: String, - val entityId: String, - val replicaId: ReplicaId, + val replicationId: ReplicationId, val replicasAndQueryPlugins: Map[ReplicaId, String]) extends akka.persistence.typed.scaladsl.ReplicationContext with akka.persistence.typed.javadsl.ReplicationContext { val allReplicas: Set[ReplicaId] = replicasAndQueryPlugins.keySet - // these are not volatile as they are set on the same thread as they should be accessed var _currentThread: OptionVal[Thread] = OptionVal.None var _origin: OptionVal[ReplicaId] = OptionVal.None @@ -65,7 +63,7 @@ private[akka] final class ReplicationContextImpl( _concurrent } - override def persistenceId: PersistenceId = PersistenceId.replicatedId(entityTypeHint, entityId, replicaId) + override def persistenceId: PersistenceId = replicationId.persistenceId override def currentTimeMillis(): Long = { WallClock.AlwaysIncreasingClock.currentTimeMillis() diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 3af686332b..9edd3c3696 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -35,6 +35,7 @@ import akka.persistence.journal.Tagged import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.{ DeleteEventsCompleted, DeleteEventsFailed, @@ -128,8 +129,8 @@ private[akka] object Running { val query = PersistenceQuery(system) replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) => if (replicaId != replicationSetup.replicaId) { - val pid = PersistenceId.replicatedId( - replicationSetup.replicationContext.entityTypeHint, + val pid = ReplicationId( + replicationSetup.replicationContext.replicationId.typeName, replicationSetup.replicationContext.entityId, replicaId) val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId) @@ -147,7 +148,7 @@ private[akka] object Running { Source.futureSource { setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr => replication - .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + .eventsByPersistenceId(pid.persistenceId.id, seqNr + 1, Long.MaxValue) // from each replica, only get the events that originated there, this prevents most of the event filtering // the downside is that events can't be received via other replicas in the event of an uneven network partition .filter(event => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala index 4bf71f8590..3f3e6c049c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ReplicatedEventSourcing.scala @@ -11,8 +11,8 @@ import java.util.{ Map => JMap } import akka.annotation.DoNotInherit import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl - import akka.util.ccompat.JavaConverters._ /** @@ -23,6 +23,8 @@ import akka.util.ccompat.JavaConverters._ @DoNotInherit trait ReplicationContext { + def replicationId: ReplicationId + /** * @return The replica id of this replicated event sourced actor */ @@ -81,25 +83,15 @@ object ReplicatedEventSourcing { * can be used for each replica. * The events from other replicas are read using PersistentQuery. * - * @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities - * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. - * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ def withSharedJournal[Command, Event, State]( - entityType: String, - entityId: String, - replicaId: ReplicaId, + replicationId: ReplicationId, allReplicaIds: JSet[ReplicaId], queryPluginId: String, behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) : EventSourcedBehavior[Command, Event, State] = - create( - entityType, - entityId, - replicaId, - allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, - behaviorFactory) + create(replicationId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory) /** * Initialize a replicated event sourced behavior. @@ -113,19 +105,15 @@ object ReplicatedEventSourcing { * A query side identifier is passed per replica allowing for separate database/journal configuration per * replica. The events from other replicas are read using PersistentQuery. * - * @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities - * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ def create[Command, Event, State]( - entityType: String, - entityId: String, - replicaId: ReplicaId, + replicationId: ReplicationId, allReplicasAndQueryPlugins: JMap[ReplicaId, String], eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]]) : EventSourcedBehavior[Command, Event, State] = { - val context = new ReplicationContextImpl(entityType, entityId, replicaId, allReplicasAndQueryPlugins.asScala.toMap) + val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins.asScala.toMap) eventSourcedBehaviorFactory(context) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala index 6d9e6a533f..d5ceaff944 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ReplicatedEventSourcing.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.scaladsl import akka.annotation.DoNotInherit import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationId import akka.persistence.typed.internal.ReplicationContextImpl /** @@ -17,6 +18,8 @@ import akka.persistence.typed.internal.ReplicationContextImpl @DoNotInherit trait ReplicationContext { + def replicationId: ReplicationId + /** * @return The unique id of this replica, including the replica id */ @@ -25,7 +28,7 @@ trait ReplicationContext { /** * @return The replica id of this replicated event sourced actor */ - def replicaId: ReplicaId + def replicaId: ReplicaId = replicationId.replicaId /** * @return The ids of all replicas of this replicated event sourced actor @@ -35,7 +38,7 @@ trait ReplicationContext { /** * @return The entity id of this replicated event sourced actor (not including the replica id) */ - def entityId: String + def entityId: String = replicationId.entityId /** * Must only be called from the event handler @@ -76,22 +79,16 @@ object ReplicatedEventSourcing { * can be used for each replica. * The events from other replicas are read using PersistentQuery. * - * @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities - * @param entityId The unique entity id - * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. * @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin. */ def withSharedJournal[Command, Event, State]( - entityType: String, - entityId: String, - replicaId: ReplicaId, + replicationId: ReplicationId, allReplicaIds: Set[ReplicaId], queryPluginId: String)( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = - apply(entityType, entityId, replicaId, allReplicaIds.map(id => id -> queryPluginId).toMap)( - eventSourcedBehaviorFactory) + apply(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory) /** * Initialize a replicated event sourced behavior. @@ -104,21 +101,13 @@ object ReplicatedEventSourcing { * The journal plugin id for the entity itself can be configured using withJournalPluginId after creation. * A query side identifier is passed per replica allowing for separate database/journal configuration per * replica. The events from other replicas are read using PersistentQuery. - * - * @param entityType The name of the entity type e.g. account, user. Made part of the persistence id so that entity ids don't need to be unique across different replicated entities - * @param entityId The unique entity id - * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. * @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas * and configured with the query plugin for the journal that each replica uses. */ - def apply[Command, Event, State]( - entityType: String, - entityId: String, - replicaId: ReplicaId, - allReplicasAndQueryPlugins: Map[ReplicaId, String])( + def apply[Command, Event, State](replicationId: ReplicationId, allReplicasAndQueryPlugins: Map[ReplicaId, String])( eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State]) : EventSourcedBehavior[Command, Event, State] = { - val context = new ReplicationContextImpl(entityType, entityId, replicaId, allReplicasAndQueryPlugins) + val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins) eventSourcedBehaviorFactory(context).withReplication(context) }