Use multiple ddata keys for the typed receptionist service registry #23704

This commit is contained in:
Johan Andrén 2018-07-11 11:36:00 +02:00 committed by GitHub
parent 90d589ec43
commit 26d251c917
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 235 additions and 100 deletions

View file

@ -27,6 +27,7 @@ private[akka] abstract class AbstractServiceKey {
* *
* Internal API * Internal API
*/ */
@InternalApi
final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] { final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] {
override def toString: String = s"ServiceKey[$typeName]($id)" override def toString: String = s"ServiceKey[$typeName]($id)"
} }

View file

@ -13,9 +13,16 @@ akka.cluster.typed.receptionist {
# Period task to remove actor references that are hosted by removed nodes, # Period task to remove actor references that are hosted by removed nodes,
# in case of abrupt termination. # in case of abrupt termination.
pruning-interval = 3 s pruning-interval = 3 s
# Shard the services over this many Distributed Data keys, with large amounts of different
# service keys storing all of them in the same Distributed Data entry would lead to large updates
# etc. instead the keys are sharded across this number of keys. This must be the same on all nodes
# in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again)
distributed-key-count = 5
} }
akka.actor { akka {
actor {
serialization-identifiers { serialization-identifiers {
"akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28
} }
@ -26,3 +33,7 @@ akka.actor {
"akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster
} }
} }
cluster.configuration-compatibility-check.checkers {
receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker"
}
}

View file

