From 574e690330444d4ae082baa5469b5de12ebdb09c Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 19 Jan 2018 12:27:22 +0000 Subject: [PATCH] Remove access of state from spawnAdapter in ClustserReceptionist (#24336) --- .../receptionist/ReceptionistImpl.scala | 20 ++++++---- .../receptionist/ClusterReceptionist.scala | 38 +++++++++++-------- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala index 6cfd928292..4bbf8b2e7f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala @@ -36,13 +36,18 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { /** * Interface to allow plugging of external service discovery infrastructure in to the existing receptionist API. */ - trait ExternalInterface { + trait ExternalInterface[State] { def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit + def onExternalUpdate(update: State) + + final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]], state: State) extends ReceptionistInternalCommand } - object LocalExternalInterface extends ExternalInterface { + + object LocalExternalInterface extends ExternalInterface[LocalServiceRegistry] { def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = () def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = () + def onExternalUpdate(update: LocalServiceRegistry): Unit = () } override def behavior: Behavior[Command] = localOnlyBehavior @@ -57,12 +62,11 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { sealed abstract class ReceptionistInternalCommand extends InternalCommand final case class RegisteredActorTerminated[T](key: ServiceKey[T], address: ActorRef[T]) extends ReceptionistInternalCommand final case class SubscriberTerminated[T](key: ServiceKey[T], address: ActorRef[Listing[T]]) extends ReceptionistInternalCommand - final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]]) extends ReceptionistInternalCommand type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[Listing[K#Protocol]] type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] - private[akka] def init(externalInterfaceFactory: ActorContext[AllCommands] ⇒ ExternalInterface): Behavior[Command] = + private[akka] def init[State](externalInterfaceFactory: ActorContext[AllCommands] ⇒ ExternalInterface[State]): Behavior[Command] = Behaviors.deferred[AllCommands] { ctx ⇒ val externalInterface = externalInterfaceFactory(ctx) behavior( @@ -71,10 +75,10 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { externalInterface) }.narrow[Command] - private def behavior( + private def behavior[State]( serviceRegistry: LocalServiceRegistry, subscriptions: SubscriptionRegistry, - externalInterface: ExternalInterface): Behavior[AllCommands] = { + externalInterface: ExternalInterface[State]): Behavior[AllCommands] = { // Helper to create new state def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) = @@ -126,7 +130,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { same - case RegistrationsChangedExternally(changes) ⇒ + case externalInterface.RegistrationsChangedExternally(changes, state) ⇒ ctx.system.log.debug("[{}] Registration changed: {}", ctx.self, changes) @@ -136,7 +140,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { case (reg, (key, values)) ⇒ reg.setAll(key)(values.asInstanceOf[Set[ActorRef[key.Protocol]]]) } - + externalInterface.onExternalUpdate(state) updateRegistry(changes.keySet, makeChanges) // overwrite all changed keys case RegisteredActorTerminated(key, serviceInstance) ⇒ diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index 5ae0a316d1..d2cf817511 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -63,7 +63,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { /** * Returns an ReceptionistImpl.ExternalInterface that synchronizes registered services with */ - def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface = { + def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface[ServiceRegistry] = { import akka.actor.typed.scaladsl.adapter._ val untypedSystem = ctx.system.toUntyped @@ -87,21 +87,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { .foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]])(changesForKey(_, _)) } - val adapter: ActorRef[Replicator.ReplicatorMessage] = - ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage) ⇒ - x match { - case changed @ Replicator.Changed(ReceptionistKey) ⇒ - val value = changed.get(ReceptionistKey) - val oldState = state - state = ServiceRegistry(value) // is that thread-safe? - val changes = diff(oldState, state) - RegistrationsChangedExternally(changes) - } - } - - replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped) - - new ExternalInterface { + val externalInterface = new ExternalInterface[ServiceRegistry] { private def updateRegistry(update: ServiceRegistry ⇒ ServiceRegistry): Unit = { state = update(state) replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ @@ -114,6 +100,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = updateRegistry(_.removeBinding(key, address)) + + def onExternalUpdate(update: ServiceRegistry): Unit = { + state = update + } } + + val adapter: ActorRef[Replicator.ReplicatorMessage] = + ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage) ⇒ + x match { + case changed @ Replicator.Changed(ReceptionistKey) ⇒ + val value = changed.get(ReceptionistKey) + val oldState = state + val newState = ServiceRegistry(value) + val changes = diff(oldState, newState) + externalInterface.RegistrationsChangedExternally(changes, newState) + } + } + + replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped) + + externalInterface } }