a few more things when reviewing replicated event sourcing (#29461)

This commit is contained in:
Patrik Nordwall 2020-08-06 09:23:11 +02:00 committed by Christopher Batey
parent f41f093372
commit fb5f5dc145
9 changed files with 43 additions and 56 deletions

View file

@ -1,29 +0,0 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.util
import java.util.concurrent.atomic.AtomicLong
import java.util.function.LongUnaryOperator
import akka.annotation.InternalApi
/**
* Always increasing wall clock time.
*/
@InternalApi
private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock {
override def currentTimeMillis(): Long = {
val currentSystemTime = System.currentTimeMillis()
updateAndGet {
new LongUnaryOperator {
override def applyAsLong(time: Long): Long = {
if (currentSystemTime <= time) time + 1
else currentSystemTime
}
}
}
}
}

View file

@ -5,6 +5,10 @@
package akka.util
import akka.annotation.ApiMayChange
import java.util.concurrent.atomic.AtomicLong
import java.util.function.LongUnaryOperator
import akka.annotation.InternalApi
/**
* A time source.
@ -17,7 +21,27 @@ trait WallClock {
object WallClock {
/**
* Always increasing time source.
* Always increasing time source. Based on `System.currentTimeMillis()` but
* guaranteed to always increase for each invocation.
*/
val AlwaysIncreasingClock: WallClock = new AlwaysIncreasingClock()
}
/**
* INTERNAL API: Always increasing wall clock time.
*/
@InternalApi
private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock {
override def currentTimeMillis(): Long = {
val currentSystemTime = System.currentTimeMillis()
updateAndGet {
new LongUnaryOperator {
override def applyAsLong(time: Long): Long = {
if (currentSystemTime <= time) time + 1
else currentSystemTime
}
}
}
}
}

View file

