Port CRDTs from multi dc (#29372)
* Metadata for snapshots for active active * Port CRDTs from multi dc * Review feedback
This commit is contained in:
parent
7e91428428
commit
116c13677a
15 changed files with 7965 additions and 28 deletions
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
|
||||
import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin }
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
object ActiveActiveBaseSpec {
|
||||
val R1 = ReplicaId("R1")
|
||||
val R2 = ReplicaId("R2")
|
||||
val AllReplicas = Set(R1, R2)
|
||||
}
|
||||
|
||||
abstract class ActiveActiveBaseSpec
|
||||
extends ScalaTestWithActorTestKit(
|
||||
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing
|
||||
with Eventually {
|
||||
|
||||
val ids = new AtomicInteger(0)
|
||||
def nextEntityId: String = s"e-${ids.getAndIncrement()}"
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,132 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||
import akka.persistence.typed.crdt.CounterSpec.PlainCounter.{ Decrement, Get, Increment }
|
||||
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
|
||||
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
|
||||
|
||||
object CounterSpec {
|
||||
|
||||
object PlainCounter {
|
||||
sealed trait Command
|
||||
case class Get(reply: ActorRef[Long]) extends Command
|
||||
case object Increment extends Command
|
||||
case object Decrement extends Command
|
||||
}
|
||||
|
||||
import ActiveActiveBaseSpec._
|
||||
|
||||
def apply(
|
||||
entityId: String,
|
||||
replicaId: ReplicaId,
|
||||
snapshotEvery: Long = 100,
|
||||
eventProbe: Option[ActorRef[Counter.Updated]] = None) =
|
||||
Behaviors.setup[PlainCounter.Command] { context =>
|
||||
ActiveActiveEventSourcing.withSharedJournal(
|
||||
entityId,
|
||||
replicaId,
|
||||
AllReplicas,
|
||||
PersistenceTestKitReadJournal.Identifier) { ctx =>
|
||||
EventSourcedBehavior[PlainCounter.Command, Counter.Updated, Counter](
|
||||
ctx.persistenceId,
|
||||
Counter.empty,
|
||||
(state, command) =>
|
||||
command match {
|
||||
case PlainCounter.Increment =>
|
||||
context.log.info("Increment. Current state {}", state.value)
|
||||
Effect.persist(Counter.Updated(1))
|
||||
case PlainCounter.Decrement =>
|
||||
Effect.persist(Counter.Updated(-1))
|
||||
case Get(replyTo) =>
|
||||
context.log.info("Get request. {} {}", state.value, state.value.longValue())
|
||||
replyTo ! state.value.longValue()
|
||||
Effect.none
|
||||
},
|
||||
(counter, event) => {
|
||||
eventProbe.foreach(_ ! event)
|
||||
counter.applyOperation(event)
|
||||
}).snapshotWhen { (_, _, seqNr) =>
|
||||
seqNr % snapshotEvery == 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class CounterSpec extends ActiveActiveBaseSpec {
|
||||
|
||||
import CounterSpec._
|
||||
import ActiveActiveBaseSpec._
|
||||
|
||||
"Active active entity using CRDT counter" should {
|
||||
"replicate" in {
|
||||
val id = nextEntityId
|
||||
val r1 = spawn(apply(id, R1))
|
||||
val r2 = spawn(apply(id, R2))
|
||||
val r1Probe = createTestProbe[Long]()
|
||||
val r2Probe = createTestProbe[Long]()
|
||||
|
||||
r1 ! Increment
|
||||
r1 ! Increment
|
||||
|
||||
eventually {
|
||||
r1 ! Get(r1Probe.ref)
|
||||
r1Probe.expectMessage(2L)
|
||||
r2 ! Get(r2Probe.ref)
|
||||
r2Probe.expectMessage(2L)
|
||||
}
|
||||
|
||||
for (n <- 1 to 10) {
|
||||
if (n % 2 == 0) r1 ! Increment
|
||||
else r1 ! Decrement
|
||||
}
|
||||
for (_ <- 1 to 10) {
|
||||
r2 ! Increment
|
||||
}
|
||||
|
||||
eventually {
|
||||
r1 ! Get(r1Probe.ref)
|
||||
r1Probe.expectMessage(12L)
|
||||
r2 ! Get(r2Probe.ref)
|
||||
r2Probe.expectMessage(12L)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"recover from snapshot" in {
|
||||
val id = nextEntityId
|
||||
|
||||
{
|
||||
val r1 = spawn(apply(id, R1, 2))
|
||||
val r2 = spawn(apply(id, R2, 2))
|
||||
val r1Probe = createTestProbe[Long]()
|
||||
val r2Probe = createTestProbe[Long]()
|
||||
|
||||
r1 ! Increment
|
||||
r1 ! Increment
|
||||
|
||||
eventually {
|
||||
r1 ! Get(r1Probe.ref)
|
||||
r1Probe.expectMessage(2L)
|
||||
r2 ! Get(r2Probe.ref)
|
||||
r2Probe.expectMessage(2L)
|
||||
}
|
||||
}
|
||||
{
|
||||
val r2EventProbe = createTestProbe[Counter.Updated]()
|
||||
val r2 = spawn(apply(id, R2, 2, Some(r2EventProbe.ref)))
|
||||
val r2Probe = createTestProbe[Long]()
|
||||
eventually {
|
||||
r2 ! Get(r2Probe.ref)
|
||||
r2Probe.expectMessage(2L)
|
||||
}
|
||||
|
||||
r2EventProbe.expectNoMessage()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
|
||||
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
|
||||
object LwwSpec {
|
||||
|
||||
import ActiveActiveBaseSpec._
|
||||
|
||||
sealed trait Command
|
||||
final case class Update(item: String, timestamp: Long, error: ActorRef[String]) extends Command
|
||||
final case class Get(replyTo: ActorRef[Registry]) extends Command
|
||||
|
||||
sealed trait Event extends CborSerializable
|
||||
final case class Changed(item: String, timestamp: LwwTime) extends Event
|
||||
|
||||
final case class Registry(item: String, updatedTimestamp: LwwTime) extends CborSerializable
|
||||
|
||||
object LwwRegistry {
|
||||
|
||||
def apply(entityId: String, replica: ReplicaId): Behavior[Command] = {
|
||||
ActiveActiveEventSourcing.withSharedJournal(
|
||||
entityId,
|
||||
replica,
|
||||
AllReplicas,
|
||||
PersistenceTestKitReadJournal.Identifier) { aaContext =>
|
||||
EventSourcedBehavior[Command, Event, Registry](
|
||||
aaContext.persistenceId,
|
||||
Registry("", LwwTime(Long.MinValue, aaContext.replicaId)),
|
||||
(state, command) =>
|
||||
command match {
|
||||
case Update(s, timestmap, error) =>
|
||||
if (s == "") {
|
||||
error ! "bad value"
|
||||
Effect.none
|
||||
} else {
|
||||
Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, aaContext.replicaId)))
|
||||
}
|
||||
case Get(replyTo) =>
|
||||
replyTo ! state
|
||||
Effect.none
|
||||
},
|
||||
(state, event) =>
|
||||
event match {
|
||||
case Changed(s, timestamp) =>
|
||||
if (timestamp.isAfter(state.updatedTimestamp)) Registry(s, timestamp)
|
||||
else state
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
class LwwSpec extends ActiveActiveBaseSpec {
|
||||
import LwwSpec._
|
||||
import ActiveActiveBaseSpec._
|
||||
|
||||
class Setup {
|
||||
val entityId = nextEntityId
|
||||
val r1 = spawn(LwwRegistry.apply(entityId, R1))
|
||||
val r2 = spawn(LwwRegistry.apply(entityId, R2))
|
||||
val r1Probe = createTestProbe[String]()
|
||||
val r2Probe = createTestProbe[String]()
|
||||
val r1GetProbe = createTestProbe[Registry]()
|
||||
val r2GetProbe = createTestProbe[Registry]()
|
||||
}
|
||||
|
||||
"Lww Active Active Event Sourced Behavior" should {
|
||||
"replicate a single event" in new Setup {
|
||||
r1 ! Update("a1", 1L, r1Probe.ref)
|
||||
eventually {
|
||||
val probe = createTestProbe[Registry]()
|
||||
r2 ! Get(probe.ref)
|
||||
probe.expectMessage(Registry("a1", LwwTime(1L, R1)))
|
||||
}
|
||||
}
|
||||
|
||||
"resolve conflict" in new Setup {
|
||||
r1 ! Update("a1", 1L, r1Probe.ref)
|
||||
r2 ! Update("b1", 2L, r2Probe.ref)
|
||||
eventually {
|
||||
r1 ! Get(r1GetProbe.ref)
|
||||
r2 ! Get(r2GetProbe.ref)
|
||||
r1GetProbe.expectMessage(Registry("b1", LwwTime(2L, R2)))
|
||||
r2GetProbe.expectMessage(Registry("b1", LwwTime(2L, R2)))
|
||||
}
|
||||
}
|
||||
|
||||
"have deterministic tiebreak when the same time" in new Setup {
|
||||
r1 ! Update("a1", 1L, r1Probe.ref)
|
||||
r2 ! Update("b1", 1L, r2Probe.ref)
|
||||
// R1 < R2
|
||||
eventually {
|
||||
r1 ! Get(r1GetProbe.ref)
|
||||
r2 ! Get(r2GetProbe.ref)
|
||||
r1GetProbe.expectMessage(Registry("a1", LwwTime(1L, R1)))
|
||||
r2GetProbe.expectMessage(Registry("a1", LwwTime(1L, R1)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||
import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
|
||||
import akka.persistence.typed.{ ActiveActiveBaseSpec, ReplicaId }
|
||||
import ORSetSpec.ORSetEntity._
|
||||
import akka.persistence.typed.ActiveActiveBaseSpec.{ R1, R2 }
|
||||
import akka.persistence.typed.crdt.ORSetSpec.ORSetEntity
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
object ORSetSpec {
|
||||
|
||||
import ActiveActiveBaseSpec._
|
||||
|
||||
object ORSetEntity {
|
||||
sealed trait Command
|
||||
final case class Get(replyTo: ActorRef[Set[String]]) extends Command
|
||||
final case class Add(elem: String) extends Command
|
||||
final case class AddAll(elems: Set[String]) extends Command
|
||||
final case class Remove(elem: String) extends Command
|
||||
|
||||
def apply(entityId: String, replica: ReplicaId): Behavior[ORSetEntity.Command] = {
|
||||
|
||||
ActiveActiveEventSourcing.withSharedJournal(
|
||||
entityId,
|
||||
replica,
|
||||
AllReplicas,
|
||||
PersistenceTestKitReadJournal.Identifier) { aaContext =>
|
||||
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](
|
||||
aaContext.persistenceId,
|
||||
ORSet(replica),
|
||||
(state, command) =>
|
||||
command match {
|
||||
case Add(elem) =>
|
||||
Effect.persist(state + elem)
|
||||
case AddAll(elems) =>
|
||||
Effect.persist(state.addAll(elems.toSet))
|
||||
case Remove(elem) =>
|
||||
Effect.persist(state - elem)
|
||||
case Get(replyTo) =>
|
||||
Effect.none.thenRun(state => replyTo ! state.elements)
|
||||
|
||||
},
|
||||
(state, operation) => state.applyOperation(operation))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ORSetSpec extends ActiveActiveBaseSpec {
|
||||
|
||||
class Setup {
|
||||
val entityId = nextEntityId
|
||||
val r1 = spawn(ORSetEntity.apply(entityId, R1))
|
||||
val r2 = spawn(ORSetEntity.apply(entityId, R2))
|
||||
val r1GetProbe = createTestProbe[Set[String]]()
|
||||
val r2GetProbe = createTestProbe[Set[String]]()
|
||||
|
||||
def assertForAllReplicas(state: Set[String]): Unit = {
|
||||
eventually {
|
||||
r1 ! Get(r1GetProbe.ref)
|
||||
r1GetProbe.expectMessage(state)
|
||||
r2 ! Get(r2GetProbe.ref)
|
||||
r2GetProbe.expectMessage(state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def randomDelay(): Unit = {
|
||||
// exercise different timing scenarios
|
||||
Thread.sleep(Random.nextInt(200).toLong)
|
||||
}
|
||||
|
||||
"ORSet Active Active Entity" should {
|
||||
|
||||
"support concurrent updates" in new Setup {
|
||||
r1 ! Add("a1")
|
||||
r2 ! Add("b1")
|
||||
assertForAllReplicas(Set("a1", "b1"))
|
||||
r2 ! Remove("b1")
|
||||
assertForAllReplicas(Set("a1"))
|
||||
r2 ! Add("b1")
|
||||
for (n <- 2 to 10) {
|
||||
r1 ! Add(s"a$n")
|
||||
if (n % 3 == 0)
|
||||
randomDelay()
|
||||
r2 ! Add(s"b$n")
|
||||
}
|
||||
r1 ! AddAll((11 to 13).map(n => s"a$n").toSet)
|
||||
r2 ! AddAll((11 to 13).map(n => s"b$n").toSet)
|
||||
val expected = (1 to 13).flatMap(n => List(s"a$n", s"b$n")).toSet
|
||||
assertForAllReplicas(expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
|
|||
import akka.persistence.testkit.PersistenceTestKitPlugin
|
||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.crdt.LwwTime
|
||||
import akka.persistence.typed.scaladsl._
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
50
akka-persistence-typed/src/main/protobuf/Crdts.proto
Normal file
50
akka-persistence-typed/src/main/protobuf/Crdts.proto
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
syntax = "proto2";
|
||||
|
||||
option java_package = "akka.persistence.typed.serialization";
|
||||
option optimize_for = SPEED;
|
||||
import "ContainerFormats.proto";
|
||||
|
||||
message Counter {
|
||||
required bytes value = 1;
|
||||
}
|
||||
|
||||
message CounterUpdate {
|
||||
required bytes delta = 1;
|
||||
}
|
||||
|
||||
message ORSet {
|
||||
required string originDc = 1;
|
||||
required VersionVector vvector = 2;
|
||||
repeated VersionVector dots = 3;
|
||||
repeated string stringElements = 4;
|
||||
repeated sint32 intElements = 5 [packed=true];
|
||||
repeated sint64 longElements = 6 [packed=true];
|
||||
repeated Payload otherElements = 7;
|
||||
}
|
||||
|
||||
message ORSetDeltaGroup {
|
||||
message Entry {
|
||||
required ORSetDeltaOp operation = 1;
|
||||
required ORSet underlying = 2;
|
||||
}
|
||||
|
||||
repeated Entry entries = 1;
|
||||
}
|
||||
|
||||
enum ORSetDeltaOp {
|
||||
Add = 0;
|
||||
Remove = 1;
|
||||
Full = 2;
|
||||
}
|
||||
|
||||
message VersionVector {
|
||||
message Entry {
|
||||
required string key = 1;
|
||||
required int64 version = 2;
|
||||
}
|
||||
repeated Entry entries = 1;
|
||||
}
|
||||
|
|
@ -1,3 +1,18 @@
|
|||
akka.actor {
|
||||
|
||||
serialization-identifiers."akka.persistence.typed.serialization.CrdtSerializer" = 40
|
||||
|
||||
serializers.replicated-crdts = "akka.persistence.typed.serialization.CrdtSerializer"
|
||||
|
||||
serialization-bindings {
|
||||
"akka.persistence.typed.crdt.Counter" = replicated-crdts
|
||||
"akka.persistence.typed.crdt.Counter$Updated" = replicated-crdts
|
||||
"akka.persistence.typed.internal.VersionVector" = replicated-crdts
|
||||
"akka.persistence.typed.crdt.ORSet" = replicated-crdts
|
||||
"akka.persistence.typed.crdt.ORSet$DeltaOp" = replicated-crdts
|
||||
}
|
||||
}
|
||||
|
||||
akka.persistence.typed {
|
||||
|
||||
# Persistent actors stash while recovering or persisting events,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
import akka.annotation.ApiMayChange
|
||||
|
||||
@ApiMayChange
|
||||
object Counter {
|
||||
val empty: Counter = Counter(0)
|
||||
|
||||
final case class Updated(delta: BigInt) {
|
||||
|
||||
/**
|
||||
* JAVA API
|
||||
*/
|
||||
def this(delta: java.math.BigInteger) = this(delta: BigInt)
|
||||
|
||||
/**
|
||||
* JAVA API
|
||||
*/
|
||||
def this(delta: Int) = this(delta: BigInt)
|
||||
}
|
||||
}
|
||||
|
||||
@ApiMayChange
|
||||
final case class Counter(value: BigInt) extends OpCrdt[Counter.Updated] {
|
||||
|
||||
override type T = Counter
|
||||
|
||||
override def applyOperation(event: Counter.Updated): Counter =
|
||||
copy(value = value + event.delta)
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
import akka.annotation.ApiMayChange
|
||||
import akka.persistence.typed.ReplicaId
|
||||
|
||||
/**
|
||||
* Utility class for comparing timestamp replica
|
||||
* identifier when implementing last-writer wins.
|
||||
*/
|
||||
@ApiMayChange
|
||||
final case class LwwTime(timestamp: Long, originReplica: ReplicaId) {
|
||||
|
||||
/**
|
||||
* Create a new `LwwTime` that has a `timestamp` that is
|
||||
* `max` of the given timestamp and previous timestamp + 1,
|
||||
* i.e. monotonically increasing.
|
||||
*/
|
||||
def increase(t: Long, replicaId: ReplicaId): LwwTime =
|
||||
LwwTime(math.max(timestamp + 1, t), replicaId)
|
||||
|
||||
/**
|
||||
* Compare this `LwwTime` with the `other`.
|
||||
* Greatest timestamp wins. If both timestamps are
|
||||
* equal the `dc` identifiers are compared and the
|
||||
* one sorted first in alphanumeric order wins.
|
||||
*/
|
||||
def isAfter(other: LwwTime): Boolean = {
|
||||
if (timestamp > other.timestamp) true
|
||||
else if (timestamp < other.timestamp) false
|
||||
else if (other.originReplica.id.compareTo(originReplica.id) > 0) true
|
||||
else false
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,503 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import akka.util.HashCode
|
||||
import akka.annotation.{ ApiMayChange, InternalApi }
|
||||
import akka.persistence.typed.ReplicaId
|
||||
import akka.persistence.typed.crdt.ORSet.DeltaOp
|
||||
import akka.persistence.typed.internal.{ ManyVersionVector, OneVersionVector, VersionVector }
|
||||
|
||||
@ApiMayChange
|
||||
object ORSet {
|
||||
def empty[A](originReplica: ReplicaId): ORSet[A] = new ORSet(originReplica.id, Map.empty, VersionVector.empty)
|
||||
def apply[A](originReplica: ReplicaId): ORSet[A] = empty(originReplica)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def create[A](originReplica: ReplicaId): ORSet[A] = empty(originReplica)
|
||||
|
||||
/**
|
||||
* Extract the [[ORSet#elements]].
|
||||
*/
|
||||
def unapply[A](s: ORSet[A]): Option[Set[A]] = Some(s.elements)
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] type Dot = VersionVector
|
||||
|
||||
sealed trait DeltaOp extends Serializable {
|
||||
def merge(that: DeltaOp): DeltaOp
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] sealed abstract class AtomicDeltaOp[A] extends DeltaOp {
|
||||
def underlying: ORSet[A]
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class AddDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] {
|
||||
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case AddDeltaOp(u) =>
|
||||
// Note that we only merge deltas originating from the same DC
|
||||
AddDeltaOp(
|
||||
new ORSet(
|
||||
underlying.originReplica,
|
||||
concatElementsMap(u.elementsMap.asInstanceOf[Map[A, Dot]]),
|
||||
underlying.vvector.merge(u.vvector)))
|
||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that))
|
||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||
}
|
||||
|
||||
private def concatElementsMap(thatMap: Map[A, Dot]): Map[A, Dot] = {
|
||||
if (thatMap.size == 1) {
|
||||
val head = thatMap.head
|
||||
underlying.elementsMap.updated(head._1, head._2)
|
||||
} else
|
||||
underlying.elementsMap ++ thatMap
|
||||
}
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
@InternalApi private[akka] final case class RemoveDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] {
|
||||
if (underlying.size != 1)
|
||||
throw new IllegalArgumentException(s"RemoveDeltaOp should contain one removed element, but was $underlying")
|
||||
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that)) // keep it simple for removals
|
||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||
}
|
||||
}
|
||||
|
||||
/** INTERNAL API: Used for `clear` but could be used for other cases also */
|
||||
@InternalApi private[akka] final case class FullStateDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] {
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that))
|
||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
|
||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||
case thatAdd: AddDeltaOp[A] =>
|
||||
// merge AddDeltaOp into last AddDeltaOp in the group, if possible
|
||||
ops.last match {
|
||||
case thisAdd: AddDeltaOp[A] => DeltaGroup(ops.dropRight(1) :+ thisAdd.merge(thatAdd))
|
||||
case _ => DeltaGroup(ops :+ thatAdd)
|
||||
}
|
||||
case DeltaGroup(thatOps) => DeltaGroup(ops ++ thatOps)
|
||||
case _ => DeltaGroup(ops :+ that)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Subtract the `vvector` from the `dot`.
|
||||
* What this means is that any (dc, version) pair in
|
||||
* `dot` that is <= an entry in `vvector` is removed from `dot`.
|
||||
* Example [{a, 3}, {b, 2}, {d, 14}, {g, 22}] -
|
||||
* [{a, 4}, {b, 1}, {c, 1}, {d, 14}, {e, 5}, {f, 2}] =
|
||||
* [{b, 2}, {g, 22}]
|
||||
*/
|
||||
@InternalApi private[akka] def subtractDots(dot: Dot, vvector: VersionVector): Dot = {
|
||||
|
||||
@tailrec def dropDots(remaining: List[(String, Long)], acc: List[(String, Long)]): List[(String, Long)] =
|
||||
remaining match {
|
||||
case Nil => acc
|
||||
case (d @ (node, v1)) :: rest =>
|
||||
val v2 = vvector.versionAt(node)
|
||||
if (v2 >= v1)
|
||||
// dot is dominated by version vector, drop it
|
||||
dropDots(rest, acc)
|
||||
else
|
||||
dropDots(rest, d :: acc)
|
||||
}
|
||||
|
||||
if (dot.isEmpty)
|
||||
VersionVector.empty
|
||||
else {
|
||||
dot match {
|
||||
case OneVersionVector(node, v1) =>
|
||||
// if dot is dominated by version vector, drop it
|
||||
if (vvector.versionAt(node) >= v1) VersionVector.empty
|
||||
else dot
|
||||
|
||||
case ManyVersionVector(vs) =>
|
||||
val remaining = vs.toList
|
||||
val newDots = dropDots(remaining, Nil)
|
||||
VersionVector(newDots)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* @see [[ORSet#merge]]
|
||||
*/
|
||||
@InternalApi private[akka] def mergeCommonKeys[A](
|
||||
commonKeys: Set[A],
|
||||
lhs: ORSet[A],
|
||||
rhs: ORSet[A]): Map[A, ORSet.Dot] =
|
||||
mergeCommonKeys(commonKeys.iterator, lhs, rhs)
|
||||
|
||||
private def mergeCommonKeys[A](commonKeys: Iterator[A], lhs: ORSet[A], rhs: ORSet[A]): Map[A, ORSet.Dot] = {
|
||||
commonKeys.foldLeft(Map.empty[A, ORSet.Dot]) {
|
||||
case (acc, k) =>
|
||||
val lhsDots = lhs.elementsMap(k)
|
||||
val rhsDots = rhs.elementsMap(k)
|
||||
(lhsDots, rhsDots) match {
|
||||
case (OneVersionVector(n1, v1), OneVersionVector(n2, v2)) =>
|
||||
if (n1 == n2 && v1 == v2)
|
||||
// one single common dot
|
||||
acc.updated(k, lhsDots)
|
||||
else {
|
||||
// no common, lhsUniqueDots == lhsDots, rhsUniqueDots == rhsDots
|
||||
val lhsKeep = ORSet.subtractDots(lhsDots, rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(rhsDots, lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep)
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
}
|
||||
case (ManyVersionVector(lhsVs), ManyVersionVector(rhsVs)) =>
|
||||
val commonDots = lhsVs.filter {
|
||||
case (thisDotNode, v) => rhsVs.get(thisDotNode).exists(_ == v)
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUniqueDots = lhsVs -- commonDotsKeys
|
||||
val rhsUniqueDots = rhsVs -- commonDotsKeys
|
||||
val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
case (ManyVersionVector(lhsVs), OneVersionVector(n2, v2)) =>
|
||||
val commonDots = lhsVs.filter {
|
||||
case (n1, v1) => v1 == v2 && n1 == n2
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUniqueDots = lhsVs -- commonDotsKeys
|
||||
val rhsUnique = if (commonDotsKeys.isEmpty) rhsDots else VersionVector.empty
|
||||
val lhsKeep = ORSet.subtractDots(VersionVector(lhsUniqueDots), rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(rhsUnique, lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
case (OneVersionVector(n1, v1), ManyVersionVector(rhsVs)) =>
|
||||
val commonDots = rhsVs.filter {
|
||||
case (n2, v2) => v1 == v2 && n1 == n2
|
||||
}
|
||||
val commonDotsKeys = commonDots.keys
|
||||
val lhsUnique = if (commonDotsKeys.isEmpty) lhsDots else VersionVector.empty
|
||||
val rhsUniqueDots = rhsVs -- commonDotsKeys
|
||||
val lhsKeep = ORSet.subtractDots(lhsUnique, rhs.vvector)
|
||||
val rhsKeep = ORSet.subtractDots(VersionVector(rhsUniqueDots), lhs.vvector)
|
||||
val merged = lhsKeep.merge(rhsKeep).merge(VersionVector(commonDots))
|
||||
// Perfectly possible that an item in both sets should be dropped
|
||||
if (merged.isEmpty) acc
|
||||
else acc.updated(k, merged)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* @see [[ORSet#merge]]
|
||||
*/
|
||||
@InternalApi private[akka] def mergeDisjointKeys[A](
|
||||
keys: Set[A],
|
||||
elementsMap: Map[A, ORSet.Dot],
|
||||
vvector: VersionVector,
|
||||
accumulator: Map[A, ORSet.Dot]): Map[A, ORSet.Dot] =
|
||||
mergeDisjointKeys(keys.iterator, elementsMap, vvector, accumulator)
|
||||
|
||||
private def mergeDisjointKeys[A](
|
||||
keys: Iterator[A],
|
||||
elementsMap: Map[A, ORSet.Dot],
|
||||
vvector: VersionVector,
|
||||
accumulator: Map[A, ORSet.Dot]): Map[A, ORSet.Dot] = {
|
||||
keys.foldLeft(accumulator) {
|
||||
case (acc, k) =>
|
||||
val dots = elementsMap(k)
|
||||
if (vvector > dots || vvector == dots)
|
||||
acc
|
||||
else {
|
||||
// Optimise the set of stored dots to include only those unseen
|
||||
val newDots = subtractDots(dots, vvector)
|
||||
acc.updated(k, newDots)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a 'Observed Remove Set' operation based CRDT, also called a 'OR-Set'.
|
||||
* Elements can be added and removed any number of times. Concurrent add wins
|
||||
* over remove.
|
||||
*
|
||||
* It is not implemented as in the paper
|
||||
* <a href="http://hal.upmc.fr/file/index/docid/555588/filename/techreport.pdf">A comprehensive study of Convergent and Commutative Replicated Data Types</a>.
|
||||
* This is more space efficient and doesn't accumulate garbage for removed elements.
|
||||
* It is described in the paper
|
||||
* <a href="https://hal.inria.fr/file/index/docid/738680/filename/RR-8083.pdf">An optimized conflict-free replicated set</a>
|
||||
* The implementation is inspired by the Riak DT <a href="https://github.com/basho/riak_dt/blob/develop/src/riak_dt_orswot.erl">
|
||||
* riak_dt_orswot</a>.
|
||||
*
|
||||
* The ORSet has a version vector that is incremented when an element is added to
|
||||
* the set. The `DC -> count` pair for that increment is stored against the
|
||||
* element as its "birth dot". Every time the element is re-added to the set,
|
||||
* its "birth dot" is updated to that of the `DC -> count` version vector entry
|
||||
* resulting from the add. When an element is removed, we simply drop it, no tombstones.
|
||||
*
|
||||
* When an element exists in replica A and not replica B, is it because A added
|
||||
* it and B has not yet seen that, or that B removed it and A has not yet seen that?
|
||||
* In this implementation we compare the `dot` of the present element to the version vector
|
||||
* in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
|
||||
* that means the other set has yet to see this add, and the item is in the merged
|
||||
* Set. If the Set version vector dominates the dot, that means the other Set has removed this
|
||||
* element already, and the item is not in the merged Set.
|
||||
*
|
||||
* This class is immutable, i.e. "modifying" methods return a new instance.
|
||||
*/
|
||||
@ApiMayChange
|
||||
@SerialVersionUID(1L)
|
||||
final class ORSet[A] private[akka] (
|
||||
val originReplica: String,
|
||||
private[akka] val elementsMap: Map[A, ORSet.Dot],
|
||||
private[akka] val vvector: VersionVector)
|
||||
extends OpCrdt[DeltaOp]
|
||||
with Serializable {
|
||||
|
||||
type T = ORSet[A]
|
||||
type D = ORSet.DeltaOp
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*/
|
||||
def elements: Set[A] = elementsMap.keySet
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getElements(): java.util.Set[A] = {
|
||||
import scala.collection.JavaConverters._
|
||||
elements.asJava
|
||||
}
|
||||
|
||||
def contains(a: A): Boolean = elementsMap.contains(a)
|
||||
|
||||
def isEmpty: Boolean = elementsMap.isEmpty
|
||||
|
||||
def size: Int = elementsMap.size
|
||||
|
||||
/**
|
||||
* Adds an element to the set
|
||||
*/
|
||||
def +(element: A): ORSet.DeltaOp = add(element)
|
||||
|
||||
/**
|
||||
* Adds an element to the set
|
||||
*/
|
||||
def add(element: A): ORSet.DeltaOp = {
|
||||
val newVvector = vvector + originReplica
|
||||
val newDot = VersionVector(originReplica, newVvector.versionAt(originReplica))
|
||||
ORSet.AddDeltaOp(new ORSet(originReplica, Map(element -> newDot), newDot))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Add several elements to the set.
|
||||
* `elems` must not be empty.
|
||||
*/
|
||||
def addAll(elems: java.util.Set[A]): ORSet.DeltaOp = {
|
||||
import scala.collection.JavaConverters._
|
||||
addAll(elems.asScala.toSet)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Add several elements to the set.
|
||||
* `elems` must not be empty.
|
||||
*/
|
||||
def addAll(elems: Set[A]): ORSet.DeltaOp = {
|
||||
if (elems.size == 0) throw new IllegalArgumentException("addAll elems must not be empty")
|
||||
else if (elems.size == 1) add(elems.head)
|
||||
else {
|
||||
val (first, rest) = elems.splitAt(1)
|
||||
val firstOp = add(first.head)
|
||||
val (mergedOps, _) = rest.foldLeft((firstOp, applyOperation(firstOp))) {
|
||||
case ((op, state), elem) =>
|
||||
val nextOp = state.add(elem)
|
||||
val mergedOp = op.merge(nextOp)
|
||||
(mergedOp, state.applyOperation(nextOp))
|
||||
}
|
||||
mergedOps
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes an element from the set.
|
||||
*/
|
||||
def -(element: A): ORSet.DeltaOp = remove(element)
|
||||
|
||||
/**
|
||||
* Removes an element from the set.
|
||||
*/
|
||||
def remove(element: A): ORSet.DeltaOp = {
|
||||
val deltaDot = VersionVector(originReplica, vvector.versionAt(originReplica))
|
||||
ORSet.RemoveDeltaOp(new ORSet(originReplica, Map(element -> deltaDot), vvector))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Remove several elements from the set.
|
||||
* `elems` must not be empty.
|
||||
*/
|
||||
def removeAll(elems: java.util.Set[A]): ORSet.DeltaOp = {
|
||||
import scala.collection.JavaConverters._
|
||||
removeAll(elems.asScala.toSet)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Remove several elements from the set.
|
||||
* `elems` must not be empty.
|
||||
*/
|
||||
def removeAll(elems: Set[A]): ORSet.DeltaOp = {
|
||||
if (elems.size == 0) throw new IllegalArgumentException("removeAll elems must not be empty")
|
||||
else if (elems.size == 1) remove(elems.head)
|
||||
else {
|
||||
val (first, rest) = elems.splitAt(1)
|
||||
val firstOp = remove(first.head)
|
||||
val (mergedOps, _) = rest.foldLeft((firstOp, applyOperation(firstOp))) {
|
||||
case ((op, state), elem) =>
|
||||
val nextOp = state.remove(elem)
|
||||
val mergedOp = op.merge(nextOp)
|
||||
(mergedOp, state.applyOperation(nextOp))
|
||||
}
|
||||
mergedOps
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all elements from the set, but keeps the history.
|
||||
* This has the same result as using [[#remove]] for each
|
||||
* element, but it is more efficient.
|
||||
*/
|
||||
def clear(): ORSet.DeltaOp = {
|
||||
val newFullState = new ORSet[A](originReplica, elementsMap = Map.empty, vvector)
|
||||
ORSet.FullStateDeltaOp(newFullState)
|
||||
}
|
||||
|
||||
/**
|
||||
* When element is in this Set but not in that Set:
|
||||
* Compare the "birth dot" of the present element to the version vector in the Set it is absent from.
|
||||
* If the element dot is not "seen" by other Set version vector, that means the other set has yet to
|
||||
* see this add, and the element is to be in the merged Set.
|
||||
* If the other Set version vector dominates the dot, that means the other Set has removed
|
||||
* the element already, and the element is not to be in the merged Set.
|
||||
*
|
||||
* When element in both this Set and in that Set:
|
||||
* Some dots may still need to be shed. If this Set has dots that the other Set does not have,
|
||||
* and the other Set version vector dominates those dots, then we need to drop those dots.
|
||||
* Keep only common dots, and dots that are not dominated by the other sides version vector
|
||||
*/
|
||||
private def merge(that: ORSet[A], addDeltaOp: Boolean): ORSet[A] = {
|
||||
if (this eq that) this
|
||||
else {
|
||||
val commonKeys =
|
||||
if (this.elementsMap.size < that.elementsMap.size)
|
||||
this.elementsMap.keysIterator.filter(that.elementsMap.contains)
|
||||
else
|
||||
that.elementsMap.keysIterator.filter(this.elementsMap.contains)
|
||||
val entries00 = ORSet.mergeCommonKeys(commonKeys, this, that)
|
||||
val entries0 =
|
||||
if (addDeltaOp)
|
||||
entries00 ++ this.elementsMap.filter { case (elem, _) => !that.elementsMap.contains(elem) } else {
|
||||
val thisUniqueKeys = this.elementsMap.keysIterator.filterNot(that.elementsMap.contains)
|
||||
ORSet.mergeDisjointKeys(thisUniqueKeys, this.elementsMap, that.vvector, entries00)
|
||||
}
|
||||
val thatUniqueKeys = that.elementsMap.keysIterator.filterNot(this.elementsMap.contains)
|
||||
val entries = ORSet.mergeDisjointKeys(thatUniqueKeys, that.elementsMap, this.vvector, entries0)
|
||||
val mergedVvector = this.vvector.merge(that.vvector)
|
||||
|
||||
new ORSet(originReplica, entries, mergedVvector)
|
||||
}
|
||||
}
|
||||
|
||||
override def applyOperation(thatDelta: ORSet.DeltaOp): ORSet[A] = {
|
||||
thatDelta match {
|
||||
case d: ORSet.AddDeltaOp[A] => merge(d.underlying, addDeltaOp = true)
|
||||
case d: ORSet.RemoveDeltaOp[A] => mergeRemoveDelta(d)
|
||||
case d: ORSet.FullStateDeltaOp[A] => merge(d.underlying, addDeltaOp = false)
|
||||
case ORSet.DeltaGroup(ops) =>
|
||||
ops.foldLeft(this) {
|
||||
case (acc, op: ORSet.AddDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = true)
|
||||
case (acc, op: ORSet.RemoveDeltaOp[A]) => acc.mergeRemoveDelta(op)
|
||||
case (acc, op: ORSet.FullStateDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = false)
|
||||
case (_, _: ORSet.DeltaGroup[A]) =>
|
||||
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def mergeRemoveDelta(thatDelta: ORSet.RemoveDeltaOp[A]): ORSet[A] = {
|
||||
val that = thatDelta.underlying
|
||||
val (elem, thatDot) = that.elementsMap.head
|
||||
def deleteDots = that.vvector.versionsIterator
|
||||
def deleteDotsNodes = deleteDots.map { case (dotNode, _) => dotNode }
|
||||
val newElementsMap = {
|
||||
val thisDotOption = this.elementsMap.get(elem)
|
||||
val deleteDotsAreGreater = deleteDots.forall {
|
||||
case (dotNode, dotV) =>
|
||||
thisDotOption match {
|
||||
case Some(thisDot) => thisDot.versionAt(dotNode) <= dotV
|
||||
case None => false
|
||||
}
|
||||
}
|
||||
if (deleteDotsAreGreater) {
|
||||
thisDotOption match {
|
||||
case Some(thisDot) =>
|
||||
if (thisDot.versionsIterator.forall { case (thisDotNode, _) => deleteDotsNodes.contains(thisDotNode) })
|
||||
elementsMap - elem
|
||||
else elementsMap
|
||||
case None =>
|
||||
elementsMap
|
||||
}
|
||||
} else
|
||||
elementsMap
|
||||
}
|
||||
|
||||
val newVvector = vvector.merge(thatDot)
|
||||
new ORSet(originReplica, newElementsMap, newVvector)
|
||||
}
|
||||
|
||||
// this class cannot be a `case class` because we need different `unapply`
|
||||
|
||||
override def toString: String = s"OR$elements"
|
||||
|
||||
override def equals(o: Any): Boolean = o match {
|
||||
case other: ORSet[_] =>
|
||||
originReplica == other.originReplica && vvector == other.vvector && elementsMap == other.elementsMap
|
||||
case _ => false
|
||||
}
|
||||
|
||||
override def hashCode: Int = {
|
||||
var result = HashCode.SEED
|
||||
result = HashCode.hash(result, originReplica)
|
||||
result = HashCode.hash(result, elementsMap)
|
||||
result = HashCode.hash(result, vvector)
|
||||
result
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.crdt
|
||||
|
||||
import akka.annotation.{ ApiMayChange, DoNotInherit }
|
||||
|
||||
@ApiMayChange
|
||||
@DoNotInherit
|
||||
trait OpCrdt[Operation] { self =>
|
||||
type T <: OpCrdt[Operation] { type T = self.T }
|
||||
|
||||
def applyOperation(op: Operation): T
|
||||
}
|
||||
|
|
@ -8,34 +8,6 @@ import akka.persistence.typed.PersistenceId
|
|||
import akka.persistence.typed.ReplicaId
|
||||
import akka.util.WallClock
|
||||
|
||||
/**
|
||||
* Utility class for comparing timestamp and data center
|
||||
* identifier when implementing last-writer wins.
|
||||
*/
|
||||
final case class LwwTime(timestamp: Long, originDc: ReplicaId) {
|
||||
|
||||
/**
|
||||
* Create a new `LwwTime` that has a `timestamp` that is
|
||||
* `max` of the given timestamp and previous timestamp + 1,
|
||||
* i.e. monotonically increasing.
|
||||
*/
|
||||
def increase(t: Long, replicaId: ReplicaId): LwwTime =
|
||||
LwwTime(math.max(timestamp + 1, t), replicaId)
|
||||
|
||||
/**
|
||||
* Compare this `LwwTime` with the `other`.
|
||||
* Greatest timestamp wins. If both timestamps are
|
||||
* equal the `dc` identifiers are compared and the
|
||||
* one sorted first in alphanumeric order wins.
|
||||
*/
|
||||
def isAfter(other: LwwTime): Boolean = {
|
||||
if (timestamp > other.timestamp) true
|
||||
else if (timestamp < other.timestamp) false
|
||||
else if (other.originDc.id.compareTo(originDc.id) > 0) true
|
||||
else false
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME docs
|
||||
trait ActiveActiveContext {
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,269 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.typed.serialization
|
||||
|
||||
import java.io.NotSerializableException
|
||||
import java.util.{ ArrayList, Collections, Comparator }
|
||||
import java.{ lang => jl }
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.annotation.InternalApi
|
||||
import akka.persistence.typed.crdt.{ Counter, ORSet }
|
||||
import akka.persistence.typed.internal.VersionVector
|
||||
import akka.protobufv3.internal.ByteString
|
||||
import akka.remote.ContainerFormats.Payload
|
||||
import akka.remote.serialization.WrappedPayloadSupport
|
||||
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.immutable.TreeMap
|
||||
|
||||
object CrdtSerializer {
|
||||
object Comparator extends Comparator[Payload] {
|
||||
override def compare(a: Payload, b: Payload): Int = {
|
||||
val aByteString = a.getEnclosedMessage
|
||||
val bByteString = b.getEnclosedMessage
|
||||
val aSize = aByteString.size
|
||||
val bSize = bByteString.size
|
||||
if (aSize == bSize) {
|
||||
val aIter = aByteString.iterator
|
||||
val bIter = bByteString.iterator
|
||||
@tailrec def findDiff(): Int = {
|
||||
if (aIter.hasNext) {
|
||||
val aByte = aIter.nextByte()
|
||||
val bByte = bIter.nextByte()
|
||||
if (aByte < bByte) -1
|
||||
else if (aByte > bByte) 1
|
||||
else findDiff()
|
||||
} else 0
|
||||
}
|
||||
findDiff()
|
||||
} else if (aSize < bSize) -1
|
||||
else 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] final class CrdtSerializer(val system: ExtendedActorSystem)
|
||||
extends SerializerWithStringManifest
|
||||
with BaseSerializer {
|
||||
|
||||
private val wrappedSupport = new WrappedPayloadSupport(system)
|
||||
|
||||
private val CrdtCounterManifest = "AA"
|
||||
private val CrdtCounterUpdatedManifest = "AB"
|
||||
|
||||
private val ORSetManifest = "CA"
|
||||
private val ORSetAddManifest = "CB"
|
||||
private val ORSetRemoveManifest = "CC"
|
||||
private val ORSetFullManifest = "CD"
|
||||
private val ORSetDeltaGroupManifest = "CE"
|
||||
|
||||
private val VersionVectorManifest = "DA"
|
||||
|
||||
def manifest(o: AnyRef) = o match {
|
||||
case _: ORSet[_] => ORSetManifest
|
||||
case _: ORSet.AddDeltaOp[_] => ORSetAddManifest
|
||||
case _: ORSet.RemoveDeltaOp[_] => ORSetRemoveManifest
|
||||
case _: ORSet.DeltaGroup[_] => ORSetDeltaGroupManifest
|
||||
case _: ORSet.FullStateDeltaOp[_] => ORSetFullManifest
|
||||
|
||||
case _: Counter => CrdtCounterManifest
|
||||
case _: Counter.Updated => CrdtCounterUpdatedManifest
|
||||
|
||||
case _: VersionVector => VersionVectorManifest
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef) = o match {
|
||||
case m: ORSet[_] => orsetToProto(m).toByteArray
|
||||
case m: ORSet.AddDeltaOp[_] => orsetToProto(m.underlying).toByteArray
|
||||
case m: ORSet.RemoveDeltaOp[_] => orsetToProto(m.underlying).toByteArray
|
||||
case m: ORSet.DeltaGroup[_] => orsetDeltaGroupToProto(m).toByteArray
|
||||
case m: ORSet.FullStateDeltaOp[_] => orsetToProto(m.underlying).toByteArray
|
||||
|
||||
case m: Counter => counterToProtoByteArray(m)
|
||||
case m: Counter.Updated => counterUpdatedToProtoBufByteArray(m)
|
||||
case m: VersionVector => versionVectorToProto(m).toByteArray
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
|
||||
case ORSetManifest => orsetFromBinary(bytes)
|
||||
case ORSetAddManifest => orsetAddFromBinary(bytes)
|
||||
case ORSetRemoveManifest => orsetRemoveFromBinary(bytes)
|
||||
case ORSetFullManifest => orsetFullFromBinary(bytes)
|
||||
case ORSetDeltaGroupManifest => orsetDeltaGroupFromBinary(bytes)
|
||||
|
||||
case CrdtCounterManifest => counterFromBinary(bytes)
|
||||
case CrdtCounterUpdatedManifest => counterUpdatedFromBinary(bytes)
|
||||
|
||||
case VersionVectorManifest => versionVectorFromBinary(bytes)
|
||||
case _ =>
|
||||
throw new NotSerializableException(
|
||||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def counterFromBinary(bytes: Array[Byte]): Counter =
|
||||
Counter(BigInt(Crdts.Counter.parseFrom(bytes).getValue.toByteArray))
|
||||
|
||||
def counterUpdatedFromBinary(bytes: Array[Byte]): Counter.Updated =
|
||||
Counter.Updated(BigInt(Crdts.CounterUpdate.parseFrom(bytes).getDelta.toByteArray))
|
||||
|
||||
def counterToProtoByteArray(counter: Counter): Array[Byte] =
|
||||
Crdts.Counter.newBuilder().setValue(ByteString.copyFrom(counter.value.toByteArray)).build().toByteArray
|
||||
|
||||
def counterUpdatedToProtoBufByteArray(updated: Counter.Updated): Array[Byte] =
|
||||
Crdts.CounterUpdate.newBuilder().setDelta(ByteString.copyFrom(updated.delta.toByteArray)).build().toByteArray
|
||||
|
||||
def orsetToProto(orset: ORSet[_]): Crdts.ORSet =
|
||||
orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]])
|
||||
|
||||
private def orsetToProtoImpl(orset: ORSet[Any]): Crdts.ORSet = {
|
||||
val b = Crdts.ORSet.newBuilder().setOriginDc(orset.originReplica).setVvector(versionVectorToProto(orset.vvector))
|
||||
// using java collections and sorting for performance (avoid conversions)
|
||||
val stringElements = new ArrayList[String]
|
||||
val intElements = new ArrayList[Integer]
|
||||
val longElements = new ArrayList[jl.Long]
|
||||
val otherElements = new ArrayList[Payload]
|
||||
var otherElementsMap = Map.empty[Payload, Any]
|
||||
orset.elementsMap.keysIterator.foreach {
|
||||
case s: String => stringElements.add(s)
|
||||
case i: Int => intElements.add(i)
|
||||
case l: Long => longElements.add(l)
|
||||
case other =>
|
||||
val enclosedMsg = wrappedSupport.payloadBuilder(other).build()
|
||||
otherElements.add(enclosedMsg)
|
||||
// need the mapping back to the `other` when adding dots
|
||||
otherElementsMap = otherElementsMap.updated(enclosedMsg, other)
|
||||
}
|
||||
|
||||
def addDots(elements: ArrayList[_]): Unit = {
|
||||
// add corresponding dots in same order
|
||||
val iter = elements.iterator
|
||||
while (iter.hasNext) {
|
||||
val element = iter.next() match {
|
||||
case enclosedMsg: Payload => otherElementsMap(enclosedMsg)
|
||||
case e => e
|
||||
}
|
||||
b.addDots(versionVectorToProto(orset.elementsMap(element)))
|
||||
}
|
||||
}
|
||||
|
||||
if (!stringElements.isEmpty) {
|
||||
Collections.sort(stringElements)
|
||||
b.addAllStringElements(stringElements)
|
||||
addDots(stringElements)
|
||||
}
|
||||
if (!intElements.isEmpty) {
|
||||
Collections.sort(intElements)
|
||||
b.addAllIntElements(intElements)
|
||||
addDots(intElements)
|
||||
}
|
||||
if (!longElements.isEmpty) {
|
||||
Collections.sort(longElements)
|
||||
b.addAllLongElements(longElements)
|
||||
addDots(longElements)
|
||||
}
|
||||
if (!otherElements.isEmpty) {
|
||||
Collections.sort(otherElements, CrdtSerializer.Comparator)
|
||||
b.addAllOtherElements(otherElements)
|
||||
addDots(otherElements)
|
||||
}
|
||||
|
||||
b.build()
|
||||
}
|
||||
|
||||
def orsetFromBinary(bytes: Array[Byte]): ORSet[Any] =
|
||||
orsetFromProto(Crdts.ORSet.parseFrom(bytes))
|
||||
|
||||
private def orsetAddFromBinary(bytes: Array[Byte]): ORSet.AddDeltaOp[Any] =
|
||||
new ORSet.AddDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes)))
|
||||
|
||||
private def orsetRemoveFromBinary(bytes: Array[Byte]): ORSet.RemoveDeltaOp[Any] =
|
||||
new ORSet.RemoveDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes)))
|
||||
|
||||
private def orsetFullFromBinary(bytes: Array[Byte]): ORSet.FullStateDeltaOp[Any] =
|
||||
new ORSet.FullStateDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes)))
|
||||
|
||||
private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): Crdts.ORSetDeltaGroup = {
|
||||
def createEntry(opType: Crdts.ORSetDeltaOp, u: ORSet[_]) = {
|
||||
Crdts.ORSetDeltaGroup.Entry.newBuilder().setOperation(opType).setUnderlying(orsetToProto(u))
|
||||
}
|
||||
|
||||
val b = Crdts.ORSetDeltaGroup.newBuilder()
|
||||
deltaGroup.ops.foreach {
|
||||
case ORSet.AddDeltaOp(u) =>
|
||||
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Add, u))
|
||||
case ORSet.RemoveDeltaOp(u) =>
|
||||
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Remove, u))
|
||||
case ORSet.FullStateDeltaOp(u) =>
|
||||
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Full, u))
|
||||
case ORSet.DeltaGroup(_) =>
|
||||
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
|
||||
}
|
||||
b.build()
|
||||
}
|
||||
|
||||
private def orsetDeltaGroupFromBinary(bytes: Array[Byte]): ORSet.DeltaGroup[Any] = {
|
||||
val deltaGroup = Crdts.ORSetDeltaGroup.parseFrom(bytes)
|
||||
val ops: Vector[ORSet.DeltaOp] =
|
||||
deltaGroup.getEntriesList.asScala.map { entry =>
|
||||
if (entry.getOperation == Crdts.ORSetDeltaOp.Add)
|
||||
ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying))
|
||||
else if (entry.getOperation == Crdts.ORSetDeltaOp.Remove)
|
||||
ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying))
|
||||
else if (entry.getOperation == Crdts.ORSetDeltaOp.Full)
|
||||
ORSet.FullStateDeltaOp(orsetFromProto(entry.getUnderlying))
|
||||
else
|
||||
throw new NotSerializableException(s"Unknow ORSet delta operation ${entry.getOperation}")
|
||||
}.toVector
|
||||
ORSet.DeltaGroup(ops)
|
||||
}
|
||||
|
||||
def orsetFromProto(orset: Crdts.ORSet): ORSet[Any] = {
|
||||
val elements: Iterator[Any] =
|
||||
(orset.getStringElementsList.iterator.asScala ++
|
||||
orset.getIntElementsList.iterator.asScala ++
|
||||
orset.getLongElementsList.iterator.asScala ++
|
||||
orset.getOtherElementsList.iterator.asScala.map(wrappedSupport.deserializePayload))
|
||||
|
||||
val dots = orset.getDotsList.asScala.map(versionVectorFromProto).iterator
|
||||
val elementsMap = elements.zip(dots).toMap
|
||||
|
||||
new ORSet(orset.getOriginDc, elementsMap, vvector = versionVectorFromProto(orset.getVvector))
|
||||
}
|
||||
|
||||
def versionVectorToProto(versionVector: VersionVector): Crdts.VersionVector = {
|
||||
val b = Crdts.VersionVector.newBuilder()
|
||||
versionVector.versionsIterator.foreach {
|
||||
case (key, value) => b.addEntries(Crdts.VersionVector.Entry.newBuilder().setKey(key).setVersion(value))
|
||||
}
|
||||
b.build()
|
||||
}
|
||||
|
||||
def versionVectorFromBinary(bytes: Array[Byte]): VersionVector =
|
||||
versionVectorFromProto(Crdts.VersionVector.parseFrom(bytes))
|
||||
|
||||
def versionVectorFromProto(versionVector: Crdts.VersionVector): VersionVector = {
|
||||
val entries = versionVector.getEntriesList
|
||||
if (entries.isEmpty)
|
||||
VersionVector.empty
|
||||
else if (entries.size == 1)
|
||||
VersionVector(entries.get(0).getKey, entries.get(0).getVersion)
|
||||
else {
|
||||
val versions = TreeMap.empty[String, Long] ++ versionVector.getEntriesList.asScala.map(entry =>
|
||||
entry.getKey -> entry.getVersion)
|
||||
VersionVector(versions)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -461,6 +461,7 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
|
|||
.dependsOn(
|
||||
actorTyped,
|
||||
streamTyped,
|
||||
remote,
|
||||
persistence % "compile->compile;test->test",
|
||||
persistenceQuery,
|
||||
actorTestkitTyped % "test->test",
|
||||
|
|
@ -470,6 +471,9 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
|
|||
.settings(javacOptions += "-parameters") // for Jackson
|
||||
.settings(Dependencies.persistenceShared)
|
||||
.settings(AutomaticModuleName.settings("akka.persistence.typed"))
|
||||
.settings(Protobuf.settings)
|
||||
// To be able to import ContainerFormats.proto
|
||||
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf"))
|
||||
.settings(OSGi.persistenceTyped)
|
||||
|
||||
lazy val clusterTyped = akkaModule("akka-cluster-typed")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue