diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala index 33796ccc04..765c5c8a62 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala @@ -27,6 +27,7 @@ private[akka] abstract class AbstractServiceKey { * * Internal API */ +@InternalApi final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] { override def toString: String = s"ServiceKey[$typeName]($id)" } diff --git a/akka-cluster-typed/src/main/resources/reference.conf b/akka-cluster-typed/src/main/resources/reference.conf index 6fa18b7439..fcb900b678 100644 --- a/akka-cluster-typed/src/main/resources/reference.conf +++ b/akka-cluster-typed/src/main/resources/reference.conf @@ -13,16 +13,27 @@ akka.cluster.typed.receptionist { # Period task to remove actor references that are hosted by removed nodes, # in case of abrupt termination. pruning-interval = 3 s + + # Shard the services over this many Distributed Data keys, with large amounts of different + # service keys storing all of them in the same Distributed Data entry would lead to large updates + # etc. instead the keys are sharded across this number of keys. This must be the same on all nodes + # in a cluster, changing it requires a full cluster restart (stopping all nodes before starting them again) + distributed-key-count = 5 } -akka.actor { - serialization-identifiers { - "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 +akka { + actor { + serialization-identifiers { + "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 + } + serializers { + typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer" + } + serialization-bindings { + "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + } } - serializers { - typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer" - } - serialization-bindings { - "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + cluster.configuration-compatibility-check.checkers { + receptionist = "akka.cluster.typed.internal.receptionist.ClusterReceptionistConfigCompatChecker" } } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index f7d4adef85..a98ad677a1 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -25,9 +25,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]] type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] + type DDataKey = ORMultiMapKey[ServiceKey[_], Entry] - private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], Entry]("ReceptionistKey") - private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry] + final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry] // values contain system uid to make it possible to discern actors at the same // path in different incarnations of a cluster node @@ -38,57 +38,17 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { override def toString = ref.path.toString + "#" + ref.path.uid } - final case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal { - - // let's hide all the ugly casts we can in here - def getActorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = - getEntriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]]) - - def getEntriesFor(key: AbstractServiceKey): Set[Entry] = - map.getOrElse(key.asServiceKey, Set.empty[Entry]) - - def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - ServiceRegistry(map.addBinding(key, value)) - - def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - ServiceRegistry(map.removeBinding(key, value)) - - def removeAll(removals: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = { - removals.foldLeft(this) { - case (acc, (key, entries)) ⇒ - entries.foldLeft(acc) { - case (innerAcc, entry) ⇒ - innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry) - } - } - } - - def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = map - - } - object ServiceRegistry { - final val Empty = ServiceRegistry(EmptyORMultiMap) - - def collectChangedKeys(previousState: ServiceRegistry, newState: ServiceRegistry): Set[AbstractServiceKey] = { - val allKeys = previousState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet - allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key) ⇒ - val oldValues = previousState.getEntriesFor(key) - val newValues = newState.getEntriesFor(key) - if (oldValues != newValues) acc + key - else acc - } - } - } - - sealed trait InternalCommand - final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand - final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand - final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand - final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand - case object RemoveTick extends InternalCommand + private sealed trait InternalCommand extends Command + private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand + private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand + private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand + private final case class ChangeFromReplicator( + key: DDataKey, + value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand + private case object RemoveTick extends InternalCommand // captures setup/dependencies so we can avoid doing it over and over again - class Setup(ctx: ActorContext[Any]) { + final class Setup(ctx: ActorContext[Command]) { val untypedSystem = ctx.system.toUntyped val settings = ClusterReceptionistSettings(ctx.system) val replicator = DistributedData(untypedSystem).replicator @@ -97,16 +57,23 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress } - override def behavior: Behavior[Command] = Behaviors.setup[Any] { ctx ⇒ + override def behavior: Behavior[Command] = Behaviors.setup { ctx ⇒ val setup = new Setup(ctx) + val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount) // subscribe to changes from other nodes val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = ctx.messageAdapter[Replicator.ReplicatorMessage] { - case changed @ Replicator.Changed(ReceptionistKey) ⇒ ChangeFromReplicator(changed.get(ReceptionistKey)) + case changed: Replicator.Changed[_] @unchecked ⇒ + ChangeFromReplicator( + changed.key.asInstanceOf[DDataKey], + changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]]) } - setup.replicator ! Replicator.Subscribe(ReceptionistKey, replicatorMessageAdapter.toUntyped) + + registry.allDdataKeys.foreach(key ⇒ + setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped) + ) // remove entries when members are removed val clusterEventMessageAdapter: ActorRef[MemberRemoved] = @@ -120,33 +87,33 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { behavior( setup, - ServiceRegistry.Empty, + registry, TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] ) - }.narrow[Command] + } /** - * @param state The last seen state from the replicator - only updated when we get an update from th replicator + * @param registry The last seen state from the replicator - only updated when we get an update from th replicator * @param subscriptions Locally subscriptions, not replicated */ def behavior( setup: Setup, - state: ServiceRegistry, - subscriptions: SubscriptionRegistry): Behavior[Any] = - Behaviors.setup[Any] { ctx ⇒ + registry: ShardedServiceRegistry, + subscriptions: SubscriptionRegistry): Behavior[Command] = + Behaviors.setup { ctx ⇒ import setup._ // Helper to create new behavior def next( - newState: ServiceRegistry = state, - newSubscriptions: SubscriptionRegistry = subscriptions) = + newState: ShardedServiceRegistry = registry, + newSubscriptions: SubscriptionRegistry = subscriptions) = behavior(setup, newState, newSubscriptions) /* * Hack to allow multiple termination notifications per target * FIXME: replace by simple map in our state */ - def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit = + def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit = ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ⇒ innerCtx.watch(target) Behaviors.receive[Nothing]((_, _) ⇒ Behaviors.same) @@ -158,16 +125,16 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { }) def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { - val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) + val msg = ReceptionistMessages.Listing(key.asServiceKey, state.actorRefsFor(key)) subscriptions.get(key).foreach(_ ! msg) } - def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Any] = { + def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Command] = { // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress)) val removals = { - state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { + registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { case (acc, (key, entries)) ⇒ val removedEntries = entries.filter(isOnRemovedNode) if (removedEntries.isEmpty) acc // no change @@ -184,8 +151,14 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { case (key, entries) ⇒ key.asServiceKey.id -> entries.mkString("[", ", ", "]") }.mkString(",")) - replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ - ServiceRegistry(registry).removeAll(removals).toORMultiMap + // shard changes over the ddata keys they belong to + val removalsPerDdataKey = registry.entriesPerDdataKey(removals) + + removalsPerDdataKey.foreach { + case (ddataKey, removalForKey) ⇒ + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap + } } } Behaviors.same @@ -193,7 +166,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } else Behaviors.same } - def onCommand(cmd: Command): Behavior[Any] = cmd match { + def onCommand(cmd: Command): Behavior[Command] = cmd match { case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) ⇒ val entry = Entry(serviceInstance, setup.selfSystemUid) ctx.log.debug("Actor was registered: [{}] [{}]", key, entry) @@ -202,25 +175,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { case Some(replyTo) ⇒ replyTo ! ReceptionistMessages.Registered(key, serviceInstance) case None ⇒ } - replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + val ddataKey = registry.ddataKeyFor(key) + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ ServiceRegistry(registry).addBinding(key, entry).toORMultiMap } Behaviors.same case ReceptionistMessages.Find(key, replyTo) ⇒ - replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) + replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key)) Behaviors.same case ReceptionistMessages.Subscribe(key, subscriber) ⇒ watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) // immediately reply with initial listings to the new subscriber - subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) + subscriber ! ReceptionistMessages.Listing(key.asServiceKey, registry.actorRefsFor(key)) next(newSubscriptions = subscriptions.inserted(key)(subscriber)) } - def onInternalCommand(cmd: InternalCommand): Behavior[Any] = cmd match { + def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match { case SubscriberTerminated(key, subscriber) ⇒ next(newSubscriptions = subscriptions.removed(key)(subscriber)) @@ -228,26 +202,28 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { case RegisteredActorTerminated(key, serviceInstance) ⇒ val entry = Entry(serviceInstance, setup.selfSystemUid) ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry) - replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + val ddataKey = registry.ddataKeyFor(key) + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap } Behaviors.same - case ChangeFromReplicator(value) ⇒ + case ChangeFromReplicator(ddataKey, value) ⇒ // every change will come back this way - this is where the local notifications happens val newState = ServiceRegistry(value) - val changedKeys = ServiceRegistry.collectChangedKeys(state, newState) + val changedKeys = registry.collectChangedKeys(ddataKey, newState) + val newRegistry = registry.withServiceRegistry(ddataKey, newState) if (changedKeys.nonEmpty) { if (ctx.log.isDebugEnabled) { ctx.log.debug( "Change from replicator: [{}], changes: [{}]", - newState.map.entries, + newState.entries.entries, changedKeys.map(key ⇒ - key.asServiceKey.id -> newState.getEntriesFor(key).mkString("[", ", ", "]") + key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]") ).mkString(", ")) } changedKeys.foreach(notifySubscribersFor(_, newState)) - next(newState) + next(newRegistry) } else { Behaviors.same } @@ -262,14 +238,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { case RemoveTick ⇒ // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress)) { - val allAddressesInState: Set[UniqueAddress] = state.map.entries.flatMap { - case (_, entries) ⇒ - // don't care about local (empty host:port addresses) - entries.collect { - case entry if entry.ref.path.address.hasGlobalScope ⇒ - entry.uniqueAddress(setup.selfUniqueAddress) - } - }(collection.breakOut) + val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress) val clusterAddresses = cluster.state.members.map(_.uniqueAddress) val notInCluster = allAddressesInState diff clusterAddresses @@ -283,11 +252,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { Behavior.same } - Behaviors.receive[Any] { (ctx, msg) ⇒ + Behaviors.receive[Command] { (ctx, msg) ⇒ msg match { // support two heterogenous types of messages without union types - case cmd: Command ⇒ onCommand(cmd) case cmd: InternalCommand ⇒ onInternalCommand(cmd) + case cmd: Command ⇒ onCommand(cmd) case _ ⇒ Behaviors.unhandled } } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistConfigCompatChecker.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistConfigCompatChecker.scala new file mode 100644 index 0000000000..4693e54371 --- /dev/null +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistConfigCompatChecker.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster.typed.internal.receptionist + +import akka.annotation.InternalApi +import akka.cluster.{ ConfigValidation, JoinConfigCompatChecker } +import com.typesafe.config.Config + +/** + * INTERNAL API + * + * Verifies that receptionist distributed-key-count are the same across cluster nodes + */ +@InternalApi +final class ClusterReceptionistConfigCompatChecker extends JoinConfigCompatChecker { + + override def requiredKeys = "akka.cluster.typed.receptionist.distributed-key-count" :: Nil + + override def check(toCheck: Config, actualConfig: Config): ConfigValidation = + JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig) +} + diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala index 7d571ad30d..3a794cd48a 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala @@ -35,7 +35,8 @@ private[akka] object ClusterReceptionistSettings { } ClusterReceptionistSettings( writeConsistency, - pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis + pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis, + config.getInt("distributed-key-count") ) } } @@ -45,6 +46,7 @@ private[akka] object ClusterReceptionistSettings { */ @InternalApi private[akka] case class ClusterReceptionistSettings( - writeConsistency: WriteConsistency, - pruningInterval: FiniteDuration) + writeConsistency: WriteConsistency, + pruningInterval: FiniteDuration, + distributedKeyCount: Int) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala new file mode 100644 index 0000000000..b7114b71d4 --- /dev/null +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster.typed.internal.receptionist + +import akka.actor.typed.ActorRef +import akka.actor.typed.internal.receptionist.AbstractServiceKey +import akka.actor.typed.receptionist.ServiceKey +import akka.annotation.InternalApi +import akka.cluster.{ Cluster, UniqueAddress } +import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey } +import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry } + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ShardedServiceRegistry { + def apply(numberOfKeys: Int): ShardedServiceRegistry = { + val emptyRegistries = (0 until numberOfKeys).map { n ⇒ + val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n") + key -> new ServiceRegistry(EmptyORMultiMap) + }.toMap + new ShardedServiceRegistry(emptyRegistries) + } + +} + +/** + * Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not + * get too large ddata messages) + * + * INTERNAL API + */ +@InternalApi private[akka] final class ShardedServiceRegistry(serviceRegistries: Map[DDataKey, ServiceRegistry]) { + + private val keys = serviceRegistries.keySet.toArray + + def registryFor(ddataKey: DDataKey): ServiceRegistry = serviceRegistries(ddataKey) + + def allDdataKeys: Iterable[DDataKey] = keys + + def ddataKeyFor(serviceKey: ServiceKey[_]): DDataKey = + keys(math.abs(serviceKey.id.hashCode() % serviceRegistries.size)) + + def allServices: Iterator[(ServiceKey[_], Set[Entry])] = + serviceRegistries.valuesIterator.flatMap(_.entries.entries) + + def allEntries: Iterator[Entry] = allServices.flatMap(_._2) + + def actorRefsFor[T](key: ServiceKey[T]): Set[ActorRef[T]] = { + val dDataKey = ddataKeyFor(key) + serviceRegistries(dDataKey).actorRefsFor(key) + } + + def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = + new ShardedServiceRegistry(serviceRegistries + (dDataKey -> registry)) + + def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] = + allEntries.collect { + // we don't care about local (empty host:port addresses) + case entry if entry.ref.path.address.hasGlobalScope ⇒ + entry.uniqueAddress(selfUniqueAddress) + }.toSet + + def collectChangedKeys(dDataKey: DDataKey, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = { + val previousRegistry = registryFor(dDataKey) + ServiceRegistry.collectChangedKeys(previousRegistry, newRegistry) + } + + def entriesPerDdataKey(entries: Map[AbstractServiceKey, Set[Entry]]): Map[DDataKey, Map[AbstractServiceKey, Set[Entry]]] = + entries.foldLeft(Map.empty[DDataKey, Map[AbstractServiceKey, Set[Entry]]]) { + case (acc, (key, entries)) ⇒ + val ddataKey = ddataKeyFor(key.asServiceKey) + val updated = acc.getOrElse(ddataKey, Map.empty) + (key -> entries) + acc + (ddataKey -> updated) + } + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class ServiceRegistry(entries: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal { + + // let's hide all the ugly casts we can in here + def actorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = + entriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]]) + + def entriesFor(key: AbstractServiceKey): Set[Entry] = + entries.getOrElse(key.asServiceKey, Set.empty[Entry]) + + def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = + ServiceRegistry(entries.addBinding(key, value)) + + def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = + ServiceRegistry(entries.removeBinding(key, value)) + + def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = { + entries.foldLeft(this) { + case (acc, (key, entries)) ⇒ + entries.foldLeft(acc) { + case (innerAcc, entry) ⇒ + innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry) + } + } + } + + def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = entries + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ServiceRegistry { + final val Empty = ServiceRegistry(EmptyORMultiMap) + + def collectChangedKeys(previousRegistry: ServiceRegistry, newRegistry: ServiceRegistry): Set[AbstractServiceKey] = { + val allKeys = previousRegistry.toORMultiMap.entries.keySet ++ newRegistry.toORMultiMap.entries.keySet + allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key) ⇒ + val oldValues = previousRegistry.entriesFor(key) + val newValues = newRegistry.entriesFor(key) + if (oldValues != newValues) acc + key + else acc + } + } +}