Merge pull request #26421 from akka/wip-26284-removals-ClusterReceptionist-patriknw

Avoid false removals in ClusterReceptionist, #26284
This commit is contained in:
Patrik Nordwall 2019-03-28 10:08:57 +01:00 committed by GitHub
commit 7d58a1a2b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1714 additions and 396 deletions

View file

@ -18,9 +18,15 @@ import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator }
import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress }
import akka.remote.AddressUidExtension
import akka.util.TypedMultiMap
import scala.concurrent.duration._
import akka.actor.Address
import akka.cluster.ClusterEvent.ClusterDomainEvent
import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.ClusterEvent.MemberJoined
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.MemberWeaklyUp
// just to provide a log class
/** INTERNAL API */
@InternalApi
@ -39,16 +45,19 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// values contain system uid to make it possible to discern actors at the same
// path in different incarnations of a cluster node
final case class Entry(ref: ActorRef[_], systemUid: Long) {
def uniqueAddress(selfUniqueAddress: UniqueAddress): UniqueAddress =
if (ref.path.address.hasLocalScope) selfUniqueAddress
def uniqueAddress(selfAddress: Address): UniqueAddress =
if (ref.path.address.hasLocalScope) UniqueAddress(selfAddress, systemUid)
else UniqueAddress(ref.path.address, systemUid)
override def toString = ref.path.toString + "#" + ref.path.uid
override def toString: String =
s"${ref.path.toString}#${ref.path.uid} @ $systemUid"
}
private sealed trait InternalCommand extends Command
private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]])
extends InternalCommand
private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand
private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
private final case class ChangeFromReplicator(key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry])
extends InternalCommand
@ -76,7 +85,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
ctx.setLoggerClass(classOf[ClusterReceptionist])
Behaviors.withTimers { timers =>
val setup = new Setup(ctx)
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount)
// include selfUniqueAddress so that it can be used locally before joining cluster
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress)
// subscribe to changes from other nodes
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
@ -90,13 +100,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
registry.allDdataKeys.foreach(key =>
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped))
// keep track of cluster members
// remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[MemberRemoved] =
ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) }
val clusterEventMessageAdapter: ActorRef[ClusterDomainEvent] =
ctx.messageAdapter[ClusterDomainEvent] {
case MemberJoined(member) => NodeAdded(member.uniqueAddress)
case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress)
case MemberUp(member) => NodeAdded(member.uniqueAddress)
case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress)
case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress)
case other =>
throw new IllegalStateException(s"Unexpected ClusterDomainEvent $other. Please report bug.")
}
setup.cluster.subscribe(
clusterEventMessageAdapter.toUntyped,
ClusterEvent.InitialStateAsEvents,
classOf[MemberRemoved])
classOf[MemberJoined],
classOf[MemberWeaklyUp],
classOf[MemberUp],
classOf[MemberRemoved],
ClusterShuttingDown.getClass)
// also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update,
// which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message
@ -129,28 +152,21 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit =
ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx =>
innerCtx.watch(target)
Behaviors.receive[Nothing]((_, _) => Behaviors.same).receiveSignal {
Behaviors.receiveSignal[Nothing] {
case (_, Terminated(`target`)) =>
ctx.self ! msg
Behaviors.stopped
}
})
def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = {
// filter tombstoned refs to avoid an extra update
// to subscribers in the case of lost removals (because of how ORMultiMap works)
val refsForKey = state.actorRefsFor(key)
val refsWithoutTombstoned =
if (registry.tombstones.isEmpty) refsForKey
else refsForKey.filterNot(registry.hasTombstone)
val msg = ReceptionistMessages.Listing(key.asServiceKey, refsWithoutTombstoned)
subscriptions.get(key).foreach(_ ! msg)
def isLeader = {
cluster.state.leader.contains(cluster.selfAddress)
}
def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Command] = {
def nodesRemoved(addresses: Set[UniqueAddress]): Unit = {
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress))
if (isLeader) {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
val removals = {
registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
@ -164,7 +180,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
if (removals.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug(
"Node(s) [{}] removed, updating registry removing: [{}]",
"ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
cluster.selfAddress,
addresses.mkString(","),
removals
.map {
@ -185,33 +202,38 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
}
Behaviors.same
}
def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Actor was registered: [{}] [{}]", key, entry)
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None =>
}
val ddataKey = registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
if (serviceInstance.path.address.hasLocalScope) {
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None =>
}
val ddataKey = registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
}
} else {
ctx.log.error(s"ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance)
}
Behaviors.same
case ReceptionistMessages.Find(key, replyTo) =>
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key))
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress))
Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) =>
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
// immediately reply with initial listings to the new subscriber
subscriber ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key))
val listing =
ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress))
subscriber ! listing
next(newSubscriptions = subscriptions.inserted(key)(subscriber))
}
@ -223,7 +245,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case RegisteredActorTerminated(key, serviceInstance) =>
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry)
ctx.log.debug(
"ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]",
cluster.selfAddress,
key.asServiceKey.id,
entry)
val ddataKey = registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
@ -240,25 +266,42 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) {
ctx.log.debug(
"Change from replicator: [{}], changes: [{}], tombstones [{}]",
"ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]",
cluster.selfAddress,
newState.entries.entries,
changedKeys
.map(key => key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]"))
.mkString(", "),
registry.tombstones.mkString(", "))
newRegistry.tombstones.mkString(", "))
}
changedKeys.foreach { changedKey =>
notifySubscribersFor(changedKey, newState)
val serviceKey = changedKey.asServiceKey
val subscribers = subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val listing =
ReceptionistMessages
.Listing(serviceKey, newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress))
subscribers.foreach(_ ! listing)
}
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
val serviceKey = changedKey.asServiceKey
val tombstonedButReAdded =
newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone)
tombstonedButReAdded.foreach { actorRef =>
ctx.log.debug("Saw actorref that was tomstoned {}, re-removing.", actorRef)
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone)
if (tombstonedButReAdded.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug(
"ClusterReceptionist [{}] - Saw ActorRefs that were tomstoned [{}], re-removing.",
cluster.selfAddress,
tombstonedButReAdded.mkString(", "))
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeBinding(serviceKey, Entry(actorRef, setup.selfSystemUid)).toORMultiMap
tombstonedButReAdded
.foldLeft(ServiceRegistry(registry)) { (acc, ref) =>
acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid))
}
.toORMultiMap
}
}
}
@ -268,34 +311,51 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behaviors.same
}
case NodeAdded(uniqueAddress) =>
next(registry.addNode(uniqueAddress))
case NodeRemoved(uniqueAddress) =>
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress)) {
ctx.log.debug(s"Leader node observed removed address [{}]", uniqueAddress)
nodesRemoved(Set(uniqueAddress))
} else Behaviors.same
if (uniqueAddress == selfUniqueAddress) {
ctx.log.debug("ClusterReceptionist [{}] - terminated/removed", cluster.selfAddress)
// If self cluster node is shutting down our own entries should have been removed via
// watch-Terminated or will be removed by other nodes. This point is anyway too late.
Behaviors.stopped
} else {
// Ok to update from several nodes but more efficient to try to do it from one node.
if (isLeader) {
ctx.log.debug(
"ClusterReceptionist [{}] - Leader node observed removed node [{}]",
cluster.selfAddress,
uniqueAddress)
nodesRemoved(Set(uniqueAddress))
}
next(registry.removeNode(uniqueAddress))
}
case RemoveTick =>
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress)) {
if (isLeader) {
val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress)
val clusterAddresses = cluster.state.members.map(_.uniqueAddress)
val notInCluster = allAddressesInState.diff(clusterAddresses)
val notInCluster = allAddressesInState.diff(registry.nodes)
if (notInCluster.isEmpty) Behavior.same
else {
if (ctx.log.isDebugEnabled)
ctx.log.debug("Leader node cleanup tick, removed nodes: [{}]", notInCluster.mkString(","))
ctx.log.debug(
"ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]",
cluster.selfAddress,
notInCluster.mkString(","))
nodesRemoved(notInCluster)
}
} else
Behavior.same
}
Behavior.same
case PruneTombstonesTick =>
val prunedRegistry = registry.pruneTombstones()
if (prunedRegistry eq registry) Behaviors.same
else {
ctx.log.debug(s"Pruning tombstones")
ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress)
next(prunedRegistry)
}
}
@ -309,4 +369,5 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
}
}
}

