diff --git a/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala b/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala deleted file mode 100644 index 8bc84f8261..0000000000 --- a/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package akka.util - -import java.util.concurrent.atomic.AtomicLong -import java.util.function.LongUnaryOperator - -import akka.annotation.InternalApi - -/** - * Always increasing wall clock time. - */ -@InternalApi -private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock { - - override def currentTimeMillis(): Long = { - val currentSystemTime = System.currentTimeMillis() - updateAndGet { - new LongUnaryOperator { - override def applyAsLong(time: Long): Long = { - if (currentSystemTime <= time) time + 1 - else currentSystemTime - } - } - } - } -} diff --git a/akka-actor/src/main/scala/akka/util/WallClock.scala b/akka-actor/src/main/scala/akka/util/WallClock.scala index 1bda0e8485..3254f52e31 100644 --- a/akka-actor/src/main/scala/akka/util/WallClock.scala +++ b/akka-actor/src/main/scala/akka/util/WallClock.scala @@ -5,6 +5,10 @@ package akka.util import akka.annotation.ApiMayChange +import java.util.concurrent.atomic.AtomicLong +import java.util.function.LongUnaryOperator + +import akka.annotation.InternalApi /** * A time source. @@ -17,7 +21,27 @@ trait WallClock { object WallClock { /** - * Always increasing time source. + * Always increasing time source. Based on `System.currentTimeMillis()` but + * guaranteed to always increase for each invocation. */ val AlwaysIncreasingClock: WallClock = new AlwaysIncreasingClock() } + +/** + * INTERNAL API: Always increasing wall clock time. + */ +@InternalApi +private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock { + + override def currentTimeMillis(): Long = { + val currentSystemTime = System.currentTimeMillis() + updateAndGet { + new LongUnaryOperator { + override def applyAsLong(time: Long): Long = { + if (currentSystemTime <= time) time + 1 + else currentSystemTime + } + } + } + } +} diff --git a/akka-docs/src/main/paradox/typed/guide/modules.md b/akka-docs/src/main/paradox/typed/guide/modules.md index 18c703640c..8580d49a99 100644 --- a/akka-docs/src/main/paradox/typed/guide/modules.md +++ b/akka-docs/src/main/paradox/typed/guide/modules.md @@ -21,12 +21,10 @@ With a [Lightbend Platform Subscription](https://www.lightbend.com/lightbend-sub * [Configuration Checker](https://doc.akka.io/docs/akka-enhancements/current/config-checker.html) — Checks for potential configuration issues and logs suggestions. * [Diagnostics Recorder](https://doc.akka.io/docs/akka-enhancements/current/diagnostics-recorder.html) — Captures configuration and system information in a format that makes it easy to troubleshoot issues during development and production. * [Thread Starvation Detector](https://doc.akka.io/docs/akka-enhancements/current/starvation-detector.html) — Monitors an Akka system dispatcher and logs warnings if it becomes unresponsive. -* [Kubernetes Lease](https://doc.akka.io/docs/akka-enhancements/current/kubernetes-lease.html) — Monitors an Akka system dispatcher and logs warnings if it becomes unresponsive. * [Fast Failover](https://doc.akka.io/docs/akka-enhancements/current/fast-failover.html) — Fast failover for Cluster Sharding. [Akka Persistence Enhancements](https://doc.akka.io/docs/akka-enhancements/current/akka-persistence-enhancements.html): -* [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) — For active-active persistent entities across multiple data centers. * [GDPR for Akka Persistence](https://doc.akka.io/docs/akka-enhancements/current/gdpr/index.html) — Data shredding can be used to forget information in events. This page does not list all available modules, but overviews the main functionality and gives you an idea of the level of sophistication you can reach when you start building systems on top of Akka. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index eae94d72e7..c55aa91aaa 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -657,9 +657,9 @@ cluster and address them by id. Akka Persistence is based on the single-writer principle. For a particular `PersistenceId` only one `EventSourcedBehavior` instance should be active at one time. If multiple instances were to persist events at the same time, the events would be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one -active entity (`EventSourcedBehavior`) for each id within a data center. Lightbend's -[Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) -supports active-active persistent entities across data centers. +active entity (`EventSourcedBehavior`) for each id within a data center. +@ref:[Replicated Event Sourcing](replicated-eventsourcing.md) supports active-active persistent entities across +data centers. ## Configuration @@ -684,5 +684,5 @@ from the events, or publish the events to other services. @java[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-java)] @scala[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-scala)] -illustrates how to use Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) -with active-active persistent entities across data centers. +illustrates how to use @ref:[Replicated Event Sourcing](replicated-eventsourcing.md) that supports +active-active persistent entities across data centers. diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index fb72910aca..390064ee69 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -2,16 +2,16 @@ akka.actor { serialization-identifiers."akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer" = 40 - serializers.active-active = "akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer" + serializers.replicated-event-sourcing = "akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer" serialization-bindings { - "akka.persistence.typed.internal.VersionVector" = active-active - "akka.persistence.typed.crdt.Counter" = active-active - "akka.persistence.typed.crdt.Counter$Updated" = active-active - "akka.persistence.typed.crdt.ORSet" = active-active - "akka.persistence.typed.crdt.ORSet$DeltaOp" = active-active - "akka.persistence.typed.internal.ReplicatedEventMetadata" = active-active - "akka.persistence.typed.internal.ReplicatedSnapshotMetadata" = active-active + "akka.persistence.typed.internal.VersionVector" = replicated-event-sourcing + "akka.persistence.typed.crdt.Counter" = replicated-event-sourcing + "akka.persistence.typed.crdt.Counter$Updated" = replicated-event-sourcing + "akka.persistence.typed.crdt.ORSet" = replicated-event-sourcing + "akka.persistence.typed.crdt.ORSet$DeltaOp" = replicated-event-sourcing + "akka.persistence.typed.internal.ReplicatedEventMetadata" = replicated-event-sourcing + "akka.persistence.typed.internal.ReplicatedSnapshotMetadata" = replicated-event-sourcing } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala index f881e1d433..b8e5f1ffc0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala @@ -32,7 +32,7 @@ object ORSet { */ @InternalApi private[akka] type Dot = VersionVector - sealed trait DeltaOp extends Serializable { + sealed trait DeltaOp { def merge(that: DeltaOp): DeltaOp } @@ -275,13 +275,11 @@ object ORSet { * This class is immutable, i.e. "modifying" methods return a new instance. */ @ApiMayChange -@SerialVersionUID(1L) final class ORSet[A] private[akka] ( val originReplica: String, private[akka] val elementsMap: Map[A, ORSet.Dot], private[akka] val vvector: VersionVector) - extends OpCrdt[DeltaOp] - with Serializable { + extends OpCrdt[DeltaOp] { type T = ORSet[A] type D = ORSet.DeltaOp diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala index 62b1e49851..e449469a92 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplicationSetup.scala @@ -14,9 +14,6 @@ import akka.util.ccompat.JavaConverters._ /** * INTERNAL API */ -// FIXME, parts of this can be set during initialisation -// Other fields will be set before executing the event handler as they change per event -// https://github.com/akka/akka/issues/29258 @InternalApi private[akka] final class ReplicationContextImpl( val entityId: String, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 05889a0101..96aaf7305d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -165,7 +165,7 @@ private[akka] object Running { // needs to be outside of the restart source so that it actually cancels when terminating the replica .via(ActorFlow .ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => - // Need to handle this not being available migration from non-active-active is supported + // Need to handle this not being available migration from non-replicated is supported val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetadata] val re = ReplicatedEvent[E]( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala index 42e5b7bbf5..3043a7ad30 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala @@ -13,7 +13,7 @@ import akka.annotation.InternalApi * VersionVector module with helper classes and methods. */ @InternalApi -object VersionVector { +private[akka] object VersionVector { private val emptyVersions: TreeMap[String, Long] = TreeMap.empty val empty: VersionVector = ManyVersionVector(emptyVersions) @@ -71,9 +71,8 @@ object VersionVector { * * This class is immutable, i.e. "modifying" methods return a new instance. */ -@SerialVersionUID(1L) @InternalApi -sealed abstract class VersionVector extends Serializable { +private[akka] sealed abstract class VersionVector { type T = VersionVector