diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index eef2946c5f..6083d60425 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -18,9 +18,15 @@ import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator } import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } import akka.remote.AddressUidExtension import akka.util.TypedMultiMap - import scala.concurrent.duration._ +import akka.actor.Address +import akka.cluster.ClusterEvent.ClusterDomainEvent +import akka.cluster.ClusterEvent.ClusterShuttingDown +import akka.cluster.ClusterEvent.MemberJoined +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.MemberWeaklyUp + // just to provide a log class /** INTERNAL API */ @InternalApi @@ -39,16 +45,19 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { // values contain system uid to make it possible to discern actors at the same // path in different incarnations of a cluster node final case class Entry(ref: ActorRef[_], systemUid: Long) { - def uniqueAddress(selfUniqueAddress: UniqueAddress): UniqueAddress = - if (ref.path.address.hasLocalScope) selfUniqueAddress + def uniqueAddress(selfAddress: Address): UniqueAddress = + if (ref.path.address.hasLocalScope) UniqueAddress(selfAddress, systemUid) else UniqueAddress(ref.path.address, systemUid) - override def toString = ref.path.toString + "#" + ref.path.uid + override def toString: String = + s"${ref.path.toString}#${ref.path.uid} @ $systemUid" + } private sealed trait InternalCommand extends Command private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand + private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand private final case class ChangeFromReplicator(key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand @@ -76,7 +85,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { ctx.setLoggerClass(classOf[ClusterReceptionist]) Behaviors.withTimers { timers => val setup = new Setup(ctx) - val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount) + // include selfUniqueAddress so that it can be used locally before joining cluster + val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress) // subscribe to changes from other nodes val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = @@ -90,13 +100,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { registry.allDdataKeys.foreach(key => setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped)) + // keep track of cluster members // remove entries when members are removed - val clusterEventMessageAdapter: ActorRef[MemberRemoved] = - ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) } + val clusterEventMessageAdapter: ActorRef[ClusterDomainEvent] = + ctx.messageAdapter[ClusterDomainEvent] { + case MemberJoined(member) => NodeAdded(member.uniqueAddress) + case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress) + case MemberUp(member) => NodeAdded(member.uniqueAddress) + case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) + case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress) + case other => + throw new IllegalStateException(s"Unexpected ClusterDomainEvent $other. Please report bug.") + } setup.cluster.subscribe( clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, - classOf[MemberRemoved]) + classOf[MemberJoined], + classOf[MemberWeaklyUp], + classOf[MemberUp], + classOf[MemberRemoved], + ClusterShuttingDown.getClass) // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, // which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message @@ -129,28 +152,21 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit = ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx => innerCtx.watch(target) - Behaviors.receive[Nothing]((_, _) => Behaviors.same).receiveSignal { + Behaviors.receiveSignal[Nothing] { case (_, Terminated(`target`)) => ctx.self ! msg Behaviors.stopped } }) - def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { - // filter tombstoned refs to avoid an extra update - // to subscribers in the case of lost removals (because of how ORMultiMap works) - val refsForKey = state.actorRefsFor(key) - val refsWithoutTombstoned = - if (registry.tombstones.isEmpty) refsForKey - else refsForKey.filterNot(registry.hasTombstone) - val msg = ReceptionistMessages.Listing(key.asServiceKey, refsWithoutTombstoned) - subscriptions.get(key).foreach(_ ! msg) + def isLeader = { + cluster.state.leader.contains(cluster.selfAddress) } - def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Command] = { + def nodesRemoved(addresses: Set[UniqueAddress]): Unit = { // ok to update from several nodes but more efficient to try to do it from one node - if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { - def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress)) + if (isLeader) { + def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address)) val removals = { registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { @@ -164,7 +180,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { if (removals.nonEmpty) { if (ctx.log.isDebugEnabled) ctx.log.debug( - "Node(s) [{}] removed, updating registry removing: [{}]", + "ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]", + cluster.selfAddress, addresses.mkString(","), removals .map { @@ -185,33 +202,38 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } } - Behaviors.same } def onCommand(cmd: Command): Behavior[Command] = cmd match { case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) => - val entry = Entry(serviceInstance, setup.selfSystemUid) - ctx.log.debug("Actor was registered: [{}] [{}]", key, entry) - watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) - maybeReplyTo match { - case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance) - case None => - } - val ddataKey = registry.ddataKeyFor(key) - replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry => - ServiceRegistry(registry).addBinding(key, entry).toORMultiMap + if (serviceInstance.path.address.hasLocalScope) { + val entry = Entry(serviceInstance, setup.selfSystemUid) + ctx.log.debug("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry) + watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) + maybeReplyTo match { + case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance) + case None => + } + val ddataKey = registry.ddataKeyFor(key) + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry => + ServiceRegistry(registry).addBinding(key, entry).toORMultiMap + } + } else { + ctx.log.error(s"ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance) } Behaviors.same case ReceptionistMessages.Find(key, replyTo) => - replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key)) + replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) Behaviors.same case ReceptionistMessages.Subscribe(key, subscriber) => watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) // immediately reply with initial listings to the new subscriber - subscriber ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key)) + val listing = + ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) + subscriber ! listing next(newSubscriptions = subscriptions.inserted(key)(subscriber)) } @@ -223,7 +245,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { case RegisteredActorTerminated(key, serviceInstance) => val entry = Entry(serviceInstance, setup.selfSystemUid) - ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry) + ctx.log.debug( + "ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]", + cluster.selfAddress, + key.asServiceKey.id, + entry) val ddataKey = registry.ddataKeyFor(key) replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry => ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap @@ -240,25 +266,42 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { if (changedKeys.nonEmpty) { if (ctx.log.isDebugEnabled) { ctx.log.debug( - "Change from replicator: [{}], changes: [{}], tombstones [{}]", + "ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]", + cluster.selfAddress, newState.entries.entries, changedKeys .map(key => key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]")) .mkString(", "), - registry.tombstones.mkString(", ")) + newRegistry.tombstones.mkString(", ")) } + changedKeys.foreach { changedKey => - notifySubscribersFor(changedKey, newState) + val serviceKey = changedKey.asServiceKey + + val subscribers = subscriptions.get(changedKey) + if (subscribers.nonEmpty) { + val listing = + ReceptionistMessages + .Listing(serviceKey, newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)) + subscribers.foreach(_ ! listing) + } // because of how ORMultiMap/ORset works, we could have a case where an actor we removed // is re-introduced because of a concurrent update, in that case we need to re-remove it - val serviceKey = changedKey.asServiceKey - val tombstonedButReAdded = - newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone) - tombstonedButReAdded.foreach { actorRef => - ctx.log.debug("Saw actorref that was tomstoned {}, re-removing.", actorRef) + val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone) + if (tombstonedButReAdded.nonEmpty) { + if (ctx.log.isDebugEnabled) + ctx.log.debug( + "ClusterReceptionist [{}] - Saw ActorRefs that were tomstoned [{}], re-removing.", + cluster.selfAddress, + tombstonedButReAdded.mkString(", ")) + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry => - ServiceRegistry(registry).removeBinding(serviceKey, Entry(actorRef, setup.selfSystemUid)).toORMultiMap + tombstonedButReAdded + .foldLeft(ServiceRegistry(registry)) { (acc, ref) => + acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid)) + } + .toORMultiMap } } } @@ -268,34 +311,51 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { Behaviors.same } + case NodeAdded(uniqueAddress) => + next(registry.addNode(uniqueAddress)) + case NodeRemoved(uniqueAddress) => - // ok to update from several nodes but more efficient to try to do it from one node - if (cluster.state.leader.contains(cluster.selfAddress)) { - ctx.log.debug(s"Leader node observed removed address [{}]", uniqueAddress) - nodesRemoved(Set(uniqueAddress)) - } else Behaviors.same + if (uniqueAddress == selfUniqueAddress) { + ctx.log.debug("ClusterReceptionist [{}] - terminated/removed", cluster.selfAddress) + // If self cluster node is shutting down our own entries should have been removed via + // watch-Terminated or will be removed by other nodes. This point is anyway too late. + Behaviors.stopped + } else { + // Ok to update from several nodes but more efficient to try to do it from one node. + if (isLeader) { + ctx.log.debug( + "ClusterReceptionist [{}] - Leader node observed removed node [{}]", + cluster.selfAddress, + uniqueAddress) + nodesRemoved(Set(uniqueAddress)) + } + + next(registry.removeNode(uniqueAddress)) + } case RemoveTick => // ok to update from several nodes but more efficient to try to do it from one node - if (cluster.state.leader.contains(cluster.selfAddress)) { + if (isLeader) { val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress) - val clusterAddresses = cluster.state.members.map(_.uniqueAddress) - val notInCluster = allAddressesInState.diff(clusterAddresses) + val notInCluster = allAddressesInState.diff(registry.nodes) if (notInCluster.isEmpty) Behavior.same else { if (ctx.log.isDebugEnabled) - ctx.log.debug("Leader node cleanup tick, removed nodes: [{}]", notInCluster.mkString(",")) + ctx.log.debug( + "ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]", + cluster.selfAddress, + notInCluster.mkString(",")) nodesRemoved(notInCluster) } - } else - Behavior.same + } + Behavior.same case PruneTombstonesTick => val prunedRegistry = registry.pruneTombstones() if (prunedRegistry eq registry) Behaviors.same else { - ctx.log.debug(s"Pruning tombstones") + ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress) next(prunedRegistry) } } @@ -309,4 +369,5 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } } } + } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala index 2494dde7ba..497aef1df9 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala @@ -23,23 +23,26 @@ import scala.concurrent.duration.Deadline val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n") key -> new ServiceRegistry(EmptyORMultiMap) }.toMap - new ShardedServiceRegistry(emptyRegistries, Map.empty) + new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty) } } /** + * INTERNAL API + * * Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not * get too large ddata messages) * * @param tombstones Local actors that were stopped and should not be re-added to the available set of actors * for a key. Since the only way to unregister is to stop, we don't need to keep track of * the service key - * INTERNAL API + * */ @InternalApi private[akka] final case class ShardedServiceRegistry( serviceRegistries: Map[DDataKey, ServiceRegistry], - tombstones: Map[ActorRef[_], Deadline]) { + tombstones: Map[ActorRef[_], Deadline], + nodes: Set[UniqueAddress]) { private val keys = serviceRegistries.keySet.toArray @@ -56,22 +59,32 @@ import scala.concurrent.duration.Deadline def allEntries: Iterator[Entry] = allServices.flatMap(_._2) def actorRefsFor[T](key: ServiceKey[T]): Set[ActorRef[T]] = { - val dDataKey = ddataKeyFor(key) - serviceRegistries(dDataKey).actorRefsFor(key) + val ddataKey = ddataKeyFor(key) + serviceRegistries(ddataKey).actorRefsFor(key) } - def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = - copy(serviceRegistries + (dDataKey -> registry), tombstones) + def activeActorRefsFor[T](key: ServiceKey[T], selfUniqueAddress: UniqueAddress): Set[ActorRef[T]] = { + val ddataKey = ddataKeyFor(key) + val entries = serviceRegistries(ddataKey).entriesFor(key) + val selfAddress = selfUniqueAddress.address + entries.collect { + case entry if nodes.contains(entry.uniqueAddress(selfAddress)) && !hasTombstone(entry.ref) => + entry.ref.asInstanceOf[ActorRef[key.Protocol]] + } + } + + def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = + copy(serviceRegistries + (ddataKey -> registry), tombstones) def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] = allEntries.collect { // we don't care about local (empty host:port addresses) case entry if entry.ref.path.address.hasGlobalScope => - entry.uniqueAddress(selfUniqueAddress) + entry.uniqueAddress(selfUniqueAddress.address) }.toSet - def collectChangedKeys(dDataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = { - val previousRegistry = registryFor(dDataKey) + def collectChangedKeys(ddataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = { + val previousRegistry = registryFor(ddataKey) ServiceRegistry.collectChangedKeys(previousRegistry, newRegistry) } @@ -88,7 +101,7 @@ import scala.concurrent.duration.Deadline copy(tombstones = tombstones + (actorRef -> deadline)) def hasTombstone(actorRef: ActorRef[_]): Boolean = - tombstones.contains(actorRef) + tombstones.nonEmpty && tombstones.contains(actorRef) def pruneTombstones(): ShardedServiceRegistry = { copy(tombstones = tombstones.filter { @@ -96,6 +109,12 @@ import scala.concurrent.duration.Deadline }) } + def addNode(node: UniqueAddress): ShardedServiceRegistry = + copy(nodes = nodes + node) + + def removeNode(node: UniqueAddress): ShardedServiceRegistry = + copy(nodes = nodes - node) + } /** diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 26f1a5c552..2a5828e592 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -18,10 +18,13 @@ import akka.actor.testkit.typed.FishingOutcome import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, FishingOutcomes, TestProbe } import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } - import scala.concurrent.Await import scala.concurrent.duration._ +import akka.cluster.typed.Down +import akka.cluster.typed.JoinSeedNodes +import akka.cluster.typed.Leave + object ClusterReceptionistSpec { val config = ConfigFactory.parseString(s""" akka.loglevel = DEBUG # issue #24960 @@ -42,9 +45,17 @@ object ClusterReceptionistSpec { akka.remote.netty.tcp.host = 127.0.0.1 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.remote.retry-gate-closed-for = 1 s + + akka.cluster.typed.receptionist { + pruning-interval = 1 s + } + akka.cluster { - auto-down-unreachable-after = 0s + #auto-down-unreachable-after = 0s jmx.multi-mbeans-in-same-jvm = on + failure-detector.acceptable-heartbeat-pause = 3s } """) @@ -109,7 +120,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { val regProbe1 = TestProbe[Any]()(system1) val regProbe2 = TestProbe[Any]()(system2) - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) system2.receptionist ! Subscribe(PingKey, regProbe2.ref) regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) @@ -132,7 +143,15 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { } "remove registrations when node dies" in { - val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config) + testNodeRemoval(down = true) + } + + "remove registrations when node leaves" in { + testNodeRemoval(down = false) + } + + def testNodeRemoval(down: Boolean): Unit = { + val testKit1 = ActorTestKit(s"ClusterReceptionistSpec-test-3-$down", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) val system2 = testKit2.system @@ -146,72 +165,47 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { val regProbe1 = TestProbe[Any]()(system1) val regProbe2 = TestProbe[Any]()(system2) - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) system1.receptionist ! Subscribe(PingKey, regProbe1.ref) regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + val service1 = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1, regProbe1.ref) + regProbe1.expectMessage(Registered(PingKey, service1)) + + regProbe1.expectMessage(Listing(PingKey, Set(service1))) + val service2 = testKit2.spawn(pingPongBehavior) system2.receptionist ! Register(PingKey, service2, regProbe2.ref) regProbe2.expectMessage(Registered(PingKey, service2)) - val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) - val theRef = remoteServiceRefs.head - theRef ! Ping(regProbe1.ref) - regProbe1.expectMessage(Pong) + val serviceRefs2 = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) + serviceRefs2.size should ===(2) + + if (down) { + // abrupt termination + Await.ready(system2.terminate(), 10.seconds) + clusterNode1.manager ! Down(clusterNode2.selfMember.address) + } else { + clusterNode1.manager ! Leave(clusterNode2.selfMember.address) + } + + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1))) + + // register another after removal + val service1b = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1b, regProbe1.ref) + regProbe1.expectMessage(Registered(PingKey, service1b)) + regProbe1.expectMessage(Listing(PingKey, Set(service1, service1b))) - // abrupt termination - system2.terminate() - regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) } finally { testKit1.shutdownTestKit() testKit2.shutdownTestKit() } } - "work with services registered before node joins cluster" in { - val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-3", ClusterReceptionistSpec.config) - val system1 = testKit1.system - val testKit2 = ActorTestKit(system1.name, system1.settings.config) - val system2 = testKit2.system - try { - - val clusterNode1 = Cluster(system1) - clusterNode1.manager ! Join(clusterNode1.selfMember.address) - - val regProbe1 = TestProbe[Any]()(system1) - val regProbe2 = TestProbe[Any]()(system2) - - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) - - system1.receptionist ! Subscribe(PingKey, regProbe1.ref) - regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - - val service2 = testKit2.spawn(pingPongBehavior) - system2.receptionist ! Register(PingKey, service2, regProbe2.ref) - regProbe2.expectMessage(Registered(PingKey, service2)) - - // then we join the cluster - val clusterNode2 = Cluster(system2) - clusterNode2.manager ! Join(clusterNode1.selfMember.address) - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up)) - - // and the subscriber on node1 should see the service - val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) - val theRef = remoteServiceRefs.head - theRef ! Ping(regProbe1.ref) - regProbe1.expectMessage(Pong) - - // abrupt termination - system2.terminate() - regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - } finally { - testKit1.shutdownTestKit() - testKit2.shutdownTestKit() - } - } - - "handle a new incarnation of the same node well" in { + "not remove registrations when self is shutdown" in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-4", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -226,7 +220,107 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { val regProbe1 = TestProbe[Any]()(system1) val regProbe2 = TestProbe[Any]()(system2) - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) + + system2.receptionist ! Subscribe(PingKey, regProbe2.ref) + regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service1 = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1, regProbe1.ref) + regProbe1.expectMessage(Registered(PingKey, service1)) + + regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1) + + val service2 = testKit2.spawn(pingPongBehavior) + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2) + + akka.cluster.Cluster(system1.toUntyped).shutdown() + + regProbe2.expectNoMessage(3.seconds) + + clusterNode2.manager ! Down(clusterNode1.selfMember.address) + // service1 removed + regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2))) + } finally { + testKit1.shutdownTestKit() + testKit2.shutdownTestKit() + } + + } + + "work with services registered before node joins cluster" in { + val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", ClusterReceptionistSpec.config) + val system1 = testKit1.system + val testKit2 = ActorTestKit(system1.name, system1.settings.config) + val system2 = testKit2.system + try { + + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + + system1.receptionist ! Subscribe(PingKey, regProbe1.ref) + regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service2 = testKit2.spawn(pingPongBehavior) + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + val reply2 = TestProbe[Listing]()(system2) + // awaitAssert because it is not immediately included in the registry (round trip to ddata) + reply2.awaitAssert { + system2.receptionist ! Find(PingKey, reply2.ref) + reply2.receiveMessage().serviceInstances(PingKey) should ===(Set(service2)) + } + + // and it shouldn't be removed (wait longer than pruning-interval) + Thread.sleep(2000) + system2.receptionist ! Find(PingKey, reply2.ref) + reply2.receiveMessage().serviceInstances(PingKey) should ===(Set(service2)) + + // then we join the cluster + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) + + // and the subscriber on node1 should see the service + val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe1.ref) + regProbe1.expectMessage(Pong) + + // abrupt termination + Await.ready(system2.terminate(), 10.seconds) + clusterNode1.manager ! Down(clusterNode2.selfMember.address) + + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + } finally { + testKit1.shutdownTestKit() + testKit2.shutdownTestKit() + } + } + + "handle a new incarnation of the same node well" in { + val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-6", ClusterReceptionistSpec.config) + val system1 = testKit1.system + val testKit2 = ActorTestKit(system1.name, system1.settings.config) + val system2 = testKit2.system + try { + + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) system1.receptionist ! Subscribe(PingKey, regProbe1.ref) regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) @@ -241,42 +335,46 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { theRef ! Ping(regProbe1.ref) regProbe1.expectMessage(Pong) - // FIXME do we need to blackhole the connection to system2 before terminating - // right now it doesn't work anyways though ;D - // abrupt termination but then a node with the same host:port comes online quickly - system1.log.debug("Terminating system2, uid: [{}]", clusterNode2.selfMember.uniqueAddress.longUid) + system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) Await.ready(system2.terminate(), 10.seconds) - val testKit3 = ActorTestKit(system1.name, testKit1.config) + val testKit3 = ActorTestKit( + system1.name, + ConfigFactory.parseString(s""" + akka.remote.netty.tcp.port = ${clusterNode2.selfMember.address.port.get} + akka.remote.artery.canonical.port = ${clusterNode2.selfMember.address.port.get} + # retry joining when existing member removed + akka.cluster.retry-unsuccessful-join-after = 1s + """).withFallback(config)) try { val system3 = testKit3.system - system1.log.debug( - "Starting system3 at same hostname port as system2, uid: [{}]", - Cluster(system3).selfMember.uniqueAddress.longUid) val clusterNode3 = Cluster(system3) - clusterNode3.manager ! Join(clusterNode1.selfMember.address) + system1.log + .debug("Starting system3 at same hostname port as system2: [{}]", clusterNode3.selfMember.uniqueAddress) + // using JoinSeedNodes instead of Join to retry the join when existing member removed + clusterNode3.manager ! JoinSeedNodes(List(clusterNode1.selfMember.address)) val regProbe3 = TestProbe[Any]()(system3) // and registers the same service key val service3 = testKit3.spawn(pingPongBehavior, "instance") - system3.log.debug( - "Spawning/registering ping service in new incarnation {}#{}", - service3.path, - service3.path.uid) + val service3Uid = service3.path.uid + system3.log.debug("Spawning/registering ping service in new incarnation {}", service3) system3.receptionist ! Register(PingKey, service3, regProbe3.ref) regProbe3.expectMessage(Registered(PingKey, service3)) - system3.log.debug("Registered actor [{}#{}] for system3", service3.path, service3.path.uid) + system3.log.debug("Registered actor [{}] for system3", service3) // make sure it joined fine and node1 has upped it - regProbe1.awaitAssert { - clusterNode1.state.members.exists( - m => - m.uniqueAddress == clusterNode3.selfMember.uniqueAddress && - m.status == MemberStatus.Up && - !clusterNode1.state.unreachable(m)) - } + regProbe1.awaitAssert( + { + clusterNode1.state.members.exists( + m => + m.uniqueAddress == clusterNode3.selfMember.uniqueAddress && + m.status == MemberStatus.Up && + !clusterNode1.state.unreachable(m)) should ===(true) + }, + 10.seconds) // we should get either empty message and then updated with the new incarnation actor // or just updated with the new service directly @@ -289,6 +387,8 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { val ref = entries.head val service3RemotePath = RootActorPath(clusterNode3.selfMember.address) / "user" / "instance" ref.path should ===(service3RemotePath) + ref.path.uid should ===(service3Uid) + ref ! Ping(regProbe1.ref) regProbe1.expectMessage(Pong) @@ -301,13 +401,117 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { } } + // reproducer of issue #26284 + "handle a new incarnation of the same node that is no longer part of same cluster" in { + val testKit1 = ActorTestKit( + "ClusterReceptionistSpec-test-7", + ConfigFactory.parseString(""" + akka.cluster { + failure-detector.acceptable-heartbeat-pause = 20s + } + akka.cluster.typed.receptionist { + # it can be stressed more by using all + write-consistency = all + } + """).withFallback(ClusterReceptionistSpec.config)) + val system1 = testKit1.system + val testKit2 = ActorTestKit(system1.name, system1.settings.config) + val system2 = testKit2.system + try { + + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + val reply1 = TestProbe[Listing]()(system1) + + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds) + + system1.receptionist ! Subscribe(PingKey, regProbe1.ref) + regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service1 = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1, regProbe1.ref) + regProbe1.expectMessage(Registered(PingKey, service1)) + regProbe1.expectMessage(Listing(PingKey, Set(service1))) + + val service2 = testKit2.spawn(pingPongBehavior, "instance") + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + // make sure we saw the first incarnation on node1 + regProbe1.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2) + + // abrupt termination but then a node with the same host:port comes online quickly + system1.log.debug("Terminating system2: [{}]", clusterNode2.selfMember.uniqueAddress) + Await.ready(system2.terminate(), 10.seconds) + + val testKit3 = ActorTestKit( + system1.name, + ConfigFactory.parseString(s""" + akka.remote.netty.tcp.port = ${clusterNode2.selfMember.address.port.get} + akka.remote.artery.canonical.port = ${clusterNode2.selfMember.address.port.get} + """).withFallback(config)) + + try { + val system3 = testKit3.system + val regProbe3 = TestProbe[Any]()(system3) + val clusterNode3 = Cluster(system3) + system1.log + .debug("Starting system3 at same hostname port as system2 [{}]", clusterNode3.selfMember.uniqueAddress) + // joining itself, i.e. not same cluster + clusterNode3.manager ! Join(clusterNode3.selfMember.address) + regProbe3.awaitAssert(clusterNode3.state.members.count(_.status == MemberStatus.Up) should ===(1)) + + // register another + Thread.sleep(2000) + val service1b = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1b, regProbe1.ref) + + val service1c = testKit1.spawn(pingPongBehavior) + system1.receptionist ! Register(PingKey, service1c, regProbe1.ref) + + system3.receptionist ! Subscribe(PingKey, regProbe3.ref) + // shouldn't get anything from the other cluster + regProbe3.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + // and registers the same service key + val service3 = testKit3.spawn(pingPongBehavior, "instance") + system3.log.debug("Spawning/registering ping service in new incarnation {}", service3) + system3.receptionist ! Register(PingKey, service3, regProbe3.ref) + regProbe3.expectMessage(Registered(PingKey, service3)) + system3.log.debug("Registered actor [{}] for system3", service3) + + // shouldn't get anything from the other cluster + regProbe3.expectMessage(Listing(PingKey, Set(service3))) + + reply1.expectNoMessage(1.second) + system1.receptionist ! Find(PingKey, reply1.ref) + (reply1.receiveMessage().serviceInstances(PingKey) should contain).allOf(service1, service1b, service1c) + + reply1.expectNoMessage(1.second) + system1.receptionist ! Find(PingKey, reply1.ref) + (reply1.receiveMessage().serviceInstances(PingKey) should contain).allOf(service1, service1b, service1c) + + } finally { + testKit3.shutdownTestKit() + } + } finally { + testKit1.shutdownTestKit() + testKit2.shutdownTestKit() + } + } + "not lose removals on concurrent updates to same key" in { val config = ConfigFactory.parseString(""" # disable delta propagation so we can have repeatable concurrent writes # without delta reaching between nodes already akka.cluster.distributed-data.delta-crdt.enabled=false """).withFallback(ClusterReceptionistSpec.config) - val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", config) + val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-8", config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) val system2 = testKit2.system @@ -322,7 +526,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { val regProbe1 = TestProbe[AnyRef]()(system1) val regProbe2 = TestProbe[AnyRef]()(system2) - regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2)) // one actor on each node up front val actor1 = testKit1.spawn(Behaviors.receive[AnyRef] { diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java index 145705283f..19015cbd3c 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ // Generated by the protocol buffer compiler. DO NOT EDIT! diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java index 335b3a5850..f7d46da701 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2018-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ // Generated by the protocol buffer compiler. DO NOT EDIT! @@ -5442,6 +5442,20 @@ public final class ReplicatorMessages { * required .akka.cluster.ddata.DataEnvelope envelope = 2; */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder getEnvelopeOrBuilder(); + + // optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + boolean hasFromNode(); + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode(); + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder(); } /** * Protobuf type {@code akka.cluster.ddata.Write} @@ -5512,6 +5526,19 @@ public final class ReplicatorMessages { bitField0_ |= 0x00000002; break; } + case 26: { + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder subBuilder = null; + if (((bitField0_ & 0x00000004) == 0x00000004)) { + subBuilder = fromNode_.toBuilder(); + } + fromNode_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(fromNode_); + fromNode_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000004; + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -5617,9 +5644,32 @@ public final class ReplicatorMessages { return envelope_; } + // optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + public static final int FROMNODE_FIELD_NUMBER = 3; + private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress fromNode_; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public boolean hasFromNode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode() { + return fromNode_; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder() { + return fromNode_; + } + private void initFields() { key_ = ""; envelope_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance(); + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -5638,6 +5688,12 @@ public final class ReplicatorMessages { memoizedIsInitialized = 0; return false; } + if (hasFromNode()) { + if (!getFromNode().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -5651,6 +5707,9 @@ public final class ReplicatorMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, envelope_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeMessage(3, fromNode_); + } getUnknownFields().writeTo(output); } @@ -5668,6 +5727,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(2, envelope_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(3, fromNode_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -5777,6 +5840,7 @@ public final class ReplicatorMessages { private void maybeForceBuilderInitialization() { if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { getEnvelopeFieldBuilder(); + getFromNodeFieldBuilder(); } } private static Builder create() { @@ -5793,6 +5857,12 @@ public final class ReplicatorMessages { envelopeBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); + if (fromNodeBuilder_ == null) { + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + } else { + fromNodeBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -5833,6 +5903,14 @@ public final class ReplicatorMessages { } else { result.envelope_ = envelopeBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + if (fromNodeBuilder_ == null) { + result.fromNode_ = fromNode_; + } else { + result.fromNode_ = fromNodeBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5857,6 +5935,9 @@ public final class ReplicatorMessages { if (other.hasEnvelope()) { mergeEnvelope(other.getEnvelope()); } + if (other.hasFromNode()) { + mergeFromNode(other.getFromNode()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -5874,6 +5955,12 @@ public final class ReplicatorMessages { return false; } + if (hasFromNode()) { + if (!getFromNode().isInitialized()) { + + return false; + } + } return true; } @@ -6087,6 +6174,123 @@ public final class ReplicatorMessages { return envelopeBuilder_; } + // optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder> fromNodeBuilder_; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public boolean hasFromNode() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode() { + if (fromNodeBuilder_ == null) { + return fromNode_; + } else { + return fromNodeBuilder_.getMessage(); + } + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public Builder setFromNode(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress value) { + if (fromNodeBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + fromNode_ = value; + onChanged(); + } else { + fromNodeBuilder_.setMessage(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public Builder setFromNode( + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder builderForValue) { + if (fromNodeBuilder_ == null) { + fromNode_ = builderForValue.build(); + onChanged(); + } else { + fromNodeBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public Builder mergeFromNode(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress value) { + if (fromNodeBuilder_ == null) { + if (((bitField0_ & 0x00000004) == 0x00000004) && + fromNode_ != akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance()) { + fromNode_ = + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.newBuilder(fromNode_).mergeFrom(value).buildPartial(); + } else { + fromNode_ = value; + } + onChanged(); + } else { + fromNodeBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public Builder clearFromNode() { + if (fromNodeBuilder_ == null) { + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + onChanged(); + } else { + fromNodeBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder getFromNodeBuilder() { + bitField0_ |= 0x00000004; + onChanged(); + return getFromNodeFieldBuilder().getBuilder(); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder() { + if (fromNodeBuilder_ != null) { + return fromNodeBuilder_.getMessageOrBuilder(); + } else { + return fromNode_; + } + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 3; + */ + private akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder> + getFromNodeFieldBuilder() { + if (fromNodeBuilder_ == null) { + fromNodeBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder>( + fromNode_, + getParentForChildren(), + isClean()); + fromNode_ = null; + } + return fromNodeBuilder_; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.Write) } @@ -6424,6 +6628,20 @@ public final class ReplicatorMessages { */ akka.protobuf.ByteString getKeyBytes(); + + // optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + boolean hasFromNode(); + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode(); + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder(); } /** * Protobuf type {@code akka.cluster.ddata.Read} @@ -6481,6 +6699,19 @@ public final class ReplicatorMessages { key_ = input.readBytes(); break; } + case 18: { + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = fromNode_.toBuilder(); + } + fromNode_ = input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(fromNode_); + fromNode_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -6564,8 +6795,31 @@ public final class ReplicatorMessages { } } + // optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + public static final int FROMNODE_FIELD_NUMBER = 2; + private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress fromNode_; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public boolean hasFromNode() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode() { + return fromNode_; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder() { + return fromNode_; + } + private void initFields() { key_ = ""; + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6576,6 +6830,12 @@ public final class ReplicatorMessages { memoizedIsInitialized = 0; return false; } + if (hasFromNode()) { + if (!getFromNode().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -6586,6 +6846,9 @@ public final class ReplicatorMessages { if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeBytes(1, getKeyBytes()); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, fromNode_); + } getUnknownFields().writeTo(output); } @@ -6599,6 +6862,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeBytesSize(1, getKeyBytes()); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(2, fromNode_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -6707,6 +6974,7 @@ public final class ReplicatorMessages { } private void maybeForceBuilderInitialization() { if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getFromNodeFieldBuilder(); } } private static Builder create() { @@ -6717,6 +6985,12 @@ public final class ReplicatorMessages { super.clear(); key_ = ""; bitField0_ = (bitField0_ & ~0x00000001); + if (fromNodeBuilder_ == null) { + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + } else { + fromNodeBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -6749,6 +7023,14 @@ public final class ReplicatorMessages { to_bitField0_ |= 0x00000001; } result.key_ = key_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (fromNodeBuilder_ == null) { + result.fromNode_ = fromNode_; + } else { + result.fromNode_ = fromNodeBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -6770,6 +7052,9 @@ public final class ReplicatorMessages { key_ = other.key_; onChanged(); } + if (other.hasFromNode()) { + mergeFromNode(other.getFromNode()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6779,6 +7064,12 @@ public final class ReplicatorMessages { return false; } + if (hasFromNode()) { + if (!getFromNode().isInitialized()) { + + return false; + } + } return true; } @@ -6875,6 +7166,123 @@ public final class ReplicatorMessages { return this; } + // optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder> fromNodeBuilder_; + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public boolean hasFromNode() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress getFromNode() { + if (fromNodeBuilder_ == null) { + return fromNode_; + } else { + return fromNodeBuilder_.getMessage(); + } + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public Builder setFromNode(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress value) { + if (fromNodeBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + fromNode_ = value; + onChanged(); + } else { + fromNodeBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public Builder setFromNode( + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder builderForValue) { + if (fromNodeBuilder_ == null) { + fromNode_ = builderForValue.build(); + onChanged(); + } else { + fromNodeBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public Builder mergeFromNode(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress value) { + if (fromNodeBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + fromNode_ != akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance()) { + fromNode_ = + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.newBuilder(fromNode_).mergeFrom(value).buildPartial(); + } else { + fromNode_ = value; + } + onChanged(); + } else { + fromNodeBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public Builder clearFromNode() { + if (fromNodeBuilder_ == null) { + fromNode_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.getDefaultInstance(); + onChanged(); + } else { + fromNodeBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder getFromNodeBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getFromNodeFieldBuilder().getBuilder(); + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder getFromNodeOrBuilder() { + if (fromNodeBuilder_ != null) { + return fromNodeBuilder_.getMessageOrBuilder(); + } else { + return fromNode_; + } + } + /** + * optional .akka.cluster.ddata.UniqueAddress fromNode = 2; + */ + private akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder> + getFromNodeFieldBuilder() { + if (fromNodeBuilder_ == null) { + fromNodeBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddress.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.UniqueAddressOrBuilder>( + fromNode_, + getParentForChildren(), + isClean()); + fromNode_ = null; + } + return fromNodeBuilder_; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.Read) } @@ -9828,6 +10236,26 @@ public final class ReplicatorMessages { */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Status.EntryOrBuilder getEntriesOrBuilder( int index); + + // optional sfixed64 toSystemUid = 4; + /** + * optional sfixed64 toSystemUid = 4; + */ + boolean hasToSystemUid(); + /** + * optional sfixed64 toSystemUid = 4; + */ + long getToSystemUid(); + + // optional sfixed64 fromSystemUid = 5; + /** + * optional sfixed64 fromSystemUid = 5; + */ + boolean hasFromSystemUid(); + /** + * optional sfixed64 fromSystemUid = 5; + */ + long getFromSystemUid(); } /** * Protobuf type {@code akka.cluster.ddata.Status} @@ -9898,6 +10326,16 @@ public final class ReplicatorMessages { entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Status.Entry.PARSER, extensionRegistry)); break; } + case 33: { + bitField0_ |= 0x00000004; + toSystemUid_ = input.readSFixed64(); + break; + } + case 41: { + bitField0_ |= 0x00000008; + fromSystemUid_ = input.readSFixed64(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -10580,10 +11018,44 @@ public final class ReplicatorMessages { return entries_.get(index); } + // optional sfixed64 toSystemUid = 4; + public static final int TOSYSTEMUID_FIELD_NUMBER = 4; + private long toSystemUid_; + /** + * optional sfixed64 toSystemUid = 4; + */ + public boolean hasToSystemUid() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional sfixed64 toSystemUid = 4; + */ + public long getToSystemUid() { + return toSystemUid_; + } + + // optional sfixed64 fromSystemUid = 5; + public static final int FROMSYSTEMUID_FIELD_NUMBER = 5; + private long fromSystemUid_; + /** + * optional sfixed64 fromSystemUid = 5; + */ + public boolean hasFromSystemUid() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional sfixed64 fromSystemUid = 5; + */ + public long getFromSystemUid() { + return fromSystemUid_; + } + private void initFields() { chunk_ = 0; totChunks_ = 0; entries_ = java.util.Collections.emptyList(); + toSystemUid_ = 0L; + fromSystemUid_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -10620,6 +11092,12 @@ public final class ReplicatorMessages { for (int i = 0; i < entries_.size(); i++) { output.writeMessage(3, entries_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeSFixed64(4, toSystemUid_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeSFixed64(5, fromSystemUid_); + } getUnknownFields().writeTo(output); } @@ -10641,6 +11119,14 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(3, entries_.get(i)); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeSFixed64Size(4, toSystemUid_); + } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += akka.protobuf.CodedOutputStream + .computeSFixed64Size(5, fromSystemUid_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -10768,6 +11254,10 @@ public final class ReplicatorMessages { } else { entriesBuilder_.clear(); } + toSystemUid_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); + fromSystemUid_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -10813,6 +11303,14 @@ public final class ReplicatorMessages { } else { result.entries_ = entriesBuilder_.build(); } + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.toSystemUid_ = toSystemUid_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.fromSystemUid_ = fromSystemUid_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -10861,6 +11359,12 @@ public final class ReplicatorMessages { } } } + if (other.hasToSystemUid()) { + setToSystemUid(other.getToSystemUid()); + } + if (other.hasFromSystemUid()) { + setFromSystemUid(other.getFromSystemUid()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11208,6 +11712,72 @@ public final class ReplicatorMessages { return entriesBuilder_; } + // optional sfixed64 toSystemUid = 4; + private long toSystemUid_ ; + /** + * optional sfixed64 toSystemUid = 4; + */ + public boolean hasToSystemUid() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional sfixed64 toSystemUid = 4; + */ + public long getToSystemUid() { + return toSystemUid_; + } + /** + * optional sfixed64 toSystemUid = 4; + */ + public Builder setToSystemUid(long value) { + bitField0_ |= 0x00000008; + toSystemUid_ = value; + onChanged(); + return this; + } + /** + * optional sfixed64 toSystemUid = 4; + */ + public Builder clearToSystemUid() { + bitField0_ = (bitField0_ & ~0x00000008); + toSystemUid_ = 0L; + onChanged(); + return this; + } + + // optional sfixed64 fromSystemUid = 5; + private long fromSystemUid_ ; + /** + * optional sfixed64 fromSystemUid = 5; + */ + public boolean hasFromSystemUid() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional sfixed64 fromSystemUid = 5; + */ + public long getFromSystemUid() { + return fromSystemUid_; + } + /** + * optional sfixed64 fromSystemUid = 5; + */ + public Builder setFromSystemUid(long value) { + bitField0_ |= 0x00000010; + fromSystemUid_ = value; + onChanged(); + return this; + } + /** + * optional sfixed64 fromSystemUid = 5; + */ + public Builder clearFromSystemUid() { + bitField0_ = (bitField0_ & ~0x00000010); + fromSystemUid_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.Status) } @@ -11256,6 +11826,26 @@ public final class ReplicatorMessages { */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Gossip.EntryOrBuilder getEntriesOrBuilder( int index); + + // optional sfixed64 toSystemUid = 3; + /** + * optional sfixed64 toSystemUid = 3; + */ + boolean hasToSystemUid(); + /** + * optional sfixed64 toSystemUid = 3; + */ + long getToSystemUid(); + + // optional sfixed64 fromSystemUid = 4; + /** + * optional sfixed64 fromSystemUid = 4; + */ + boolean hasFromSystemUid(); + /** + * optional sfixed64 fromSystemUid = 4; + */ + long getFromSystemUid(); } /** * Protobuf type {@code akka.cluster.ddata.Gossip} @@ -11321,6 +11911,16 @@ public final class ReplicatorMessages { entries_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Gossip.Entry.PARSER, extensionRegistry)); break; } + case 25: { + bitField0_ |= 0x00000002; + toSystemUid_ = input.readSFixed64(); + break; + } + case 33: { + bitField0_ |= 0x00000004; + fromSystemUid_ = input.readSFixed64(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -12103,9 +12703,43 @@ public final class ReplicatorMessages { return entries_.get(index); } + // optional sfixed64 toSystemUid = 3; + public static final int TOSYSTEMUID_FIELD_NUMBER = 3; + private long toSystemUid_; + /** + * optional sfixed64 toSystemUid = 3; + */ + public boolean hasToSystemUid() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional sfixed64 toSystemUid = 3; + */ + public long getToSystemUid() { + return toSystemUid_; + } + + // optional sfixed64 fromSystemUid = 4; + public static final int FROMSYSTEMUID_FIELD_NUMBER = 4; + private long fromSystemUid_; + /** + * optional sfixed64 fromSystemUid = 4; + */ + public boolean hasFromSystemUid() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional sfixed64 fromSystemUid = 4; + */ + public long getFromSystemUid() { + return fromSystemUid_; + } + private void initFields() { sendBack_ = false; entries_ = java.util.Collections.emptyList(); + toSystemUid_ = 0L; + fromSystemUid_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -12135,6 +12769,12 @@ public final class ReplicatorMessages { for (int i = 0; i < entries_.size(); i++) { output.writeMessage(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeSFixed64(3, toSystemUid_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeSFixed64(4, fromSystemUid_); + } getUnknownFields().writeTo(output); } @@ -12152,6 +12792,14 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeMessageSize(2, entries_.get(i)); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeSFixed64Size(3, toSystemUid_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeSFixed64Size(4, fromSystemUid_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -12277,6 +12925,10 @@ public final class ReplicatorMessages { } else { entriesBuilder_.clear(); } + toSystemUid_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); + fromSystemUid_ = 0L; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -12318,6 +12970,14 @@ public final class ReplicatorMessages { } else { result.entries_ = entriesBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000002; + } + result.toSystemUid_ = toSystemUid_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.fromSystemUid_ = fromSystemUid_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -12363,6 +13023,12 @@ public final class ReplicatorMessages { } } } + if (other.hasToSystemUid()) { + setToSystemUid(other.getToSystemUid()); + } + if (other.hasFromSystemUid()) { + setFromSystemUid(other.getFromSystemUid()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -12673,6 +13339,72 @@ public final class ReplicatorMessages { return entriesBuilder_; } + // optional sfixed64 toSystemUid = 3; + private long toSystemUid_ ; + /** + * optional sfixed64 toSystemUid = 3; + */ + public boolean hasToSystemUid() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional sfixed64 toSystemUid = 3; + */ + public long getToSystemUid() { + return toSystemUid_; + } + /** + * optional sfixed64 toSystemUid = 3; + */ + public Builder setToSystemUid(long value) { + bitField0_ |= 0x00000004; + toSystemUid_ = value; + onChanged(); + return this; + } + /** + * optional sfixed64 toSystemUid = 3; + */ + public Builder clearToSystemUid() { + bitField0_ = (bitField0_ & ~0x00000004); + toSystemUid_ = 0L; + onChanged(); + return this; + } + + // optional sfixed64 fromSystemUid = 4; + private long fromSystemUid_ ; + /** + * optional sfixed64 fromSystemUid = 4; + */ + public boolean hasFromSystemUid() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional sfixed64 fromSystemUid = 4; + */ + public long getFromSystemUid() { + return fromSystemUid_; + } + /** + * optional sfixed64 fromSystemUid = 4; + */ + public Builder setFromSystemUid(long value) { + bitField0_ |= 0x00000008; + fromSystemUid_ = value; + onChanged(); + return this; + } + /** + * optional sfixed64 fromSystemUid = 4; + */ + public Builder clearFromSystemUid() { + bitField0_ = (bitField0_ & ~0x00000008); + fromSystemUid_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.Gossip) } @@ -12731,7 +13463,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-     * no reply if not set 
+     * no reply if not set
      * 