@ -25,9 +25,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]] type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
type DDataKey = ORMultiMapKey[ServiceKey[_], Entry]
private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], Entry]("ReceptionistKey") final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry]
private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry]
// values contain system uid to make it possible to discern actors at the same // values contain system uid to make it possible to discern actors at the same
// path in different incarnations of a cluster node // path in different incarnations of a cluster node
@ -38,57 +38,17 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
override def toString = ref.path.toString + "#" + ref.path.uid override def toString = ref.path.toString + "#" + ref.path.uid
} }
final case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal { private sealed trait InternalCommand extends Command
private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
// let's hide all the ugly casts we can in here private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand
def getActorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
getEntriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]]) private final case class ChangeFromReplicator(
key: DDataKey,
def getEntriesFor(key: AbstractServiceKey): Set[Entry] = value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand
map.getOrElse(key.asServiceKey, Set.empty[Entry]) private case object RemoveTick extends InternalCommand
def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(map.addBinding(key, value))
def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(map.removeBinding(key, value))
def removeAll(removals: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = {
removals.foldLeft(this) {
case (acc, (key, entries))
entries.foldLeft(acc) {
case (innerAcc, entry)
innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry)
}
}
}
def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = map
}
object ServiceRegistry {
final val Empty = ServiceRegistry(EmptyORMultiMap)
def collectChangedKeys(previousState: ServiceRegistry, newState: ServiceRegistry): Set[AbstractServiceKey] = {
val allKeys = previousState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet
allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key)
val oldValues = previousState.getEntriesFor(key)
val newValues = newState.getEntriesFor(key)
if (oldValues != newValues) acc + key
else acc
}
}
}
sealed trait InternalCommand
final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand
final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand
case object RemoveTick extends InternalCommand
// captures setup/dependencies so we can avoid doing it over and over again // captures setup/dependencies so we can avoid doing it over and over again
class Setup(ctx: ActorContext[Any]) { final class Setup(ctx: ActorContext[Command]) {
val untypedSystem = ctx.system.toUntyped val untypedSystem = ctx.system.toUntyped
val settings = ClusterReceptionistSettings(ctx.system) val settings = ClusterReceptionistSettings(ctx.system)
val replicator = DistributedData(untypedSystem).replicator val replicator = DistributedData(untypedSystem).replicator
@ -97,16 +57,23 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress
} }
override def behavior: Behavior[Command] = Behaviors.setup[Any] { ctx override def behavior: Behavior[Command] = Behaviors.setup { ctx
val setup = new Setup(ctx) val setup = new Setup(ctx)
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount)
// subscribe to changes from other nodes // subscribe to changes from other nodes
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.messageAdapter[Replicator.ReplicatorMessage] { ctx.messageAdapter[Replicator.ReplicatorMessage] {
case changed @ Replicator.Changed(ReceptionistKey) ChangeFromReplicator(changed.get(ReceptionistKey)) case changed: Replicator.Changed[_] @unchecked
ChangeFromReplicator(
changed.key.asInstanceOf[DDataKey],
changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]])
} }
setup.replicator ! Replicator.Subscribe(ReceptionistKey, replicatorMessageAdapter.toUntyped)
registry.allDdataKeys.foreach(key
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped)
)
// remove entries when members are removed // remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[MemberRemoved] = val clusterEventMessageAdapter: ActorRef[MemberRemoved] =
@ -120,25 +87,25 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
behavior( behavior(
setup, setup,
ServiceRegistry.Empty, registry,
TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV]
) )
}.narrow[Command] }
/** /**
* @param state The last seen state from the replicator - only updated when we get an update from th replicator * @param registry The last seen state from the replicator - only updated when we get an update from th replicator
* @param subscriptions Locally subscriptions, not replicated * @param subscriptions Locally subscriptions, not replicated
*/ */
def behavior( def behavior(
setup: Setup, setup: Setup,
state: ServiceRegistry, registry: ShardedServiceRegistry,
subscriptions: SubscriptionRegistry): Behavior[Any] = subscriptions: SubscriptionRegistry): Behavior[Command] =
Behaviors.setup[Any] { ctx Behaviors.setup { ctx
import setup._ import setup._
// Helper to create new behavior // Helper to create new behavior
def next( def next(
newState: ServiceRegistry = state, newState: ShardedServiceRegistry = registry,
newSubscriptions: SubscriptionRegistry = subscriptions) = newSubscriptions: SubscriptionRegistry = subscriptions) =
behavior(setup, newState, newSubscriptions) behavior(setup, newState, newSubscriptions)
@ -146,7 +113,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
* Hack to allow multiple termination notifications per target * Hack to allow multiple termination notifications per target
* FIXME: replace by simple map in our state * FIXME: replace by simple map in our state
*/ */
def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit = def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit =
ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx
innerCtx.watch(target) innerCtx.watch(target)
Behaviors.receive[Nothing]((_, _) Behaviors.same) Behaviors.receive[Nothing]((_, _) Behaviors.same)
@ -158,16 +125,16 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}) })
def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = {
val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) val msg = ReceptionistMessages.Listing(key.asServiceKey, state.actorRefsFor(key))
subscriptions.get(key).foreach(_ ! msg) subscriptions.get(key).foreach(_ ! msg)
} }
def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Any] = { def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Command] = {
// ok to update from several nodes but more efficient to try to do it from one node // ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress)) def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress))
val removals = { val removals = {
state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries)) case (acc, (key, entries))
val removedEntries = entries.filter(isOnRemovedNode) val removedEntries = entries.filter(isOnRemovedNode)
if (removedEntries.isEmpty) acc // no change if (removedEntries.isEmpty) acc // no change
@ -184,8 +151,14 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case (key, entries) key.asServiceKey.id -> entries.mkString("[", ", ", "]") case (key, entries) key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}.mkString(",")) }.mkString(","))
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry // shard changes over the ddata keys they belong to
ServiceRegistry(registry).removeAll(removals).toORMultiMap val removalsPerDdataKey = registry.entriesPerDdataKey(removals)
removalsPerDdataKey.foreach {
case (ddataKey, removalForKey)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
}
} }
} }
Behaviors.same Behaviors.same
@ -193,7 +166,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
} else Behaviors.same } else Behaviors.same
} }
def onCommand(cmd: Command): Behavior[Any] = cmd match { def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo)
val entry = Entry(serviceInstance, setup.selfSystemUid) val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Actor was registered: [{}] [{}]", key, entry) ctx.log.debug("Actor was registered: [{}] [{}]", key, entry)
@ -202,25 +175,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case Some(replyTo) replyTo ! ReceptionistMessages.Registered(key, serviceInstance) case Some(replyTo) replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None case None
} }
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry val ddataKey = registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
} }
Behaviors.same Behaviors.same
case ReceptionistMessages.Find(key, replyTo) case ReceptionistMessages.Find(key, replyTo)
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key))
Behaviors.same Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) case ReceptionistMessages.Subscribe(key, subscriber)
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
// immediately reply with initial listings to the new subscriber // immediately reply with initial listings to the new subscriber
subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) subscriber ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key))
next(newSubscriptions = subscriptions.inserted(key)(subscriber)) next(newSubscriptions = subscriptions.inserted(key)(subscriber))
} }
def onInternalCommand(cmd: InternalCommand): Behavior[Any] = cmd match { def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match {
case SubscriberTerminated(key, subscriber) case SubscriberTerminated(key, subscriber)
next(newSubscriptions = subscriptions.removed(key)(subscriber)) next(newSubscriptions = subscriptions.removed(key)(subscriber))
@ -228,26 +202,28 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case RegisteredActorTerminated(key, serviceInstance) case RegisteredActorTerminated(key, serviceInstance)
val entry = Entry(serviceInstance, setup.selfSystemUid) val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry) ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry)
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry val ddataKey = registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
} }
Behaviors.same Behaviors.same
case ChangeFromReplicator(value) case ChangeFromReplicator(ddataKey, value)
// every change will come back this way - this is where the local notifications happens // every change will come back this way - this is where the local notifications happens
val newState = ServiceRegistry(value) val newState = ServiceRegistry(value)
val changedKeys = ServiceRegistry.collectChangedKeys(state, newState) val changedKeys = registry.collectChangedKeys(ddataKey, newState)
val newRegistry = registry.withServiceRegistry(ddataKey, newState)
if (changedKeys.nonEmpty) { if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) { if (ctx.log.isDebugEnabled) {
ctx.log.debug( ctx.log.debug(
"Change from replicator: [{}], changes: [{}]", "Change from replicator: [{}], changes: [{}]",
newState.map.entries, newState.entries.entries,
changedKeys.map(key changedKeys.map(key
key.asServiceKey.id -> newState.getEntriesFor(key).mkString("[", ", ", "]") key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]")
).mkString(", ")) ).mkString(", "))
} }
changedKeys.foreach(notifySubscribersFor(_, newState)) changedKeys.foreach(notifySubscribersFor(_, newState))
next(newState) next(newRegistry)
} else { } else {
Behaviors.same Behaviors.same
} }
@ -262,14 +238,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case RemoveTick case RemoveTick
// ok to update from several nodes but more efficient to try to do it from one node // ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress)) { if (cluster.state.leader.contains(cluster.selfAddress)) {
val allAddressesInState: Set[UniqueAddress] = state.map.entries.flatMap { val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress)
case (_, entries)
// don't care about local (empty host:port addresses)
entries.collect {
case entry if entry.ref.path.address.hasGlobalScope
entry.uniqueAddress(setup.selfUniqueAddress)
}
}(collection.breakOut)
val clusterAddresses = cluster.state.members.map(_.uniqueAddress) val clusterAddresses = cluster.state.members.map(_.uniqueAddress)
val notInCluster = allAddressesInState diff clusterAddresses val notInCluster = allAddressesInState diff clusterAddresses
@ -283,11 +252,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behavior.same Behavior.same
} }
Behaviors.receive[Any] { (ctx, msg) Behaviors.receive[Command] { (ctx, msg)
msg match { msg match {
// support two heterogenous types of messages without union types // support two heterogenous types of messages without union types
case cmd: Command onCommand(cmd)
case cmd: InternalCommand onInternalCommand(cmd) case cmd: InternalCommand onInternalCommand(cmd)
case cmd: Command onCommand(cmd)
case _ Behaviors.unhandled case _ Behaviors.unhandled
} }
} }

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed.internal.receptionist
import akka.annotation.InternalApi
import akka.cluster.{ ConfigValidation, JoinConfigCompatChecker }
import com.typesafe.config.Config
/**
* INTERNAL API
*
* Verifies that receptionist distributed-key-count are the same across cluster nodes
*/
@InternalApi
final class ClusterReceptionistConfigCompatChecker extends JoinConfigCompatChecker {
override def requiredKeys = "akka.cluster.typed.receptionist.distributed-key-count" :: Nil
override def check(toCheck: Config, actualConfig: Config): ConfigValidation =
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
}

