diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/replicated-event-sourcing.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/replicated-event-sourcing.excludes new file mode 100644 index 0000000000..4e794fd578 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/replicated-event-sourcing.excludes @@ -0,0 +1,2 @@ +# new private method on type that is do not inherit +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.asJava") \ No newline at end of file diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala index 156037b46d..3d39c47991 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ReplicatedShardingSpec.scala @@ -96,14 +96,13 @@ object ReplicatedShardingSpec extends MultiNodeConfig { } } - def provider(): ReplicatedEntityProvider[Command, ShardingEnvelope[Command]] = { - ReplicatedEntityProvider[Command, ShardingEnvelope[Command]]("TestRES", AllReplicas) { - (entityTypeKey, replicaId) => - ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => - Behaviors.setup { ctx => - TestRES(ReplicationId.fromString(entityContext.entityId), ctx) - } - }) + def provider(): ReplicatedEntityProvider[Command] = { + ReplicatedEntityProvider[Command]("TestRES", AllReplicas) { (entityTypeKey, replicaId) => + ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => + Behaviors.setup { ctx => + TestRES(ReplicationId.fromString(entityContext.entityId), ctx) + } + }) }.withDirectReplication(true) // this is required as we don't have a shared read journal } } @@ -134,7 +133,7 @@ abstract class ReplicatedShardingSpec } "start replicated entities" in { - val replicatedSharding: ReplicatedSharding[TestRES.Command, ShardingEnvelope[TestRES.Command]] = + val replicatedSharding: ReplicatedSharding[TestRES.Command] = ReplicatedShardingExtension(typedSystem).init(TestRES.provider()) runOn(first) { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 925da2e7f3..13b7f8bc25 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -358,6 +358,8 @@ private[akka] object Shard { // only called once during handoff def activeEntities(): Set[ActorRef] = byRef.keySet.asScala.toSet + def nrActiveEntities() = byRef.size() + // only called for getting shard stats def activeEntityIds(): Set[EntityId] = byRef.values.asScala.toSet diff --git a/akka-docs/src/main/paradox/project/examples.md b/akka-docs/src/main/paradox/project/examples.md index d790e8590e..d108262df8 100644 --- a/akka-docs/src/main/paradox/project/examples.md +++ b/akka-docs/src/main/paradox/project/examples.md @@ -61,7 +61,7 @@ from the events, or publish the events to other services. ## Multi-DC Persistence -This commercial feature has now been superseeded by @ref[Replicated Event Sourcing](../typed/replicated-eventsourcing.md) +This commercial feature has now been superseded by @ref[Replicated Event Sourcing](../typed/replicated-eventsourcing.md) ## Cluster with Docker diff --git a/akka-docs/src/main/paradox/typed/cluster-dc.md b/akka-docs/src/main/paradox/typed/cluster-dc.md index ca0003bb71..ea5d3921ce 100644 --- a/akka-docs/src/main/paradox/typed/cluster-dc.md +++ b/akka-docs/src/main/paradox/typed/cluster-dc.md @@ -193,7 +193,7 @@ Especially when used together with Akka Persistence that is based on the single- it is important to avoid running the same entity at multiple locations at the same time with a shared data store. That would result in corrupt data since the events stored by different instances may be interleaved and would be interpreted differently in a later replay. For replicated persistent -entities see @ref[Replciated Event Sourcing](replicated-eventsourcing.md). +entities see @ref[Replicated Event Sourcing](replicated-eventsourcing.md). If you need global entities you have to pick one data center to host that entity type and only start `ClusterSharding` on nodes of that data center. If the data center is unreachable from another data center the diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-auction.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-auction.md index bc2a7040c3..a10bc9ba9f 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-auction.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-auction.md @@ -25,7 +25,7 @@ Scala Java : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #events } -The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event. +The winner does not have to pay the highest bid but only enough to beat the second highest, so the `highestCounterOffer` is in the `AuctionFinished` event. Let's have a look at the auction entity that will handle incoming commands: diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index 8a9413d935..52770e4b95 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -8,7 +8,8 @@ warning or deprecation period. It is also not recommended to use this module in @@@ -@ref[Event sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior` with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state. +@ref[Event sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior` +with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state. This restriction means that in the event of network partitions, and for a short time during rolling re-deploys, some `EventSourcedBehavior` actors are unavailable. @@ -28,7 +29,7 @@ The motivations are: * Balance the load over many servers However, the event handler must be able to **handle concurrent events** as when replication is enabled -there is no longer the single writer principle as there is with a normal `EventSourcedBehavior`. +the single-writer guarantee is not maintained like it is with a normal `EventSourcedBehavior`. The state of a replicated `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed due to network partitions and outages, which means that the event handler and those reading the state must be designed @@ -36,7 +37,7 @@ to handle this. To be able to use Replicated Event Sourcing the journal and snapshot store used is required to have specific support for the metadata that the replication needs (see @ref[Journal Support](#journal-support)). -## Relaxing the single writer principle for availability +## Relaxing the single-writer principle for availability Taking the example of using Replicated Event Sourcing to run a replica per data center. diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala index 4257e324f8..cc7867f5dd 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/MultiJournalReplicationSpec.scala @@ -32,10 +32,10 @@ object MultiJournalReplicationSpec { private val writeJournalPerReplica = Map("R1" -> "journal1.journal", "R2" -> "journal2.journal") def apply(entityId: String, replicaId: String): Behavior[Command] = { - ReplicatedEventSourcing( - ReplicationId("MultiJournalSpec", entityId, ReplicaId(replicaId)), - Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))( - replicationContext => + ReplicatedEventSourcing + .perReplicaJournalConfig( + ReplicationId("MultiJournalSpec", entityId, ReplicaId(replicaId)), + Map(ReplicaId("R1") -> "journal1.query", ReplicaId("R2") -> "journal2.query"))(replicationContext => EventSourcedBehavior[Command, String, Set[String]]( replicationContext.persistenceId, Set.empty[String], @@ -47,7 +47,8 @@ object MultiJournalReplicationSpec { case StoreMe(evt, ack) => Effect.persist(evt).thenRun(_ => ack ! Done) }, - (state, event) => state + event)).withJournalPluginId(writeJournalPerReplica(replicaId)) + (state, event) => state + event)) + .withJournalPluginId(writeJournalPerReplica(replicaId)) } } diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala index 002f2cd0ef..8ab4ebec6e 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala @@ -34,7 +34,7 @@ object ReplicatedEventSourcingCompileOnlySpec { //#factory-shared //#factory - ReplicatedEventSourcing( + ReplicatedEventSourcing.perReplicaJournalConfig( ReplicationId("entityTypeHint", "entityId", DCA), Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context => EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) diff --git a/build.sbt b/build.sbt index 0794926692..b8c1e1ca4e 100644 --- a/build.sbt +++ b/build.sbt @@ -82,6 +82,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = List[ProjectReference]( persistenceShared, persistenceTck, persistenceTyped, + persistenceTypedTests, persistenceTestkit, protobuf, protobufV3,