Feedback from creating replicated entity sample (#29510)

* Fix javadsl and remove shard regions from ReplicatedSharding

* Simplyfy sharding API for replicated event sourcing

As the ShardRegion access has been removed then we will only initially
support Entity's with ShardingEnvelope meaning we can remove the type
param.

Also provide convenience constructors for running a replica on a role
and a replica in each DC

* Compile

* Review feedback

* feedback
This commit is contained in:
Christopher Batey 2020-08-24 10:52:28 +01:00 committed by GitHub
parent 90e09549e8
commit d1114495dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 208 additions and 162 deletions

View file

@ -14,8 +14,10 @@ import scala.reflect.ClassTag
import akka.util.ccompat.JavaConverters._
import java.util.{ Set => JSet }
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.ReplicationId.Separator
@ApiMayChange
@ -24,29 +26,33 @@ object ReplicatedEntityProvider {
/**
* Java API:
*
* Provides full control over the [[ReplicatedEntity]] and the [[Entity]]
* Most use cases can use the [[createPerDataCenter]] and [[createPerRole]]
*
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def create[M, E](
def create[M](
messageClass: Class[M],
typeName: String,
allReplicaIds: JSet[ReplicaId],
settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M, E]])
: ReplicatedEntityProvider[M, E] = {
settingsPerReplicaFactory: akka.japi.function.Function2[JEntityTypeKey[M], ReplicaId, ReplicatedEntity[M]])
: ReplicatedEntityProvider[M] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply[M, E](typeName, allReplicaIds.asScala.toSet)((key, replica) =>
apply[M](typeName, allReplicaIds.asScala.toSet)((key, replica) =>
settingsPerReplicaFactory(key.asInstanceOf[EntityTypeKeyImpl[M]], replica))
}
/**
* Scala API:
*
* Provides full control over the [[ReplicatedEntity]] and the [[Entity]]
* Most use cases can use the [[perDataCenter]] and [[perRole]]
*
* @param typeName The type name used in the [[EntityTypeKey]]
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
def apply[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M, E])
: ReplicatedEntityProvider[M, E] = {
def apply[M: ClassTag](typeName: String, allReplicaIds: Set[ReplicaId])(
settingsPerReplicaFactory: (EntityTypeKey[M], ReplicaId) => ReplicatedEntity[M]): ReplicatedEntityProvider[M] = {
new ReplicatedEntityProvider(allReplicaIds.map { replicaId =>
if (typeName.contains(Separator))
throw new IllegalArgumentException(s"typeName [$typeName] contains [$Separator] which is a reserved character")
@ -55,15 +61,86 @@ object ReplicatedEntityProvider {
(settingsPerReplicaFactory(typeKey, replicaId), typeName)
}.toVector, directReplication = true)
}
/**
* Scala API
*
* Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in
* ClusterSharding. A replica will be run per data center.
*/
def perDataCenter[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])(
create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = {
apply(typeName, allReplicaIds) { (typeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(typeKey) { entityContext =>
create(ReplicationId.fromString(entityContext.entityId))
}.withDataCenter(replicaId.id))
}
}
/**
* Scala API
*
* Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in
* ClusterSharding. The replicas in allReplicaIds should be roles used by nodes. A replica for each
* entity will run on each role.
*/
def perRole[M: ClassTag, E](typeName: String, allReplicaIds: Set[ReplicaId])(
create: ReplicationId => Behavior[M]): ReplicatedEntityProvider[M] = {
apply(typeName, allReplicaIds) { (typeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(typeKey) { entityContext =>
create(ReplicationId.fromString(entityContext.entityId))
}.withRole(replicaId.id))
}
}
/**
* Java API
*
* Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in
* ClusterSharding. A replica will be run per data center.
*/
def createPerDataCenter[M](
messageClass: Class[M],
typeName: String,
allReplicaIds: JSet[ReplicaId],
createBehavior: java.util.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(typeKey) { entityContext =>
createBehavior(ReplicationId.fromString(entityContext.entityId))
}.withDataCenter(replicaId.id))
}
}
/**
* Java API
*
* Create a [[ReplicatedEntityProvider]] that uses the defaults for [[Entity]] when running in
* ClusterSharding.
*
* Map replicas to roles and then there will be a replica per role e.g. to match to availability zones/racks
*/
def createPerRole[M](
messageClass: Class[M],
typeName: String,
allReplicaIds: JSet[ReplicaId],
createBehavior: akka.japi.function.Function[ReplicationId, Behavior[M]]): ReplicatedEntityProvider[M] = {
implicit val classTag: ClassTag[M] = ClassTag(messageClass)
apply(typeName, allReplicaIds.asScala.toSet) { (typeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(typeKey) { entityContext =>
createBehavior(ReplicationId.fromString(entityContext.entityId))
}.withRole(replicaId.id))
}
}
}
/**
*
* @tparam M The type of messages the replicated entity accepts
* @tparam E The type for envelopes used for sending `M`s over sharding
*/
@ApiMayChange
final class ReplicatedEntityProvider[M, E] private (
val replicas: immutable.Seq[(ReplicatedEntity[M, E], String)],
final class ReplicatedEntityProvider[M] private (
val replicas: immutable.Seq[(ReplicatedEntity[M], String)],
val directReplication: Boolean) {
/**
@ -73,7 +150,7 @@ final class ReplicatedEntityProvider[M, E] private (
* to work.
*
*/
def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M, E] =
def withDirectReplication(enabled: Boolean): ReplicatedEntityProvider[M] =
new ReplicatedEntityProvider(replicas, directReplication = enabled)
}
@ -87,7 +164,7 @@ object ReplicatedEntity {
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.javadsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
def create[M, E](replicaId: ReplicaId, entity: JEntity[M, E]): ReplicatedEntity[M, E] =
def create[M](replicaId: ReplicaId, entity: JEntity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] =
apply(replicaId, entity.toScala)
/**
@ -96,12 +173,13 @@ object ReplicatedEntity {
* [[akka.actor.typed.Behavior]] but must never be a regular [[akka.persistence.typed.scaladsl.EventSourcedBehavior]]
* as that requires a single writer and that would cause it to have multiple writers.
*/
def apply[M, E](replicaId: ReplicaId, entity: Entity[M, E]): ReplicatedEntity[M, E] =
def apply[M](replicaId: ReplicaId, entity: Entity[M, ShardingEnvelope[M]]): ReplicatedEntity[M] =
new ReplicatedEntity(replicaId, entity)
}
/**
* Settings for a specific replica id in replicated sharding
* Currently only Entity's with ShardingEnvelope are supported but this may change in the future
*/
@ApiMayChange
final class ReplicatedEntity[M, E] private (val replicaId: ReplicaId, val entity: Entity[M, E])
final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]])