View file

@ -35,7 +35,8 @@ private[akka] object ClusterReceptionistSettings {
} }
ClusterReceptionistSettings( ClusterReceptionistSettings(
writeConsistency, writeConsistency,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
config.getInt("distributed-key-count")
) )
} }
} }
@ -46,5 +47,6 @@ private[akka] object ClusterReceptionistSettings {
@InternalApi @InternalApi
private[akka] case class ClusterReceptionistSettings( private[akka] case class ClusterReceptionistSettings(
writeConsistency: WriteConsistency, writeConsistency: WriteConsistency,
pruningInterval: FiniteDuration) pruningInterval: FiniteDuration,
distributedKeyCount: Int)

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed.internal.receptionist
import akka.actor.typed.ActorRef
import akka.actor.typed.internal.receptionist.AbstractServiceKey
import akka.actor.typed.receptionist.ServiceKey
import akka.annotation.InternalApi
import akka.cluster.{ Cluster, UniqueAddress }
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey }
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry }
/**
* INTERNAL API
*/
@InternalApi private[akka] object ShardedServiceRegistry {
def apply(numberOfKeys: Int): ShardedServiceRegistry = {
val emptyRegistries = (0 until numberOfKeys).map { n
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
key -> new ServiceRegistry(EmptyORMultiMap)
}.toMap
new ShardedServiceRegistry(emptyRegistries)
}
}
/**
* Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not
* get too large ddata messages)
*
* INTERNAL API
*/
@InternalApi private[akka] final class ShardedServiceRegistry(serviceRegistries: Map[DDataKey, ServiceRegistry]) {
private val keys = serviceRegistries.keySet.toArray
def registryFor(ddataKey: DDataKey): ServiceRegistry = serviceRegistries(ddataKey)
def allDdataKeys: Iterable[DDataKey] = keys
def ddataKeyFor(serviceKey: ServiceKey[_]): DDataKey =
keys(math.abs(serviceKey.id.hashCode() % serviceRegistries.size))
def allServices: Iterator[(ServiceKey[_], Set[Entry])] =
serviceRegistries.valuesIterator.flatMap(_.entries.entries)
def allEntries: Iterator[Entry] = allServices.flatMap(_._2)
def actorRefsFor[T](key: ServiceKey[T]): Set[ActorRef[T]] = {
val dDataKey = ddataKeyFor(key)
serviceRegistries(dDataKey).actorRefsFor(key)
}
def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
new ShardedServiceRegistry(serviceRegistries + (dDataKey -> registry))
def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] =
allEntries.collect {
// we don't care about local (empty host:port addresses)
case entry if entry.ref.path.address.hasGlobalScope
entry.uniqueAddress(selfUniqueAddress)
}.toSet
def collectChangedKeys(dDataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = {
val previousRegistry = registryFor(dDataKey)
ServiceRegistry.collectChangedKeys(previousRegistry, newRegistry)
}
def entriesPerDdataKey(entries: Map[AbstractServiceKey, Set[Entry]]): Map[DDataKey, Map[AbstractServiceKey, Set[Entry]]] =
entries.foldLeft(Map.empty[DDataKey, Map[AbstractServiceKey, Set[Entry]]]) {
case (acc, (key, entries))
val ddataKey = ddataKeyFor(key.asServiceKey)
val updated = acc.getOrElse(ddataKey, Map.empty) + (key -> entries)
acc + (ddataKey -> updated)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class ServiceRegistry(entries: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal {
// let's hide all the ugly casts we can in here
def actorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] =
entriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]])
def entriesFor(key: AbstractServiceKey): Set[Entry] =
entries.getOrElse(key.asServiceKey, Set.empty[Entry])
def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(entries.addBinding(key, value))
def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(entries.removeBinding(key, value))
def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = {
entries.foldLeft(this) {
case (acc, (key, entries))
entries.foldLeft(acc) {
case (innerAcc, entry)
innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry)
}
}
}
def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = entries
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object ServiceRegistry {
final val Empty = ServiceRegistry(EmptyORMultiMap)
def collectChangedKeys(previousRegistry: ServiceRegistry, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = {
val allKeys = previousRegistry.toORMultiMap.entries.keySet ++ newRegistry.toORMultiMap.entries.keySet
allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key)
val oldValues = previousRegistry.entriesFor(key)
val newValues = newRegistry.entriesFor(key)
if (oldValues != newValues) acc + key
else acc
}
}
}