parent
bbae718ef7
commit
8573c70883
12 changed files with 36 additions and 48 deletions
|
|
@ -15,12 +15,10 @@ import akka.util.ccompat.JavaConverters._
|
||||||
import java.util.{ Set => JSet }
|
import java.util.{ Set => JSet }
|
||||||
|
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
|
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
|
||||||
import akka.persistence.typed.ReplicationId
|
import akka.persistence.typed.ReplicationId
|
||||||
import akka.persistence.typed.ReplicationId.Separator
|
import akka.persistence.typed.ReplicationId.Separator
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
object ReplicatedEntityProvider {
|
object ReplicatedEntityProvider {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -138,7 +136,6 @@ object ReplicatedEntityProvider {
|
||||||
*
|
*
|
||||||
* @tparam M The type of messages the replicated entity accepts
|
* @tparam M The type of messages the replicated entity accepts
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
final class ReplicatedEntityProvider[M] private (
|
final class ReplicatedEntityProvider[M] private (
|
||||||
val replicas: immutable.Seq[(ReplicatedEntity[M], String)],
|
val replicas: immutable.Seq[(ReplicatedEntity[M], String)],
|
||||||
val directReplication: Boolean) {
|
val directReplication: Boolean) {
|
||||||
|
|
@ -155,7 +152,6 @@ final class ReplicatedEntityProvider[M] private (
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
object ReplicatedEntity {
|
object ReplicatedEntity {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -181,5 +177,4 @@ object ReplicatedEntity {
|
||||||
* Settings for a specific replica id in replicated sharding
|
* Settings for a specific replica id in replicated sharding
|
||||||
* Currently only Entity's with ShardingEnvelope are supported but this may change in the future
|
* Currently only Entity's with ShardingEnvelope are supported but this may change in the future
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]])
|
final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]])
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ package akka.cluster.sharding.typed
|
||||||
import akka.actor.typed.ActorSystem
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.actor.typed.Extension
|
import akka.actor.typed.Extension
|
||||||
import akka.actor.typed.ExtensionId
|
import akka.actor.typed.ExtensionId
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
import akka.annotation.DoNotInherit
|
import akka.annotation.DoNotInherit
|
||||||
import akka.cluster.sharding.typed.internal.ReplicatedShardingExtensionImpl
|
import akka.cluster.sharding.typed.internal.ReplicatedShardingExtensionImpl
|
||||||
import akka.cluster.sharding.typed.scaladsl.EntityRef
|
import akka.cluster.sharding.typed.scaladsl.EntityRef
|
||||||
|
|
@ -18,7 +17,6 @@ import java.util.{ Map => JMap }
|
||||||
* Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica.
|
* Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica.
|
||||||
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
|
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtension] {
|
object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtension] {
|
||||||
|
|
||||||
override def createExtension(system: ActorSystem[_]): ReplicatedShardingExtension =
|
override def createExtension(system: ActorSystem[_]): ReplicatedShardingExtension =
|
||||||
|
|
@ -32,7 +30,6 @@ object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtensi
|
||||||
* Not for user extension.
|
* Not for user extension.
|
||||||
*/
|
*/
|
||||||
@DoNotInherit
|
@DoNotInherit
|
||||||
@ApiMayChange
|
|
||||||
trait ReplicatedShardingExtension extends Extension {
|
trait ReplicatedShardingExtension extends Extension {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -61,7 +58,6 @@ trait ReplicatedShardingExtension extends Extension {
|
||||||
* Not for user extension.
|
* Not for user extension.
|
||||||
*/
|
*/
|
||||||
@DoNotInherit
|
@DoNotInherit
|
||||||
@ApiMayChange
|
|
||||||
trait ReplicatedSharding[M] {
|
trait ReplicatedSharding[M] {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,5 @@ These are the current complete modules marked as **may change**:
|
||||||
* @ref:[Multi Node Testing](../multi-node-testing.md)
|
* @ref:[Multi Node Testing](../multi-node-testing.md)
|
||||||
* @ref:[Reliable Delivery](../typed/reliable-delivery.md)
|
* @ref:[Reliable Delivery](../typed/reliable-delivery.md)
|
||||||
* @ref:[Sharded Daemon Process](../typed/cluster-sharded-daemon-process.md)
|
* @ref:[Sharded Daemon Process](../typed/cluster-sharded-daemon-process.md)
|
||||||
* @ref:[Replicated Event Sourcing](../typed/replicated-eventsourcing.md)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,5 @@
|
||||||
# Replicated Event Sourcing
|
# Replicated Event Sourcing
|
||||||
|
|
||||||
@@@ warning
|
|
||||||
|
|
||||||
This module is marked as @ref:[may change](../common/may-change.md) because it is a new feature that
|
|
||||||
needs feedback from real usage before finalizing the API. This means that API or semantics can change without
|
|
||||||
warning or deprecation period. It is also not recommended to use this module in production just yet.
|
|
||||||
|
|
||||||
@@@
|
|
||||||
|
|
||||||
@ref[Event Sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior`
|
@ref[Event Sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior`
|
||||||
with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state.
|
with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,10 +4,13 @@
|
||||||
|
|
||||||
package docs.akka.persistence.typed
|
package docs.akka.persistence.typed
|
||||||
|
|
||||||
|
import scala.annotation.nowarn
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorSystem
|
||||||
import akka.persistence.typed.ReplicaId
|
import akka.persistence.typed.ReplicaId
|
||||||
import akka.persistence.typed.ReplicationId
|
import akka.persistence.typed.ReplicationId
|
||||||
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, ReplicatedEventSourcing }
|
import akka.persistence.typed.scaladsl.EventSourcedBehavior
|
||||||
import scala.annotation.nowarn
|
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
|
||||||
|
|
||||||
@nowarn("msg=never used")
|
@nowarn("msg=never used")
|
||||||
object ReplicatedEventSourcingCompileOnlySpec {
|
object ReplicatedEventSourcingCompileOnlySpec {
|
||||||
|
|
@ -24,21 +27,36 @@ object ReplicatedEventSourcingCompileOnlySpec {
|
||||||
trait State
|
trait State
|
||||||
trait Event
|
trait Event
|
||||||
|
|
||||||
//#factory-shared
|
object Shared {
|
||||||
ReplicatedEventSourcing.commonJournalConfig(
|
//#factory-shared
|
||||||
ReplicationId("entityTypeHint", "entityId", DCA),
|
def apply(
|
||||||
AllReplicas,
|
system: ActorSystem[_],
|
||||||
queryPluginId) { context =>
|
entityId: String,
|
||||||
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
|
||||||
|
ReplicatedEventSourcing.commonJournalConfig(
|
||||||
|
ReplicationId("MyReplicatedEntity", entityId, replicaId),
|
||||||
|
AllReplicas,
|
||||||
|
queryPluginId) { replicationContext =>
|
||||||
|
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#factory-shared
|
||||||
}
|
}
|
||||||
//#factory-shared
|
|
||||||
|
|
||||||
//#factory
|
object PerReplica {
|
||||||
ReplicatedEventSourcing.perReplicaJournalConfig(
|
//#factory
|
||||||
ReplicationId("entityTypeHint", "entityId", DCA),
|
def apply(
|
||||||
Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
|
system: ActorSystem[_],
|
||||||
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
entityId: String,
|
||||||
|
replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
|
||||||
|
ReplicatedEventSourcing.perReplicaJournalConfig(
|
||||||
|
ReplicationId("MyReplicatedEntity", entityId, replicaId),
|
||||||
|
Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { replicationContext =>
|
||||||
|
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#factory
|
||||||
}
|
}
|
||||||
//#factory
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package akka.persistence.typed.crdt
|
package akka.persistence.typed.crdt
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
object Counter {
|
object Counter {
|
||||||
val empty: Counter = Counter(0)
|
val empty: Counter = Counter(0)
|
||||||
|
|
||||||
|
|
@ -23,7 +21,6 @@ object Counter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
final case class Counter(value: BigInt) extends OpCrdt[Counter.Updated] {
|
final case class Counter(value: BigInt) extends OpCrdt[Counter.Updated] {
|
||||||
|
|
||||||
override type T = Counter
|
override type T = Counter
|
||||||
|
|
|
||||||
|
|
@ -3,14 +3,12 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package akka.persistence.typed.crdt
|
package akka.persistence.typed.crdt
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
import akka.persistence.typed.ReplicaId
|
import akka.persistence.typed.ReplicaId
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class for comparing timestamp replica
|
* Utility class for comparing timestamp replica
|
||||||
* identifier when implementing last-writer wins.
|
* identifier when implementing last-writer wins.
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
final case class LwwTime(timestamp: Long, originReplica: ReplicaId) {
|
final case class LwwTime(timestamp: Long, originReplica: ReplicaId) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -7,12 +7,11 @@ package akka.persistence.typed.crdt
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.util.HashCode
|
import akka.util.HashCode
|
||||||
import akka.annotation.{ ApiMayChange, InternalApi }
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.typed.ReplicaId
|
import akka.persistence.typed.ReplicaId
|
||||||
import akka.persistence.typed.crdt.ORSet.DeltaOp
|
import akka.persistence.typed.crdt.ORSet.DeltaOp
|
||||||
import akka.persistence.typed.internal.{ ManyVersionVector, OneVersionVector, VersionVector }
|
import akka.persistence.typed.internal.{ ManyVersionVector, OneVersionVector, VersionVector }
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
object ORSet {
|
object ORSet {
|
||||||
def empty[A](originReplica: ReplicaId): ORSet[A] = new ORSet(originReplica.id, Map.empty, VersionVector.empty)
|
def empty[A](originReplica: ReplicaId): ORSet[A] = new ORSet(originReplica.id, Map.empty, VersionVector.empty)
|
||||||
def apply[A](originReplica: ReplicaId): ORSet[A] = empty(originReplica)
|
def apply[A](originReplica: ReplicaId): ORSet[A] = empty(originReplica)
|
||||||
|
|
@ -274,7 +273,6 @@ object ORSet {
|
||||||
*
|
*
|
||||||
* This class is immutable, i.e. "modifying" methods return a new instance.
|
* This class is immutable, i.e. "modifying" methods return a new instance.
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
final class ORSet[A] private[akka] (
|
final class ORSet[A] private[akka] (
|
||||||
val originReplica: String,
|
val originReplica: String,
|
||||||
private[akka] val elementsMap: Map[A, ORSet.Dot],
|
private[akka] val elementsMap: Map[A, ORSet.Dot],
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,8 @@
|
||||||
|
|
||||||
package akka.persistence.typed.crdt
|
package akka.persistence.typed.crdt
|
||||||
|
|
||||||
import akka.annotation.{ ApiMayChange, DoNotInherit }
|
import akka.annotation.DoNotInherit
|
||||||
|
|
||||||
@ApiMayChange
|
|
||||||
@DoNotInherit
|
@DoNotInherit
|
||||||
trait OpCrdt[Operation] { self =>
|
trait OpCrdt[Operation] { self =>
|
||||||
type T <: OpCrdt[Operation] { type T = self.T }
|
type T <: OpCrdt[Operation] { type T = self.T }
|
||||||
|
|
|
||||||
|
|
@ -305,7 +305,6 @@ object ReplicatedEventMetadata {
|
||||||
* For a journal supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
|
* For a journal supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
|
||||||
* to the built in serializer for serialization format
|
* to the built in serializer for serialization format
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
def instanceForJournalTest: Any = ReplicatedEventMetadata(ReplicaId("DC-A"), 1L, VersionVector.empty + "DC-A", true)
|
def instanceForJournalTest: Any = ReplicatedEventMetadata(ReplicaId("DC-A"), 1L, VersionVector.empty + "DC-A", true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -328,7 +327,6 @@ object ReplicatedSnapshotMetadata {
|
||||||
* For a snapshot store supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
|
* For a snapshot store supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
|
||||||
* to the built in serializer for serialization format
|
* to the built in serializer for serialization format
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
def instanceForSnapshotStoreTest: Any =
|
def instanceForSnapshotStoreTest: Any =
|
||||||
ReplicatedSnapshotMetadata(
|
ReplicatedSnapshotMetadata(
|
||||||
VersionVector.empty + "DC-B" + "DC-A",
|
VersionVector.empty + "DC-B" + "DC-A",
|
||||||
|
|
|
||||||
|
|
@ -268,7 +268,7 @@ private[akka] object Running {
|
||||||
state: Running.RunningState[S],
|
state: Running.RunningState[S],
|
||||||
envelope: ReplicatedEventEnvelope[E],
|
envelope: ReplicatedEventEnvelope[E],
|
||||||
replication: ReplicationSetup): Behavior[InternalProtocol] = {
|
replication: ReplicationSetup): Behavior[InternalProtocol] = {
|
||||||
setup.internalLogger.infoN(
|
setup.internalLogger.debugN(
|
||||||
"Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}",
|
"Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}",
|
||||||
setup.replication,
|
setup.replication,
|
||||||
state.seenPerReplica,
|
state.seenPerReplica,
|
||||||
|
|
|
||||||
|
|
@ -9,14 +9,12 @@ import java.util.Optional
|
||||||
import akka.actor.typed.BackoffSupervisorStrategy
|
import akka.actor.typed.BackoffSupervisorStrategy
|
||||||
import akka.actor.typed.Behavior
|
import akka.actor.typed.Behavior
|
||||||
import akka.actor.typed.TypedActorContext
|
import akka.actor.typed.TypedActorContext
|
||||||
import akka.annotation.ApiMayChange
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.typed.internal.ReplicationContextImpl
|
import akka.persistence.typed.internal.ReplicationContextImpl
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for replicated event sourced behaviors.
|
* Base class for replicated event sourced behaviors.
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
|
||||||
abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
|
abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
|
||||||
replicationContext: ReplicationContext,
|
replicationContext: ReplicationContext,
|
||||||
onPersistFailure: Optional[BackoffSupervisorStrategy])
|
onPersistFailure: Optional[BackoffSupervisorStrategy])
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue