Remove access of state from spawnAdapter in ClustserReceptionist (#24336)

This commit is contained in:
Christopher Batey 2018-01-19 12:27:22 +00:00 committed by GitHub
parent 1d5b913f7f
commit 574e690330
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 24 deletions

View file

@ -36,13 +36,18 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
/**
* Interface to allow plugging of external service discovery infrastructure in to the existing receptionist API.
*/
trait ExternalInterface {
trait ExternalInterface[State] {
def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit
def onExternalUpdate(update: State)
final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]], state: State) extends ReceptionistInternalCommand
}
object LocalExternalInterface extends ExternalInterface {
object LocalExternalInterface extends ExternalInterface[LocalServiceRegistry] {
def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = ()
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = ()
def onExternalUpdate(update: LocalServiceRegistry): Unit = ()
}
override def behavior: Behavior[Command] = localOnlyBehavior
@ -57,12 +62,11 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
sealed abstract class ReceptionistInternalCommand extends InternalCommand
final case class RegisteredActorTerminated[T](key: ServiceKey[T], address: ActorRef[T]) extends ReceptionistInternalCommand
final case class SubscriberTerminated[T](key: ServiceKey[T], address: ActorRef[Listing[T]]) extends ReceptionistInternalCommand
final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]]) extends ReceptionistInternalCommand
type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[Listing[K#Protocol]]
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
private[akka] def init(externalInterfaceFactory: ActorContext[AllCommands] ExternalInterface): Behavior[Command] =
private[akka] def init[State](externalInterfaceFactory: ActorContext[AllCommands] ExternalInterface[State]): Behavior[Command] =
Behaviors.deferred[AllCommands] { ctx
val externalInterface = externalInterfaceFactory(ctx)
behavior(
@ -71,10 +75,10 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
externalInterface)
}.narrow[Command]
private def behavior(
private def behavior[State](
serviceRegistry: LocalServiceRegistry,
subscriptions: SubscriptionRegistry,
externalInterface: ExternalInterface): Behavior[AllCommands] = {
externalInterface: ExternalInterface[State]): Behavior[AllCommands] = {
// Helper to create new state
def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) =
@ -126,7 +130,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
same
case RegistrationsChangedExternally(changes)
case externalInterface.RegistrationsChangedExternally(changes, state)
ctx.system.log.debug("[{}] Registration changed: {}", ctx.self, changes)
@ -136,7 +140,7 @@ private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider {
case (reg, (key, values))
reg.setAll(key)(values.asInstanceOf[Set[ActorRef[key.Protocol]]])
}
externalInterface.onExternalUpdate(state)
updateRegistry(changes.keySet, makeChanges) // overwrite all changed keys
case RegisteredActorTerminated(key, serviceInstance)

View file

@ -63,7 +63,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
/**
* Returns an ReceptionistImpl.ExternalInterface that synchronizes registered services with
*/
def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface = {
def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface[ServiceRegistry] = {
import akka.actor.typed.scaladsl.adapter._
val untypedSystem = ctx.system.toUntyped
@ -87,21 +87,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
.foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]])(changesForKey(_, _))
}
val adapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage)
x match {
case changed @ Replicator.Changed(ReceptionistKey)
val value = changed.get(ReceptionistKey)
val oldState = state
state = ServiceRegistry(value) // is that thread-safe?
val changes = diff(oldState, state)
RegistrationsChangedExternally(changes)
}
}
replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped)
new ExternalInterface {
val externalInterface = new ExternalInterface[ServiceRegistry] {
private def updateRegistry(update: ServiceRegistry ServiceRegistry): Unit = {
state = update(state)
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry
@ -114,6 +100,26 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit =
updateRegistry(_.removeBinding(key, address))
def onExternalUpdate(update: ServiceRegistry): Unit = {
state = update
}
}
val adapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage)
x match {
case changed @ Replicator.Changed(ReceptionistKey)
val value = changed.get(ReceptionistKey)
val oldState = state
val newState = ServiceRegistry(value)
val changes = diff(oldState, newState)
externalInterface.RegistrationsChangedExternally(changes, newState)
}
}
replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped)
externalInterface
}
}