causal delivery of deltas, #22188

* keep track of delta interval versions and skip deltas
  that are not consequtive, i.e. when some delta message was lost
* send the delta versions in the full state gossip to sync up the
  expected versions after dropped deltas
* implementation of deltas for ORSet
* refactoring of the delta types to allow for different type for the
  delta and the full state
* extensive tests
* mima filter
* performance optimizations
* simple pruning of deltas
* Java API
* update documentation
* KeyId type alias
* Use InternalApi annotation
This commit is contained in:
Patrik Nordwall 2017-02-07 11:21:56 +01:00
parent 94afbee179
commit b700b840d1
41 changed files with 5010 additions and 1950 deletions

View file

@ -45,6 +45,9 @@ import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
import akka.actor.Cancellable
import scala.util.control.NonFatal
import akka.cluster.ddata.Key.KeyId
import akka.annotation.InternalApi
object ReplicatorSettings {
@ -89,7 +92,7 @@ object ReplicatorSettings {
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
@InternalApi private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
}
@ -128,7 +131,7 @@ final class ReplicatorSettings(
val pruningInterval: FiniteDuration,
val maxPruningDissemination: FiniteDuration,
val durableStoreProps: Either[(String, Config), Props],
val durableKeys: Set[String],
val durableKeys: Set[KeyId],
val pruningMarkerTimeToLive: FiniteDuration,
val durablePruningMarkerTimeToLive: FiniteDuration,
val deltaCrdtEnabled: Boolean) {
@ -183,7 +186,7 @@ final class ReplicatorSettings(
/**
* Scala API
*/
def withDurableKeys(durableKeys: Set[String]): ReplicatorSettings =
def withDurableKeys(durableKeys: Set[KeyId]): ReplicatorSettings =
copy(durableKeys = durableKeys)
/**
@ -206,7 +209,7 @@ final class ReplicatorSettings(
pruningInterval: FiniteDuration = pruningInterval,
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
durableKeys: Set[String] = durableKeys,
durableKeys: Set[KeyId] = durableKeys,
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings =
@ -270,12 +273,12 @@ object Replicator {
/**
* INTERNAL API
*/
private[akka] case object GetKeyIds
@InternalApi private[akka] case object GetKeyIds
/**
* INTERNAL API
*/
private[akka] final case class GetKeyIdsResult(keyIds: Set[String]) {
@InternalApi private[akka] final case class GetKeyIdsResult(keyIds: Set[KeyId]) {
/**
* Java API
*/
@ -576,18 +579,18 @@ object Replicator {
/**
* INTERNAL API
*/
private[akka] object Internal {
@InternalApi private[akka] object Internal {
case object GossipTick
case object DeltaPropagationTick
case object RemovedNodePruningTick
case object ClockTick
final case class Write(key: String, envelope: DataEnvelope) extends ReplicatorMessage
final case class Write(key: KeyId, envelope: DataEnvelope) extends ReplicatorMessage
case object WriteAck extends ReplicatorMessage with DeadLetterSuppression
case object WriteNack extends ReplicatorMessage with DeadLetterSuppression
final case class Read(key: String) extends ReplicatorMessage
final case class Read(key: KeyId) extends ReplicatorMessage
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: String, envelope: DataEnvelope)
final case class ReadRepair(key: KeyId, envelope: DataEnvelope)
case object ReadRepairAck
// for testing purposes
final case class TestFullStateGossip(enabled: Boolean)
@ -603,12 +606,24 @@ object Replicator {
* The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
*/
final case class DataEnvelope(
data: ReplicatedData,
pruning: Map[UniqueAddress, PruningState] = Map.empty)
data: ReplicatedData,
pruning: Map[UniqueAddress, PruningState] = Map.empty,
deltaVersions: VersionVector = VersionVector.empty)
extends ReplicatorMessage {
import PruningState._
def withoutDeltaVersions: DataEnvelope =
if (deltaVersions.isEmpty) this
else copy(deltaVersions = VersionVector.empty)
/**
* We only use the deltaVersions to track versions per node, not for ordering comparisons,
* so we can just remove the entry for the removed node.
*/
private def cleanedDeltaVersions(from: UniqueAddress): VersionVector =
deltaVersions.pruningCleanup(from)
def needPruningFrom(removedNode: UniqueAddress): Boolean =
data match {
case r: RemovedNodePruning r.needPruningFrom(removedNode)
@ -616,7 +631,9 @@ object Replicator {
}
def initRemovedNodePruning(removed: UniqueAddress, owner: UniqueAddress): DataEnvelope = {
copy(pruning = pruning.updated(removed, PruningInitialized(owner, Set.empty)))
copy(
pruning = pruning.updated(removed, PruningInitialized(owner, Set.empty)),
deltaVersions = cleanedDeltaVersions(removed))
}
def prune(from: UniqueAddress, pruningPerformed: PruningPerformed): DataEnvelope = {
@ -626,7 +643,8 @@ object Replicator {
pruning(from) match {
case PruningInitialized(owner, _)
val prunedData = dataWithRemovedNodePruning.prune(from, owner)
copy(data = prunedData, pruning = pruning.updated(from, pruningPerformed))
copy(data = prunedData, pruning = pruning.updated(from, pruningPerformed),
deltaVersions = cleanedDeltaVersions(from))
case _
this
}
@ -659,13 +677,36 @@ object Replicator {
}
}
// cleanup both sides before merging, `merge((otherData: ReplicatedData)` will cleanup other.data
copy(data = cleaned(data, filteredMergedPruning), pruning = filteredMergedPruning).merge(other.data)
// cleanup and merge deltaVersions
val removedNodes = filteredMergedPruning.keys
val cleanedDV = removedNodes.foldLeft(deltaVersions) { (acc, node) acc.pruningCleanup(node) }
val cleanedOtherDV = removedNodes.foldLeft(other.deltaVersions) { (acc, node) acc.pruningCleanup(node) }
val mergedDeltaVersions = cleanedDV.merge(cleanedOtherDV)
// cleanup both sides before merging, `merge(otherData: ReplicatedData)` will cleanup other.data
copy(
data = cleaned(data, filteredMergedPruning),
deltaVersions = mergedDeltaVersions,
pruning = filteredMergedPruning).merge(other.data)
}
def merge(otherData: ReplicatedData): DataEnvelope =
def merge(otherData: ReplicatedData): DataEnvelope = {
if (otherData == DeletedData) DeletedEnvelope
else copy(data = data merge cleaned(otherData, pruning).asInstanceOf[data.T])
else {
val mergedData =
cleaned(otherData, pruning) match {
case d: ReplicatedDelta data match {
case drd: DeltaReplicatedData drd.mergeDelta(d.asInstanceOf[drd.D])
case _ throw new IllegalArgumentException("Expected DeltaReplicatedData")
}
case c data.merge(c.asInstanceOf[data.T])
}
if (data.getClass != mergedData.getClass)
throw new IllegalArgumentException(
s"Wrong type, existing type [${data.getClass.getName}], got [${mergedData.getClass.getName}]")
copy(data = mergedData)
}
}
private def cleaned(c: ReplicatedData, p: Map[UniqueAddress, PruningState]): ReplicatedData = p.foldLeft(c) {
case (c: RemovedNodePruning, (removed, _: PruningPerformed))
@ -693,15 +734,16 @@ object Replicator {
override def merge(that: ReplicatedData): ReplicatedData = DeletedData
}
final case class Status(digests: Map[String, Digest], chunk: Int, totChunks: Int) extends ReplicatorMessage {
final case class Status(digests: Map[KeyId, Digest], chunk: Int, totChunks: Int) extends ReplicatorMessage {
override def toString: String =
(digests.map {
case (key, bytes) key + " -> " + bytes.map(byte f"$byte%02x").mkString("")
}).mkString("Status(", ", ", ")")
}
final case class Gossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class DeltaPropagation(deltas: Map[String, DataEnvelope]) extends ReplicatorMessage
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
final case class DeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]) extends ReplicatorMessage
}
}
@ -748,14 +790,13 @@ object Replicator {
* result in sending the delta {'c', 'd'} and merge that with the state on the
* receiving side, resulting in set {'a', 'b', 'c', 'd'}.
*
* Current protocol for replicating the deltas does not support causal consistency.
* It is only eventually consistent. This means that if elements 'c' and 'd' are
* The protocol for replicating the deltas supports causal consistency if the data type
* is marked with [[RequiresCausalDeliveryOfDeltas]]. Otherwise it is only eventually
* consistent. Without causal consistency it means that if elements 'c' and 'd' are
* added in two separate `Update` operations these deltas may occasionally be propagated
* to nodes in different order than the causal order of the updates. For this example it
* can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually
* it will be {'a', 'b', 'c', 'd'}. If causal consistency is needed the delta propagation
* should be disabled with configuration property
* `akka.cluster.distributed-data.delta-crdt.enabled=off`.
* it will be {'a', 'b', 'c', 'd'}.
*
* == Update ==
*
@ -940,19 +981,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
context.system.deadLetters // not used
val deltaPropagationSelector = new DeltaPropagationSelector {
override val divisor = 5
override val gossipIntervalDivisor = 5
override def allNodes: Vector[Address] = {
// TODO optimize, by maintaining a sorted instance variable instead
nodes.union(weaklyUpNodes).toVector.sorted
nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted
}
override def createDeltaPropagation(deltas: Map[String, ReplicatedData]): DeltaPropagation = {
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = {
// Important to include the pruning state in the deltas. For example if the delta is based
// on an entry that has been pruned but that has not yet been performed on the target node.
DeltaPropagation(deltas.map {
case (key, d) getData(key) match {
case Some(envelope) key envelope.copy(data = d)
case None key DataEnvelope(d)
DeltaPropagation(selfUniqueAddress, deltas.map {
case (key, (d, fromSeqNr, toSeqNr)) getData(key) match {
case Some(envelope) key Delta(envelope.copy(data = d), fromSeqNr, toSeqNr)
case None key Delta(DataEnvelope(d), fromSeqNr, toSeqNr)
}
}(collection.breakOut))
}
@ -962,7 +1003,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// Derive the deltaPropagationInterval from the gossipInterval.
// Normally the delta is propagated to all nodes within the gossip tick, so that
// full state gossip is not needed.
val deltaPropagationInterval = (gossipInterval / deltaPropagationSelector.divisor).max(200.millis)
val deltaPropagationInterval = (gossipInterval / deltaPropagationSelector.gossipIntervalDivisor).max(200.millis)
Some(context.system.scheduler.schedule(deltaPropagationInterval, deltaPropagationInterval,
self, DeltaPropagationTick))
} else None
@ -983,9 +1024,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
var unreachable = Set.empty[Address]
// the actual data
var dataEntries = Map.empty[String, (DataEnvelope, Digest)]
var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
// keys that have changed, Changed event published to subscribers on FlushChanges
var changed = Set.empty[String]
var changed = Set.empty[KeyId]
// for splitting up gossip in chunks
var statusCount = 0L
@ -993,9 +1034,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// possibility to disable Gossip for testing purpose
var fullStateGossipEnabled = true
val subscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
val newSubscribers = new mutable.HashMap[String, mutable.Set[ActorRef]] with mutable.MultiMap[String, ActorRef]
var subscriptionKeys = Map.empty[String, KeyR]
val subscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with mutable.MultiMap[KeyId, ActorRef]
val newSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with mutable.MultiMap[KeyId, ActorRef]
var subscriptionKeys = Map.empty[KeyId, KeyR]
// To be able to do efficient stashing we use this field instead of sender().
// Using internal buffer instead of Stash to avoid the overhead of the Stash mailbox.
@ -1108,7 +1149,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case Read(key) receiveRead(key)
case Write(key, envelope) receiveWrite(key, envelope)
case ReadRepair(key, envelope) receiveReadRepair(key, envelope)
case DeltaPropagation(deltas) receiveDeltaPropagation(deltas)
case DeltaPropagation(from, deltas) receiveDeltaPropagation(from, deltas)
case FlushChanges receiveFlushChanges()
case DeltaPropagationTick receiveDeltaPropagationTick()
case GossipTick receiveGossipTick()
@ -1138,9 +1179,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
log.debug("Received Get for key [{}], local data [{}]", key, localValue)
if (isLocalGet(consistency)) {
val reply = localValue match {
case Some(DataEnvelope(DeletedData, _)) DataDeleted(key, req)
case Some(DataEnvelope(data, _)) GetSuccess(key, req)(data)
case None NotFound(key, req)
case Some(DataEnvelope(DeletedData, _, _)) DataDeleted(key, req)
case Some(DataEnvelope(data, _, _)) GetSuccess(key, req)(data)
case None NotFound(key, req)
}
replyTo ! reply
} else
@ -1155,7 +1196,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case _ false
}
def receiveRead(key: String): Unit = {
def receiveRead(key: KeyId): Unit = {
replyTo ! ReadResult(getData(key))
}
@ -1166,23 +1207,22 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val localValue = getData(key.id)
Try {
localValue match {
case Some(DataEnvelope(DeletedData, _)) throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _))
case Some(DataEnvelope(DeletedData, _, _)) throw new DataDeleted(key, req)
case Some(envelope @ DataEnvelope(existing, _, _))
modify(Some(existing)) match {
case d: DeltaReplicatedData if deltaCrdtEnabled
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), Some(d.delta))
(envelope.merge(d.resetDelta.asInstanceOf[existing.T]), d.delta)
case d
(envelope.merge(d.asInstanceOf[existing.T]), None)
}
case None modify(None) match {
case d: DeltaReplicatedData if deltaCrdtEnabled (DataEnvelope(d.resetDelta), Some(d.delta))
case d: DeltaReplicatedData if deltaCrdtEnabled (DataEnvelope(d.resetDelta), d.delta)
case d (DataEnvelope(d), None)
}
}
} match {
case Success((envelope, delta))
log.debug("Received Update for key [{}], old data [{}], new data [{}], delta [{}]", key, localValue, envelope.data, delta)
setData(key.id, envelope)
// handle the delta
delta match {
@ -1190,23 +1230,28 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case None // not DeltaReplicatedData
}
// note that it's important to do deltaPropagationSelector.update before setData,
// so that the latest delta version is used
val newEnvelope = setData(key.id, envelope)
val durable = isDurable(key.id)
if (isLocalUpdate(writeConsistency)) {
if (durable)
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
else
replyTo ! UpdateSuccess(key, req)
} else {
val writeEnvelope = delta match {
case Some(d) DataEnvelope(d)
case None envelope
case Some(d: RequiresCausalDeliveryOfDeltas) newEnvelope
case Some(d) DataEnvelope(d)
case None newEnvelope
}
val writeAggregator =
context.actorOf(WriteAggregator.props(key, writeEnvelope, writeConsistency, req, nodes, unreachable, replyTo, durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(key.id, new DurableDataEnvelope(envelope),
durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
}
}
@ -1219,7 +1264,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def isDurable(key: String): Boolean =
def isDurable(key: KeyId): Boolean =
durable(key) || (durableWildcards.nonEmpty && durableWildcards.exists(key.startsWith))
def isLocalUpdate(writeConsistency: WriteConsistency): Boolean =
@ -1229,7 +1274,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case _ false
}
def receiveWrite(key: String, envelope: DataEnvelope): Unit = {
def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit = {
write(key, envelope) match {
case Some(newEnvelope)
if (isDurable(key))
@ -1240,27 +1285,38 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope] =
def write(key: KeyId, writeEnvelope: DataEnvelope): Option[DataEnvelope] = {
getData(key) match {
case Some(DataEnvelope(DeletedData, _)) Some(DeletedEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _))
if (existing.getClass == writeEnvelope.data.getClass || writeEnvelope.data == DeletedData) {
case someEnvelope @ Some(envelope) if envelope eq writeEnvelope someEnvelope
case Some(DataEnvelope(DeletedData, _, _)) Some(DeletedEnvelope) // already deleted
case Some(envelope @ DataEnvelope(existing, _, _))
try {
// DataEnvelope will mergeDelta when needed
val merged = envelope.merge(writeEnvelope).addSeen(selfAddress)
setData(key, merged)
Some(merged)
} else {
log.warning(
"Wrong type for writing [{}], existing type [{}], got [{}]",
key, existing.getClass.getName, writeEnvelope.data.getClass.getName)
None
Some(setData(key, merged))
} catch {
case e: IllegalArgumentException
log.warning(
"Couldn't merge [{}], due to: {}", key, e.getMessage)
None
}
case None
val writeEnvelope2 = writeEnvelope.addSeen(selfAddress)
setData(key, writeEnvelope2)
Some(writeEnvelope2)
}
// no existing data for the key
val writeEnvelope2 =
writeEnvelope.data match {
case d: ReplicatedDelta
val z = d.zero
writeEnvelope.copy(data = z.mergeDelta(d.asInstanceOf[z.D]))
case _
writeEnvelope
}
def writeAndStore(key: String, writeEnvelope: DataEnvelope): Unit = {
val writeEnvelope3 = writeEnvelope2.addSeen(selfAddress)
Some(setData(key, writeEnvelope3))
}
}
def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
write(key, writeEnvelope) match {
case Some(newEnvelope)
if (isDurable(key))
@ -1269,21 +1325,21 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit = {
def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit = {
writeAndStore(key, writeEnvelope)
replyTo ! ReadRepairAck
}
def receiveGetKeyIds(): Unit = {
val keys: Set[String] = dataEntries.collect {
case (key, (DataEnvelope(data, _), _)) if data != DeletedData key
val keys: Set[KeyId] = dataEntries.collect {
case (key, (DataEnvelope(data, _, _), _)) if data != DeletedData key
}(collection.breakOut)
replyTo ! GetKeyIdsResult(keys)
}
def receiveDelete(key: KeyR, consistency: WriteConsistency, req: Option[Any]): Unit = {
getData(key.id) match {
case Some(DataEnvelope(DeletedData, _))
case Some(DataEnvelope(DeletedData, _, _))
// already deleted
replyTo ! DataDeleted(key, req)
case _
@ -1307,23 +1363,35 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def setData(key: String, envelope: DataEnvelope): Unit = {
def setData(key: KeyId, envelope: DataEnvelope): DataEnvelope = {
val newEnvelope = {
if (deltaCrdtEnabled) {
val deltaVersions = envelope.deltaVersions
val currVersion = deltaPropagationSelector.currentVersion(key)
if (currVersion == 0L || currVersion == deltaVersions.versionAt(selfUniqueAddress))
envelope
else
envelope.copy(deltaVersions = deltaVersions.merge(VersionVector(selfUniqueAddress, currVersion)))
} else envelope
}
val dig =
if (subscribers.contains(key) && !changed.contains(key)) {
val oldDigest = getDigest(key)
val dig = digest(envelope)
val dig = digest(newEnvelope)
if (dig != oldDigest)
changed += key // notify subscribers, later
dig
} else if (envelope.data == DeletedData) DeletedDigest
} else if (newEnvelope.data == DeletedData) DeletedDigest
else LazyDigest
dataEntries = dataEntries.updated(key, (envelope, dig))
if (envelope.data == DeletedData)
dataEntries = dataEntries.updated(key, (newEnvelope, dig))
if (newEnvelope.data == DeletedData)
deltaPropagationSelector.delete(key)
newEnvelope
}
def getDigest(key: String): Digest = {
def getDigest(key: KeyId): Digest = {
dataEntries.get(key) match {
case Some((envelope, LazyDigest))
val d = digest(envelope)
@ -1337,14 +1405,27 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def digest(envelope: DataEnvelope): Digest =
if (envelope.data == DeletedData) DeletedDigest
else {
val bytes = serializer.toBinary(envelope)
val bytes = serializer.toBinary(envelope.withoutDeltaVersions)
ByteString.fromArray(MessageDigest.getInstance("SHA-1").digest(bytes))
}
def getData(key: String): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) envelope }
def getData(key: KeyId): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) envelope }
def getDeltaSeqNr(key: KeyId, fromNode: UniqueAddress): Long =
dataEntries.get(key) match {
case Some((DataEnvelope(_, _, deltaVersions), _)) deltaVersions.versionAt(fromNode)
case None 0L
}
def isNodeRemoved(node: UniqueAddress, keys: Iterable[KeyId]): Boolean = {
removedNodes.contains(node) || (keys.exists(key dataEntries.get(key) match {
case Some((DataEnvelope(_, pruning, _), _)) pruning.contains(node)
case None false
}))
}
def receiveFlushChanges(): Unit = {
def notify(keyId: String, subs: mutable.Set[ActorRef]): Unit = {
def notify(keyId: KeyId, subs: mutable.Set[ActorRef]): Unit = {
val key = subscriptionKeys(keyId)
getData(keyId) match {
case Some(envelope)
@ -1369,7 +1450,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
newSubscribers.clear()
}
changed = Set.empty[String]
changed = Set.empty[KeyId]
}
def receiveDeltaPropagationTick(): Unit = {
@ -1378,17 +1459,55 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// TODO split it to several DeltaPropagation if too many entries
replica(node) ! deltaPropagation
}
if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.divisor == 0)
if (deltaPropagationSelector.propagationCount % deltaPropagationSelector.gossipIntervalDivisor == 0)
deltaPropagationSelector.cleanupDeltaEntries()
}
def receiveDeltaPropagation(deltas: Map[String, DataEnvelope]): Unit = {
if (log.isDebugEnabled)
log.debug("Received DeltaPropagation from [{}], containing [{}]", sender().path.address, deltas.keys.mkString(", "))
deltas.foreach {
case (key, envelope) writeAndStore(key, envelope)
def receiveDeltaPropagation(fromNode: UniqueAddress, deltas: Map[KeyId, Delta]): Unit =
if (deltaCrdtEnabled) {
try {
val isDebugEnabled = log.isDebugEnabled
if (isDebugEnabled)
log.debug("Received DeltaPropagation from [{}], containing [{}]", fromNode.address,
deltas.collect { case (key, Delta(_, fromSeqNr, toSeqNr)) s"$key $fromSeqNr-$toSeqNr" }.mkString(", "))
if (isNodeRemoved(fromNode, deltas.keys)) {
// Late message from a removed node.
// Drop it to avoid merging deltas that have been pruned on one side.
if (isDebugEnabled) log.debug(
"Skipping DeltaPropagation from [{}] because that node has been removed", fromNode.address)
} else {
deltas.foreach {
case (key, Delta(envelope @ DataEnvelope(_: RequiresCausalDeliveryOfDeltas, _, _), fromSeqNr, toSeqNr))
val currentSeqNr = getDeltaSeqNr(key, fromNode)
if (currentSeqNr >= toSeqNr) {
if (isDebugEnabled) log.debug(
"Skipping DeltaPropagation from [{}] for [{}] because toSeqNr [{}] already handled [{}]",
fromNode.address, key, toSeqNr, currentSeqNr)
} else if (fromSeqNr > (currentSeqNr + 1)) {
if (isDebugEnabled) log.debug(
"Skipping DeltaPropagation from [{}] for [{}] because missing deltas between [{}-{}]",
fromNode.address, key, currentSeqNr + 1, fromSeqNr - 1)
} else {
if (isDebugEnabled) log.debug(
"Applying DeltaPropagation from [{}] for [{}] with sequence numbers [{}], current was [{}]",
fromNode.address, key, s"$fromSeqNr-$toSeqNr", currentSeqNr)
val newEnvelope = envelope.copy(deltaVersions = VersionVector(fromNode, toSeqNr))
writeAndStore(key, newEnvelope)
}
case (key, Delta(envelope, _, _))
// causal delivery of deltas not needed, just apply it
writeAndStore(key, envelope)
}
}
} catch {
case NonFatal(e)
// catching in case we need to support rolling upgrades that are
// mixing nodes with incompatible delta-CRDT types
log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e)
}
}
}
def receiveGossipTick(): Unit = {
if (fullStateGossipEnabled)
@ -1424,12 +1543,12 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def replica(address: Address): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(address))
def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit = {
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", replyTo.path.address,
(chunk + 1), totChunks, otherDigests.keys.mkString(", "))
def isOtherDifferent(key: String, otherDigest: Digest): Boolean = {
def isOtherDifferent(key: KeyId, otherDigest: Digest): Boolean = {
val d = getDigest(key)
d != NotFoundDigest && d != otherDigest
}
@ -1457,10 +1576,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveGossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean): Unit = {
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[String, DataEnvelope]
var replyData = Map.empty[KeyId, DataEnvelope]
updatedData.foreach {
case (key, envelope)
val hadData = dataEntries.contains(key)
@ -1568,7 +1687,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val knownNodes = nodes union weaklyUpNodes union removedNodes.keySet.map(_.address)
val newRemovedNodes =
dataEntries.foldLeft(Set.empty[UniqueAddress]) {
case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _), _)))
case (acc, (_, (envelope @ DataEnvelope(data: RemovedNodePruning, _, _), _)))
acc union data.modifiedByNodes.filterNot(n n == selfUniqueAddress || knownNodes(n.address))
case (acc, _)
acc
@ -1616,7 +1735,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis)
val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning), _))
case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning, _), _))
pruning.foreach {
case (removed, PruningInitialized(owner, seen)) if owner == selfUniqueAddress
&& (allNodes.isEmpty || allNodes.forall(seen))
@ -1634,7 +1753,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def deleteObsoletePruningPerformed(): Unit = {
val currentTime = System.currentTimeMillis()
dataEntries.foreach {
case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning), _))
case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning, _), _))
val newEnvelope = pruning.foldLeft(envelope) {
case (acc, (removed, p: PruningPerformed)) if p.isObsolete(currentTime)
log.debug("Removing obsolete pruning marker for [{}] in [{}]", removed, key)
@ -1660,7 +1779,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
/**
* INTERNAL API
*/
private[akka] object ReadWriteAggregator {
@InternalApi private[akka] object ReadWriteAggregator {
case object SendToSecondary
val MaxSecondaryNodes = 10
@ -1678,7 +1797,7 @@ private[akka] object ReadWriteAggregator {
/**
* INTERNAL API
*/
private[akka] abstract class ReadWriteAggregator extends Actor {
@InternalApi private[akka] abstract class ReadWriteAggregator extends Actor {
import ReadWriteAggregator._
def timeout: FiniteDuration
@ -1719,7 +1838,7 @@ private[akka] abstract class ReadWriteAggregator extends Actor {
/**
* INTERNAL API
*/
private[akka] object WriteAggregator {
@InternalApi private[akka] object WriteAggregator {
def props(
key: KeyR,
envelope: Replicator.Internal.DataEnvelope,
@ -1736,7 +1855,7 @@ private[akka] object WriteAggregator {
/**
* INTERNAL API
*/
private[akka] class WriteAggregator(
@InternalApi private[akka] class WriteAggregator(
key: KeyR,
envelope: Replicator.Internal.DataEnvelope,
consistency: Replicator.WriteConsistency,
@ -1826,7 +1945,7 @@ private[akka] class WriteAggregator(
/**
* INTERNAL API
*/
private[akka] object ReadAggregator {
@InternalApi private[akka] object ReadAggregator {
def props(
key: KeyR,
consistency: Replicator.ReadConsistency,
@ -1843,7 +1962,7 @@ private[akka] object ReadAggregator {
/**
* INTERNAL API
*/
private[akka] class ReadAggregator(
@InternalApi private[akka] class ReadAggregator(
key: KeyR,
consistency: Replicator.ReadConsistency,
req: Option[Any],