Add support for delta-CRDT, #21875

* delta GCounter and PNCounter
* first stab at delta propagation protocol
* send delta in the direct write
* possibility to turn off delta propagation
* tests
* protobuf serializer for DeltaPropagation
* documentation
This commit is contained in:
Patrik Nordwall 2017-01-04 15:25:12 +01:00
parent 2a9fa234a1
commit 3e7ffd6b96
18 changed files with 2408 additions and 98 deletions

View file

@ -44,6 +44,7 @@ import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
import akka.actor.Cancellable
object ReplicatorSettings {
@ -81,7 +82,8 @@ object ReplicatorSettings {
durableStoreProps = Left((config.getString("durable.store-actor-class"), config.getConfig("durable"))),
durableKeys = config.getStringList("durable.keys").asScala.toSet,
pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis,
durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis)
durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis,
deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"))
}
/**
@ -128,20 +130,21 @@ final class ReplicatorSettings(
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String],
val pruningMarkerTimeToLive: FiniteDuration,
val durablePruningMarkerTimeToLive: FiniteDuration) {
val durablePruningMarkerTimeToLive: FiniteDuration,
val deltaCrdtEnabled: Boolean) {
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days)
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true)
// For backwards compatibility
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days)
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true)
def withRole(role: String): ReplicatorSettings = copy(role = ReplicatorSettings.roleOption(role))
@ -191,6 +194,9 @@ final class ReplicatorSettings(
withDurableKeys(durableKeys.asScala.toSet)
}
def withDeltaCrdtEnabled(deltaCrdtEnabled: Boolean): ReplicatorSettings =
copy(deltaCrdtEnabled = deltaCrdtEnabled)
private def copy(
role: Option[String] = role,
gossipInterval: FiniteDuration = gossipInterval,
@ -202,10 +208,11 @@ final class ReplicatorSettings(
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys,
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive): ReplicatorSettings =
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings =
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive)
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled)
}
object Replicator {
@ -572,6 +579,7 @@ object Replicator {
private[akka] object Internal {
case object GossipTick
case object DeltaPropagationTick
case object RemovedNodePruningTick
case object ClockTick
final case class Write(key: String, envelope: DataEnvelope) extends ReplicatorMessage
@ -581,6 +589,8 @@ object Replicator {
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: String, envelope: DataEnvelope)
case object ReadRepairAck
// for testing purposes
final case class TestFullStateGossip(enabled: Boolean)
// Gossip Status message contains SHA-1 digests of the data to determine when
// to send the full data
@ -691,6 +701,8 @@ object Replicator {
}
final case class Gossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class DeltaPropagation(deltas: Map[String, DataEnvelope]) extends ReplicatorMessage
}
}
@ -704,8 +716,8 @@ object Replicator {
* The data types must be convergent CRDTs and implement [[ReplicatedData]], i.e.
* they provide a monotonic merge function and the state changes always converge.
*
* You can use your own custom [[ReplicatedData]] types, and several types are provided
* by this package, such as:
* You can use your own custom [[ReplicatedData]] or [[DeltaReplicatedData]] types,
* and several types are provided by this package, such as:
*
* <ul>
* <li>Counters: [[GCounter]], [[PNCounter]]</li>
@ -726,7 +738,24 @@ object Replicator {
* The `Replicator` actor must be started on each node in the cluster, or group of
* nodes tagged with a specific role. It communicates with other `Replicator` instances
* with the same path (without address) that are running on other nodes . For convenience it
* can be used with the [[DistributedData]] extension.
* can be used with the [[DistributedData]] extension but it can also be started as an ordinary
* actor using the `Replicator.props`. If it is started as an ordinary actor it is important
* that it is given the same name, started on same path, on all nodes.
*
* <a href="paper http://arxiv.org/abs/1603.01529">Delta State Replicated Data Types</a>
* is supported. delta-CRDT is a way to reduce the need for sending the full state
* for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would
* result in sending the delta {'c', 'd'} and merge that with the state on the
* receiving side, resulting in set {'a', 'b', 'c', 'd'}.
*
* Current protocol for replicating the deltas does not support causal consistency.
* It is only eventually consistent. This means that if elements 'c' and 'd' are
* added in two separate `Update` operations these deltas may occasionally be propagated
* to nodes in different order than the causal order of the updates. For this example it
* can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually
* it will be {'a', 'b', 'c', 'd'}. If causal consistency is needed the delta propagation
* should be disabled with configuration property
* `akka.cluster.distributed-data.delta-crdt.enabled=off`.
*
* == Update ==
*
@ -910,6 +939,34 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} else
context.system.deadLetters // not used
val deltaPropagationSelector = new DeltaPropagationSelector {
override val divisor = 5
override def allNodes: Vector[Address] = {
// TODO optimize, by maintaining a sorted instance variable instead
nodes.union(weaklyUpNodes).toVector.sorted
}
override def createDeltaPropagation(deltas: Map[String, ReplicatedData]): DeltaPropagation = {
// Important to include the pruning state in the deltas. For example if the delta is based
// on an entry that has been pruned but that has not yet been performed on the target node.
DeltaPropagation(deltas.map {
case (key, d) getData(key) match {
case Some(envelope) key envelope.copy(data = d)
case None key DataEnvelope(d)
}
}(collection.breakOut))
}
}
val deltaPropagationTask: Option[Cancellable] =
if (deltaCrdtEnabled) {
// Derive the deltaPropagationInterval from the gossipInterval.
// Normally the delta is propagated to all nodes within the gossip tick, so that
// full state gossip is not needed.
val deltaPropagationInterval = (gossipInterval / deltaPropagationSelector.divisor).max(200.millis)
Some(context.system.scheduler.schedule(deltaPropagationInterval, deltaPropagationInterval,
self, DeltaPropagationTick))
} else None
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
@ -933,6 +990,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// for splitting up gossip in chunks
var statusCount = 0L
var statusTotChunks = 0
// possibility to disable Gossip for testing purpose
var fullStateGossipEnabled = true
val subscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
val newSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
@ -965,6 +1024,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
override def postStop(): Unit = {
cluster.unsubscribe(self)
gossipTask.cancel()
deltaPropagationTask.foreach(_.cancel())
notifyTask.cancel()
pruningTask.foreach(_.cancel())
clockTask.cancel()
@ -1011,7 +1071,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case (key, d)
write(key, d.dataEnvelope) match {
case Some(newEnvelope)
if (newEnvelope.data ne d.dataEnvelope.data)
if (newEnvelope ne d.dataEnvelope)
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
@ -1030,6 +1090,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case RemovedNodePruningTick | FlushChanges | GossipTick
// ignore scheduled ticks when loading durable data
case TestFullStateGossip(enabled)
fullStateGossipEnabled = enabled
case m @ (_: Read | _: Write | _: Status | _: Gossip)
// ignore gossip and replication when loading durable data
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
@ -1046,7 +1108,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Read(key) receiveRead(key)
case Write(key, envelope) receiveWrite(key, envelope)
case ReadRepair(key, envelope) receiveReadRepair(key, envelope)
case DeltaPropagation(deltas) receiveDeltaPropagation(deltas)
case FlushChanges receiveFlushChanges()
case DeltaPropagationTick receiveDeltaPropagationTick()
case GossipTick receiveGossipTick()
case ClockTick receiveClockTick()
case Status(otherDigests, chunk, totChunks) receiveStatus(otherDigests, chunk, totChunks)
@ -1066,6 +1130,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Delete(key, consistency, req) receiveDelete(key, consistency, req)
case RemovedNodePruningTick receiveRemovedNodePruningTick()
case GetReplicaCount receiveGetReplicaCount()
case TestFullStateGossip(enabled) fullStateGossipEnabled = enabled
}
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = {
@ -1103,13 +1168,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
localValue match {
case Some(DataEnvelope(DeletedData, _)) throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _))
envelope.merge(modify(Some(existing)).asInstanceOf[existing.T])
case None DataEnvelope(modify(None))
modify(Some(existing)) match {
case d: DeltaReplicatedData if deltaCrdtEnabled
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), Some(d.delta))
case d
(envelope.merge(d.asInstanceOf[existing.T]), None)
}
case None modify(None) match {
case d: DeltaReplicatedData if deltaCrdtEnabled (DataEnvelope(d.resetDelta), Some(d.delta))
case d (DataEnvelope(d), None)
}
}
} match {
case Success(envelope)
log.debug("Received Update for key [{}], old data [{}], new data [{}]", key, localValue, envelope.data)
case Success((envelope, delta))
log.debug("Received Update for key [{}], old data [{}], new data [{}], delta [{}]", key, localValue, envelope.data, delta)
setData(key.id, envelope)
// handle the delta
delta match {
case Some(d) deltaPropagationSelector.update(key.id, d)
case None // not DeltaReplicatedData
}
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
@ -1118,8 +1198,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else
replyTo ! UpdateSuccess(key, req)
} else {
val writeEnvelope = delta match {
case Some(d) DataEnvelope(d)
case None envelope
}
val writeAggregator =
context.actorOf(WriteAggregator.props(key, envelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
@ -1176,13 +1260,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
Some(writeEnvelope2)
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
def writeAndStore(key: String, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
writeAndStore(key, writeEnvelope)
replyTo ! ReadRepairAck
}
@ -1231,6 +1319,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else LazyDigest
dataEntries = dataEntries.updated(key, (envelope, dig))
if (envelope.data == DeletedData)
deltaPropagationSelector.delete(key)
}
def getDigest(key: String): Digest = {
@ -1282,7 +1372,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
changed = Set.empty[String]
}
def receiveGossipTick(): Unit = selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo
def receiveDeltaPropagationTick(): Unit = {
deltaPropagationSelector.collectPropagations().foreach {
case (node, deltaPropagation)
// TODO split it to several DeltaPropagation if too many entries
replica(node) ! deltaPropagation
}
if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.divisor == 0)
deltaPropagationSelector.cleanupDeltaEntries()
}
def receiveDeltaPropagation(deltas: Map[String, DataEnvelope]): Unit = {
if (log.isDebugEnabled)
log.debug("Received DeltaPropagation from [{}], containing [{}]", sender().path.address, deltas.keys.mkString(", "))
deltas.foreach {
case (key, envelope) writeAndStore(key, envelope)
}
}
def receiveGossipTick(): Unit = {
if (fullStateGossipEnabled)
selectRandomNode(nodes.union(weaklyUpNodes).toVector) foreach gossipTo
}
def gossipTo(address: Address): Unit = {
val to = replica(address)
@ -1353,12 +1464,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
updatedData.foreach {
case (key, envelope)
val hadData = dataEntries.contains(key)
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
durableStore ! Store(key, new DurableDataEnvelope(newEnvelope), None)
case None
}
writeAndStore(key, envelope)
if (sendBack) getData(key) match {
case Some(d)
if (hadData || d.pruning.nonEmpty)
@ -1426,6 +1532,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
deltaPropagationSelector.cleanupRemovedNode(m.address)
}
}