@ -21,12 +21,10 @@ With a [Lightbend Platform Subscription](https://www.lightbend.com/lightbend-sub
* [Configuration Checker](https://doc.akka.io/docs/akka-enhancements/current/config-checker.html) &#8212; Checks for potential configuration issues and logs suggestions.
* [Diagnostics Recorder](https://doc.akka.io/docs/akka-enhancements/current/diagnostics-recorder.html) &#8212; Captures configuration and system information in a format that makes it easy to troubleshoot issues during development and production.
* [Thread Starvation Detector](https://doc.akka.io/docs/akka-enhancements/current/starvation-detector.html) &#8212; Monitors an Akka system dispatcher and logs warnings if it becomes unresponsive.
* [Kubernetes Lease](https://doc.akka.io/docs/akka-enhancements/current/kubernetes-lease.html) &#8212; Monitors an Akka system dispatcher and logs warnings if it becomes unresponsive.
* [Fast Failover](https://doc.akka.io/docs/akka-enhancements/current/fast-failover.html) &#8212; Fast failover for Cluster Sharding.
[Akka Persistence Enhancements](https://doc.akka.io/docs/akka-enhancements/current/akka-persistence-enhancements.html):
* [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html) &#8212; For active-active persistent entities across multiple data centers.
* [GDPR for Akka Persistence](https://doc.akka.io/docs/akka-enhancements/current/gdpr/index.html) &#8212; Data shredding can be used to forget information in events.
This page does not list all available modules, but overviews the main functionality and gives you an idea of the level of sophistication you can reach when you start building systems on top of Akka.

View file

@ -657,9 +657,9 @@ cluster and address them by id.
Akka Persistence is based on the single-writer principle. For a particular `PersistenceId` only one `EventSourcedBehavior`
instance should be active at one time. If multiple instances were to persist events at the same time, the events would
be interleaved and might not be interpreted correctly on replay. Cluster Sharding ensures that there is only one
active entity (`EventSourcedBehavior`) for each id within a data center. Lightbend's
[Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
supports active-active persistent entities across data centers.
active entity (`EventSourcedBehavior`) for each id within a data center.
@ref:[Replicated Event Sourcing](replicated-eventsourcing.md) supports active-active persistent entities across
data centers.
## Configuration
@ -684,5 +684,5 @@ from the events, or publish the events to other services.
@java[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-java)]
@scala[@extref[Multi-DC Persistence example project](samples:akka-samples-persistence-dc-scala)]
illustrates how to use Lightbend's [Multi-DC Persistence](https://doc.akka.io/docs/akka-enhancements/current/persistence-dc/index.html)
with active-active persistent entities across data centers.
illustrates how to use @ref:[Replicated Event Sourcing](replicated-eventsourcing.md) that supports
active-active persistent entities across data centers.

View file

@ -2,16 +2,16 @@ akka.actor {
serialization-identifiers."akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer" = 40
serializers.active-active = "akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer"
serializers.replicated-event-sourcing = "akka.persistence.typed.serialization.ReplicatedEventSourcingSerializer"
serialization-bindings {
"akka.persistence.typed.internal.VersionVector" = active-active
"akka.persistence.typed.crdt.Counter" = active-active
"akka.persistence.typed.crdt.Counter$Updated" = active-active
"akka.persistence.typed.crdt.ORSet" = active-active
"akka.persistence.typed.crdt.ORSet$DeltaOp" = active-active
"akka.persistence.typed.internal.ReplicatedEventMetadata" = active-active
"akka.persistence.typed.internal.ReplicatedSnapshotMetadata" = active-active
"akka.persistence.typed.internal.VersionVector" = replicated-event-sourcing
"akka.persistence.typed.crdt.Counter" = replicated-event-sourcing
"akka.persistence.typed.crdt.Counter$Updated" = replicated-event-sourcing
"akka.persistence.typed.crdt.ORSet" = replicated-event-sourcing
"akka.persistence.typed.crdt.ORSet$DeltaOp" = replicated-event-sourcing
"akka.persistence.typed.internal.ReplicatedEventMetadata" = replicated-event-sourcing
"akka.persistence.typed.internal.ReplicatedSnapshotMetadata" = replicated-event-sourcing
}
}

View file

@ -32,7 +32,7 @@ object ORSet {
*/
@InternalApi private[akka] type Dot = VersionVector
sealed trait DeltaOp extends Serializable {
sealed trait DeltaOp {
def merge(that: DeltaOp): DeltaOp
}
@ -275,13 +275,11 @@ object ORSet {
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@ApiMayChange
@SerialVersionUID(1L)
final class ORSet[A] private[akka] (
val originReplica: String,
private[akka] val elementsMap: Map[A, ORSet.Dot],
private[akka] val vvector: VersionVector)
extends OpCrdt[DeltaOp]
with Serializable {
extends OpCrdt[DeltaOp] {
type T = ORSet[A]
type D = ORSet.DeltaOp

View file

@ -14,9 +14,6 @@ import akka.util.ccompat.JavaConverters._
/**
* INTERNAL API
*/
// FIXME, parts of this can be set during initialisation
// Other fields will be set before executing the event handler as they change per event
// https://github.com/akka/akka/issues/29258
@InternalApi
private[akka] final class ReplicationContextImpl(
val entityId: String,

View file

@ -165,7 +165,7 @@ private[akka] object Running {
// needs to be outside of the restart source so that it actually cancels when terminating the replica
.via(ActorFlow
.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) =>
// Need to handle this not being available migration from non-active-active is supported
// Need to handle this not being available migration from non-replicated is supported
val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetadata]
val re =
ReplicatedEvent[E](

View file

@ -13,7 +13,7 @@ import akka.annotation.InternalApi
* VersionVector module with helper classes and methods.
*/
@InternalApi
object VersionVector {
private[akka] object VersionVector {
private val emptyVersions: TreeMap[String, Long] = TreeMap.empty
val empty: VersionVector = ManyVersionVector(emptyVersions)
@ -71,9 +71,8 @@ object VersionVector {
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
@InternalApi
sealed abstract class VersionVector extends Serializable {
private[akka] sealed abstract class VersionVector {
type T = VersionVector