View file

@ -14,8 +14,6 @@ import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.persistence.typed.ReplicaId
import java.util.{ Map => JMap }
import akka.actor.typed.ActorRef
/**
* Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica.
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
@ -41,22 +39,20 @@ trait ReplicatedShardingExtension extends Extension {
* Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those.
*
* @tparam M The type of messages the replicated event sourced actor accepts
* @tparam E The type of envelope used for routing messages to actors, the same for all replicas
*
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
*/
def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E]
def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M]
/**
* Init one instance sharding per replica in the given settings and return a [[ReplicatedSharding]] representing those.
*
* @param thisReplica If provided saves messages being forwarded to sharding for this replica
* @tparam M The type of messages the replicated event sourced actor accepts
* @tparam E The type of envelope used for routing messages to actors, the same for all replicas
*
* Note, multiple calls on the same node will not start new sharding instances but will return a new instance of [[ReplicatedSharding]]
*/
def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E]
def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M]
}
/**
@ -66,34 +62,15 @@ trait ReplicatedShardingExtension extends Extension {
*/
@DoNotInherit
@ApiMayChange
trait ReplicatedSharding[M, E] {
/**
* Scala API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined
* routing/replica selection.
*/
def shardingRefs: Map[ReplicaId, ActorRef[E]]
/**
* Java API: Returns the actor refs for the shard region or proxies of sharding for each replica for user defined
* routing/replica selection.
*/
def getShardingRefs: JMap[ReplicaId, ActorRef[E]]
trait ReplicatedSharding[M] {
/**
* Scala API: Returns the entity ref for each replica for user defined routing/replica selection
*
* This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message
* entity ids you will need to use [[#shardingRefs]]
*/
def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]]
/**
* Java API: Returns the entity ref for each replica for user defined routing/replica selection
*
* This can only be used if the default [[ShardingEnvelope]] is used, when using custom envelopes or in message
* entity ids you will need to use [[#getShardingRefs]]
*/
def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]]
def getEntityRefsFor(entityId: String): JMap[ReplicaId, javadsl.EntityRef[M]]
}