*/ boolean hasReply(); @@ -12739,7 +13471,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-     * no reply if not set 
+     * no reply if not set
      * 
*/ boolean getReply(); @@ -13818,7 +14550,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-     * no reply if not set 
+     * no reply if not set
      * 
*/ public boolean hasReply() { @@ -13828,7 +14560,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-     * no reply if not set 
+     * no reply if not set
      * 
*/ public boolean getReply() { @@ -14529,7 +15261,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-       * no reply if not set 
+       * no reply if not set
        * 
*/ public boolean hasReply() { @@ -14539,7 +15271,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-       * no reply if not set 
+       * no reply if not set
        * 
*/ public boolean getReply() { @@ -14549,7 +15281,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-       * no reply if not set 
+       * no reply if not set
        * 
*/ public Builder setReply(boolean value) { @@ -14562,7 +15294,7 @@ public final class ReplicatorMessages { * optional bool reply = 3; * *
-       * no reply if not set 
+       * no reply if not set
        * 
*/ public Builder clearReply() { @@ -19307,50 +20039,54 @@ public final class ReplicatorMessages { "a.cluster.ddata.OtherMessage\022\013\n\003ref\030\002 \002(" + "\t\"h\n\007Changed\022-\n\003key\030\001 \002(\0132 .akka.cluster" + ".ddata.OtherMessage\022.\n\004data\030\002 \002(\0132 .akka", - ".cluster.ddata.OtherMessage\"H\n\005Write\022\013\n\003" + + ".cluster.ddata.OtherMessage\"}\n\005Write\022\013\n\003" + "key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.clust" + - "er.ddata.DataEnvelope\"\007\n\005Empty\"\023\n\004Read\022\013" + - "\n\003key\030\001 \002(\t\"@\n\nReadResult\0222\n\010envelope\030\001 " + - "\001(\0132 .akka.cluster.ddata.DataEnvelope\"\221\003" + - "\n\014DataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.clus" + - "ter.ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132" + - "-.akka.cluster.ddata.DataEnvelope.Prunin" + - "gEntry\0228\n\rdeltaVersions\030\003 \001(\0132!.akka.clu" + - "ster.ddata.VersionVector\032\326\001\n\014PruningEntr", - "y\0229\n\016removedAddress\030\001 \002(\0132!.akka.cluster" + - ".ddata.UniqueAddress\0227\n\014ownerAddress\030\002 \002" + - "(\0132!.akka.cluster.ddata.UniqueAddress\022\021\n" + - "\tperformed\030\003 \002(\010\022)\n\004seen\030\004 \003(\0132\033.akka.cl" + - "uster.ddata.Address\022\024\n\014obsoleteTime\030\005 \001(" + - "\022\"\203\001\n\006Status\022\r\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks" + - "\030\002 \002(\r\0221\n\007entries\030\003 \003(\0132 .akka.cluster.d" + - "data.Status.Entry\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022" + - "\016\n\006digest\030\002 \002(\014\"\227\001\n\006Gossip\022\020\n\010sendBack\030\001" + - " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda", - "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + - "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat" + - "aEnvelope\"\201\002\n\020DeltaPropagation\0223\n\010fromNo" + - "de\030\001 \002(\0132!.akka.cluster.ddata.UniqueAddr" + - "ess\022;\n\007entries\030\002 \003(\0132*.akka.cluster.ddat" + - "a.DeltaPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032" + - "l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132" + - " .akka.cluster.ddata.DataEnvelope\022\021\n\tfro" + - "mSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueA" + - "ddress\022,\n\007address\030\001 \002(\0132\033.akka.cluster.d", - "data.Address\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\"" + - ")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" + - "(\r\"\224\001\n\rVersionVector\0228\n\007entries\030\001 \003(\0132\'." + - "akka.cluster.ddata.VersionVector.Entry\032I" + - "\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.cluster.dda" + - "ta.UniqueAddress\022\017\n\007version\030\002 \002(\003\"V\n\014Oth" + - "erMessage\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014se" + - "rializerId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(" + - "\014\"\036\n\nStringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023Du" + - "rableDataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.c", - "luster.ddata.OtherMessage\022>\n\007pruning\030\002 \003" + - "(\0132-.akka.cluster.ddata.DataEnvelope.Pru" + - "ningEntryB#\n\037akka.cluster.ddata.protobuf" + - ".msgH\001" + "er.ddata.DataEnvelope\0223\n\010fromNode\030\003 \001(\0132" + + "!.akka.cluster.ddata.UniqueAddress\"\007\n\005Em" + + "pty\"H\n\004Read\022\013\n\003key\030\001 \002(\t\0223\n\010fromNode\030\002 \001" + + "(\0132!.akka.cluster.ddata.UniqueAddress\"@\n" + + "\nReadResult\0222\n\010envelope\030\001 \001(\0132 .akka.clu" + + "ster.ddata.DataEnvelope\"\221\003\n\014DataEnvelope" + + "\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Othe" + + "rMessage\022>\n\007pruning\030\002 \003(\0132-.akka.cluster", + ".ddata.DataEnvelope.PruningEntry\0228\n\rdelt" + + "aVersions\030\003 \001(\0132!.akka.cluster.ddata.Ver" + + "sionVector\032\326\001\n\014PruningEntry\0229\n\016removedAd" + + "dress\030\001 \002(\0132!.akka.cluster.ddata.UniqueA" + + "ddress\0227\n\014ownerAddress\030\002 \002(\0132!.akka.clus" + + "ter.ddata.UniqueAddress\022\021\n\tperformed\030\003 \002" + + "(\010\022)\n\004seen\030\004 \003(\0132\033.akka.cluster.ddata.Ad" + + "dress\022\024\n\014obsoleteTime\030\005 \001(\022\"\257\001\n\006Status\022\r" + + "\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks\030\002 \002(\r\0221\n\007entr" + + "ies\030\003 \003(\0132 .akka.cluster.ddata.Status.En", + "try\022\023\n\013toSystemUid\030\004 \001(\020\022\025\n\rfromSystemUi" + + "d\030\005 \001(\020\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022\016\n\006digest\030" + + "\002 \002(\014\"\303\001\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007en" + + "tries\030\002 \003(\0132 .akka.cluster.ddata.Gossip." + + "Entry\022\023\n\013toSystemUid\030\003 \001(\020\022\025\n\rfromSystem" + + "Uid\030\004 \001(\020\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envel" + + "ope\030\002 \002(\0132 .akka.cluster.ddata.DataEnvel" + + "ope\"\201\002\n\020DeltaPropagation\0223\n\010fromNode\030\001 \002" + + "(\0132!.akka.cluster.ddata.UniqueAddress\022;\n" + + "\007entries\030\002 \003(\0132*.akka.cluster.ddata.Delt", + "aPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032l\n\005Ent" + + "ry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka" + + ".cluster.ddata.DataEnvelope\022\021\n\tfromSeqNr" + + "\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress" + + "\022,\n\007address\030\001 \002(\0132\033.akka.cluster.ddata.A" + + "ddress\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Add" + + "ress\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n" + + "\rVersionVector\0228\n\007entries\030\001 \003(\0132\'.akka.c" + + "luster.ddata.VersionVector.Entry\032I\n\005Entr" + + "y\022/\n\004node\030\001 \002(\0132!.akka.cluster.ddata.Uni", + "queAddress\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMess" + + "age\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializ" + + "erId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nS" + + "tringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableD" + + "ataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.cluster" + + ".ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132-.a" + + "kka.cluster.ddata.DataEnvelope.PruningEn" + + "tryB#\n\037akka.cluster.ddata.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -19404,7 +20140,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_Write_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_Write_descriptor, - new java.lang.String[] { "Key", "Envelope", }); + new java.lang.String[] { "Key", "Envelope", "FromNode", }); internal_static_akka_cluster_ddata_Empty_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_akka_cluster_ddata_Empty_fieldAccessorTable = new @@ -19416,7 +20152,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_Read_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_Read_descriptor, - new java.lang.String[] { "Key", }); + new java.lang.String[] { "Key", "FromNode", }); internal_static_akka_cluster_ddata_ReadResult_descriptor = getDescriptor().getMessageTypes().get(10); internal_static_akka_cluster_ddata_ReadResult_fieldAccessorTable = new @@ -19440,7 +20176,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_Status_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_Status_descriptor, - new java.lang.String[] { "Chunk", "TotChunks", "Entries", }); + new java.lang.String[] { "Chunk", "TotChunks", "Entries", "ToSystemUid", "FromSystemUid", }); internal_static_akka_cluster_ddata_Status_Entry_descriptor = internal_static_akka_cluster_ddata_Status_descriptor.getNestedTypes().get(0); internal_static_akka_cluster_ddata_Status_Entry_fieldAccessorTable = new @@ -19452,7 +20188,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_Gossip_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_Gossip_descriptor, - new java.lang.String[] { "SendBack", "Entries", }); + new java.lang.String[] { "SendBack", "Entries", "ToSystemUid", "FromSystemUid", }); internal_static_akka_cluster_ddata_Gossip_Entry_descriptor = internal_static_akka_cluster_ddata_Gossip_descriptor.getNestedTypes().get(0); internal_static_akka_cluster_ddata_Gossip_Entry_fieldAccessorTable = new diff --git a/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes b/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes index 4a7d80ac34..7152ca4bca 100644 --- a/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-distributed-data/src/main/mima-filters/2.5.21.backwards.excludes @@ -2,3 +2,18 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.ORSet.clear") ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.cluster.ddata.PruningState.addSeen") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.PruningState.addSeen") + +# #26284 adding uid to internal messages +ProblemFilters.exclude[Problem]("akka.cluster.ddata.Replicator#Internal*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.Replicator$Internal*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadAggregator*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.WriteAggregator*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadWriteAggregator*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.DeltaPropagationSelector*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveGossip") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveStatus") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.gossipTo") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.replica") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator.receiveWeaklyUpMemberUp") + diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index dbf8c7ed80..599c6c742d 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -11,7 +11,7 @@ message Get { required OtherMessage key = 1; required sint32 consistency = 2; required uint32 timeout = 3; - optional OtherMessage request = 4; + optional OtherMessage request = 4; } message GetSuccess { @@ -48,6 +48,7 @@ message Changed { message Write { required string key = 1; required DataEnvelope envelope = 2; + optional UniqueAddress fromNode = 3; } // message WriteAck, via Empty @@ -57,6 +58,7 @@ message Empty { message Read { required string key = 1; + optional UniqueAddress fromNode = 2; } message ReadResult { @@ -86,6 +88,8 @@ message Status { required uint32 chunk = 1; required uint32 totChunks = 2; repeated Entry entries = 3; + optional sfixed64 toSystemUid = 4; + optional sfixed64 fromSystemUid = 5; } message Gossip { @@ -96,6 +100,8 @@ message Gossip { required bool sendBack = 1; repeated Entry entries = 2; + optional sfixed64 toSystemUid = 3; + optional sfixed64 fromSystemUid = 4; } message DeltaPropagation { @@ -108,7 +114,7 @@ message DeltaPropagation { required UniqueAddress fromNode = 1; repeated Entry entries = 2; - optional bool reply = 3; // no reply if not set + optional bool reply = 3; // no reply if not set } message UniqueAddress { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala index 75a4bb5063..d60df4c629 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/DeltaPropagationSelector.scala @@ -6,8 +6,8 @@ package akka.cluster.ddata import scala.collection.immutable.TreeMap -import akka.actor.Address import akka.annotation.InternalApi +import akka.cluster.UniqueAddress import akka.cluster.ddata.Key.KeyId import akka.cluster.ddata.Replicator.Internal.DeltaPropagation import akka.cluster.ddata.Replicator.Internal.DeltaPropagation.NoDeltaPlaceholder @@ -25,12 +25,12 @@ private[akka] trait DeltaPropagationSelector { def propagationCount: Long = _propagationCount private var deltaCounter = Map.empty[KeyId, Long] private var deltaEntries = Map.empty[KeyId, TreeMap[Long, ReplicatedData]] - private var deltaSentToNode = Map.empty[KeyId, Map[Address, Long]] + private var deltaSentToNode = Map.empty[KeyId, Map[UniqueAddress, Long]] private var deltaNodeRoundRobinCounter = 0L def gossipIntervalDivisor: Int - def allNodes: Vector[Address] + def allNodes: Vector[UniqueAddress] def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation @@ -68,7 +68,7 @@ private[akka] trait DeltaPropagationSelector { math.min(math.max((allNodesSize / gossipIntervalDivisor) + 1, 2), math.min(allNodesSize, 10)) } - def collectPropagations(): Map[Address, DeltaPropagation] = { + def collectPropagations(): Map[UniqueAddress, DeltaPropagation] = { _propagationCount += 1 val all = allNodes if (all.isEmpty) @@ -90,7 +90,7 @@ private[akka] trait DeltaPropagationSelector { } deltaNodeRoundRobinCounter += sliceSize - var result = Map.empty[Address, DeltaPropagation] + var result = Map.empty[UniqueAddress, DeltaPropagation] var cache = Map.empty[(KeyId, Long, Long), ReplicatedData] slice.foreach { node => @@ -99,7 +99,7 @@ private[akka] trait DeltaPropagationSelector { var deltas = Map.empty[KeyId, (ReplicatedData, Long, Long)] deltaEntries.foreach { case (key, entries) => - val deltaSentToNodeForKey = deltaSentToNode.getOrElse(key, TreeMap.empty[Address, Long]) + val deltaSentToNodeForKey = deltaSentToNode.getOrElse(key, TreeMap.empty[UniqueAddress, Long]) val j = deltaSentToNodeForKey.getOrElse(node, 0L) val deltaEntriesAfterJ = deltaEntriesAfter(entries, j) if (deltaEntriesAfterJ.nonEmpty) { @@ -160,7 +160,7 @@ private[akka] trait DeltaPropagationSelector { } } - private def findSmallestVersionPropagatedToAllNodes(key: KeyId, all: Vector[Address]): Long = { + private def findSmallestVersionPropagatedToAllNodes(key: KeyId, all: Vector[UniqueAddress]): Long = { deltaSentToNode.get(key) match { case None => 0L case Some(deltaSentToNodeForKey) => @@ -188,7 +188,7 @@ private[akka] trait DeltaPropagationSelector { } } - def cleanupRemovedNode(address: Address): Unit = { + def cleanupRemovedNode(address: UniqueAddress): Unit = { deltaSentToNode = deltaSentToNode.map { case (key, deltaSentToNodeForKey) => key -> (deltaSentToNodeForKey - address) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 773713bca7..4654a7c4b7 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -5,15 +5,18 @@ package akka.cluster.ddata import java.security.MessageDigest + import scala.collection.immutable import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import java.util.concurrent.ThreadLocalRandom + import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NoStackTrace + import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef @@ -34,24 +37,31 @@ import akka.serialization.SerializationExtension import akka.util.ByteString import com.typesafe.config.Config import java.util.function.{ Function => JFunction } + import akka.dispatch.Dispatchers import akka.actor.DeadLetterSuppression import akka.cluster.ddata.Key.KeyR import java.util.Optional + import akka.cluster.ddata.DurableStore._ import akka.actor.ExtendedActorSystem import akka.actor.SupervisorStrategy import akka.actor.OneForOneStrategy import akka.actor.ActorInitializationException import java.util.concurrent.TimeUnit + import akka.util.Helpers.toRootLowerCase import akka.actor.Cancellable import scala.util.control.NonFatal + import akka.cluster.ddata.Key.KeyId import akka.annotation.InternalApi import scala.collection.immutable.TreeSet + import akka.cluster.MemberStatus import scala.annotation.varargs + +import akka.event.Logging import akka.util.JavaDurationConverters._ import akka.util.ccompat._ @@ -790,10 +800,20 @@ object Replicator { case object DeltaPropagationTick case object RemovedNodePruningTick case object ClockTick - final case class Write(key: KeyId, envelope: DataEnvelope) extends ReplicatorMessage + sealed trait SendingSystemUid { + // FIXME #26566: we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions + def fromNode: Option[UniqueAddress] + } + sealed trait DestinationSystemUid { + // FIXME #26566: we can change from Option to Long after supporting mixed rolling updates for some versions + def toSystemUid: Option[Long] + } + final case class Write(key: KeyId, envelope: DataEnvelope, fromNode: Option[UniqueAddress]) + extends ReplicatorMessage + with SendingSystemUid case object WriteAck extends ReplicatorMessage with DeadLetterSuppression case object WriteNack extends ReplicatorMessage with DeadLetterSuppression - final case class Read(key: KeyId) extends ReplicatorMessage + final case class Read(key: KeyId, fromNode: Option[UniqueAddress]) extends ReplicatorMessage with SendingSystemUid final case class ReadResult(envelope: Option[DataEnvelope]) extends ReplicatorMessage with DeadLetterSuppression final case class ReadRepair(key: KeyId, envelope: DataEnvelope) case object ReadRepairAck @@ -946,7 +966,14 @@ object Replicator { override def merge(that: ReplicatedData): ReplicatedData = DeletedData } - final case class Status(digests: Map[KeyId, Digest], chunk: Int, totChunks: Int) extends ReplicatorMessage { + final case class Status( + digests: Map[KeyId, Digest], + chunk: Int, + totChunks: Int, + toSystemUid: Option[Long], + fromSystemUid: Option[Long]) + extends ReplicatorMessage + with DestinationSystemUid { override def toString: String = (digests .map { @@ -954,11 +981,22 @@ object Replicator { }) .mkString("Status(", ", ", ")") } - final case class Gossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean) extends ReplicatorMessage + final case class Gossip( + updatedData: Map[KeyId, DataEnvelope], + sendBack: Boolean, + toSystemUid: Option[Long], + fromSystemUid: Option[Long]) + extends ReplicatorMessage + with DestinationSystemUid final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) - final case class DeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) + final case class DeltaPropagation(_fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage + with SendingSystemUid { + // FIXME we can change from Option to UniqueAddress after supporting mixed rolling updates for some versions, + // i.e. remove this and rename _fromNode + override def fromNode: Option[UniqueAddress] = Some(_fromNode) + } object DeltaPropagation { /** @@ -1186,6 +1224,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val cluster = Cluster(context.system) val selfAddress = cluster.selfAddress val selfUniqueAddress = cluster.selfUniqueAddress + val selfFromSystemUid = Some(selfUniqueAddress.longUid) require(!cluster.isTerminated, "Cluster node must not be terminated") require( @@ -1223,9 +1262,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val deltaPropagationSelector = new DeltaPropagationSelector { override val gossipIntervalDivisor = 5 - override def allNodes: Vector[Address] = { + override def allNodes: Vector[UniqueAddress] = { // TODO optimize, by maintaining a sorted instance variable instead - nodes.union(weaklyUpNodes).diff(unreachable).toVector.sorted + Replicator.this.allNodes.diff(unreachable).toVector.sorted } override def maxDeltaSize: Int = settings.maxDeltaSize @@ -1256,11 +1295,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog .schedule(deltaPropagationInterval, deltaPropagationInterval, self, DeltaPropagationTick)) } else None - // cluster nodes, doesn't contain selfAddress - var nodes: Set[Address] = Set.empty + // cluster nodes, doesn't contain selfAddress, doesn't contain joining and weaklyUp + var nodes: Set[UniqueAddress] = Set.empty // cluster weaklyUp nodes, doesn't contain selfAddress - var weaklyUpNodes: Set[Address] = Set.empty + var weaklyUpNodes: Set[UniqueAddress] = Set.empty + + // cluster joining nodes, doesn't contain selfAddress + var joiningNodes: Set[UniqueAddress] = Set.empty + + // up and weaklyUp nodes, doesn't contain joining and not selfAddress + private def allNodes: Set[UniqueAddress] = nodes.union(weaklyUpNodes) + + private def isKnownNode(node: UniqueAddress): Boolean = + nodes(node) || weaklyUpNodes(node) || joiningNodes(node) || selfUniqueAddress == node var removedNodes: Map[UniqueAddress, Long] = Map.empty // all nodes sorted with the leader first @@ -1271,7 +1319,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // for pruning timeouts are based on clock that is only increased when all nodes are reachable var previousClockTime = System.nanoTime() var allReachableClockTime = 0L - var unreachable = Set.empty[Address] + var unreachable = Set.empty[UniqueAddress] // the actual data var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)] @@ -1397,32 +1445,66 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // MUST use replyTo instead of sender() and forward from normalReceive, because of the stash in load val normalReceive: Receive = { - case Get(key, consistency, req) => receiveGet(key, consistency, req) - case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req) - case Read(key) => receiveRead(key) - case Write(key, envelope) => receiveWrite(key, envelope) - case ReadRepair(key, envelope) => receiveReadRepair(key, envelope) - case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas) - case FlushChanges => receiveFlushChanges() - case DeltaPropagationTick => receiveDeltaPropagationTick() - case GossipTick => receiveGossipTick() - case ClockTick => receiveClockTick() - case Status(otherDigests, chunk, totChunks) => receiveStatus(otherDigests, chunk, totChunks) - case Gossip(updatedData, sendBack) => receiveGossip(updatedData, sendBack) - case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber) - case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber) - case Terminated(ref) => receiveTerminated(ref) - case MemberWeaklyUp(m) => receiveWeaklyUpMemberUp(m) - case MemberUp(m) => receiveMemberUp(m) - case MemberRemoved(m, _) => receiveMemberRemoved(m) - case evt: MemberEvent => receiveOtherMemberEvent(evt.member) - case UnreachableMember(m) => receiveUnreachable(m) - case ReachableMember(m) => receiveReachable(m) - case GetKeyIds => receiveGetKeyIds() - case Delete(key, consistency, req) => receiveDelete(key, consistency, req) - case RemovedNodePruningTick => receiveRemovedNodePruningTick() - case GetReplicaCount => receiveGetReplicaCount() - case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled + case msg: DestinationSystemUid => + msg.toSystemUid match { + case Some(uid) if uid != selfUniqueAddress.longUid => + // When restarting a node with same host:port it is possible that a Replicator on another node + // is sending messages to the restarted node even if it hasn't joined the same cluster. + // Therefore we check that the message was intended for this incarnation and otherwise + // it is discarded. + log.info( + "Ignoring message [{}] from [{}] intended for system uid [{}], self uid is [{}]", + Logging.simpleName(msg), + replyTo, + uid, + selfUniqueAddress.longUid) + case _ => + msg match { + case Status(otherDigests, chunk, totChunks, _, fromSystemUid) => + receiveStatus(otherDigests, chunk, totChunks, fromSystemUid) + case Gossip(updatedData, sendBack, _, fromSystemUid) => + receiveGossip(updatedData, sendBack, fromSystemUid) + } + } + + case msg: SendingSystemUid => + msg.fromNode match { + case Some(fromNode) if !isKnownNode(fromNode) => + // When restarting a node with same host:port it is possible that a Replicator on another node + // is sending messages to the restarted node even if it hasn't joined the same cluster. + // Therefore we check that the message was from a known cluster member + log.info("Ignoring message [{}] from [{}] unknown node [{}]", Logging.simpleName(msg), replyTo, fromNode) + + case _ => + msg match { + case Read(key, _) => receiveRead(key) + case Write(key, envelope, _) => receiveWrite(key, envelope) + case DeltaPropagation(from, reply, deltas) => receiveDeltaPropagation(from, reply, deltas) + } + } + + case Get(key, consistency, req) => receiveGet(key, consistency, req) + case u @ Update(key, writeC, req) => receiveUpdate(key, u.modify, writeC, req) + case ReadRepair(key, envelope) => receiveReadRepair(key, envelope) + case FlushChanges => receiveFlushChanges() + case DeltaPropagationTick => receiveDeltaPropagationTick() + case GossipTick => receiveGossipTick() + case ClockTick => receiveClockTick() + case Subscribe(key, subscriber) => receiveSubscribe(key, subscriber) + case Unsubscribe(key, subscriber) => receiveUnsubscribe(key, subscriber) + case Terminated(ref) => receiveTerminated(ref) + case MemberJoined(m) => receiveMemberJoining(m) + case MemberWeaklyUp(m) => receiveMemberWeaklyUp(m) + case MemberUp(m) => receiveMemberUp(m) + case MemberRemoved(m, _) => receiveMemberRemoved(m) + case evt: MemberEvent => receiveOtherMemberEvent(evt.member) + case UnreachableMember(m) => receiveUnreachable(m) + case ReachableMember(m) => receiveReachable(m) + case GetKeyIds => receiveGetKeyIds() + case Delete(key, consistency, req) => receiveDelete(key, consistency, req) + case RemovedNodePruningTick => receiveRemovedNodePruningTick() + case GetReplicaCount => receiveGetReplicaCount() + case TestFullStateGossip(enabled) => fullStateGossipEnabled = enabled } def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit = { @@ -1438,7 +1520,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } else context.actorOf( ReadAggregator - .props(key, consistency, req, nodes, unreachable, localValue, replyTo) + .props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo) .withDispatcher(context.props.dispatcher)) } @@ -1521,7 +1603,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val writeAggregator = context.actorOf( WriteAggregator - .props(key, writeEnvelope, writeDelta, writeConsistency, req, nodes, unreachable, replyTo, durable) + .props( + key, + writeEnvelope, + writeDelta, + writeConsistency, + req, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store( @@ -1630,7 +1722,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val writeAggregator = context.actorOf( WriteAggregator - .props(key, DeletedEnvelope, None, consistency, req, nodes, unreachable, replyTo, durable) + .props( + key, + DeletedEnvelope, + None, + consistency, + req, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable) .withDispatcher(context.props.dispatcher)) if (durable) { durableStore ! Store( @@ -1811,13 +1913,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def receiveGossipTick(): Unit = { if (fullStateGossipEnabled) - selectRandomNode(nodes.union(weaklyUpNodes).toVector).foreach(gossipTo) + selectRandomNode(allNodes.toVector).foreach(gossipTo) } - def gossipTo(address: Address): Unit = { + def gossipTo(address: UniqueAddress): Unit = { val to = replica(address) + val toSystemUid = Some(address.longUid) if (dataEntries.size <= maxDeltaElements) { - val status = Status(dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) }, chunk = 0, totChunks = 1) + val status = Status( + dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) }, + chunk = 0, + totChunks = 1, + toSystemUid, + selfFromSystemUid) to ! status } else { val totChunks = dataEntries.size / maxDeltaElements @@ -1831,19 +1939,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val chunk = (statusCount % totChunks).toInt val status = Status(dataEntries.collect { case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk => (key, getDigest(key)) - }, chunk, totChunks) + }, chunk, totChunks, toSystemUid, selfFromSystemUid) to ! status } } } - def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = + def selectRandomNode(addresses: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) - def replica(address: Address): ActorSelection = - context.actorSelection(self.path.toStringWithAddress(address)) + def replica(node: UniqueAddress): ActorSelection = + context.actorSelection(self.path.toStringWithAddress(node.address)) - def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int): Unit = { + def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit = { if (log.isDebugEnabled) log.debug( "Received gossip status from [{}], chunk [{}] of [{}] containing [{}]", @@ -1868,7 +1976,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (keys.nonEmpty) { if (log.isDebugEnabled) log.debug("Sending gossip to [{}], containing [{}]", replyTo.path.address, keys.mkString(", ")) - val g = Gossip(keys.iterator.map(k => k -> getData(k).get).toMap, sendBack = otherDifferentKeys.nonEmpty) + val g = Gossip( + keys.iterator.map(k => k -> getData(k).get).toMap, + sendBack = otherDifferentKeys.nonEmpty, + fromSystemUid, + selfFromSystemUid) replyTo ! g } val myMissingKeys = otherKeys.diff(myKeys) @@ -1878,12 +1990,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog "Sending gossip status to [{}], requesting missing [{}]", replyTo.path.address, myMissingKeys.mkString(", ")) - val status = Status(myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap, chunk, totChunks) + val status = Status( + myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap, + chunk, + totChunks, + fromSystemUid, + selfFromSystemUid) replyTo ! status } } - def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean): Unit = { + def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, fromSystemUid: Option[Long]): Unit = { if (log.isDebugEnabled) log.debug("Received gossip from [{}], containing [{}]", replyTo.path.address, updatedData.keys.mkString(", ")) var replyData = Map.empty[KeyId, DataEnvelope] @@ -1899,7 +2016,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } if (sendBack && replyData.nonEmpty) - replyTo ! Gossip(replyData, sendBack = false) + replyTo ! Gossip(replyData, sendBack = false, fromSystemUid, selfFromSystemUid) } def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = { @@ -1943,16 +2060,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } - def receiveWeaklyUpMemberUp(m: Member): Unit = + def receiveMemberJoining(m: Member): Unit = if (matchingRole(m) && m.address != selfAddress) - weaklyUpNodes += m.address + joiningNodes += m.uniqueAddress + + def receiveMemberWeaklyUp(m: Member): Unit = + if (matchingRole(m) && m.address != selfAddress) { + weaklyUpNodes += m.uniqueAddress + joiningNodes -= m.uniqueAddress + } def receiveMemberUp(m: Member): Unit = if (matchingRole(m)) { leader += m if (m.address != selfAddress) { - nodes += m.address - weaklyUpNodes -= m.address + nodes += m.uniqueAddress + weaklyUpNodes -= m.uniqueAddress + joiningNodes -= m.uniqueAddress } } @@ -1960,14 +2084,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (m.address == selfAddress) context.stop(self) else if (matchingRole(m)) { + log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress) // filter, it's possible that the ordering is changed since it based on MemberStatus leader = leader.filterNot(_.uniqueAddress == m.uniqueAddress) - nodes -= m.address - weaklyUpNodes -= m.address - log.debug("adding removed node [{}] from MemberRemoved", m.uniqueAddress) + nodes -= m.uniqueAddress + weaklyUpNodes -= m.uniqueAddress + joiningNodes -= m.uniqueAddress removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) - unreachable -= m.address - deltaPropagationSelector.cleanupRemovedNode(m.address) + unreachable -= m.uniqueAddress + deltaPropagationSelector.cleanupRemovedNode(m.uniqueAddress) } } @@ -1979,10 +2104,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def receiveUnreachable(m: Member): Unit = - if (matchingRole(m)) unreachable += m.address + if (matchingRole(m)) unreachable += m.uniqueAddress def receiveReachable(m: Member): Unit = - if (matchingRole(m)) unreachable -= m.address + if (matchingRole(m)) unreachable -= m.uniqueAddress def receiveClockTick(): Unit = { val now = System.nanoTime() @@ -2004,11 +2129,11 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def collectRemovedNodes(): Unit = { - val knownNodes = nodes.union(weaklyUpNodes).union(removedNodes.keySet.map(_.address)) + val knownNodes = allNodes.union(removedNodes.keySet) val newRemovedNodes = dataEntries.foldLeft(Set.empty[UniqueAddress]) { case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) => - acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n.address))) + acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress || knownNodes(n))) case (acc, _) => acc } @@ -2023,7 +2148,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // initiate pruning for removed nodes val removedSet: Set[UniqueAddress] = removedNodes.iterator .collect { - case (r, t) if ((allReachableClockTime - t) > maxPruningDisseminationNanos) => r + case (r, t) if (allReachableClockTime - t) > maxPruningDisseminationNanos => r } .to(immutable.Set) @@ -2053,7 +2178,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def performRemovedNodePruning(): Unit = { // perform pruning when all seen Init - val allNodes = nodes.union(weaklyUpNodes) + val all = allNodes val pruningPerformed = PruningPerformed(System.currentTimeMillis() + pruningMarkerTimeToLive.toMillis) val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() + durablePruningMarkerTimeToLive.toMillis) dataEntries.foreach { @@ -2061,7 +2186,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog pruning.foreach { case (removed, PruningInitialized(owner, seen)) if owner == selfUniqueAddress - && (allNodes.isEmpty || allNodes.forall(seen)) => + && (all.isEmpty || all.forall(n => seen(n.address))) => val newEnvelope = envelope.prune(removed, if (isDurable(key)) durablePruningPerformed else pruningPerformed) log.debug("Perform pruning of [{}] from [{}] to [{}]", key, removed, selfUniqueAddress) setData(key, newEnvelope) @@ -2124,22 +2249,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import ReadWriteAggregator._ def timeout: FiniteDuration - def nodes: Set[Address] - def unreachable: Set[Address] - def reachableNodes: Set[Address] = nodes.diff(unreachable) + def nodes: Set[UniqueAddress] + def unreachable: Set[UniqueAddress] + def reachableNodes: Set[UniqueAddress] = nodes.diff(unreachable) import context.dispatcher var sendToSecondarySchedule = context.system.scheduler.scheduleOnce(timeout / 5, self, SendToSecondary) var timeoutSchedule = context.system.scheduler.scheduleOnce(timeout, self, ReceiveTimeout) - var remaining = nodes + var remaining = nodes.map(_.address) def doneWhenRemainingSize: Int - def primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas: Boolean): (Vector[Address], Vector[Address]) = { + def primaryAndSecondaryNodes( + requiresCausalDeliveryOfDeltas: Boolean): (Vector[UniqueAddress], Vector[UniqueAddress]) = { val primarySize = nodes.size - doneWhenRemainingSize if (primarySize >= nodes.size) - (nodes.toVector, Vector.empty[Address]) + (nodes.toVector, Vector.empty[UniqueAddress]) else { // Prefer to use reachable nodes over the unreachable nodes first. // When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent @@ -2157,8 +2283,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog timeoutSchedule.cancel() } - def replica(address: Address): ActorSelection = - context.actorSelection(context.parent.path.toStringWithAddress(address)) + def replica(node: UniqueAddress): ActorSelection = + context.actorSelection(context.parent.path.toStringWithAddress(node.address)) } @@ -2172,12 +2298,23 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], - nodes: Set[Address], - unreachable: Set[Address], + selfUniqueAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean): Props = - Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable)) - .withDeploy(Deploy.local) + Props( + new WriteAggregator( + key, + envelope, + delta, + consistency, + req, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable)).withDeploy(Deploy.local) } /** @@ -2189,18 +2326,18 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog delta: Option[Replicator.Internal.Delta], consistency: Replicator.WriteConsistency, req: Option[Any], - override val nodes: Set[Address], - override val unreachable: Set[Address], + selfUniqueAddress: UniqueAddress, + override val nodes: Set[UniqueAddress], + override val unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean) - extends ReadWriteAggregator { + extends ReadWriteAggregator + with ActorLogging { import Replicator._ import Replicator.Internal._ import ReadWriteAggregator._ - val selfUniqueAddress = Cluster(context.system).selfUniqueAddress - override def timeout: FiniteDuration = consistency.timeout override val doneWhenRemainingSize = consistency match { @@ -2214,7 +2351,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator") } - val writeMsg = Write(key.id, envelope) + val writeMsg = Write(key.id, envelope, Some(selfUniqueAddress)) val deltaMsg = delta match { case None => None case Some(d) => Some(DeltaPropagation(selfUniqueAddress, reply = true, Map(key.id -> d))) @@ -2267,7 +2404,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // Deltas must be applied in order and we can't keep track of ordering of // simultaneous updates so there is a chance that the delta could not be applied. // Try again with the full state to the primary nodes that have not acked. - primaryNodes.toSet.intersect(remaining).foreach { replica(_) ! writeMsg } + primaryNodes.foreach { to => + if (remaining(to.address)) + replica(to) ! writeMsg + } } secondaryNodes.foreach { replica(_) ! writeMsg } case ReceiveTimeout => @@ -2309,11 +2449,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog key: KeyR, consistency: Replicator.ReadConsistency, req: Option[Any], - nodes: Set[Address], - unreachable: Set[Address], + selfUniqueAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], localValue: Option[Replicator.Internal.DataEnvelope], replyTo: ActorRef): Props = - Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo)).withDeploy(Deploy.local) + Props(new ReadAggregator(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo)) + .withDeploy(Deploy.local) } @@ -2324,8 +2466,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog key: KeyR, consistency: Replicator.ReadConsistency, req: Option[Any], - override val nodes: Set[Address], - override val unreachable: Set[Address], + selfUniqueAddress: UniqueAddress, + override val nodes: Set[UniqueAddress], + override val unreachable: Set[UniqueAddress], localValue: Option[Replicator.Internal.DataEnvelope], replyTo: ActorRef) extends ReadWriteAggregator { @@ -2348,7 +2491,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator") } - val readMsg = Read(key.id) + val readMsg = Read(key.id, Some(selfUniqueAddress)) private val (primaryNodes, secondaryNodes) = { primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false) diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 0da88c94a6..370343e103 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -50,14 +50,14 @@ private object ReplicatedDataSerializer { @silent private final def compareKeys(t1: Any, t2: Any): Int = (t1, t2) match { case (k1: String, k2: String) => k1.compareTo(k2) - case (k1: String, k2) => -1 - case (k1, k2: String) => 1 + case (_: String, _) => -1 + case (_, _: String) => 1 case (k1: Int, k2: Int) => k1.compareTo(k2) - case (k1: Int, k2) => -1 - case (k1, k2: Int) => 1 + case (_: Int, _) => -1 + case (_, _: Int) => 1 case (k1: Long, k2: Long) => k1.compareTo(k2) - case (k1: Long, k2) => -1 - case (k1, k2: Long) => 1 + case (_: Long, _) => -1 + case (_, _: Long) => 1 case (k1: OtherMessage, k2: OtherMessage) => OtherMessageComparator.compare(k1, k2) case (k1, k2) => throw new IllegalStateException( diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index de4118e4da..0ac0a2ff40 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -267,15 +267,21 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case (key, digest) => b.addEntries(dm.Status.Entry.newBuilder().setKey(key).setDigest(ByteString.copyFrom(digest.toArray))) } + b.setToSystemUid(status.toSystemUid.get) + b.setFromSystemUid(status.fromSystemUid.get) b.build() } private def statusFromBinary(bytes: Array[Byte]): Status = { val status = dm.Status.parseFrom(bytes) + val toSystemUid = if (status.hasToSystemUid) Some(status.getToSystemUid) else None + val fromSystemUid = if (status.hasFromSystemUid) Some(status.getFromSystemUid) else None Status( status.getEntriesList.asScala.iterator.map(e => e.getKey -> AkkaByteString(e.getDigest.toByteArray())).toMap, status.getChunk, - status.getTotChunks) + status.getTotChunks, + toSystemUid, + fromSystemUid) } private def gossipToProto(gossip: Gossip): dm.Gossip = { @@ -284,18 +290,24 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) case (key, data) => b.addEntries(dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data))) } + b.setToSystemUid(gossip.toSystemUid.get) + b.setFromSystemUid(gossip.fromSystemUid.get) b.build() } private def gossipFromBinary(bytes: Array[Byte]): Gossip = { val gossip = dm.Gossip.parseFrom(decompress(bytes)) + val toSystemUid = if (gossip.hasToSystemUid) Some(gossip.getToSystemUid) else None + val fromSystemUid = if (gossip.hasFromSystemUid) Some(gossip.getFromSystemUid) else None Gossip( gossip.getEntriesList.asScala.iterator.map(e => e.getKey -> dataEnvelopeFromProto(e.getEnvelope)).toMap, - sendBack = gossip.getSendBack) + sendBack = gossip.getSendBack, + toSystemUid, + fromSystemUid) } private def deltaPropagationToProto(deltaPropagation: DeltaPropagation): dm.DeltaPropagation = { - val b = dm.DeltaPropagation.newBuilder().setFromNode(uniqueAddressToProto(deltaPropagation.fromNode)) + val b = dm.DeltaPropagation.newBuilder().setFromNode(uniqueAddressToProto(deltaPropagation._fromNode)) if (deltaPropagation.reply) b.setReply(deltaPropagation.reply) deltaPropagation.deltas.foreach { @@ -513,18 +525,27 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) } private def writeToProto(write: Write): dm.Write = - dm.Write.newBuilder().setKey(write.key).setEnvelope(dataEnvelopeToProto(write.envelope)).build() + dm.Write + .newBuilder() + .setKey(write.key) + .setEnvelope(dataEnvelopeToProto(write.envelope)) + .setFromNode(uniqueAddressToProto(write.fromNode.get)) + .build() private def writeFromBinary(bytes: Array[Byte]): Write = { val write = dm.Write.parseFrom(bytes) - Write(write.getKey, dataEnvelopeFromProto(write.getEnvelope)) + val fromNode = if (write.hasFromNode) Some(uniqueAddressFromProto(write.getFromNode)) else None + Write(write.getKey, dataEnvelopeFromProto(write.getEnvelope), fromNode) } private def readToProto(read: Read): dm.Read = - dm.Read.newBuilder().setKey(read.key).build() + dm.Read.newBuilder().setKey(read.key).setFromNode(uniqueAddressToProto(read.fromNode.get)).build() - private def readFromBinary(bytes: Array[Byte]): Read = - Read(dm.Read.parseFrom(bytes).getKey) + private def readFromBinary(bytes: Array[Byte]): Read = { + val read = dm.Read.parseFrom(bytes) + val fromNode = if (read.hasFromNode) Some(uniqueAddressFromProto(read.getFromNode)) else None + Read(read.getKey, fromNode) + } private def readResultToProto(readResult: ReadResult): dm.ReadResult = { val b = dm.ReadResult.newBuilder() diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala index ab4f6da0ed..7ca1648efe 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurablePruningSpec.scala @@ -181,6 +181,12 @@ class DurablePruningSpec extends MultiNodeSpec(DurablePruningSpec) with STMultiN values should ===(Set(10)) } + // all must at least have seen it as joining + awaitAssert({ + cluster3.state.members.size should ===(4) + cluster3.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up)) + }, 10.seconds) + // after merging with others replicator3 ! Get(KeyA, ReadAll(remainingOrDefault)) val counter5 = expectMsgType[GetSuccess[GCounter]].dataValue diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala index 0c4ff9e8b3..851c432ed4 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorChaosSpec.scala @@ -105,6 +105,7 @@ class ReplicatorChaosSpec extends MultiNodeSpec(ReplicatorChaosSpec) with STMult expectMsg(ReplicaCount(5)) } } + enterBarrier("all-joined") runOn(first) { for (_ <- 0 until 5) { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala index dfd3e3d88b..9876e464a3 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala @@ -16,7 +16,7 @@ import org.scalatest.Matchers import org.scalatest.WordSpec object DeltaPropagationSelectorSpec { - class TestSelector(val selfUniqueAddress: UniqueAddress, override val allNodes: Vector[Address]) + class TestSelector(val selfUniqueAddress: UniqueAddress, override val allNodes: Vector[UniqueAddress]) extends DeltaPropagationSelector { override val gossipIntervalDivisor = 5 override def createDeltaPropagation(deltas: Map[KeyId, (ReplicatedData, Long, Long)]): DeltaPropagation = @@ -33,14 +33,14 @@ object DeltaPropagationSelectorSpec { class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheckedTripleEquals { import DeltaPropagationSelectorSpec._ - val selfUniqueAddress = UniqueAddress(Address("akka", "Sys", "localhost", 4999), 1L) - val nodes = (2500 until 2600).map(n => Address("akka", "Sys", "localhost", n)).toVector + val selfUniqueAddress = UniqueAddress(Address("akka", "Sys", "localhost", 4999), 17L) + val nodes = (2500 until 2600).map(n => UniqueAddress(Address("akka", "Sys", "localhost", n), 17L)).toVector "DeltaPropagationSelector" must { "collect none when no nodes" in { val selector = new TestSelector(selfUniqueAddress, Vector.empty) selector.update("A", deltaA) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(false) } @@ -56,9 +56,9 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) -> expected)) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(false) selector.hasDeltaEntries("B") should ===(false) @@ -72,13 +72,13 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) -> expected)) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) + selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) → expected)) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(true) selector.hasDeltaEntries("B") should ===(true) selector.collectPropagations() should ===(Map(nodes(2) -> expected)) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(false) selector.hasDeltaEntries("B") should ===(false) @@ -92,8 +92,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) -> expected1)) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) + selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) → expected1)) // new update before previous was propagated to all nodes selector.update("C", deltaC) val expected2 = DeltaPropagation( @@ -103,14 +103,15 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck "A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L), "C" -> Delta(DataEnvelope(deltaC), 1L, 1L))) - val expected3 = DeltaPropagation(selfUniqueAddress, false, Map("C" -> Delta(DataEnvelope(deltaC), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) -> expected3)) + val expected3 = + DeltaPropagation(selfUniqueAddress, false, Map("C" -> Delta(DataEnvelope(deltaC), 1L, 1L))) + selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) → expected3)) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(false) selector.hasDeltaEntries("B") should ===(false) selector.hasDeltaEntries("C") should ===(true) selector.collectPropagations() should ===(Map(nodes(1) -> expected3)) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) selector.cleanupDeltaEntries() selector.hasDeltaEntries("C") should ===(false) } @@ -129,9 +130,10 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck selector.collectPropagations() should ===(Map(nodes(0) -> expected1)) selector.update("A", delta3) selector.currentVersion("A") should ===(3L) - val expected2 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L))) + val expected2 = + DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) -> expected2)) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) } "merge deltas" in { @@ -142,7 +144,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck override def nodesSliceSize(allNodesSize: Int): Int = 1 } selector.update("A", delta1) - val expected1 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta1), 1L, 1L))) + val expected1 = + DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta1), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) -> expected1)) selector.update("A", delta2) @@ -161,10 +164,11 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta2.merge(delta3)), 2L, 3L))) selector.collectPropagations() should ===(Map(nodes(0) -> expected4)) - val expected5 = DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L))) + val expected5 = + DeltaPropagation(selfUniqueAddress, false, Map("A" -> Delta(DataEnvelope(delta3), 3L, 3L))) selector.collectPropagations() should ===(Map(nodes(1) -> expected5)) - selector.collectPropagations() should ===(Map.empty[Address, DeltaPropagation]) + selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) } "discard too large deltas" in { diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 2a4783f78b..4a75e3b1fb 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -5,6 +5,7 @@ package akka.cluster.ddata import scala.concurrent.duration._ + import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSelection @@ -15,9 +16,11 @@ import akka.testkit._ import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.Replicator._ import akka.remote.RARP - import scala.concurrent.Future +import akka.cluster.Cluster +import akka.cluster.UniqueAddress + object WriteAggregatorSpec { val KeyA = GSetKey[String]("A") @@ -26,41 +29,76 @@ object WriteAggregatorSpec { def writeAggregatorProps( data: GSet[String], consistency: Replicator.WriteConsistency, - probes: Map[Address, ActorRef], - nodes: Set[Address], - unreachable: Set[Address], + probes: Map[UniqueAddress, ActorRef], + selfUniqueAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean): Props = - Props(new TestWriteAggregator(KeyA, data, None, consistency, probes, nodes, unreachable, replyTo, durable)) + Props( + new TestWriteAggregator( + KeyA, + data, + None, + consistency, + probes, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable)) def writeAggregatorPropsWithDelta( data: ORSet[String], delta: Delta, consistency: Replicator.WriteConsistency, - probes: Map[Address, ActorRef], - nodes: Set[Address], - unreachable: Set[Address], + probes: Map[UniqueAddress, ActorRef], + selfUniqueAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean): Props = - Props(new TestWriteAggregator(KeyB, data, Some(delta), consistency, probes, nodes, unreachable, replyTo, durable)) + Props( + new TestWriteAggregator( + KeyB, + data, + Some(delta), + consistency, + probes, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable)) class TestWriteAggregator( key: Key.KeyR, data: ReplicatedData, delta: Option[Delta], consistency: Replicator.WriteConsistency, - probes: Map[Address, ActorRef], - nodes: Set[Address], - unreachable: Set[Address], + probes: Map[UniqueAddress, ActorRef], + selfUniqueAddress: UniqueAddress, + nodes: Set[UniqueAddress], + unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean) - extends WriteAggregator(key, DataEnvelope(data), delta, consistency, None, nodes, unreachable, replyTo, durable) { + extends WriteAggregator( + key, + DataEnvelope(data), + delta, + consistency, + None, + selfUniqueAddress, + nodes, + unreachable, + replyTo, + durable) { - override def replica(address: Address): ActorSelection = + override def replica(address: UniqueAddress): ActorSelection = context.actorSelection(probes(address).path) override def senderAddress(): Address = - probes.find { case (_, r) => r == sender() }.get._1 + probes.find { case (_, r) => r == sender() }.get._1.address } def writeAckAdapterProps(replica: ActorRef): Props = @@ -105,10 +143,10 @@ class WriteAggregatorSpec extends AkkaSpec(s""" if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp" - val nodeA = Address(protocol, "Sys", "a", 2552) - val nodeB = nodeA.copy(host = Some("b")) - val nodeC = nodeA.copy(host = Some("c")) - val nodeD = nodeA.copy(host = Some("d")) + val nodeA = UniqueAddress(Address(protocol, "Sys", "a", 2552), 17L) + val nodeB = UniqueAddress(Address(protocol, "Sys", "b", 2552), 17L) + val nodeC = UniqueAddress(Address(protocol, "Sys", "c", 2552), 17L) + val nodeD = UniqueAddress(Address(protocol, "Sys", "d", 2552), 17L) // 4 replicas + the local => 5 val nodes = Set(nodeA, nodeB, nodeC, nodeD) @@ -118,13 +156,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val writeMajority = WriteMajority(timeout) val writeAll = WriteAll(timeout) - def probes(probe: ActorRef): Map[Address, ActorRef] = + val selfUniqueAddress: UniqueAddress = Cluster(system).selfUniqueAddress + + def probes(probe: ActorRef): Map[UniqueAddress, ActorRef] = nodes.toSeq.map(_ -> system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap /** * Create a tuple for each node with the WriteAckAdapter and the TestProbe */ - def probes(): Map[Address, TestMock] = { + def probes(): Map[UniqueAddress, TestMock] = { nodes.toSeq.map(_ -> TestMock()).toMap } @@ -132,8 +172,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "send to at least N/2+1 replicas when WriteMajority" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeMajority, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = false)) probe.expectMsgType[Write] probe.lastSender ! WriteAck @@ -147,8 +194,16 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "send to more when no immediate reply" in { val testProbes = probes() val testProbeRefs = testProbes.map { case (a, tm) => a -> tm.writeAckAdapter } - val aggr = system.actorOf(WriteAggregatorSpec - .writeAggregatorProps(data, writeMajority, testProbeRefs, nodes, Set(nodeC, nodeD), testActor, durable = false)) + val aggr = system.actorOf( + WriteAggregatorSpec.writeAggregatorProps( + data, + writeMajority, + testProbeRefs, + selfUniqueAddress, + nodes, + Set(nodeC, nodeD), + testActor, + durable = false)) testProbes(nodeA).expectMsgType[Write] // no reply @@ -173,8 +228,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeMajority, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = false)) probe.expectMsgType[Write] // no reply @@ -221,6 +283,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" delta, writeMajority, probes(probe.ref), + selfUniqueAddress, nodes, Set.empty, testActor, @@ -244,6 +307,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" delta, writeAll, testProbeRefs, + selfUniqueAddress, nodes, Set.empty, testActor, @@ -279,6 +343,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" delta, writeAll, probes(probe.ref), + selfUniqueAddress, nodes, Set.empty, testActor, @@ -311,8 +376,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "not reply before local confirmation" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeThree, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteAck @@ -331,8 +403,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "tolerate WriteNack if enough WriteAck" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeThree, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = true)) aggr ! UpdateSuccess(WriteAggregatorSpec.KeyA, None) // the local write probe.expectMsgType[Write] @@ -350,8 +429,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "reply with StoreFailure when too many nacks" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeMajority, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteNack @@ -371,8 +457,15 @@ class WriteAggregatorSpec extends AkkaSpec(s""" "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf( - WriteAggregatorSpec - .writeAggregatorProps(data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) + WriteAggregatorSpec.writeAggregatorProps( + data, + writeMajority, + probes(probe.ref), + selfUniqueAddress, + nodes, + Set.empty, + testActor, + durable = true)) probe.expectMsgType[Write] // no reply diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index df0089fdb7..86675f9f35 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -103,17 +103,26 @@ class ReplicatorMessageSerializerSpec pruning = Map( address1 -> PruningPerformed(System.currentTimeMillis()), address3 -> PruningInitialized(address2, Set(address1.address))))) - checkSerialization(Write("A", DataEnvelope(data1))) + checkSerialization(Write("A", DataEnvelope(data1), Some(address1))) checkSerialization(WriteAck) checkSerialization(WriteNack) checkSerialization(DeltaNack) - checkSerialization(Read("A")) + checkSerialization(Read("A", Some(address1))) checkSerialization(ReadResult(Some(DataEnvelope(data1)))) checkSerialization(ReadResult(None)) checkSerialization( - Status(Map("A" -> ByteString.fromString("a"), "B" -> ByteString.fromString("b")), chunk = 3, totChunks = 10)) + Status( + Map("A" -> ByteString.fromString("a"), "B" → ByteString.fromString("b")), + chunk = 3, + totChunks = 10, + Some(17), + Some(19))) checkSerialization( - Gossip(Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + "c")), sendBack = true)) + Gossip( + Map("A" -> DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), + sendBack = true, + Some(17), + Some(19))) checkSerialization( DeltaPropagation( address1, @@ -153,10 +162,10 @@ class ReplicatorMessageSerializerSpec "get added element" in { val cache = new SmallCache[Read, String](2, 5.seconds, _ => null) - val a = Read("a") + val a = Read("a", Some(address1)) cache.add(a, "A") cache.get(a) should be("A") - val b = Read("b") + val b = Read("b", Some(address1)) cache.add(b, "B") cache.get(a) should be("A") cache.get(b) should be("B") @@ -164,20 +173,20 @@ class ReplicatorMessageSerializerSpec "return null for non-existing elements" in { val cache = new SmallCache[Read, String](4, 5.seconds, _ => null) - val a = Read("a") + val a = Read("a", Some(address1)) cache.get(a) should be(null) cache.add(a, "A") - val b = Read("b") + val b = Read("b", Some(address1)) cache.get(b) should be(null) } "hold latest added elements" in { val cache = new SmallCache[Read, String](4, 5.seconds, _ => null) - val a = Read("a") - val b = Read("b") - val c = Read("c") - val d = Read("d") - val e = Read("e") + val a = Read("a", Some(address1)) + val b = Read("b", Some(address1)) + val c = Read("c", Some(address1)) + val d = Read("d", Some(address1)) + val e = Read("e", Some(address1)) cache.add(a, "A") cache.get(a) should be("A") cache.add(b, "B") @@ -204,7 +213,7 @@ class ReplicatorMessageSerializerSpec "handle Int wrap around" ignore { // ignored because it takes 20 seconds (but it works) val cache = new SmallCache[Read, String](2, 5.seconds, _ => null) - val a = Read("a") + val a = Read("a", Some(address1)) val x = a -> "A" var n = 0 while (n <= Int.MaxValue - 3) { @@ -214,8 +223,8 @@ class ReplicatorMessageSerializerSpec cache.get(a) should be("A") - val b = Read("b") - val c = Read("c") + val b = Read("b", Some(address1)) + val c = Read("c", Some(address1)) cache.add(b, "B") cache.get(a) should be("A") cache.get(b) should be("B") @@ -241,7 +250,7 @@ class ReplicatorMessageSerializerSpec } val cache = new SmallCache[Read, AnyRef](4, 5.seconds, a => createValue(a)) - val a = Read("a") + val a = Read("a", Some(address1)) val v1 = cache.getOrAdd(a) v1.toString should be("v1") (cache.getOrAdd(a) should be).theSameInstanceAs(v1) @@ -249,8 +258,8 @@ class ReplicatorMessageSerializerSpec "evict cache after time-to-live" in { val cache = new SmallCache[Read, AnyRef](4, 10.millis, _ => null) - val b = Read("b") - val c = Read("c") + val b = Read("b", Some(address1)) + val c = Read("c", Some(address1)) cache.add(b, "B") cache.add(c, "C") @@ -262,8 +271,8 @@ class ReplicatorMessageSerializerSpec "not evict cache before time-to-live" in { val cache = new SmallCache[Read, AnyRef](4, 5.seconds, _ => null) - val b = Read("b") - val c = Read("c") + val b = Read("b", Some(address1)) + val c = Read("c", Some(address1)) cache.add(b, "B") cache.add(c, "C") cache.evict()