View file

@ -23,23 +23,26 @@ import scala.concurrent.duration.Deadline
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
key -> new ServiceRegistry(EmptyORMultiMap)
}.toMap
new ShardedServiceRegistry(emptyRegistries, Map.empty)
new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty)
}
}
/**
* INTERNAL API
*
* Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not
* get too large ddata messages)
*
* @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
* for a key. Since the only way to unregister is to stop, we don't need to keep track of
* the service key
* INTERNAL API
*
*/
@InternalApi private[akka] final case class ShardedServiceRegistry(
serviceRegistries: Map[DDataKey, ServiceRegistry],
tombstones: Map[ActorRef[_], Deadline]) {
tombstones: Map[ActorRef[_], Deadline],
nodes: Set[UniqueAddress]) {
private val keys = serviceRegistries.keySet.toArray
@ -56,22 +59,32 @@ import scala.concurrent.duration.Deadline
def allEntries: Iterator[Entry] = allServices.flatMap(_._2)
def actorRefsFor[T](key: ServiceKey[T]): Set[ActorRef[T]] = {
val dDataKey = ddataKeyFor(key)
serviceRegistries(dDataKey).actorRefsFor(key)
val ddataKey = ddataKeyFor(key)
serviceRegistries(ddataKey).actorRefsFor(key)
}
def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
copy(serviceRegistries + (dDataKey -> registry), tombstones)
def activeActorRefsFor[T](key: ServiceKey[T], selfUniqueAddress: UniqueAddress): Set[ActorRef[T]] = {
val ddataKey = ddataKeyFor(key)
val entries = serviceRegistries(ddataKey).entriesFor(key)
val selfAddress = selfUniqueAddress.address
entries.collect {
case entry if nodes.contains(entry.uniqueAddress(selfAddress)) && !hasTombstone(entry.ref) =>
entry.ref.asInstanceOf[ActorRef[key.Protocol]]
}
}
def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
copy(serviceRegistries + (ddataKey -> registry), tombstones)
def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] =
allEntries.collect {
// we don't care about local (empty host:port addresses)
case entry if entry.ref.path.address.hasGlobalScope =>
entry.uniqueAddress(selfUniqueAddress)
entry.uniqueAddress(selfUniqueAddress.address)
}.toSet
def collectChangedKeys(dDataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = {
val previousRegistry = registryFor(dDataKey)
def collectChangedKeys(ddataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = {
val previousRegistry = registryFor(ddataKey)
ServiceRegistry.collectChangedKeys(previousRegistry, newRegistry)
}
@ -88,7 +101,7 @@ import scala.concurrent.duration.Deadline
copy(tombstones = tombstones + (actorRef -> deadline))
def hasTombstone(actorRef: ActorRef[_]): Boolean =
tombstones.contains(actorRef)
tombstones.nonEmpty && tombstones.contains(actorRef)
def pruneTombstones(): ShardedServiceRegistry = {
copy(tombstones = tombstones.filter {
@ -96,6 +109,12 @@ import scala.concurrent.duration.Deadline
})
}
def addNode(node: UniqueAddress): ShardedServiceRegistry =
copy(nodes = nodes + node)
def removeNode(node: UniqueAddress): ShardedServiceRegistry =
copy(nodes = nodes - node)
}
/**

View file

@ -18,10 +18,13 @@ import akka.actor.testkit.typed.FishingOutcome
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, FishingOutcomes, TestProbe }
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.cluster.typed.Down
import akka.cluster.typed.JoinSeedNodes
import akka.cluster.typed.Leave
object ClusterReceptionistSpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel = DEBUG # issue #24960
@ -42,9 +45,17 @@ object ClusterReceptionistSpec {
akka.remote.netty.tcp.host = 127.0.0.1
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.remote.retry-gate-closed-for = 1 s
akka.cluster.typed.receptionist {
pruning-interval = 1 s
}
akka.cluster {
auto-down-unreachable-after = 0s
#auto-down-unreachable-after = 0s
jmx.multi-mbeans-in-same-jvm = on
failure-detector.acceptable-heartbeat-pause = 3s
}
""")
@ -109,7 +120,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
@ -132,7 +143,15 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
}
"remove registrations when node dies" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config)
testNodeRemoval(down = true)
}
"remove registrations when node leaves" in {
testNodeRemoval(down = false)
}
def testNodeRemoval(down: Boolean): Unit = {
val testKit1 = ActorTestKit(s"ClusterReceptionistSpec-test-3-$down", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
@ -146,72 +165,47 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service1 = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service1))
regProbe1.expectMessage(Listing(PingKey, Set(service1)))
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
val serviceRefs2 = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
serviceRefs2.size should ===(2)
if (down) {
// abrupt termination
Await.ready(system2.terminate(), 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address)
} else {
clusterNode1.manager ! Leave(clusterNode2.selfMember.address)
}
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1)))
// register another after removal
val service1b = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1b, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service1b))
regProbe1.expectMessage(Listing(PingKey, Set(service1, service1b)))
// abrupt termination
system2.terminate()
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"work with services registered before node joins cluster" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-3", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
// then we join the cluster
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up))
// and the subscriber on node1 should see the service
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// abrupt termination
system2.terminate()
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"handle a new incarnation of the same node well" in {
"not remove registrations when self is shutdown" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-4", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -226,7 +220,107 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service1 = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service1))
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
akka.cluster.Cluster(system1.toUntyped).shutdown()
regProbe2.expectNoMessage(3.seconds)
clusterNode2.manager ! Down(clusterNode1.selfMember.address)
// service1 removed
regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2)))
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"work with services registered before node joins cluster" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
val reply2 = TestProbe[Listing]()(system2)
// awaitAssert because it is not immediately included in the registry (round trip to ddata)
reply2.awaitAssert {
system2.receptionist ! Find(PingKey, reply2.ref)
reply2.receiveMessage().serviceInstances(PingKey) should ===(Set(service2))
}
// and it shouldn't be removed (wait longer than pruning-interval)
Thread.sleep(2000)
system2.receptionist ! Find(PingKey, reply2.ref)
reply2.receiveMessage().serviceInstances(PingKey) should ===(Set(service2))
// then we join the cluster
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
// and the subscriber on node1 should see the service
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// abrupt termination
Await.ready(system2.terminate(), 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address)
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"handle a new incarnation of the same node well" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-6", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
@ -241,42 +335,46 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// FIXME do we need to blackhole the connection to system2 before terminating
// right now it doesn't work anyways though ;D
// abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2, uid: [{}]", clusterNode2.selfMember.uniqueAddress.longUid)
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds)
val testKit3 = ActorTestKit(system1.name, testKit1.config)
val testKit3 = ActorTestKit(
system1.name,
ConfigFactory.parseString(s"""
akka.remote.netty.tcp.port = ${clusterNode2.selfMember.address.port.get}
akka.remote.artery.canonical.port = ${clusterNode2.selfMember.address.port.get}
# retry joining when existing member removed
akka.cluster.retry-unsuccessful-join-after = 1s
""").withFallback(config))
try {
val system3 = testKit3.system
system1.log.debug(
"Starting system3 at same hostname port as system2, uid: [{}]",
Cluster(system3).selfMember.uniqueAddress.longUid)
val clusterNode3 = Cluster(system3)
clusterNode3.manager ! Join(clusterNode1.selfMember.address)
system1.log
.debug("Starting system3 at same hostname port as system2: [{}]", clusterNode3.selfMember.uniqueAddress)
// using JoinSeedNodes instead of Join to retry the join when existing member removed
clusterNode3.manager ! JoinSeedNodes(List(clusterNode1.selfMember.address))
val regProbe3 = TestProbe[Any]()(system3)
// and registers the same service key
val service3 = testKit3.spawn(pingPongBehavior, "instance")
system3.log.debug(
"Spawning/registering ping service in new incarnation {}#{}",
service3.path,
service3.path.uid)
val service3Uid = service3.path.uid
system3.log.debug("Spawning/registering ping service in new incarnation {}", service3)
system3.receptionist ! Register(PingKey, service3, regProbe3.ref)
regProbe3.expectMessage(Registered(PingKey, service3))
system3.log.debug("Registered actor [{}#{}] for system3", service3.path, service3.path.uid)
system3.log.debug("Registered actor [{}] for system3", service3)
// make sure it joined fine and node1 has upped it
regProbe1.awaitAssert {
clusterNode1.state.members.exists(
m =>
m.uniqueAddress == clusterNode3.selfMember.uniqueAddress &&
m.status == MemberStatus.Up &&
!clusterNode1.state.unreachable(m))
}
regProbe1.awaitAssert(
{
clusterNode1.state.members.exists(
m =>
m.uniqueAddress == clusterNode3.selfMember.uniqueAddress &&
m.status == MemberStatus.Up &&
!clusterNode1.state.unreachable(m)) should ===(true)
},
10.seconds)
// we should get either empty message and then updated with the new incarnation actor
// or just updated with the new service directly
@ -289,6 +387,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
val ref = entries.head
val service3RemotePath = RootActorPath(clusterNode3.selfMember.address) / "user" / "instance"
ref.path should ===(service3RemotePath)
ref.path.uid should ===(service3Uid)
ref ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
@ -301,13 +401,117 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
}
}
// reproducer of issue #26284
"handle a new incarnation of the same node that is no longer part of same cluster" in {
val testKit1 = ActorTestKit(
"ClusterReceptionistSpec-test-7",
ConfigFactory.parseString("""
akka.cluster {
failure-detector.acceptable-heartbeat-pause = 20s
}
akka.cluster.typed.receptionist {
# it can be stressed more by using all
write-consistency = all
}
""").withFallback(ClusterReceptionistSpec.config))
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
val reply1 = TestProbe[Listing]()(system1)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service1 = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service1))
regProbe1.expectMessage(Listing(PingKey, Set(service1)))
val service2 = testKit2.spawn(pingPongBehavior, "instance")
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
// make sure we saw the first incarnation on node1
regProbe1.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
// abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress)
Await.ready(system2.terminate(), 10.seconds)
val testKit3 = ActorTestKit(
system1.name,
ConfigFactory.parseString(s"""
akka.remote.netty.tcp.port = ${clusterNode2.selfMember.address.port.get}
akka.remote.artery.canonical.port = ${clusterNode2.selfMember.address.port.get}
""").withFallback(config))
try {
val system3 = testKit3.system
val regProbe3 = TestProbe[Any]()(system3)
val clusterNode3 = Cluster(system3)
system1.log
.debug("Starting system3 at same hostname port as system2 [{}]", clusterNode3.selfMember.uniqueAddress)
// joining itself, i.e. not same cluster
clusterNode3.manager ! Join(clusterNode3.selfMember.address)
regProbe3.awaitAssert(clusterNode3.state.members.count(_.status == MemberStatus.Up) should ===(1))
// register another
Thread.sleep(2000)
val service1b = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1b, regProbe1.ref)
val service1c = testKit1.spawn(pingPongBehavior)
system1.receptionist ! Register(PingKey, service1c, regProbe1.ref)
system3.receptionist ! Subscribe(PingKey, regProbe3.ref)
// shouldn't get anything from the other cluster
regProbe3.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
// and registers the same service key
val service3 = testKit3.spawn(pingPongBehavior, "instance")
system3.log.debug("Spawning/registering ping service in new incarnation {}", service3)
system3.receptionist ! Register(PingKey, service3, regProbe3.ref)
regProbe3.expectMessage(Registered(PingKey, service3))
system3.log.debug("Registered actor [{}] for system3", service3)
// shouldn't get anything from the other cluster
regProbe3.expectMessage(Listing(PingKey, Set(service3)))
reply1.expectNoMessage(1.second)
system1.receptionist ! Find(PingKey, reply1.ref)
(reply1.receiveMessage().serviceInstances(PingKey) should contain).allOf(service1, service1b, service1c)
reply1.expectNoMessage(1.second)
system1.receptionist ! Find(PingKey, reply1.ref)
(reply1.receiveMessage().serviceInstances(PingKey) should contain).allOf(service1, service1b, service1c)
} finally {
testKit3.shutdownTestKit()
}
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"not lose removals on concurrent updates to same key" in {
val config = ConfigFactory.parseString("""
# disable delta propagation so we can have repeatable concurrent writes
# without delta reaching between nodes already
akka.cluster.distributed-data.delta-crdt.enabled=false
""").withFallback(ClusterReceptionistSpec.config)
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", config)
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-8", config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
@ -322,7 +526,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
val regProbe1 = TestProbe[AnyRef]()(system1)
val regProbe2 = TestProbe[AnyRef]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2))
// one actor on each node up front
val actor1 = testKit1.spawn(Behaviors.receive[AnyRef] {

View file

@ -1,5 +1,5 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!

View file

@ -2,3 +2,18 @@
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.ORSet.clear")
ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.cluster.ddata.PruningState.addSeen")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.PruningState.addSeen")
# #26284 adding uid to internal messages
ProblemFilters.exclude[Problem]("akka.cluster.ddata.Replicator#Internal*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.Replicator$Internal*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadAggregator*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.WriteAggregator*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadWriteAggregator*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages*")
ProblemFilters.exclude[Problem]("akka.cluster.ddata.DeltaPropagationSelector*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveGossip")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveStatus")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.gossipTo")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.replica")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveWeaklyUpMemberUp")

View file

@ -11,7 +11,7 @@ message Get {
required OtherMessage key = 1;
required sint32 consistency = 2;
required uint32 timeout = 3;
optional OtherMessage request = 4;
optional OtherMessage request = 4;
}
message GetSuccess {
@ -48,6 +48,7 @@ message Changed {
message Write {
required string key = 1;
required DataEnvelope envelope = 2;
optional UniqueAddress fromNode = 3;
}
// message WriteAck, via Empty
@ -57,6 +58,7 @@ message Empty {
message Read {
required string key = 1;
optional UniqueAddress fromNode = 2;
}
message ReadResult {
@ -86,6 +88,8 @@ message Status {
required uint32 chunk = 1;
required uint32 totChunks = 2;
repeated Entry entries = 3;
optional sfixed64 toSystemUid = 4;
optional sfixed64 fromSystemUid = 5;
}
message Gossip {
@ -96,6 +100,8 @@ message Gossip {
required bool sendBack = 1;
repeated Entry entries = 2;
optional sfixed64 toSystemUid = 3;
optional sfixed64 fromSystemUid = 4;
}
message DeltaPropagation {
@ -108,7 +114,7 @@ message DeltaPropagation {
required UniqueAddress fromNode = 1;
repeated Entry entries = 2;
optional bool reply = 3; // no reply if not set
optional bool reply = 3; // no reply if not set
}
message UniqueAddress {

View file

@ -6,8 +6,8 @@ package akka.cluster.ddata
import scala.collection.immutable.TreeMap
import akka.actor.Address
import akka.annotation.InternalApi
import akka.cluster.UniqueAddress
import akka.cluster.ddata.Key.KeyId
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation
import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder
@ -25,12 +25,12 @@ private[akka] trait DeltaPropagationSelector {
def propagationCount: Long = _propagationCount
private var deltaCounter = Map.empty[KeyId, Long]
private var deltaEntries = Map.empty[KeyId, TreeMap[Long, ReplicatedData]]
private var deltaSentToNode = Map.empty[KeyId, Map[Address, Long]]
private var deltaSentToNode = Map.empty[KeyId, Map[UniqueAddress, Long]]
private var deltaNodeRoundRobinCounter = 0L
def gossipIntervalDivisor: Int
def allNodes: Vector[Address]
def allNodes: Vector[UniqueAddress]
def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation
@ -68,7 +68,7 @@ private[akka] trait DeltaPropagationSelector {
math.min(math.max((allNodesSize / gossipIntervalDivisor) + 1, 2), math.min(allNodesSize, 10))
}
def collectPropagations(): Map[Address, DeltaPropagation] = {
def collectPropagations(): Map[UniqueAddress, DeltaPropagation] = {
_propagationCount += 1
val all = allNodes
if (all.isEmpty)
@ -90,7 +90,7 @@ private[akka] trait DeltaPropagationSelector {
}
deltaNodeRoundRobinCounter += sliceSize
var result = Map.empty[Address, DeltaPropagation]
var result = Map.empty[UniqueAddress, DeltaPropagation]
var cache = Map.empty[(KeyId, Long, Long), ReplicatedData]
slice.foreach { node =>
@ -99,7 +99,7 @@ private[akka] trait DeltaPropagationSelector {
var deltas = Map.empty[KeyId, (ReplicatedData, Long, Long)]
deltaEntries.foreach {
case (key, entries) =>
val deltaSentToNodeForKey = deltaSentToNode.getOrElse(key, TreeMap.empty[Address, Long])
val deltaSentToNodeForKey = deltaSentToNode.getOrElse(key, TreeMap.empty[UniqueAddress, Long])
val j = deltaSentToNodeForKey.getOrElse(node, 0L)
val deltaEntriesAfterJ = deltaEntriesAfter(entries, j)
if (deltaEntriesAfterJ.nonEmpty) {
@ -160,7 +160,7 @@ private[akka] trait DeltaPropagationSelector {
}
}
private def findSmallestVersionPropagatedToAllNodes(key: KeyId, all: Vector[Address]): Long = {
private def findSmallestVersionPropagatedToAllNodes(key: KeyId, all: Vector[UniqueAddress]): Long = {
deltaSentToNode.get(key) match {
case None => 0L
case Some(deltaSentToNodeForKey) =>
@ -188,7 +188,7 @@ private[akka] trait DeltaPropagationSelector {
}
}
def cleanupRemovedNode(address: Address): Unit = {
def cleanupRemovedNode(address: UniqueAddress): Unit = {
deltaSentToNode = deltaSentToNode.map {
case (key, deltaSentToNodeForKey) =>
key -> (deltaSentToNodeForKey - address)

View file

@ -5,15 +5,18 @@
package akka.cluster.ddata
import java.security.MessageDigest
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.ThreadLocalRandom
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
@ -34,24 +37,31 @@ import akka.serialization.SerializationExtension
import akka.util.ByteString
import com.typesafe.config.Config
import java.util.function.{ Function => JFunction }
import akka.dispatch.Dispatchers
import akka.actor.DeadLetterSuppression
import akka.cluster.ddata.Key.KeyR
import java.util.Optional
import akka.cluster.ddata.DurableStore._
import akka.actor.ExtendedActorSystem
import akka.actor.SupervisorStrategy
import akka.actor.OneForOneStrategy
import akka.actor.ActorInitializationException
import java.util.concurrent.TimeUnit
import akka.util.Helpers.toRootLowerCase
import akka.actor.Cancellable
import scala.util.control.NonFatal
import akka.cluster.ddata.Key.KeyId
import akka.annotation.InternalApi
import scala.collection.immutable.TreeSet
import akka.cluster.MemberStatus
import scala.annotation.varargs
import akka.event.Logging
import akka.util.JavaDurationConverters._
import akka.util.ccompat._
@ -790,10 +800,20 @@ object Replicator {
case object DeltaPropagationTick
case object RemovedNodePruningTick
case object ClockTick
final case class Write(key: KeyId, envelope: DataEnvelope) extends ReplicatorMessage
sealed trait SendingSystemUid {
// FIXME #26566: we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions
def fromNode: Option[UniqueAddress]
}
sealed trait DestinationSystemUid {
// FIXME #26566: we can change from Option to Long after supporting mixed rolling updates for some versions
def toSystemUid: Option[Long]
}
final case class Write(key: KeyId, envelope: DataEnvelope, fromNode: Option[UniqueAddress])
extends ReplicatorMessage
with SendingSystemUid
case object WriteAck extends ReplicatorMessage with DeadLetterSuppression
case object WriteNack extends ReplicatorMessage with DeadLetterSuppression
final case class Read(key: KeyId) extends ReplicatorMessage
final case class Read(key: KeyId, fromNode: Option[UniqueAddress]) extends ReplicatorMessage with SendingSystemUid
final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression
final case class ReadRepair(key: KeyId, envelope: DataEnvelope)
case object ReadRepairAck
@ -946,7 +966,14 @@ object Replicator {
override def merge(that: ReplicatedData): ReplicatedData = DeletedData
}
final case class Status(digests: Map[KeyId, Digest], chunk: Int, totChunks: Int) extends ReplicatorMessage {
final case class Status(
digests: Map[KeyId, Digest],
chunk: Int,
totChunks: Int,
toSystemUid: Option[Long],
fromSystemUid: Option[Long])
extends ReplicatorMessage
with DestinationSystemUid {
override def toString: String =
(digests
.map {
@ -954,11 +981,22 @@ object Replicator {
})
.mkString("Status(", ", ", ")")
}
final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage
final case class Gossip(
updatedData: Map[KeyId, DataEnvelope],
sendBack: Boolean,
toSystemUid: Option[Long],
fromSystemUid: Option[Long])
extends ReplicatorMessage
with DestinationSystemUid
final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long)
final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta])
final case class DeltaPropagation(_fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta])
extends ReplicatorMessage
with SendingSystemUid {
// FIXME we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions,
// i.e. remove this and rename _fromNode
override def fromNode: Option[UniqueAddress] = Some(_fromNode)
}
object DeltaPropagation {
/**
@ -1186,6 +1224,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val cluster = Cluster(context.system)
val selfAddress = cluster.selfAddress
val selfUniqueAddress = cluster.selfUniqueAddress
val selfFromSystemUid = Some(selfUniqueAddress.longUid)
require(!cluster.isTerminated, "Cluster node must not be terminated")
require(
@ -1223,9 +1262,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val deltaPropagationSelector = new DeltaPropagationSelector {
override val gossipIntervalDivisor = 5
override def allNodes: Vector[Address] = {
override def allNodes: Vector[UniqueAddress] = {
// TODO optimize, by maintaining a sorted instance variable instead
nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted
Replicator.this.allNodes.diff(unreachable).toVector.sorted
}
override def maxDeltaSize: Int = settings.maxDeltaSize
@ -1256,11 +1295,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
.schedule(deltaPropagationInterval, deltaPropagationInterval, self, DeltaPropagationTick))
} else None
// cluster nodes, doesn't contain selfAddress
var nodes: Set[Address] = Set.empty
// cluster nodes, doesn't contain selfAddress, doesn't contain joining and weaklyUp
var nodes: Set[UniqueAddress] = Set.empty
// cluster weaklyUp nodes, doesn't contain selfAddress
var weaklyUpNodes: Set[Address] = Set.empty
var weaklyUpNodes: Set[UniqueAddress] = Set.empty
// cluster joining nodes, doesn't contain selfAddress
var joiningNodes: Set[UniqueAddress] = Set.empty
// up and weaklyUp nodes, doesn't contain joining and not selfAddress
private def allNodes: Set[UniqueAddress] = nodes.union(weaklyUpNodes)
private def isKnownNode(node: UniqueAddress): Boolean =
nodes(node) || weaklyUpNodes(node) || joiningNodes(node) || selfUniqueAddress == node
var removedNodes: Map[UniqueAddress, Long] = Map.empty
// all nodes sorted with the leader first
@ -1271,7 +1319,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// for pruning timeouts are based on clock that is only increased when all nodes are reachable
var previousClockTime = System.nanoTime()
var allReachableClockTime = 0L
var unreachable = Set.empty[Address]
var unreachable = Set.empty[UniqueAddress]
// the actual data
var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
@ -1397,32 +1445,66 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// MUST use replyTo instead of sender() and forward from normalReceive, because of the stash in load
val normalReceive: Receive = {
case Get(key, consistency, req) => receiveGet(key, consistency, req)
case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req)
case Read(key) => receiveRead(key)
case Write(key, envelope) => receiveWrite(key, envelope)
case ReadRepair(key, envelope) => receiveReadRepair(key, envelope)
case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas)
case FlushChanges => receiveFlushChanges()
case DeltaPropagationTick => receiveDeltaPropagationTick()
case GossipTick => receiveGossipTick()
case ClockTick => receiveClockTick()
case Status(otherDigests, chunk, totChunks) => receiveStatus(otherDigests, chunk, totChunks)
case Gossip(updatedData, sendBack) => receiveGossip(updatedData, sendBack)
case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber)
case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber)
case Terminated(ref) => receiveTerminated(ref)
case MemberWeaklyUp(m) => receiveWeaklyUpMemberUp(m)
case MemberUp(m) => receiveMemberUp(m)
case MemberRemoved(m, _) => receiveMemberRemoved(m)
case evt: MemberEvent => receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) => receiveUnreachable(m)
case ReachableMember(m) => receiveReachable(m)
case GetKeyIds => receiveGetKeyIds()
case Delete(key, consistency, req) => receiveDelete(key, consistency, req)
case RemovedNodePruningTick => receiveRemovedNodePruningTick()
case GetReplicaCount => receiveGetReplicaCount()
case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled
case msg: DestinationSystemUid =>
msg.toSystemUid match {
case Some(uid) if uid != selfUniqueAddress.longUid =>
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was intended for this incarnation and otherwise
// it is discarded.
log.info(
"Ignoring message [{}] from [{}] intended for system uid [{}], self uid is [{}]",
Logging.simpleName(msg),
replyTo,
uid,
selfUniqueAddress.longUid)
case _ =>
msg match {
case Status(otherDigests, chunk, totChunks, _, fromSystemUid) =>
receiveStatus(otherDigests, chunk, totChunks, fromSystemUid)
case Gossip(updatedData, sendBack, _, fromSystemUid) =>
receiveGossip(updatedData, sendBack, fromSystemUid)
}
}
case msg: SendingSystemUid =>
msg.fromNode match {
case Some(fromNode) if !isKnownNode(fromNode) =>
// When restarting a node with same host:port it is possible that a Replicator on another node
// is sending messages to the restarted node even if it hasn't joined the same cluster.
// Therefore we check that the message was from a known cluster member
log.info("Ignoring message [{}] from [{}] unknown node [{}]", Logging.simpleName(msg), replyTo, fromNode)
case _ =>
msg match {
case Read(key, _) => receiveRead(key)
case Write(key, envelope, _) => receiveWrite(key, envelope)
case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas)
}
}
case Get(key, consistency, req) => receiveGet(key, consistency, req)
case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req)
case ReadRepair(key, envelope) => receiveReadRepair(key, envelope)
case FlushChanges => receiveFlushChanges()
case DeltaPropagationTick => receiveDeltaPropagationTick()
case GossipTick => receiveGossipTick()
case ClockTick => receiveClockTick()
case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber)
case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber)
case Terminated(ref) => receiveTerminated(ref)
case MemberJoined(m) => receiveMemberJoining(m)
case MemberWeaklyUp(m) => receiveMemberWeaklyUp(m)
case MemberUp(m) => receiveMemberUp(m)
case MemberRemoved(m, _) => receiveMemberRemoved(m)
case evt: MemberEvent => receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) => receiveUnreachable(m)
case ReachableMember(m) => receiveReachable(m)
case GetKeyIds => receiveGetKeyIds()
case Delete(key, consistency, req) => receiveDelete(key, consistency, req)
case RemovedNodePruningTick => receiveRemovedNodePruningTick()
case GetReplicaCount => receiveGetReplicaCount()
case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled
}
def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = {
@ -1438,7 +1520,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
} else
context.actorOf(
ReadAggregator
.props(key, consistency, req, nodes, unreachable, localValue, replyTo)
.props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo)
.withDispatcher(context.props.dispatcher))
}
@ -1521,7 +1603,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val writeAggregator =
context.actorOf(
WriteAggregator
.props(key, writeEnvelope, writeDelta, writeConsistency, req, nodes, unreachable, replyTo, durable)
.props(
key,
writeEnvelope,
writeDelta,
writeConsistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(
@ -1630,7 +1722,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val writeAggregator =
context.actorOf(
WriteAggregator
.props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable)
.props(
key,
DeletedEnvelope,
None,
consistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)
.withDispatcher(context.props.dispatcher))
if (durable) {
durableStore ! Store(
@ -1811,13 +1913,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def receiveGossipTick(): Unit = {
if (fullStateGossipEnabled)
selectRandomNode(nodes.union(weaklyUpNodes).toVector).foreach(gossipTo)
selectRandomNode(allNodes.toVector).foreach(gossipTo)
}
def gossipTo(address: Address): Unit = {
def gossipTo(address: UniqueAddress): Unit = {
val to = replica(address)
val toSystemUid = Some(address.longUid)
if (dataEntries.size <= maxDeltaElements) {
val status = Status(dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) }, chunk = 0, totChunks = 1)
val status = Status(
dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) },
chunk = 0,
totChunks = 1,
toSystemUid,
selfFromSystemUid)
to ! status
} else {
val totChunks = dataEntries.size / maxDeltaElements
@ -1831,19 +1939,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val chunk = (statusCount % totChunks).toInt
val status = Status(dataEntries.collect {
case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk => (key, getDigest(key))
}, chunk, totChunks)
}, chunk, totChunks, toSystemUid, selfFromSystemUid)
to ! status
}
}
}
def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] =
def selectRandomNode(addresses: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] =
if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size)))
def replica(address: Address): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(address))
def replica(node: UniqueAddress): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(node.address))
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int): Unit = {
def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug(
"Received gossip status from [{}], chunk [{}] of [{}] containing [{}]",
@ -1868,7 +1976,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (keys.nonEmpty) {
if (log.isDebugEnabled)
log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", "))
val g = Gossip(keys.iterator.map(k => k -> getData(k).get).toMap, sendBack = otherDifferentKeys.nonEmpty)
val g = Gossip(
keys.iterator.map(k => k -> getData(k).get).toMap,
sendBack = otherDifferentKeys.nonEmpty,
fromSystemUid,
selfFromSystemUid)
replyTo ! g
}
val myMissingKeys = otherKeys.diff(myKeys)
@ -1878,12 +1990,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
"Sending gossip status to [{}], requesting missing [{}]",
replyTo.path.address,
myMissingKeys.mkString(", "))
val status = Status(myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap, chunk, totChunks)
val status = Status(
myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap,
chunk,
totChunks,
fromSystemUid,
selfFromSystemUid)
replyTo ! status
}
}
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean): Unit = {
def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = {
if (log.isDebugEnabled)
log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", "))
var replyData = Map.empty[KeyId, DataEnvelope]
@ -1899,7 +2016,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
if (sendBack && replyData.nonEmpty)
replyTo ! Gossip(replyData, sendBack = false)
replyTo ! Gossip(replyData, sendBack = false, fromSystemUid, selfFromSystemUid)
}
def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = {
@ -1943,16 +2060,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveWeaklyUpMemberUp(m: Member): Unit =
def receiveMemberJoining(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress)
weaklyUpNodes += m.address
joiningNodes += m.uniqueAddress
def receiveMemberWeaklyUp(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress) {
weaklyUpNodes += m.uniqueAddress
joiningNodes -= m.uniqueAddress
}
def receiveMemberUp(m: Member): Unit =
if (matchingRole(m)) {
leader += m
if (m.address != selfAddress) {
nodes += m.address
weaklyUpNodes -= m.address
nodes += m.uniqueAddress
weaklyUpNodes -= m.uniqueAddress
joiningNodes -= m.uniqueAddress
}
}
@ -1960,14 +2084,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
if (m.address == selfAddress)
context.stop(self)
else if (matchingRole(m)) {
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
// filter, it's possible that the ordering is changed since it based on MemberStatus
leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress)
nodes -= m.address
weaklyUpNodes -= m.address
log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress)
nodes -= m.uniqueAddress
weaklyUpNodes -= m.uniqueAddress
joiningNodes -= m.uniqueAddress
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.address
deltaPropagationSelector.cleanupRemovedNode(m.address)
unreachable -= m.uniqueAddress
deltaPropagationSelector.cleanupRemovedNode(m.uniqueAddress)
}
}
@ -1979,10 +2104,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def receiveUnreachable(m: Member): Unit =
if (matchingRole(m)) unreachable += m.address
if (matchingRole(m)) unreachable += m.uniqueAddress
def receiveReachable(m: Member): Unit =
if (matchingRole(m)) unreachable -= m.address
if (matchingRole(m)) unreachable -= m.uniqueAddress
def receiveClockTick(): Unit = {
val now = System.nanoTime()
@ -2004,11 +2129,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
def collectRemovedNodes(): Unit = {
val knownNodes = nodes.union(weaklyUpNodes).union(removedNodes.keySet.map(_.address))
val knownNodes = allNodes.union(removedNodes.keySet)
val newRemovedNodes =
dataEntries.foldLeft(Set.empty[UniqueAddress]) {
case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) =>
acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n.address)))
acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n)))
case (acc, _) =>
acc
}
@ -2023,7 +2148,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// initiate pruning for removed nodes
val removedSet: Set[UniqueAddress] = removedNodes.iterator
.collect {
case (r, t) if ((allReachableClockTime - t) > maxPruningDisseminationNanos) => r
case (r, t) if (allReachableClockTime - t) > maxPruningDisseminationNanos => r
}
.to(immutable.Set)
@ -2053,7 +2178,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def performRemovedNodePruning(): Unit = {
// perform pruning when all seen Init
val allNodes = nodes.union(weaklyUpNodes)
val all = allNodes
val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis)
val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis)
dataEntries.foreach {
@ -2061,7 +2186,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
pruning.foreach {
case (removed, PruningInitialized(owner, seen))
if owner == selfUniqueAddress
&& (allNodes.isEmpty || allNodes.forall(seen)) =>
&& (all.isEmpty || all.forall(n => seen(n.address))) =>
val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed)
log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress)
setData(key, newEnvelope)
@ -2124,22 +2249,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
import ReadWriteAggregator._
def timeout: FiniteDuration
def nodes: Set[Address]
def unreachable: Set[Address]
def reachableNodes: Set[Address] = nodes.diff(unreachable)
def nodes: Set[UniqueAddress]
def unreachable: Set[UniqueAddress]
def reachableNodes: Set[UniqueAddress] = nodes.diff(unreachable)
import context.dispatcher
var sendToSecondarySchedule = context.system.scheduler.scheduleOnce(timeout / 5, self, SendToSecondary)
var timeoutSchedule = context.system.scheduler.scheduleOnce(timeout, self, ReceiveTimeout)
var remaining = nodes
var remaining = nodes.map(_.address)
def doneWhenRemainingSize: Int
def primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas: Boolean): (Vector[Address], Vector[Address]) = {
def primaryAndSecondaryNodes(
requiresCausalDeliveryOfDeltas: Boolean): (Vector[UniqueAddress], Vector[UniqueAddress]) = {
val primarySize = nodes.size - doneWhenRemainingSize
if (primarySize >= nodes.size)
(nodes.toVector, Vector.empty[Address])
(nodes.toVector, Vector.empty[UniqueAddress])
else {
// Prefer to use reachable nodes over the unreachable nodes first.
// When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent
@ -2157,8 +2283,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
timeoutSchedule.cancel()
}
def replica(address: Address): ActorSelection =
context.actorSelection(context.parent.path.toStringWithAddress(address))
def replica(node: UniqueAddress): ActorSelection =
context.actorSelection(context.parent.path.toStringWithAddress(node.address))
}
@ -2172,12 +2298,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency,
req: Option[Any],
nodes: Set[Address],
unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean): Props =
Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
.withDeploy(Deploy.local)
Props(
new WriteAggregator(
key,
envelope,
delta,
consistency,
req,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable)).withDeploy(Deploy.local)
}
/**
@ -2189,18 +2326,18 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
delta: Option[Replicator.Internal.Delta],
consistency: Replicator.WriteConsistency,
req: Option[Any],
override val nodes: Set[Address],
override val unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
override val nodes: Set[UniqueAddress],
override val unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean)
extends ReadWriteAggregator {
extends ReadWriteAggregator
with ActorLogging {
import Replicator._
import Replicator.Internal._
import ReadWriteAggregator._
val selfUniqueAddress = Cluster(context.system).selfUniqueAddress
override def timeout: FiniteDuration = consistency.timeout
override val doneWhenRemainingSize = consistency match {
@ -2214,7 +2351,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator")
}
val writeMsg = Write(key.id, envelope)
val writeMsg = Write(key.id, envelope, Some(selfUniqueAddress))
val deltaMsg = delta match {
case None => None
case Some(d) => Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id -> d)))
@ -2267,7 +2404,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// Deltas must be applied in order and we can't keep track of ordering of
// simultaneous updates so there is a chance that the delta could not be applied.
// Try again with the full state to the primary nodes that have not acked.
primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg }
primaryNodes.foreach { to =>
if (remaining(to.address))
replica(to) ! writeMsg
}
}
secondaryNodes.foreach { replica(_) ! writeMsg }
case ReceiveTimeout =>
@ -2309,11 +2449,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
key: KeyR,
consistency: Replicator.ReadConsistency,
req: Option[Any],
nodes: Set[Address],
unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
localValue: Option[Replicator.Internal.DataEnvelope],
replyTo: ActorRef): Props =
Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo)).withDeploy(Deploy.local)
Props(new ReadAggregator(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo))
.withDeploy(Deploy.local)
}
@ -2324,8 +2466,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
key: KeyR,
consistency: Replicator.ReadConsistency,
req: Option[Any],
override val nodes: Set[Address],
override val unreachable: Set[Address],
selfUniqueAddress: UniqueAddress,
override val nodes: Set[UniqueAddress],
override val unreachable: Set[UniqueAddress],
localValue: Option[Replicator.Internal.DataEnvelope],
replyTo: ActorRef)
extends ReadWriteAggregator {
@ -2348,7 +2491,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator")
}
val readMsg = Read(key.id)
val readMsg = Read(key.id, Some(selfUniqueAddress))
private val (primaryNodes, secondaryNodes) = {
primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false)

View file

@ -50,14 +50,14 @@ private object ReplicatedDataSerializer {
@silent
private final def compareKeys(t1: Any, t2: Any): Int = (t1, t2) match {
case (k1: String, k2: String) => k1.compareTo(k2)
case (k1: String, k2) => -1
case (k1, k2: String) => 1
case (_: String, _) => -1
case (_, _: String) => 1
case (k1: Int, k2: Int) => k1.compareTo(k2)
case (k1: Int, k2) => -1
case (k1, k2: Int) => 1
case (_: Int, _) => -1
case (_, _: Int) => 1
case (k1: Long, k2: Long) => k1.compareTo(k2)
case (k1: Long, k2) => -1
case (k1, k2: Long) => 1
case (_: Long, _) => -1
case (_, _: Long) => 1
case (k1: OtherMessage, k2: OtherMessage) => OtherMessageComparator.compare(k1, k2)
case (k1, k2) =>
throw new IllegalStateException(

View file

@ -267,15 +267,21 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case (key, digest) =>
b.addEntries(dm.Status.Entry.newBuilder().setKey(key).setDigest(ByteString.copyFrom(digest.toArray)))
}
b.setToSystemUid(status.toSystemUid.get)
b.setFromSystemUid(status.fromSystemUid.get)
b.build()
}
private def statusFromBinary(bytes: Array[Byte]): Status = {
val status = dm.Status.parseFrom(bytes)
val toSystemUid = if (status.hasToSystemUid) Some(status.getToSystemUid) else None
val fromSystemUid = if (status.hasFromSystemUid) Some(status.getFromSystemUid) else None
Status(
status.getEntriesList.asScala.iterator.map(e => e.getKey -> AkkaByteString(e.getDigest.toByteArray())).toMap,
status.getChunk,
status.getTotChunks)
status.getTotChunks,
toSystemUid,
fromSystemUid)
}
private def gossipToProto(gossip: Gossip): dm.Gossip = {
@ -284,18 +290,24 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
case (key, data) =>
b.addEntries(dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data)))
}
b.setToSystemUid(gossip.toSystemUid.get)
b.setFromSystemUid(gossip.fromSystemUid.get)
b.build()
}
private def gossipFromBinary(bytes: Array[Byte]): Gossip = {
val gossip = dm.Gossip.parseFrom(decompress(bytes))
val toSystemUid = if (gossip.hasToSystemUid) Some(gossip.getToSystemUid) else None
val fromSystemUid = if (gossip.hasFromSystemUid) Some(gossip.getFromSystemUid) else None
Gossip(
gossip.getEntriesList.asScala.iterator.map(e => e.getKey -> dataEnvelopeFromProto(e.getEnvelope)).toMap,
sendBack = gossip.getSendBack)
sendBack = gossip.getSendBack,
toSystemUid,
fromSystemUid)
}
private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = {
val b = dm.DeltaPropagation.newBuilder().setFromNode(uniqueAddressToProto(deltaPropagation.fromNode))
val b = dm.DeltaPropagation.newBuilder().setFromNode(uniqueAddressToProto(deltaPropagation._fromNode))
if (deltaPropagation.reply)
b.setReply(deltaPropagation.reply)
deltaPropagation.deltas.foreach {
@ -513,18 +525,27 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
}
private def writeToProto(write: Write): dm.Write =
dm.Write.newBuilder().setKey(write.key).setEnvelope(dataEnvelopeToProto(write.envelope)).build()
dm.Write
.newBuilder()
.setKey(write.key)
.setEnvelope(dataEnvelopeToProto(write.envelope))
.setFromNode(uniqueAddressToProto(write.fromNode.get))
.build()
private def writeFromBinary(bytes: Array[Byte]): Write = {
val write = dm.Write.parseFrom(bytes)
Write(write.getKey, dataEnvelopeFromProto(write.getEnvelope))
val fromNode = if (write.hasFromNode) Some(uniqueAddressFromProto(write.getFromNode)) else None
Write(write.getKey, dataEnvelopeFromProto(write.getEnvelope), fromNode)
}
private def readToProto(read: Read): dm.Read =
dm.Read.newBuilder().setKey(read.key).build()
dm.Read.newBuilder().setKey(read.key).setFromNode(uniqueAddressToProto(read.fromNode.get)).build()
private def readFromBinary(bytes: Array[Byte]): Read =
Read(dm.Read.parseFrom(bytes).getKey)
private def readFromBinary(bytes: Array[Byte]): Read = {
val read = dm.Read.parseFrom(bytes)
val fromNode = if (read.hasFromNode) Some(uniqueAddressFromProto(read.getFromNode)) else None
Read(read.getKey, fromNode)
}
private def readResultToProto(readResult: ReadResult): dm.ReadResult = {
val b = dm.ReadResult.newBuilder()

View file

@ -181,6 +181,12 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN
values should ===(Set(10))
}
// all must at least have seen it as joining
awaitAssert({
cluster3.state.members.size should ===(4)
cluster3.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
}, 10.seconds)
// after merging with others
replicator3 ! Get(KeyA, ReadAll(remainingOrDefault))
val counter5 = expectMsgType[GetSuccess[GCounter]].dataValue

View file

@ -105,6 +105,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult
expectMsg(ReplicaCount(5))
}
}
enterBarrier("all-joined")
runOn(first) {
for (_ <- 0 until 5) {

View file

@ -16,7 +16,7 @@ import org.scalatest.Matchers
import org.scalatest.WordSpec
object DeltaPropagationSelectorSpec {
class TestSelector(val selfUniqueAddress: UniqueAddress, override val allNodes: Vector[Address])
class TestSelector(val selfUniqueAddress: UniqueAddress, override val allNodes: Vector[UniqueAddress])
extends DeltaPropagationSelector {
override val gossipIntervalDivisor = 5
override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation =
@ -33,14 +33,14 @@ object DeltaPropagationSelectorSpec {
class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheckedTripleEquals {
import DeltaPropagationSelectorSpec._
val selfUniqueAddress = UniqueAddress(Address("akka", "Sys", "localhost", 4999), 1L)
val nodes = (2500 until 2600).map(n => Address("akka", "Sys", "localhost", n)).toVector
val selfUniqueAddress = UniqueAddress(Address("akka", "Sys", "localhost", 4999), 17L)
val nodes = (2500 until 2600).map(n => UniqueAddress(Address("akka", "Sys", "localhost", n), 17L)).toVector
"DeltaPropagationSelector" must {
"collect none when no nodes" in {
val selector = new TestSelector(selfUniqueAddress, Vector.empty)
selector.update("A", deltaA)
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(false)
}
@ -56,9 +56,9 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
DeltaPropagation(
selfUniqueAddress,
false,
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L)))
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(false)
selector.hasDeltaEntries("B") should ===(false)
@ -72,13 +72,13 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
DeltaPropagation(
selfUniqueAddress,
false,
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) -> expected))
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) expected))
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(true)
selector.hasDeltaEntries("B") should ===(true)
selector.collectPropagations() should ===(Map(nodes(2) -> expected))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(false)
selector.hasDeltaEntries("B") should ===(false)
@ -92,8 +92,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
DeltaPropagation(
selfUniqueAddress,
false,
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) -> expected1))
Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" Delta(DataEnvelope(deltaB), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) expected1))
// new update before previous was propagated to all nodes
selector.update("C", deltaC)
val expected2 = DeltaPropagation(
@ -103,14 +103,15 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
"A" -> Delta(DataEnvelope(deltaA), 1L, 1L),
"B" -> Delta(DataEnvelope(deltaB), 1L, 1L),
"C" -> Delta(DataEnvelope(deltaC), 1L, 1L)))
val expected3 = DeltaPropagation(selfUniqueAddress, false, Map("C" -> Delta(DataEnvelope(deltaC), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) -> expected3))
val expected3 =
DeltaPropagation(selfUniqueAddress, false, Map("C" -> Delta(DataEnvelope(deltaC), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) expected3))
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("A") should ===(false)
selector.hasDeltaEntries("B") should ===(false)
selector.hasDeltaEntries("C") should ===(true)
selector.collectPropagations() should ===(Map(nodes(1) -> expected3))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
selector.cleanupDeltaEntries()
selector.hasDeltaEntries("C") should ===(false)
}
@ -129,9 +130,10 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
selector.collectPropagations() should ===(Map(nodes(0) -> expected1))
selector.update("A", delta3)
selector.currentVersion("A") should ===(3L)
val expected2 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L)))
val expected2 =
DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected2))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
}
"merge deltas" in {
@ -142,7 +144,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
override def nodesSliceSize(allNodesSize: Int): Int = 1
}
selector.update("A", delta1)
val expected1 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta1), 1L, 1L)))
val expected1 =
DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta1), 1L, 1L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected1))
selector.update("A", delta2)
@ -161,10 +164,11 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck
DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L)))
selector.collectPropagations() should ===(Map(nodes(0) -> expected4))
val expected5 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L)))
val expected5 =
DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L)))
selector.collectPropagations() should ===(Map(nodes(1) -> expected5))
selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation])
selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation])
}
"discard too large deltas" in {

View file

@ -5,6 +5,7 @@
package akka.cluster.ddata
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSelection
@ -15,9 +16,11 @@ import akka.testkit._
import akka.cluster.ddata.Replicator.Internal._
import akka.cluster.ddata.Replicator._
import akka.remote.RARP
import scala.concurrent.Future
import akka.cluster.Cluster
import akka.cluster.UniqueAddress
object WriteAggregatorSpec {
val KeyA = GSetKey[String]("A")
@ -26,41 +29,76 @@ object WriteAggregatorSpec {
def writeAggregatorProps(
data: GSet[String],
consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef],
nodes: Set[Address],
unreachable: Set[Address],
probes: Map[UniqueAddress, ActorRef],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean): Props =
Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable))
Props(
new TestWriteAggregator(
KeyA,
data,
None,
consistency,
probes,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable))
def writeAggregatorPropsWithDelta(
data: ORSet[String],
delta: Delta,
consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef],
nodes: Set[Address],
unreachable: Set[Address],
probes: Map[UniqueAddress, ActorRef],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean): Props =
Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable))
Props(
new TestWriteAggregator(
KeyB,
data,
Some(delta),
consistency,
probes,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable))
class TestWriteAggregator(
key: Key.KeyR,
data: ReplicatedData,
delta: Option[Delta],
consistency: Replicator.WriteConsistency,
probes: Map[Address, ActorRef],
nodes: Set[Address],
unreachable: Set[Address],
probes: Map[UniqueAddress, ActorRef],
selfUniqueAddress: UniqueAddress,
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress],
replyTo: ActorRef,
durable: Boolean)
extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) {
extends WriteAggregator(
key,
DataEnvelope(data),
delta,
consistency,
None,
selfUniqueAddress,
nodes,
unreachable,
replyTo,
durable) {
override def replica(address: Address): ActorSelection =
override def replica(address: UniqueAddress): ActorSelection =
context.actorSelection(probes(address).path)
override def senderAddress(): Address =
probes.find { case (_, r) => r == sender() }.get._1
probes.find { case (_, r) => r == sender() }.get._1.address
}
def writeAckAdapterProps(replica: ActorRef): Props =
@ -105,10 +143,10 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka"
else "akka.tcp"
val nodeA = Address(protocol, "Sys", "a", 2552)
val nodeB = nodeA.copy(host = Some("b"))
val nodeC = nodeA.copy(host = Some("c"))
val nodeD = nodeA.copy(host = Some("d"))
val nodeA = UniqueAddress(Address(protocol, "Sys", "a", 2552), 17L)
val nodeB = UniqueAddress(Address(protocol, "Sys", "b", 2552), 17L)
val nodeC = UniqueAddress(Address(protocol, "Sys", "c", 2552), 17L)
val nodeD = UniqueAddress(Address(protocol, "Sys", "d", 2552), 17L)
// 4 replicas + the local => 5
val nodes = Set(nodeA, nodeB, nodeC, nodeD)
@ -118,13 +156,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
val writeMajority = WriteMajority(timeout)
val writeAll = WriteAll(timeout)
def probes(probe: ActorRef): Map[Address, ActorRef] =
val selfUniqueAddress: UniqueAddress = Cluster(system).selfUniqueAddress
def probes(probe: ActorRef): Map[UniqueAddress, ActorRef] =
nodes.toSeq.map(_ -> system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap
/**
* Create a tuple for each node with the WriteAckAdapter and the TestProbe
*/
def probes(): Map[Address, TestMock] = {
def probes(): Map[UniqueAddress, TestMock] = {
nodes.toSeq.map(_ -> TestMock()).toMap
}
@ -132,8 +172,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"send to at least N/2+1 replicas when WriteMajority" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeMajority,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = false))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
@ -147,8 +194,16 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"send to more when no immediate reply" in {
val testProbes = probes()
val testProbeRefs = testProbes.map { case (a, tm) => a -> tm.writeAckAdapter }
val aggr = system.actorOf(WriteAggregatorSpec
.writeAggregatorProps(data, writeMajority, testProbeRefs, nodes, Set(nodeC, nodeD), testActor, durable = false))
val aggr = system.actorOf(
WriteAggregatorSpec.writeAggregatorProps(
data,
writeMajority,
testProbeRefs,
selfUniqueAddress,
nodes,
Set(nodeC, nodeD),
testActor,
durable = false))
testProbes(nodeA).expectMsgType[Write]
// no reply
@ -173,8 +228,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeMajority,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = false))
probe.expectMsgType[Write]
// no reply
@ -221,6 +283,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
delta,
writeMajority,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
@ -244,6 +307,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
delta,
writeAll,
testProbeRefs,
selfUniqueAddress,
nodes,
Set.empty,
testActor,
@ -279,6 +343,7 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
delta,
writeAll,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
@ -311,8 +376,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"not reply before local confirmation" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeThree,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = true))
probe.expectMsgType[Write]
probe.lastSender ! WriteAck
@ -331,8 +403,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"tolerate WriteNack if enough WriteAck" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeThree,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = true))
aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write
probe.expectMsgType[Write]
@ -350,8 +429,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"reply with StoreFailure when too many nacks" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeMajority,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = true))
probe.expectMsgType[Write]
probe.lastSender ! WriteNack
@ -371,8 +457,15 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
"timeout when less than required acks" in {
val probe = TestProbe()
val aggr = system.actorOf(
WriteAggregatorSpec
.writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true))
WriteAggregatorSpec.writeAggregatorProps(
data,
writeMajority,
probes(probe.ref),
selfUniqueAddress,
nodes,
Set.empty,
testActor,
durable = true))
probe.expectMsgType[Write]
// no reply