View file

@ -385,6 +385,11 @@ import akka.util.JavaDurationConverters._
}
override def toString: String = s"EntityRef($typeKey, $entityId)"
/**
* INTERNAL API
*/
override private[akka] def asJava: javadsl.EntityRef[M] = this
}
/**

View file

@ -7,7 +7,6 @@ package akka.cluster.sharding.typed.internal
import java.util.concurrent.atomic.AtomicLong
import java.util.{ Map => JMap }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.cluster.sharding.typed.ReplicatedShardingExtension
@ -34,15 +33,15 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
private val logger = LoggerFactory.getLogger(getClass)
override def init[M, E](settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] =
override def init[M](settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] =
initInternal(None, settings)
override def init[M, E](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] =
override def init[M](thisReplica: ReplicaId, settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] =
initInternal(Some(thisReplica), settings)
private def initInternal[M, E](
private def initInternal[M](
thisReplica: Option[ReplicaId],
settings: ReplicatedEntityProvider[M, E]): ReplicatedSharding[M, E] = {
settings: ReplicatedEntityProvider[M]): ReplicatedSharding[M] = {
val sharding = ClusterSharding(system)
val initializedReplicas = settings.replicas.map {
case (replicaSettings, typeName) =>
@ -72,7 +71,7 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
val replicaToTypeKey = initializedReplicas.map {
case (typeName, id, typeKey, _, dc) => id -> ((typeKey, dc, typeName))
}.toMap
new ReplicatedShardingImpl(sharding, replicaToRegionOrProxy, replicaToTypeKey)
new ReplicatedShardingImpl(sharding, replicaToTypeKey)
}
}
@ -80,15 +79,10 @@ private[akka] final class ReplicatedShardingExtensionImpl(system: ActorSystem[_]
* INTERNAL API
*/
@InternalApi
private[akka] final class ReplicatedShardingImpl[M, E](
private[akka] final class ReplicatedShardingImpl[M](
sharding: ClusterSharding,
shardingPerReplica: Map[ReplicaId, ActorRef[E]],
replicaTypeKeys: Map[ReplicaId, (EntityTypeKey[M], Option[DataCenter], String)])
extends ReplicatedSharding[M, E] {
// FIXME add test coverage for these
override def shardingRefs: Map[ReplicaId, ActorRef[E]] = shardingPerReplica
override def getShardingRefs: JMap[ReplicaId, ActorRef[E]] = shardingRefs.asJava
extends ReplicatedSharding[M] {
override def entityRefsFor(entityId: String): Map[ReplicaId, EntityRef[M]] =
replicaTypeKeys.map {
@ -100,7 +94,7 @@ private[akka] final class ReplicatedShardingImpl[M, E](
})
}
override def getEntityRefsFor(entityId: String): JMap[ReplicaId, EntityRef[M]] =
entityRefsFor(entityId).asJava
override def getEntityRefsFor(entityId: String): JMap[ReplicaId, akka.cluster.sharding.typed.javadsl.EntityRef[M]] =
entityRefsFor(entityId).transform((_, v) => v.asJava).asJava
}

View file

@ -15,6 +15,7 @@ import akka.actor.typed.Scheduler
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.InternalApi
import akka.cluster.sharding.typed.javadsl
import akka.cluster.sharding.typed.javadsl.EntityRef
import akka.cluster.sharding.typed.scaladsl
import akka.japi.function.{ Function => JFunction }
import akka.pattern.StatusReply
@ -59,4 +60,6 @@ import akka.util.Timeout
}
override def toString: String = s"TestEntityRef($entityId)"
override private[akka] def asJava: EntityRef[M] = this
}

