Avoid false removals in ClusterReceptionist, #26284

* The scenario was (probably) that a node was restarted with
  same host:port and then didn't join the same cluster. The DData
  Replicator in the original cluster would continue sending messages
  to the new incarnation resulting in false removals.
* The fix is that DData Replicator includes the system uid of the sending
  or target system in messages and if recipient gets a message that is from/to
  unknown it will discard it and thereby not spreading information across
  different clusters.
* Reproduced in ClusterReceptionistSpec
* Much hardening of other things in ClusterReceptionistSpec
* There are also some improvements to ClusterReceptionist to not leak
  Listing with refs of removed nodes.
* use ClusterShuttingDown
* The reason for using sender system uid instead of target uid in messages
  like Read and Write is that then the optimization for sending same message
  to many destinations can remain.
This commit is contained in:
Patrik Nordwall 2019-02-21 09:09:20 +01:00
parent 3cbda93496
commit 825d90bf63
16 changed files with 1714 additions and 396 deletions

View file

@ -5,15 +5,18 @@
package akka.cluster.ddata
import java.security.MessageDigest
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.ThreadLocalRandom
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
@ -34,24 +37,31 @@ import akka.serialization.SerializationExtension
import akka.util.ByteString
import com.typesafe.config.Config
import java.util.function.{ Function => JFunction }
import akka.dispatch.Dispatchers
import akka.actor.DeadLetterSuppression
import akka.cluster.ddata.Key.KeyR
import java.util.Optional
import akka.cluster.ddata.DurableStore._
import akka.actor.ExtendedActorSystem
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
import akka.actor.Cancellable
import scala.util.control.NonFatal
import akka.cluster.ddata.Key.KeyId
import akka.annotation.InternalApi
import scala.collection.immutable.TreeSet
import akka.cluster.MemberStatus
import scala.annotation.varargs
import akka.event.Logging
import akka.util.JavaDurationConverters._
import akka.util.ccompat._
@ -790,10 +800,20 @@ object Replicator {
case object DeltaPropagationTick
case object RemovedNodePruningTick
case object ClockTick
final case class Write(key: KeyId, envelope: DataEnvelope) extends ReplicatorMessage
sealed trait SendingSystemUid {
// FIXME #26566: we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions
def fromNode: Option[UniqueAddress]
}
sealed trait DestinationSystemUid {
// FIXME #26566: we can change from Option to Long after supporting mixed rolling updates for some versions
def toSystemUid: Option[Long]
}
final case class Write(key: KeyId, envelope: DataEnvelope, fromNode: Option[UniqueAddress])
extends ReplicatorMessage
with SendingSystemUid
case object WriteAck extends ReplicatorMessage with DeadLetterSuppression
case object WriteNack extends ReplicatorMessage with DeadLetterSuppression
final case class Read(key: KeyId) extends ReplicatorMessage
final case class Read(key: KeyId, fromNode: Option[UniqueAddress]) extends ReplicatorMessage with SendingSystemUid
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: KeyId, envelope: DataEnvelope)
case object ReadRepairAck
@ -946,7 +966,14 @@ object Replicator {
override def merge(that: ReplicatedData): ReplicatedData = DeletedData
}
final case class Status(digests: Map[KeyId, Digest], chunk: Int, totChunks: Int) extends ReplicatorMessage {
final case class Status(
digests: Map[KeyId, Digest],
chunk: Int,
totChunks: Int,
toSystemUid: Option[Long],
fromSystemUid: Option[Long])
extends ReplicatorMessage
with DestinationSystemUid {
override def toString: String =
(digests
.map {
@ -954,11 +981,22 @@ object Replicator {
})
.mkString("Status(", ", ", ")")
}
final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class Gossip(
updatedData: Map[KeyId, DataEnvelope],
sendBack: Boolean,
toSystemUid: Option[Long],
fromSystemUid: Option[Long])
extends ReplicatorMessage
with DestinationSystemUid
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta])
final case class DeltaPropagation(_fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta])
extends ReplicatorMessage
with SendingSystemUid {
// FIXME we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions,
// i.e. remove this and rename _fromNode
override def fromNode: Option[UniqueAddress] = Some(_fromNode)
}
object DeltaPropagation {
/**
@ -1186,6 +1224,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val cluster = Cluster(context.system)
val selfAddress = cluster.selfAddress
val selfUniqueAddress = cluster.selfUniqueAddress
val selfFromSystemUid = Some(selfUniqueAddress.longUid)
require(!cluster.isTerminated, "Cluster node must not be terminated")
require(
@ -1223,9 +1262,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val deltaPropagationSelector = new DeltaPropagationSelector {
override val gossipIntervalDivisor = 5
override def allNodes: Vector[Address] = {
override def allNodes: Vector[UniqueAddress] = {
// TODO optimize, by maintaining a sorted instance variable instead
nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted
Replicator.this.allNodes.diff(unreachable).toVector.sorted
}
override def maxDeltaSize: Int = settings.maxDeltaSize
@ -1256,11 +1295,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
.schedule(deltaPropagationInterval, deltaPropagationInterval, self, DeltaPropagationTick))
} else None
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
// cluster nodes, doesn't contain selfAddress, doesn't contain joining and weaklyUp
var nodes: Set[UniqueAddress] = Set.empty
// cluster weaklyUp nodes, doesn't contain selfAddress
var weaklyUpNodes: Set[Address] = Set.empty
var weaklyUpNodes: Set[UniqueAddress] = Set.empty
// cluster joining nodes, doesn't contain selfAddress
var joiningNodes: Set[UniqueAddress] = Set.empty
// up and weaklyUp nodes, doesn't contain joining and not selfAddress
private def allNodes: Set[UniqueAddress] = nodes.union(weaklyUpNodes)
private def isKnownNode(node: UniqueAddress): Boolean =
nodes(node) || weaklyUpNodes(node) || joiningNodes(node) || selfUniqueAddress == node
var removedNodes: Map[UniqueAddress, Long] = Map.empty
// all nodes sorted with the leader first
@ -1271,7 +1319,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// for pruning timeouts are based on clock that is only increased when all nodes are reachable
var previousClockTime = System.nanoTime()
var allReachableClockTime = 0L
var unreachable = Set.empty[Address]
var unreachable = Set.empty[UniqueAddress]
// the actual data
var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
@ -1397,32 +1445,66 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// MUST use replyTo instead of sender() and forward from normalReceive, because of the stash in load
val normalReceive: Receive = {
case Get(key, consistency, req) => receiveGet(key, consistency, req)
case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req)
case Read(key) => receiveRead(key)
case Write(key, envelope) => receiveWrite(key, envelope)
case ReadRepair(key, envelope) => receiveReadRepair(key, envelope)
case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas)
case FlushChanges => receiveFlushChanges()
case DeltaPropagationTick => receiveDeltaPropagationTick()
case GossipTick => receiveGossipTick()
case ClockTick => receiveClockTick()
case Status(otherDigests, chunk, totChunks) => receiveStatus(otherDigests, chunk, totChunks)
case Gossip(updatedData, sendBack) => receiveGossip(updatedData, sendBack)
case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber)
case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber)
case Terminated(ref) => receiveTerminated(ref)
case MemberWeaklyUp(m) => receiveWeaklyUpMemberUp(m)
case MemberUp(m) => receiveMemberUp(m)
case MemberRemoved(m, _) => receiveMemberRemoved(m)
case evt: MemberEvent => receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) => receiveUnreachable(m)
case ReachableMember(m) => receiveReachable(m)
case GetKeyIds => receiveGetKeyIds()
case Delete(key, consistency, req) => receiveDelete(key, consistency, req)
case RemovedNodePruningTick => receiveRemovedNodePruningTick()
case GetReplicaCount => receiveGetReplicaCount()
case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled
case msg: DestinationSystemUid =>
msg.toSystemUid match {
case Some(uid) if uid != selfUniqueAddress.longUid =>
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was intended for this incarnation and otherwise
// it is discarded.
log.info(
"Ignoring message [{}] from [{}] intended for system uid [{}], self uid is [{}]",
Logging.simpleName(msg),
replyTo,
uid,
selfUniqueAddress.longUid)
case _ =>
msg match {
case Status(otherDigests, chunk, totChunks, _, fromSystemUid) =>
receiveStatus(otherDigests, chunk, totChunks, fromSystemUid)
case Gossip(updatedData, sendBack, _, fromSystemUid) =>
receiveGossip(updatedData, sendBack, fromSystemUid)
}
}
case msg: SendingSystemUid =>
msg.fromNode match {
case Some(fromNode) if !isKnownNode(fromNode) =>
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was from a known cluster member
log.info("Ignoring message [{}] from [{}] unknown node [{}]", Logging.simpleName(msg), replyTo, fromNode)
case _ =>
msg match {
case Read(key, _) => receiveRead(key)
case Write(key, envelope, _) => receiveWrite(key, envelope)
case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas)
}
}
case Get(key, consistency, req) => receiveGet(key, consistency, req)
case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req)
case ReadRepair(key, envelope) => receiveReadRepair(key, envelope)
case FlushChanges => receiveFlushChanges()
case DeltaPropagationTick => receiveDeltaPropagationTick()
case GossipTick => receiveGossipTick()
case ClockTick => receiveClockTick()
case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber)
case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber)
case Terminated(ref) => receiveTerminated(ref)
case MemberJoined(m) => receiveMemberJoining(m)
case MemberWeaklyUp(m) => receiveMemberWeaklyUp(m)
case MemberUp(m) => receiveMemberUp(m)
case MemberRemoved(m, _) => receiveMemberRemoved(m)
case evt: MemberEvent => receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) => receiveUnreachable(m)
case ReachableMember(m) => receiveReachable(m)
case GetKeyIds => receiveGetKeyIds()
case Delete(key, consistency, req) => receiveDelete(key, consistency, req)
case RemovedNodePruningTick => receiveRemovedNodePruningTick()
case GetReplicaCount => receiveGetReplicaCount()
case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled
}
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = {
@ -1438,7 +1520,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} else
context.actorOf(
ReadAggregator
.props(key, consistency, req, nodes, unreachable, localValue, replyTo)
.props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo)
.withDispatcher(context.props.dispatcher))
}
@ -1521,7 +1603,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val writeAggregator =
context.actorOf(
WriteAggregator
.props(key, writeEnvelope, writeDelta, writeConsistency, req, nodes, unreachable, replyTo, durable)
.props(
key,
writeEnvelope,
writeDelta,
writeConsistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(
@ -1630,7 +1722,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val writeAggregator =
context.actorOf(
WriteAggregator
.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable)
.props(
key,
DeletedEnvelope,
None,
consistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(
@ -1811,13 +1913,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveGossipTick(): Unit = {
if (fullStateGossipEnabled)
selectRandomNode(nodes.union(weaklyUpNodes).toVector).foreach(gossipTo)
selectRandomNode(allNodes.toVector).foreach(gossipTo)
}
def gossipTo(address: Address): Unit = {
def gossipTo(address: UniqueAddress): Unit = {
val to = replica(address)
val toSystemUid = Some(address.longUid)
if (dataEntries.size <= maxDeltaElements) {
val status = Status(dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) }, chunk = 0, totChunks = 1)
val status = Status(
dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) },
chunk = 0,
totChunks = 1,
toSystemUid,
selfFromSystemUid)
to ! status
} else {
val totChunks = dataEntries.size / maxDeltaElements
@ -1831,19 +1939,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val chunk = (statusCount % totChunks).toInt
val status = Status(dataEntries.collect {
case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk => (key, getDigest(key))
}, chunk, totChunks)
}, chunk, totChunks, toSystemUid, selfFromSystemUid)
to ! status
}
}
}
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
def selectRandomNode(addresses: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size)))
def replica(address: Address): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(address))
def replica(node: UniqueAddress): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(node.address))
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int): Unit = {
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug(
"Received gossip status from [{}], chunk [{}] of [{}] containing [{}]",
@ -1868,7 +1976,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (keys.nonEmpty) {
if (log.isDebugEnabled)
log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", "))
val g = Gossip(keys.iterator.map(k => k -> getData(k).get).toMap, sendBack = otherDifferentKeys.nonEmpty)
val g = Gossip(
keys.iterator.map(k => k -> getData(k).get).toMap,
sendBack = otherDifferentKeys.nonEmpty,
fromSystemUid,
selfFromSystemUid)
replyTo ! g
}
val myMissingKeys = otherKeys.diff(myKeys)
@ -1878,12 +1990,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
"Sending gossip status to [{}], requesting missing [{}]",
replyTo.path.address,
myMissingKeys.mkString(", "))
val status = Status(myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap, chunk, totChunks)
val status = Status(
myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap,
chunk,
totChunks,
fromSystemUid,
selfFromSystemUid)
replyTo ! status
}
}
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean): Unit = {
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[KeyId, DataEnvelope]
@ -1899,7 +2016,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
if (sendBack && replyData.nonEmpty)
replyTo ! Gossip(replyData, sendBack = false)
replyTo ! Gossip(replyData, sendBack = false, fromSystemUid, selfFromSystemUid)
}
def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = {
@ -1943,16 +2060,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveWeaklyUpMemberUp(m: Member): Unit =
def receiveMemberJoining(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress)
weaklyUpNodes += m.address
joiningNodes += m.uniqueAddress
def receiveMemberWeaklyUp(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress) {
weaklyUpNodes += m.uniqueAddress
joiningNodes -= m.uniqueAddress
}
def receiveMemberUp(m: Member): Unit =
if (matchingRole(m)) {
leader += m
if (m.address != selfAddress) {
nodes += m.address
weaklyUpNodes -= m.address
nodes += m.uniqueAddress
weaklyUpNodes -= m.uniqueAddress
joiningNodes -= m.uniqueAddress
}
}
@ -1960,14 +2084,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (m.address == selfAddress)
context.stop(self)
else if (matchingRole(m)) {
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
// filter, it's possible that the ordering is changed since it based on MemberStatus
leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress)
nodes -= m.address
weaklyUpNodes -= m.address
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
nodes -= m.uniqueAddress
weaklyUpNodes -= m.uniqueAddress
joiningNodes -= m.uniqueAddress
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
deltaPropagationSelector.cleanupRemovedNode(m.address)
unreachable -= m.uniqueAddress
deltaPropagationSelector.cleanupRemovedNode(m.uniqueAddress)
}
}
@ -1979,10 +2104,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveUnreachable(m: Member): Unit =
if (matchingRole(m)) unreachable += m.address
if (matchingRole(m)) unreachable += m.uniqueAddress
def receiveReachable(m: Member): Unit =
if (matchingRole(m)) unreachable -= m.address
if (matchingRole(m)) unreachable -= m.uniqueAddress
def receiveClockTick(): Unit = {
val now = System.nanoTime()
@ -2004,11 +2129,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def collectRemovedNodes(): Unit = {
val knownNodes = nodes.union(weaklyUpNodes).union(removedNodes.keySet.map(_.address))
val knownNodes = allNodes.union(removedNodes.keySet)
val newRemovedNodes =
dataEntries.foldLeft(Set.empty[UniqueAddress]) {
case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) =>
acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n.address)))
acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n)))
case (acc, _) =>
acc
}
@ -2023,7 +2148,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// initiate pruning for removed nodes
val removedSet: Set[UniqueAddress] = removedNodes.iterator
.collect {
case (r, t) if ((allReachableClockTime - t) > maxPruningDisseminationNanos) => r
case (r, t) if (allReachableClockTime - t) > maxPruningDisseminationNanos => r
}
.to(immutable.Set)
@ -2053,7 +2178,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def performRemovedNodePruning(): Unit = {
// perform pruning when all seen Init
val allNodes = nodes.union(weaklyUpNodes)
val all = allNodes
val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis)
val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
@ -2061,7 +2186,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
pruning.foreach {
case (removed, PruningInitialized(owner, seen))
if owner == selfUniqueAddress
&& (allNodes.isEmpty || allNodes.forall(seen)) =>
&& (all.isEmpty || all.forall(n => seen(n.address))) =>
val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
@ -2124,22 +2249,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import ReadWriteAggregator._
def timeout: FiniteDuration
def nodes: Set[Address]
def unreachable: Set[Address]
def reachableNodes: Set[Address] = nodes.diff(unreachable)
def nodes: Set[UniqueAddress]
def unreachable: Set[UniqueAddress]
def reachableNodes: Set[UniqueAddress] = nodes.diff(unreachable)
import context.dispatcher
var sendToSecondarySchedule = context.system.scheduler.scheduleOnce(timeout / 5, self, SendToSecondary)
var timeoutSchedule = context.system.scheduler.scheduleOnce(timeout, self, ReceiveTimeout)
var remaining = nodes
var remaining = nodes.map(_.address)
def doneWhenRemainingSize: Int
def primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas: Boolean): (Vector[Address], Vector[Address]) = {
def primaryAndSecondaryNodes(
requiresCausalDeliveryOfDeltas: Boolean): (Vector[UniqueAddress], Vector[UniqueAddress]) = {
val primarySize = nodes.size - doneWhenRemainingSize
if (primarySize >= nodes.size)
(nodes.toVector, Vector.empty[Address])
(nodes.toVector, Vector.empty[UniqueAddress])
else {
// Prefer to use reachable nodes over the unreachable nodes first.
// When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent
@ -2157,8 +2283,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
timeoutSchedule.cancel()
}
def replica(address: Address): ActorSelection =
context.actorSelection(context.parent.path.toStringWithAddress(address))
def replica(node: UniqueAddress): ActorSelection =
context.actorSelection(context.parent.path.toStringWithAddress(node.address))
}
@ -2172,12 +2298,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency,
req: Option[Any],
nodes: Set[Address],
unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean): Props =
Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
.withDeploy(Deploy.local)
Props(
new WriteAggregator(
key,
envelope,
delta,
consistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)).withDeploy(Deploy.local)
}
/**
@ -2189,18 +2326,18 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency,
req: Option[Any],
override val nodes: Set[Address],
override val unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
override val nodes: Set[UniqueAddress],
override val unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean)
extends ReadWriteAggregator {
extends ReadWriteAggregator
with ActorLogging {
import Replicator._
import Replicator.Internal._
import ReadWriteAggregator._
val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
override def timeout: FiniteDuration = consistency.timeout
override val doneWhenRemainingSize = consistency match {
@ -2214,7 +2351,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator")
}
val writeMsg = Write(key.id, envelope)
val writeMsg = Write(key.id, envelope, Some(selfUniqueAddress))
val deltaMsg = delta match {
case None => None
case Some(d) => Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id -> d)))
@ -2267,7 +2404,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// Deltas must be applied in order and we can't keep track of ordering of
// simultaneous updates so there is a chance that the delta could not be applied.
// Try again with the full state to the primary nodes that have not acked.
primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg }
primaryNodes.foreach { to =>
if (remaining(to.address))
replica(to) ! writeMsg
}
}
secondaryNodes.foreach { replica(_) ! writeMsg }
case ReceiveTimeout =>
@ -2309,11 +2449,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
key: KeyR,
consistency: Replicator.ReadConsistency,
req: Option[Any],
nodes: Set[Address],
unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
localValue: Option[Replicator.Internal.DataEnvelope],
replyTo: ActorRef): Props =
Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo)).withDeploy(Deploy.local)
Props(new ReadAggregator(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo))
.withDeploy(Deploy.local)
}
@ -2324,8 +2466,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
key: KeyR,
consistency: Replicator.ReadConsistency,
req: Option[Any],
override val nodes: Set[Address],
override val unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
override val nodes: Set[UniqueAddress],
override val unreachable: Set[UniqueAddress],
localValue: Option[Replicator.Internal.DataEnvelope],
replyTo: ActorRef)
extends ReadWriteAggregator {
@ -2348,7 +2491,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator")
}
val readMsg = Read(key.id)
val readMsg = Read(key.id, Some(selfUniqueAddress))
private val (primaryNodes, secondaryNodes) = {
primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false)