View file

@ -103,17 +103,26 @@ class ReplicatorMessageSerializerSpec
pruning = Map(
address1 -> PruningPerformed(System.currentTimeMillis()),
address3 -> PruningInitialized(address2, Set(address1.address)))))
checkSerialization(Write("A", DataEnvelope(data1)))
checkSerialization(Write("A", DataEnvelope(data1), Some(address1)))
checkSerialization(WriteAck)
checkSerialization(WriteNack)
checkSerialization(DeltaNack)
checkSerialization(Read("A"))
checkSerialization(Read("A", Some(address1)))
checkSerialization(ReadResult(Some(DataEnvelope(data1))))
checkSerialization(ReadResult(None))
checkSerialization(
Status(Map("A" -> ByteString.fromString("a"), "B" -> ByteString.fromString("b")), chunk = 3, totChunks = 10))
Status(
Map("A" -> ByteString.fromString("a"), "B" ByteString.fromString("b")),
chunk = 3,
totChunks = 10,
Some(17),
Some(19)))
checkSerialization(
Gossip(Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + "c")), sendBack = true))
Gossip(
Map("A" -> DataEnvelope(data1), "B" DataEnvelope(GSet() + "b" + "c")),
sendBack = true,
Some(17),
Some(19)))
checkSerialization(
DeltaPropagation(
address1,
@ -153,10 +162,10 @@ class ReplicatorMessageSerializerSpec
"get added element" in {
val cache = new SmallCache[Read, String](2, 5.seconds, _ => null)
val a = Read("a")
val a = Read("a", Some(address1))
cache.add(a, "A")
cache.get(a) should be("A")
val b = Read("b")
val b = Read("b", Some(address1))
cache.add(b, "B")
cache.get(a) should be("A")
cache.get(b) should be("B")
@ -164,20 +173,20 @@ class ReplicatorMessageSerializerSpec
"return null for non-existing elements" in {
val cache = new SmallCache[Read, String](4, 5.seconds, _ => null)
val a = Read("a")
val a = Read("a", Some(address1))
cache.get(a) should be(null)
cache.add(a, "A")
val b = Read("b")
val b = Read("b", Some(address1))
cache.get(b) should be(null)
}
"hold latest added elements" in {
val cache = new SmallCache[Read, String](4, 5.seconds, _ => null)
val a = Read("a")
val b = Read("b")
val c = Read("c")
val d = Read("d")
val e = Read("e")
val a = Read("a", Some(address1))
val b = Read("b", Some(address1))
val c = Read("c", Some(address1))
val d = Read("d", Some(address1))
val e = Read("e", Some(address1))
cache.add(a, "A")
cache.get(a) should be("A")
cache.add(b, "B")
@ -204,7 +213,7 @@ class ReplicatorMessageSerializerSpec
"handle Int wrap around" ignore { // ignored because it takes 20 seconds (but it works)
val cache = new SmallCache[Read, String](2, 5.seconds, _ => null)
val a = Read("a")
val a = Read("a", Some(address1))
val x = a -> "A"
var n = 0
while (n <= Int.MaxValue - 3) {
@ -214,8 +223,8 @@ class ReplicatorMessageSerializerSpec
cache.get(a) should be("A")
val b = Read("b")
val c = Read("c")
val b = Read("b", Some(address1))
val c = Read("c", Some(address1))
cache.add(b, "B")
cache.get(a) should be("A")
cache.get(b) should be("B")
@ -241,7 +250,7 @@ class ReplicatorMessageSerializerSpec
}
val cache = new SmallCache[Read, AnyRef](4, 5.seconds, a => createValue(a))
val a = Read("a")
val a = Read("a", Some(address1))
val v1 = cache.getOrAdd(a)
v1.toString should be("v1")
(cache.getOrAdd(a) should be).theSameInstanceAs(v1)
@ -249,8 +258,8 @@ class ReplicatorMessageSerializerSpec
"evict cache after time-to-live" in {
val cache = new SmallCache[Read, AnyRef](4, 10.millis, _ => null)
val b = Read("b")
val c = Read("c")
val b = Read("b", Some(address1))
val c = Read("c", Some(address1))
cache.add(b, "B")
cache.add(c, "C")
@ -262,8 +271,8 @@ class ReplicatorMessageSerializerSpec
"not evict cache before time-to-live" in {
val cache = new SmallCache[Read, AnyRef](4, 5.seconds, _ => null)
val b = Read("b")
val c = Read("c")
val b = Read("b", Some(address1))
val c = Read("c", Some(address1))
cache.add(b, "B")
cache.add(c, "C")
cache.evict()