View file

@ -441,7 +441,7 @@ object EntityTypeKey {
*
* Not for user extension.
*/
@DoNotInherit abstract class EntityRef[M] extends RecipientRef[M] {
@DoNotInherit abstract class EntityRef[-M] extends RecipientRef[M] {
scaladslSelf: scaladsl.EntityRef[M] with InternalRecipientRef[M] =>
/**

View file

@ -500,6 +500,11 @@ object EntityTypeKey {
def ?[Res](message: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] =
this.ask(message)(timeout)
/**
* INTERNAL API
*/
@InternalApi private[akka] def asJava: javadsl.EntityRef[M]
}
object ClusterShardingSetup {

View file

@ -70,7 +70,7 @@ object ReplicatedShardingSpec extends MultiNodeConfig {
def apply(id: ReplicationId, ctx: ActorContext[Command]): EventSourcedBehavior[Command, String, State] = {
// Relies on direct replication as there is no proxy query journal
ReplicatedEventSourcing.withSharedJournal(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) {
ReplicatedEventSourcing.commonJournalConfig(id, AllReplicas, PersistenceTestKitReadJournal.Identifier) {
replicationContext =>
ctx.log.info("Creating replica {}", replicationContext.replicationId)
EventSourcedBehavior[Command, String, State](

View file

@ -15,7 +15,7 @@ import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.MemberStatus;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.scaladsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.persistence.testkit.PersistenceTestKitPlugin;
@ -67,7 +67,7 @@ public class ReplicatedShardingTest extends JUnitSuite {
}
static Behavior<Command> create(ReplicationId replicationId) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
replicationId,
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
@ -143,17 +143,13 @@ public class ReplicatedShardingTest extends JUnitSuite {
Arrays.asList(
new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C"))));
private final ReplicatedSharding<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
replicatedSharding;
private final ReplicatedSharding<MyReplicatedStringSet.Command> replicatedSharding;
private ProxyActor(ActorContext<Command> context) {
super(context);
// #bootstrap
ReplicatedEntityProvider<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
replicatedEntityProvider =
ReplicatedEntityProvider<MyReplicatedStringSet.Command> replicatedEntityProvider =
ReplicatedEntityProvider.create(
MyReplicatedStringSet.Command.class,
"StringSet",
@ -178,9 +174,8 @@ public class ReplicatedShardingTest extends JUnitSuite {
ReplicatedShardingExtension extension =
ReplicatedShardingExtension.get(getContext().getSystem());
ReplicatedSharding<
MyReplicatedStringSet.Command, ShardingEnvelope<MyReplicatedStringSet.Command>>
replicatedSharding = extension.init(replicatedEntityProvider);
ReplicatedSharding<MyReplicatedStringSet.Command> replicatedSharding =
extension.init(replicatedEntityProvider);
// #bootstrap
this.replicatedSharding = replicatedSharding;

View file

@ -4,12 +4,11 @@
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.cluster.sharding.typed.*;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.scaladsl.EntityRef;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.ReplicationId;
@ -30,7 +29,7 @@ public class ReplicatedShardingCompileOnlySpec {
new HashSet<>(
Arrays.asList(new ReplicaId("DC-A"), new ReplicaId("DC-B"), new ReplicaId("DC-C"))));
public static ReplicatedEntityProvider<Command, ShardingEnvelope<Command>> provider() {
public static ReplicatedEntityProvider<Command> provider() {
// #bootstrap
return ReplicatedEntityProvider.create(
Command.class,
@ -67,7 +66,7 @@ public class ReplicatedShardingCompileOnlySpec {
// #bootstrap-dc
}
public static ReplicatedEntityProvider<Command, ShardingEnvelope<Command>> role() {
public static ReplicatedEntityProvider<Command> role() {
// #bootstrap-role
return ReplicatedEntityProvider.create(
Command.class,
@ -90,13 +89,10 @@ public class ReplicatedShardingCompileOnlySpec {
// #sending-messages
ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system);
ReplicatedSharding<Command, ShardingEnvelope<Command>> replicatedSharding =
extension.init(provider());
ReplicatedSharding<Command> replicatedSharding = extension.init(provider());
Map<ReplicaId, EntityRef<Command>> myEntityId =
replicatedSharding.getEntityRefsFor("myEntityId");
Map<ReplicaId, ActorRef<ShardingEnvelope<Command>>> shardingRefs =
replicatedSharding.getShardingRefs();
// #sending-messages
}

View file

@ -76,7 +76,7 @@ object ReplicatedShardingSpec {
case class Texts(texts: Set[String]) extends CborSerializable
def apply(replicationId: ReplicationId): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory
ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory
replicationId,
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
@ -97,7 +97,7 @@ object ReplicatedShardingSpec {
}
def provider(replicationType: ReplicationType) =
ReplicatedEntityProvider[MyReplicatedStringSet.Command, ShardingEnvelope[MyReplicatedStringSet.Command]](
ReplicatedEntityProvider[MyReplicatedStringSet.Command](
// all replicas
"StringSet",
AllReplicas) { (entityTypeKey, replicaId) =>
@ -127,7 +127,7 @@ object ReplicatedShardingSpec {
case class Ints(ints: Set[Int]) extends CborSerializable
def apply(id: ReplicationId, allReplicas: Set[ReplicaId]): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal( // it isn't really shared as it is in memory
ReplicatedEventSourcing.commonJournalConfig( // it isn't really shared as it is in memory
id,
allReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
@ -148,9 +148,7 @@ object ReplicatedShardingSpec {
}
def provider(replicationType: ReplicationType) =
ReplicatedEntityProvider[MyReplicatedIntSet.Command, ShardingEnvelope[MyReplicatedIntSet.Command]](
"IntSet",
AllReplicas) { (entityTypeKey, replicaId) =>
ReplicatedEntityProvider[MyReplicatedIntSet.Command]("IntSet", AllReplicas) { (entityTypeKey, replicaId) =>
val entity = {
val e = Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)

View file

@ -4,14 +4,12 @@
package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.ReplicatedEntity
import akka.cluster.sharding.typed.ReplicatedEntityProvider
import akka.cluster.sharding.typed.ReplicatedSharding
import akka.cluster.sharding.typed.ReplicatedShardingExtension
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.persistence.typed.ReplicaId
@ -30,9 +28,8 @@ object ReplicatedShardingCompileOnlySpec {
}
//#bootstrap
ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntityProvider[Command]("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) {
(entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
// the sharding entity id contains the business entityId, entityType, and replica id
// which you'll need to create a ReplicatedEventSourcedBehavior
@ -43,32 +40,22 @@ object ReplicatedShardingCompileOnlySpec {
//#bootstrap
//#bootstrap-dc
ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)
ReplicatedEntityProvider.perDataCenter("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId =>
MyEventSourcedBehavior(replicationId)
}.withDataCenter(replicaId.id))
}
//#bootstrap-dc
//#bootstrap-role
val provider = ReplicatedEntityProvider[Command, ShardingEnvelope[Command]](
"MyEntityType",
Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) =>
ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
val replicationId = ReplicationId.fromString(entityContext.entityId)
val provider = ReplicatedEntityProvider.perRole("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) {
replicationId =>
MyEventSourcedBehavior(replicationId)
}.withRole(replicaId.id))
}
//#bootstrap-role
//#sending-messages
val myReplicatedSharding: ReplicatedSharding[Command, ShardingEnvelope[Command]] =
val myReplicatedSharding: ReplicatedSharding[Command] =
ReplicatedShardingExtension(system).init(provider)
val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId")
val actorRefs: Map[ReplicaId, ActorRef[ShardingEnvelope[Command]]] = myReplicatedSharding.shardingRefs
//#sending-messages
}

View file

@ -80,7 +80,7 @@ public class ReplicatedEventSourcingTest extends JUnitSuite {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("ReplicatedEventSourcingTest", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -33,7 +33,7 @@ public class MyReplicatedBehavior
// #factory-shared
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, String queryPluginId) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("MyReplicatedEntity", entityId, replicaId),
ALL_REPLICAS,
queryPluginId,
@ -47,7 +47,7 @@ public class MyReplicatedBehavior
allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
return ReplicatedEventSourcing.create(
return ReplicatedEventSourcing.perReplicaJournalConfig(
new ReplicationId("MyReplicatedEntity", entityId, replicaId),
allReplicasAndQueryPlugins,
MyReplicatedBehavior::new);

View file

@ -262,7 +262,7 @@ class AuctionEntity extends ReplicatedEventSourcedBehavior<Command, Event, Aucti
ctx ->
Behaviors.withTimers(
timers ->
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("Auction", name, replica),
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -187,7 +187,7 @@ interface ReplicatedBlogExample {
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return Behaviors.setup(
context ->
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("blog", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -61,7 +61,7 @@ interface ReplicatedMovieExample {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("movies", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -85,7 +85,7 @@ interface ReplicatedShoppingCartExample {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("blog", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -27,7 +27,7 @@ public final class ReplicatedStringSet
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
return ReplicatedEventSourcing.commonJournalConfig(
new ReplicationId("StringSet", entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier(),

View file

@ -30,7 +30,7 @@ object ReplicatedEventPublishingSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicas: Set[ReplicaId]): Behavior[Command] =
Behaviors.setup { ctx =>
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId(EntityType, entityId, replicaId),
allReplicas,
PersistenceTestKitReadJournal.Identifier)(

View file

@ -71,7 +71,7 @@ object ReplicatedEventSourcingSpec {
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(replicationContext => eventSourcedBehavior(replicationContext, probe))

View file

@ -42,7 +42,7 @@ object ReplicatedEventSourcingTaggingSpec {
replica: ReplicaId,
allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = {
// #tagging
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("TaggingSpec", entityId, replica),
allReplicas,
queryPluginId)(

View file

@ -28,7 +28,7 @@ object ReplicationIllegalAccessSpec {
case class State(all: List[String]) extends CborSerializable
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("IllegalAccessSpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(
@ -88,7 +88,7 @@ class ReplicationIllegalAccessSpec
}
"detect illegal access in the factory" in {
val exception = intercept[UnsupportedOperationException] {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("IllegalAccessSpec", "id2", R1),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -35,7 +35,7 @@ object ReplicationSnapshotSpec {
entityId: String,
replicaId: ReplicaId,
probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId(EntityType, entityId, replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(replicationContext =>

View file

@ -29,7 +29,7 @@ object CounterSpec {
snapshotEvery: Long = 100,
eventProbe: Option[ActorRef[Counter.Updated]] = None) =
Behaviors.setup[PlainCounter.Command] { context =>
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("CounterSpec", entityId, replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { ctx =>

View file

@ -27,7 +27,7 @@ object LwwSpec {
object LwwRegistry {
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("LwwRegistrySpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -28,7 +28,7 @@ object ORSetSpec {
def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("ORSetSpec", entityId, replica),
AllReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -143,7 +143,7 @@ object ReplicatedAuctionExampleSpec {
responsibleForClosing: Boolean,
allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers { timers =>
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("auction", name, replica),
allReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationCtx =>

View file

@ -52,7 +52,7 @@ object ReplicatedBlogExampleSpec {
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
Behaviors.setup[Command] { ctx =>
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("blog", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -25,7 +25,7 @@ object ReplicatedEventSourcingCompileOnlySpec {
trait Event
//#factory-shared
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("entityTypeHint", "entityId", DCA),
AllReplicas,
queryPluginId) { context =>

View file

@ -28,7 +28,7 @@ object ReplicatedMovieWatchListExampleSpec {
final case class MovieList(movieIds: Set[String])
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("movies", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -41,7 +41,7 @@ object ReplicatedShoppingCartExampleSpec {
final case class State(items: Map[ProductId, Counter])
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("blog", entityId, replicaId),
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>

View file

@ -72,7 +72,8 @@ trait ReplicationContext {
object ReplicatedEventSourcing {
/**
* Initialize a replicated event sourced behavior where all entity replicas are stored in the same journal.
* Initialize a replicated event sourced behavior where all entity replicas are share the same journal configuration.
* This is typical if there is a shared database and no replica specific configuratin is required.
*
* Events from each replica for the same entityId will be replicated to every copy.
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
@ -85,16 +86,20 @@ object ReplicatedEventSourcing {
*
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
*/
def withSharedJournal[Command, Event, State](
def commonJournalConfig[Command, Event, State](
replicationId: ReplicationId,
allReplicaIds: JSet[ReplicaId],
queryPluginId: String,
behaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])
: EventSourcedBehavior[Command, Event, State] =
create(replicationId, allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava, behaviorFactory)
perReplicaJournalConfig(
replicationId,
allReplicaIds.asScala.map(id => id -> queryPluginId).toMap.asJava,
behaviorFactory)
/**
* Initialize a replicated event sourced behavior.
* Initialize a replicated event sourced behavior where each journal has different journal configuration e.g.
* each replica uses a different database or requires different database configuration for a shared database.
*
* Events from each replica for the same entityId will be replicated to every copy.
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
@ -108,7 +113,7 @@ object ReplicatedEventSourcing {
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
* and configured with the query plugin for the journal that each replica uses.
*/
def create[Command, Event, State](
def perReplicaJournalConfig[Command, Event, State](
replicationId: ReplicationId,
allReplicasAndQueryPlugins: JMap[ReplicaId, String],
eventSourcedBehaviorFactory: JFunction[ReplicationContext, EventSourcedBehavior[Command, Event, State]])

View file

@ -82,13 +82,14 @@ object ReplicatedEventSourcing {
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
* @param queryPluginId A single query plugin used to read the events from other replicas. Must be the query side of your configured journal plugin.
*/
def withSharedJournal[Command, Event, State](
def commonJournalConfig[Command, Event, State](
replicationId: ReplicationId,
allReplicaIds: Set[ReplicaId],
queryPluginId: String)(
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] =
apply(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(eventSourcedBehaviorFactory)
perReplicaJournalConfig(replicationId, allReplicaIds.map(id => id -> queryPluginId).toMap)(
eventSourcedBehaviorFactory)
/**
* Initialize a replicated event sourced behavior.
@ -104,7 +105,9 @@ object ReplicatedEventSourcing {
* @param allReplicasAndQueryPlugins All replica ids and a query plugin per replica id. These need to be known to receive events from all replicas
* and configured with the query plugin for the journal that each replica uses.
*/
def apply[Command, Event, State](replicationId: ReplicationId, allReplicasAndQueryPlugins: Map[ReplicaId, String])(
def perReplicaJournalConfig[Command, Event, State](
replicationId: ReplicationId,
allReplicasAndQueryPlugins: Map[ReplicaId, String])(
eventSourcedBehaviorFactory: ReplicationContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ReplicationContextImpl(replicationId, allReplicasAndQueryPlugins)