Typed receptionist deregistration (#28406)
* Co-authored-by: tipame <33191778+tipame@users.noreply.github.com>
This commit is contained in:
parent
6b73a2504f
commit
4749b11be8
22 changed files with 916 additions and 249 deletions
|
|
@ -4,9 +4,6 @@
|
|||
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>INFO</level>
|
||||
</filter>
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n</pattern>
|
||||
</encoder>
|
||||
|
|
|
|||
|
|
@ -41,72 +41,135 @@ object LocalReceptionistSpec {
|
|||
class LocalReceptionistSpec extends ScalaTestWithActorTestKit with WordSpecLike with LogCapturing {
|
||||
import LocalReceptionistSpec._
|
||||
|
||||
abstract class TestSetup {
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
}
|
||||
|
||||
"A local receptionist" must {
|
||||
|
||||
"unregister services when they terminate" in {
|
||||
new TestSetup {
|
||||
val regProbe = TestProbe[Any]("regProbe")
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
val regProbe = TestProbe[Any]("regProbe")
|
||||
|
||||
val serviceA = spawn(stoppableBehavior.narrow[ServiceA])
|
||||
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
|
||||
val serviceA = spawn(stoppableBehavior.narrow[ServiceA])
|
||||
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
|
||||
|
||||
val serviceB = spawn(stoppableBehavior.narrow[ServiceB])
|
||||
receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyB, serviceB))
|
||||
val serviceB = spawn(stoppableBehavior.narrow[ServiceB])
|
||||
receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyB, serviceB))
|
||||
|
||||
val serviceC = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
|
||||
receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceC))
|
||||
regProbe.expectMessage(Registered(ServiceKeyB, serviceC))
|
||||
val serviceC = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
|
||||
receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceC))
|
||||
regProbe.expectMessage(Registered(ServiceKeyB, serviceC))
|
||||
|
||||
receptionist ! Find(ServiceKeyA, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceC)))
|
||||
receptionist ! Find(ServiceKeyB, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB, serviceC)))
|
||||
|
||||
serviceC ! Stop
|
||||
|
||||
eventually {
|
||||
receptionist ! Find(ServiceKeyA, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceC)))
|
||||
regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
|
||||
receptionist ! Find(ServiceKeyB, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB, serviceC)))
|
||||
|
||||
serviceC ! Stop
|
||||
|
||||
eventually {
|
||||
receptionist ! Find(ServiceKeyA, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
|
||||
receptionist ! Find(ServiceKeyB, regProbe.ref)
|
||||
regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB)))
|
||||
}
|
||||
regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB)))
|
||||
}
|
||||
}
|
||||
|
||||
"unregister programatically" in {
|
||||
val subProbe = TestProbe[Any]()
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
val serviceA = TestProbe[ServiceA]()
|
||||
receptionist ! Register(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
|
||||
receptionist ! Deregister(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
}
|
||||
|
||||
"unregister per service key, not service actor" in {
|
||||
val subProbe = TestProbe[Any]()
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
|
||||
// subscribe to 2 keys
|
||||
receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
receptionist ! Subscribe(ServiceKeyB, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyB, Set.empty[ActorRef[ServiceB]]))
|
||||
|
||||
// register same service for both 2 keys
|
||||
val service = TestProbe[AnyRef]()
|
||||
receptionist ! Register(ServiceKeyA, service.ref)
|
||||
receptionist ! Register(ServiceKeyB, service.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set(service.ref.narrow[ServiceKeyA.Protocol])))
|
||||
subProbe.expectMessage(Listing(ServiceKeyB, Set(service.ref.narrow[ServiceKeyB.Protocol])))
|
||||
|
||||
// unregister one of the service keys for the service
|
||||
receptionist ! Deregister(ServiceKeyA, service.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
receptionist ! Find(ServiceKeyB, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyB, Set(service.ref.narrow[ServiceKeyB.Protocol])))
|
||||
}
|
||||
|
||||
"unregister and re-register same service actor" in {
|
||||
val subProbe = TestProbe[Any]()
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
|
||||
receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
|
||||
val serviceA = TestProbe[ServiceA]()
|
||||
receptionist ! Register(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
|
||||
|
||||
receptionist ! Deregister(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
|
||||
receptionist ! Register(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
|
||||
}
|
||||
|
||||
"support subscribing to service changes" in {
|
||||
new TestSetup {
|
||||
val regProbe = TestProbe[Registered]("regProbe")
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
val regProbe = TestProbe[Registered]("regProbe")
|
||||
|
||||
val aSubscriber = TestProbe[Listing]("aUser")
|
||||
receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
|
||||
val aSubscriber = TestProbe[Listing]("aUser")
|
||||
receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
|
||||
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
|
||||
val serviceA: ActorRef[ServiceA] = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
|
||||
val serviceA: ActorRef[ServiceA] = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
|
||||
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
|
||||
|
||||
val serviceA2: ActorRef[ServiceA] = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA2))
|
||||
val serviceA2: ActorRef[ServiceA] = spawn(stoppableBehavior)
|
||||
receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
|
||||
regProbe.expectMessage(Registered(ServiceKeyA, serviceA2))
|
||||
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
|
||||
|
||||
serviceA ! Stop
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA2)))
|
||||
serviceA2 ! Stop
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
}
|
||||
serviceA ! Stop
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA2)))
|
||||
serviceA2 ! Stop
|
||||
aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
}
|
||||
|
||||
"support subscribing to different services with the same subscriber" in {
|
||||
val subProbe = TestProbe[Any]()
|
||||
val receptionist = spawn(LocalReceptionist.behavior)
|
||||
receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
|
||||
receptionist ! Subscribe(ServiceKeyB, subProbe.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
|
||||
subProbe.expectMessage(Listing(ServiceKeyB, Set.empty[ActorRef[ServiceB]]))
|
||||
val serviceA = TestProbe[ServiceA]()
|
||||
receptionist ! Register(ServiceKeyA, serviceA.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
|
||||
val serviceB = TestProbe[ServiceB]()
|
||||
receptionist ! Register(ServiceKeyB, serviceB.ref)
|
||||
subProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB.ref)))
|
||||
}
|
||||
|
||||
"work with ask" in {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
# internal API and messages changed
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.key")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy$default$1")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy$default$2")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.this")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.apply")
|
||||
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.unapply")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.apply")
|
||||
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.unapply")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.key")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy$default$1")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy$default$2")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.this")
|
||||
|
|
@ -6,7 +6,6 @@ package akka.actor.typed.internal.receptionist
|
|||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Terminated
|
||||
import akka.actor.typed.receptionist.Receptionist._
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
|
|
@ -37,98 +36,206 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
|
|||
|
||||
override val name = "localReceptionist"
|
||||
|
||||
type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol]
|
||||
type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV]
|
||||
type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
|
||||
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
|
||||
private type Service[K <: AbstractServiceKey] = ActorRef[K#Protocol]
|
||||
private type Subscriber[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
|
||||
|
||||
sealed trait InternalCommand
|
||||
final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
|
||||
final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]])
|
||||
private sealed trait InternalCommand
|
||||
private final case class RegisteredActorTerminated[T](ref: ActorRef[T]) extends InternalCommand
|
||||
private final case class SubscriberTerminated[T](ref: ActorRef[ReceptionistMessages.Listing[T]])
|
||||
extends InternalCommand
|
||||
|
||||
private object State {
|
||||
def empty =
|
||||
State(
|
||||
TypedMultiMap.empty[AbstractServiceKey, Service],
|
||||
Map.empty,
|
||||
TypedMultiMap.empty[AbstractServiceKey, Subscriber],
|
||||
Map.empty)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param services current registered services per key
|
||||
* @param servicesPerActor current registered service keys per actor (needed for unregistration since an actor can implement several services)
|
||||
* @param subscriptions current subscriptions per service key
|
||||
* @param subscriptionsPerActor current subscriptions per subscriber (needed since a subscriber can subscribe to several keys) FIXME is it really needed?
|
||||
*/
|
||||
private final case class State(
|
||||
services: TypedMultiMap[AbstractServiceKey, Service],
|
||||
servicesPerActor: Map[ActorRef[_], Set[AbstractServiceKey]],
|
||||
subscriptions: TypedMultiMap[AbstractServiceKey, Subscriber],
|
||||
subscriptionsPerActor: Map[ActorRef[_], Set[AbstractServiceKey]]) {
|
||||
|
||||
def serviceInstanceAdded[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
|
||||
val newServices = services.inserted(key)(serviceInstance)
|
||||
val newServicePerActor =
|
||||
servicesPerActor.updated(
|
||||
serviceInstance,
|
||||
servicesPerActor.getOrElse(serviceInstance, Set.empty) + key.asServiceKey)
|
||||
copy(services = newServices, servicesPerActor = newServicePerActor)
|
||||
}
|
||||
|
||||
def serviceInstanceRemoved[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
|
||||
val newServices = services.removed(key)(serviceInstance)
|
||||
val newServicePerActor =
|
||||
servicesPerActor.get(serviceInstance) match {
|
||||
case Some(keys) =>
|
||||
val newKeys = keys - key.asServiceKey
|
||||
// only/last service this actor was registered for
|
||||
if (newKeys.isEmpty) {
|
||||
servicesPerActor - serviceInstance
|
||||
} else servicesPerActor.updated(serviceInstance, newKeys)
|
||||
case None =>
|
||||
// no services actually registered for actor
|
||||
servicesPerActor
|
||||
}
|
||||
copy(services = newServices, servicesPerActor = newServicePerActor)
|
||||
}
|
||||
|
||||
def serviceInstanceRemoved(serviceInstance: ActorRef[_]): State = {
|
||||
val keys = servicesPerActor.getOrElse(serviceInstance, Set.empty)
|
||||
val newServices =
|
||||
if (keys.isEmpty) services
|
||||
else
|
||||
keys.foldLeft(services)((acc, key) =>
|
||||
acc.removed(key.asServiceKey)(serviceInstance.asInstanceOf[Service[AbstractServiceKey]]))
|
||||
val newServicesPerActor = servicesPerActor - serviceInstance
|
||||
copy(services = newServices, servicesPerActor = newServicesPerActor)
|
||||
}
|
||||
|
||||
def subscriberAdded[Key <: AbstractServiceKey](key: Key)(subscriber: Subscriber[key.type]): State = {
|
||||
val newSubscriptions = subscriptions.inserted(key)(subscriber)
|
||||
val newSubscriptionsPerActor =
|
||||
subscriptionsPerActor.updated(
|
||||
subscriber,
|
||||
subscriptionsPerActor.getOrElse(subscriber, Set.empty) + key.asServiceKey)
|
||||
|
||||
copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
|
||||
}
|
||||
|
||||
def subscriptionRemoved[Key <: AbstractServiceKey](key: Key)(subscriber: Subscriber[key.type]): State = {
|
||||
val newSubscriptions = subscriptions.removed(key)(subscriber)
|
||||
val newSubscriptionsPerActor =
|
||||
subscriptionsPerActor.get(subscriber) match {
|
||||
case Some(keys) =>
|
||||
val newKeys = keys - key.asServiceKey
|
||||
if (newKeys.isEmpty) {
|
||||
subscriptionsPerActor - subscriber
|
||||
} else {
|
||||
subscriptionsPerActor.updated(subscriber, newKeys)
|
||||
}
|
||||
case None =>
|
||||
// no subscriptions actually exist for actor
|
||||
subscriptionsPerActor
|
||||
}
|
||||
copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
|
||||
}
|
||||
|
||||
def subscriberRemoved(subscriber: ActorRef[_]): State = {
|
||||
val keys = subscriptionsPerActor.getOrElse(subscriber, Set.empty)
|
||||
if (keys.isEmpty) this
|
||||
else {
|
||||
val newSubscriptions = keys.foldLeft(subscriptions) { (subscriptions, key) =>
|
||||
val serviceKey = key.asServiceKey
|
||||
subscriptions.removed(serviceKey)(subscriber.asInstanceOf[Subscriber[serviceKey.type]])
|
||||
}
|
||||
val newSubscriptionsPerActor = subscriptionsPerActor - subscriber
|
||||
copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def behavior: Behavior[Command] = Behaviors.setup { ctx =>
|
||||
ctx.setLoggerName(classOf[LocalReceptionist])
|
||||
behavior(TypedMultiMap.empty[AbstractServiceKey, KV], TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
|
||||
.narrow[Command]
|
||||
behavior(State.empty).narrow[Command]
|
||||
}
|
||||
|
||||
private def behavior(serviceRegistry: LocalServiceRegistry, subscriptions: SubscriptionRegistry): Behavior[Any] = {
|
||||
|
||||
// Helper to create new state
|
||||
def next(
|
||||
newRegistry: LocalServiceRegistry = serviceRegistry,
|
||||
newSubscriptions: SubscriptionRegistry = subscriptions) =
|
||||
behavior(newRegistry, newSubscriptions)
|
||||
|
||||
/*
|
||||
* Hack to allow multiple termination notifications per target
|
||||
* FIXME #26505: replace by simple map in our state
|
||||
*/
|
||||
def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit =
|
||||
ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx =>
|
||||
innerCtx.watch(target)
|
||||
Behaviors.receiveSignal[Nothing] {
|
||||
case (_, Terminated(`target`)) =>
|
||||
ctx.self ! msg
|
||||
Behaviors.stopped
|
||||
}
|
||||
})
|
||||
|
||||
private def behavior(state: State): Behavior[Any] = {
|
||||
// Helper that makes sure that subscribers are notified when an entry is changed
|
||||
def updateRegistry(
|
||||
changedKeysHint: Set[AbstractServiceKey],
|
||||
f: LocalServiceRegistry => LocalServiceRegistry): Behavior[Any] = {
|
||||
val newRegistry = f(serviceRegistry)
|
||||
def updateServices(changedKeysHint: Set[AbstractServiceKey], f: State => State): Behavior[Any] = {
|
||||
val newState = f(state)
|
||||
|
||||
def notifySubscribersFor[T](key: AbstractServiceKey): Unit = {
|
||||
val newListing = newRegistry.get(key)
|
||||
subscriptions
|
||||
.get(key)
|
||||
.foreach(
|
||||
_ ! ReceptionistMessages
|
||||
.Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true))
|
||||
val newListing = newState.services.get(key)
|
||||
val listing =
|
||||
ReceptionistMessages.Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true)
|
||||
newState.subscriptions.get(key).foreach(_ ! listing)
|
||||
}
|
||||
|
||||
changedKeysHint.foreach(notifySubscribersFor)
|
||||
next(newRegistry = newRegistry)
|
||||
behavior(newState)
|
||||
}
|
||||
|
||||
def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = {
|
||||
val listing = serviceRegistry.get(key)
|
||||
val listing = state.services.get(key)
|
||||
replyTo ! ReceptionistMessages.Listing(key, listing, listing, servicesWereAddedOrRemoved = true)
|
||||
}
|
||||
|
||||
def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match {
|
||||
|
||||
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
|
||||
ctx.log.debug2("Actor was registered: {} {}", key, serviceInstance)
|
||||
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
|
||||
if (!state.servicesPerActor.contains(serviceInstance))
|
||||
ctx.watchWith(serviceInstance, RegisteredActorTerminated(serviceInstance))
|
||||
maybeReplyTo match {
|
||||
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
|
||||
case None =>
|
||||
}
|
||||
updateRegistry(Set(key), _.inserted(key)(serviceInstance))
|
||||
updateServices(Set(key), _.serviceInstanceAdded(key)(serviceInstance))
|
||||
|
||||
case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
|
||||
val servicesForActor = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
|
||||
if (servicesForActor.isEmpty) {
|
||||
// actor deregistered but we saw a terminate message before we got the deregistration
|
||||
Behaviors.same
|
||||
} else {
|
||||
ctx.log.debug2("Actor was deregistered: {} {}", key, serviceInstance)
|
||||
if ((servicesForActor - key).isEmpty)
|
||||
ctx.unwatch(serviceInstance)
|
||||
|
||||
maybeReplyTo match {
|
||||
case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
|
||||
case None =>
|
||||
}
|
||||
|
||||
updateServices(Set(key), { state =>
|
||||
val newState = state.serviceInstanceRemoved(key)(serviceInstance)
|
||||
if (state.servicesPerActor.getOrElse(serviceInstance, Set.empty).isEmpty)
|
||||
ctx.unwatch(serviceInstance)
|
||||
newState
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
case ReceptionistMessages.Find(key, replyTo) =>
|
||||
replyWithListing(key, replyTo)
|
||||
Behaviors.same
|
||||
|
||||
case ReceptionistMessages.Subscribe(key, subscriber) =>
|
||||
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
|
||||
if (!state.subscriptionsPerActor.contains(subscriber))
|
||||
ctx.watchWith(subscriber, SubscriberTerminated(subscriber))
|
||||
|
||||
// immediately reply with initial listings to the new subscriber
|
||||
replyWithListing(key, subscriber)
|
||||
|
||||
next(newSubscriptions = subscriptions.inserted(key)(subscriber))
|
||||
behavior(state.subscriberAdded(key)(subscriber))
|
||||
}
|
||||
|
||||
def onInternal(ctx: ActorContext[Any], cmd: InternalCommand): Behavior[Any] = cmd match {
|
||||
case RegisteredActorTerminated(key, serviceInstance) =>
|
||||
ctx.log.debug2("Registered actor terminated: {} {}", key, serviceInstance)
|
||||
updateRegistry(Set(key), _.removed(key)(serviceInstance))
|
||||
|
||||
case SubscriberTerminated(key, subscriber) =>
|
||||
next(newSubscriptions = subscriptions.removed(key)(subscriber))
|
||||
case RegisteredActorTerminated(serviceInstance) =>
|
||||
val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
|
||||
if (keys.isEmpty) {
|
||||
// actor terminated but had deregistered all registrations before we could process the termination
|
||||
Behaviors.same
|
||||
} else {
|
||||
ctx.log.debug2("Registered actor terminated: [{}] {}", keys.mkString(","), serviceInstance)
|
||||
updateServices(keys, _.serviceInstanceRemoved(serviceInstance))
|
||||
}
|
||||
case SubscriberTerminated(subscriber) =>
|
||||
if (ctx.log.isDebugEnabled) {
|
||||
val keys = state.subscriptionsPerActor.getOrElse(subscriber, Set.empty)
|
||||
ctx.log.debug2("Subscribed actor terminated: [{}] {}", keys.mkString(","), subscriber)
|
||||
}
|
||||
behavior(state.subscriberRemoved(subscriber))
|
||||
}
|
||||
|
||||
Behaviors.receive[Any] { (ctx, msg) =>
|
||||
|
|
|
|||
|
|
@ -27,6 +27,12 @@ private[akka] object ReceptionistMessages {
|
|||
replyTo: Option[ActorRef[Receptionist.Registered]])
|
||||
extends Command
|
||||
|
||||
final case class Deregister[T] private[akka] (
|
||||
key: ServiceKey[T],
|
||||
serviceInstance: ActorRef[T],
|
||||
replyTo: Option[ActorRef[Receptionist.Deregistered]])
|
||||
extends Command
|
||||
|
||||
final case class Registered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T])
|
||||
extends Receptionist.Registered {
|
||||
def isForKey(key: ServiceKey[_]): Boolean = key == this.key
|
||||
|
|
@ -40,6 +46,19 @@ private[akka] object ReceptionistMessages {
|
|||
serviceInstance(key)
|
||||
}
|
||||
|
||||
final case class Deregistered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T])
|
||||
extends Receptionist.Deregistered {
|
||||
def isForKey(key: ServiceKey[_]): Boolean = key == this.key
|
||||
def serviceInstance[M](key: ServiceKey[M]): ActorRef[M] = {
|
||||
if (key != this.key)
|
||||
throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]")
|
||||
_serviceInstance.asInstanceOf[ActorRef[M]]
|
||||
}
|
||||
|
||||
def getServiceInstance[M](key: ServiceKey[M]): ActorRef[M] =
|
||||
serviceInstance(key)
|
||||
}
|
||||
|
||||
final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command
|
||||
|
||||
final case class Listing[T] private[akka] (
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ object Routers {
|
|||
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
|
||||
*
|
||||
* Note that there is a delay between a routee stopping and this being detected by the receptionist, and another
|
||||
* before the group detects this, therefore it is best to unregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete to minimize the risk of lost messages.
|
||||
* before the group detects this, therefore it is best to deregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete if you want to minimize the risk of lost messages.
|
||||
*/
|
||||
def group[T](key: ServiceKey[T]): GroupRouter[T] =
|
||||
new GroupRouterBuilder[T](key)
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
* by sending this command to the [[Receptionist.ref]].
|
||||
*
|
||||
* Multiple registrations can be made for the same key. De-registration is implied by
|
||||
* the end of the referenced Actor’s lifecycle.
|
||||
* the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
|
||||
*
|
||||
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor
|
||||
* if there is one.
|
||||
|
|
@ -135,7 +135,7 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
* by sending this command to the [[Receptionist.ref]].
|
||||
*
|
||||
* Multiple registrations can be made for the same key. De-registration is implied by
|
||||
* the end of the referenced Actor’s lifecycle.
|
||||
* the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
|
||||
*/
|
||||
def register[T](key: ServiceKey[T], service: ActorRef[T]): Command = Register(key, service)
|
||||
|
||||
|
|
@ -145,7 +145,7 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
* by sending this command to the [[Receptionist.ref]].
|
||||
*
|
||||
* Multiple registrations can be made for the same key. De-registration is implied by
|
||||
* the end of the referenced Actor’s lifecycle.
|
||||
* the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
|
||||
*
|
||||
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor.
|
||||
*/
|
||||
|
|
@ -200,10 +200,92 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
def registered[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Registered =
|
||||
Registered(key, serviceInstance)
|
||||
|
||||
/**
|
||||
* Remove association between the given [[akka.actor.typed.ActorRef]] and the given [[ServiceKey]].
|
||||
*
|
||||
* Deregistration can be acknowledged with the [[Deregistered]] message if the deregister message is created with a
|
||||
* `replyTo` actor.
|
||||
*
|
||||
* Note that getting the [[Deregistered]] confirmation does not mean all service key subscribers has seen the fact
|
||||
* that the actor has been deregistered yet (especially in a clustered context) so it will be possible that messages
|
||||
* sent to the actor in the role of service provider arrive even after getting the confirmation.
|
||||
*/
|
||||
object Deregister {
|
||||
|
||||
/**
|
||||
* Create a Deregister without Ack that the service was deregistered
|
||||
*/
|
||||
def apply[T](key: ServiceKey[T], service: ActorRef[T]): Command =
|
||||
new ReceptionistMessages.Deregister[T](key, service, None)
|
||||
|
||||
/**
|
||||
* Create a Deregister with an actor that will get an ack that the service was unregistered
|
||||
*/
|
||||
def apply[T](key: ServiceKey[T], service: ActorRef[T], replyTo: ActorRef[Deregistered]): Command =
|
||||
new ReceptionistMessages.Deregister[T](key, service, Some(replyTo))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: A Deregister message without Ack that the service was unregistered
|
||||
*/
|
||||
def deregister[T](key: ServiceKey[T], service: ActorRef[T]): Command = Deregister(key, service)
|
||||
|
||||
/**
|
||||
* Java API: A Deregister message with an actor that will get an ack that the service was unregistered
|
||||
*/
|
||||
def deregister[T](key: ServiceKey[T], service: ActorRef[T], replyTo: ActorRef[Deregistered]): Command =
|
||||
Deregister(key, service, replyTo)
|
||||
|
||||
/**
|
||||
* Confirmation that the given [[akka.actor.typed.ActorRef]] no more associated with the [[ServiceKey]] in the local receptionist.
|
||||
* Note that this does not guarantee that subscribers has yet seen that the service is deregistered.
|
||||
*
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
trait Deregistered {
|
||||
|
||||
def isForKey(key: ServiceKey[_]): Boolean
|
||||
|
||||
/** Scala API */
|
||||
def key: ServiceKey[_]
|
||||
|
||||
/** Java API */
|
||||
def getKey: ServiceKey[_] = key
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*
|
||||
* Also, see [[ServiceKey.Listing]] for more convenient pattern matching
|
||||
*/
|
||||
def serviceInstance[T](key: ServiceKey[T]): ActorRef[T]
|
||||
|
||||
/** Java API */
|
||||
def getServiceInstance[T](key: ServiceKey[T]): ActorRef[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Sent by the receptionist, available here for easier testing
|
||||
*/
|
||||
object Deregistered {
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*/
|
||||
def apply[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Deregistered =
|
||||
new ReceptionistMessages.Deregistered(key, serviceInstance)
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Sent by the receptionist, available here for easier testing
|
||||
*/
|
||||
def deregistered[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Deregistered =
|
||||
Deregistered(key, serviceInstance)
|
||||
|
||||
/**
|
||||
* `Subscribe` message. The given actor will subscribe to service updates when this command is sent to
|
||||
* the [[Receptionist.ref]]. When new instances are registered or unregistered to the given key
|
||||
* the given subscriber will be sent a [[Listing]] with the new set of instances for that service.
|
||||
* the [[Receptionist.ref]]. When the set of instances registered for the given key changes
|
||||
* the subscriber will be sent a [[Listing]] with the new set of instances for that service.
|
||||
*
|
||||
* The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends
|
||||
* with the termination of the subscriber.
|
||||
|
|
@ -220,8 +302,8 @@ object Receptionist extends ExtensionId[Receptionist] {
|
|||
|
||||
/**
|
||||
* Java API: `Subscribe` message. The given actor to service updates when this command is sent to
|
||||
* * the [[Receptionist.ref]]. When new instances are registered or unregistered to the given key
|
||||
* the given subscriber will be sent a [[Listing]] with the new set of instances for that service.
|
||||
* the [[Receptionist.ref]]. When the set of instances registered for the given key changes
|
||||
* the subscriber will be sent a [[Listing]] with the new set of instances for that service.
|
||||
*
|
||||
* The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends
|
||||
* with the termination of the subscriber.
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ object Routers {
|
|||
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
|
||||
*
|
||||
* Note that there is a delay between a routee stopping and this being detected by the receptionist and another
|
||||
* before the group detects this. Because of this it is best to unregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete to minimize the risk of lost messages.
|
||||
* before the group detects this. Because of this it is best to deregister routees from the receptionist and not stop
|
||||
* until the deregistration is complete if you want to minimize the risk of lost messages.
|
||||
*/
|
||||
def group[T](key: ServiceKey[T]): GroupRouter[T] =
|
||||
new GroupRouterBuilder[T](key)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util.ccompat
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.util
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.compat
|
||||
|
|
|
|||
|
|
@ -0,0 +1,24 @@
|
|||
# internal messages and apis changed
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist.behavior")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.key")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy$default$1")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy$default$2")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.this")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist$RegisteredActorTerminated")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.apply")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.tombstones")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.activeActorRefsFor")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.addTombstone")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.hasTombstone")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.pruneTombstones")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy$default$2")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy$default$4")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.this")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist$RegisteredActorTerminated$")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist.behavior")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.apply")
|
||||
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.unapply")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.apply")
|
||||
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.unapply")
|
||||
|
|
@ -9,7 +9,7 @@ import akka.actor.typed.receptionist.Receptionist.Command
|
|||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, LoggerOps }
|
||||
import akka.actor.typed.{ ActorRef, Behavior, Terminated }
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.ClusterEvent.MemberRemoved
|
||||
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator }
|
||||
|
|
@ -58,8 +58,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
}
|
||||
|
||||
private sealed trait InternalCommand extends Command
|
||||
private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
|
||||
private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]])
|
||||
private final case class LocalServiceActorTerminated[T](ref: ActorRef[T]) extends InternalCommand
|
||||
private final case class SubscriberTerminated[T](ref: ActorRef[ReceptionistMessages.Listing[T]])
|
||||
extends InternalCommand
|
||||
private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand
|
||||
private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
|
||||
|
|
@ -70,6 +70,108 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
private case object RemoveTick extends InternalCommand
|
||||
private case object PruneTombstonesTick extends InternalCommand
|
||||
|
||||
/**
|
||||
* @param registry The last seen state from the replicator - only updated when we get an update from th replicator
|
||||
* @param servicesPerActor needed since an actor can implement several services
|
||||
* @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
|
||||
* for a key.
|
||||
* @param subscriptions Locally subscriptions, not replicated
|
||||
*/
|
||||
final case class State(
|
||||
registry: ShardedServiceRegistry,
|
||||
servicesPerActor: Map[ActorRef[_], Set[AbstractServiceKey]],
|
||||
tombstones: Map[ActorRef[_], Set[(AbstractServiceKey, Deadline)]],
|
||||
subscriptions: SubscriptionRegistry) {
|
||||
|
||||
/** tombstone all services actor is registered for */
|
||||
def addTombstone(actor: ActorRef[_], deadline: Deadline): State = {
|
||||
servicesPerActor.getOrElse(actor, Set.empty).foldLeft(this) { (state, key) =>
|
||||
state.addTombstone(actor.asInstanceOf[ActorRef[key.Protocol]], key.asServiceKey, deadline)
|
||||
}
|
||||
}
|
||||
|
||||
/** tombstone specific service actor is registered for */
|
||||
def addTombstone[T](actor: ActorRef[T], serviceKey: ServiceKey[T], deadline: Deadline): State = {
|
||||
val newTombsonesForActor = tombstones.getOrElse(actor, Set.empty) + (serviceKey -> deadline)
|
||||
copy(tombstones = tombstones.updated(actor, newTombsonesForActor))
|
||||
}
|
||||
|
||||
def hasTombstone[T](serviceKey: ServiceKey[T])(actorRef: ActorRef[T]): Boolean =
|
||||
tombstones.nonEmpty && tombstones.getOrElse(actorRef, Set.empty).exists { case (key, _) => key == serviceKey }
|
||||
|
||||
def pruneTombstones(): State = {
|
||||
if (tombstones.isEmpty) this
|
||||
else {
|
||||
val newTombstones: Map[ActorRef[_], Set[(AbstractServiceKey, Deadline)]] =
|
||||
tombstones.foldLeft(tombstones) {
|
||||
case (acc, (actorRef, entries)) =>
|
||||
val entriesToKeep = entries.filter {
|
||||
case (_, deadline) => deadline.hasTimeLeft
|
||||
}
|
||||
if (entriesToKeep.size == entries.size) acc
|
||||
else if (entriesToKeep.isEmpty) acc - actorRef
|
||||
else acc.updated(actorRef, entriesToKeep)
|
||||
}
|
||||
if (newTombstones eq tombstones) this
|
||||
else copy(tombstones = newTombstones)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return (reachable-nodes, all)
|
||||
*/
|
||||
def activeActorRefsFor[T](
|
||||
key: ServiceKey[T],
|
||||
selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = {
|
||||
val ddataKey = registry.ddataKeyFor(key)
|
||||
val entries = registry.serviceRegistries(ddataKey).entriesFor(key)
|
||||
val selfAddress = selfUniqueAddress.address
|
||||
val reachable = Set.newBuilder[ActorRef[T]]
|
||||
val all = Set.newBuilder[ActorRef[T]]
|
||||
entries.foreach { entry =>
|
||||
val entryAddress = entry.uniqueAddress(selfAddress)
|
||||
val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]]
|
||||
if (registry.nodes.contains(entryAddress) && !hasTombstone(key)(ref)) {
|
||||
all += ref
|
||||
if (!registry.unreachable.contains(entryAddress)) {
|
||||
reachable += ref
|
||||
}
|
||||
}
|
||||
}
|
||||
(reachable.result(), all.result())
|
||||
}
|
||||
|
||||
def addLocalService[T](serviceInstance: ActorRef[T], key: ServiceKey[T]): State = {
|
||||
val newServicesPerActor =
|
||||
servicesPerActor.updated(serviceInstance, servicesPerActor.getOrElse(serviceInstance, Set.empty) + key)
|
||||
// if the service was previously registered and unregistered we need to remove it from the tombstones
|
||||
val tombstonesForActor = tombstones.getOrElse(serviceInstance, Set.empty)
|
||||
val newTombstones =
|
||||
if (tombstonesForActor.isEmpty) tombstones
|
||||
else tombstones.updated(serviceInstance, tombstonesForActor.filterNot(_._1 == key))
|
||||
copy(servicesPerActor = newServicesPerActor, tombstones = newTombstones)
|
||||
}
|
||||
|
||||
def removeLocalService[T](serviceInstance: ActorRef[T], key: ServiceKey[T], tombstoneDeadline: Deadline): State = {
|
||||
val newServicesForActor = servicesPerActor.get(serviceInstance) match {
|
||||
case Some(keys) =>
|
||||
val newKeys = keys - key
|
||||
if (newKeys.isEmpty)
|
||||
servicesPerActor - serviceInstance
|
||||
else
|
||||
servicesPerActor.updated(serviceInstance, newKeys)
|
||||
case None =>
|
||||
throw new IllegalArgumentException(
|
||||
s"Trying to remove $serviceInstance for $key but that has never been registered")
|
||||
}
|
||||
addTombstone(serviceInstance, key, tombstoneDeadline).copy(servicesPerActor = newServicesForActor)
|
||||
}
|
||||
|
||||
def removeSubscriber(subscriber: ActorRef[ReceptionistMessages.Listing[Any]]): ClusterReceptionist.State =
|
||||
copy(subscriptions = subscriptions.valueRemoved(subscriber))
|
||||
|
||||
}
|
||||
|
||||
// captures setup/dependencies so we can avoid doing it over and over again
|
||||
final class Setup(ctx: ActorContext[Command]) {
|
||||
val classicSystem = ctx.system.toClassic
|
||||
|
|
@ -96,7 +198,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
Behaviors.withTimers { timers =>
|
||||
val setup = new Setup(ctx)
|
||||
// include selfUniqueAddress so that it can be used locally before joining cluster
|
||||
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress)
|
||||
val initialRegistry =
|
||||
ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress)
|
||||
|
||||
// subscribe to changes from other nodes
|
||||
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
|
||||
|
|
@ -107,7 +210,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]])
|
||||
}
|
||||
|
||||
registry.allDdataKeys.foreach(key =>
|
||||
initialRegistry.allDdataKeys.foreach(key =>
|
||||
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toClassic))
|
||||
|
||||
// keep track of cluster members
|
||||
|
|
@ -142,38 +245,19 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
// around isn't very costly so don't prune often
|
||||
timers.startTimerWithFixedDelay(PruneTombstonesTick, setup.keepTombstonesFor / 24)
|
||||
|
||||
behavior(setup, registry, TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
|
||||
val initialState = State(
|
||||
registry = initialRegistry,
|
||||
servicesPerActor = Map.empty,
|
||||
tombstones = Map.empty,
|
||||
subscriptions = TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
|
||||
behavior(setup, initialState)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param registry The last seen state from the replicator - only updated when we get an update from th replicator
|
||||
* @param subscriptions Locally subscriptions, not replicated
|
||||
*/
|
||||
def behavior(setup: Setup, registry: ShardedServiceRegistry, subscriptions: SubscriptionRegistry): Behavior[Command] =
|
||||
def behavior(setup: Setup, state: State): Behavior[Command] =
|
||||
Behaviors.setup { ctx =>
|
||||
import setup._
|
||||
|
||||
// Helper to create new behavior
|
||||
def next(newState: ShardedServiceRegistry = registry, newSubscriptions: SubscriptionRegistry = subscriptions) =
|
||||
behavior(setup, newState, newSubscriptions)
|
||||
|
||||
/*
|
||||
* Hack to allow multiple termination notifications per target
|
||||
* FIXME #26505: replace by simple map in our state
|
||||
*/
|
||||
def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit = {
|
||||
ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx =>
|
||||
innerCtx.watch(target)
|
||||
Behaviors.receiveSignal[Nothing] {
|
||||
case (_, Terminated(`target`)) =>
|
||||
ctx.self ! msg
|
||||
Behaviors.stopped
|
||||
}
|
||||
})
|
||||
()
|
||||
}
|
||||
|
||||
def isLeader = {
|
||||
cluster.state.leader.contains(cluster.selfAddress)
|
||||
}
|
||||
|
|
@ -184,7 +268,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
|
||||
|
||||
val removals = {
|
||||
registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
|
||||
state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
|
||||
case (acc, (key, entries)) =>
|
||||
val removedEntries = entries.filter(isOnRemovedNode)
|
||||
if (removedEntries.isEmpty) acc // no change
|
||||
|
|
@ -205,7 +289,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
.mkString(","))
|
||||
|
||||
// shard changes over the ddata keys they belong to
|
||||
val removalsPerDdataKey = registry.entriesPerDdataKey(removals)
|
||||
val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
|
||||
|
||||
removalsPerDdataKey.foreach {
|
||||
case (ddataKey, removalForKey) =>
|
||||
|
|
@ -219,13 +303,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
}
|
||||
}
|
||||
|
||||
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newRegistry: ShardedServiceRegistry): Unit = {
|
||||
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
|
||||
keysForNode.foreach { changedKey =>
|
||||
val serviceKey = changedKey.asServiceKey
|
||||
|
||||
val subscribers = subscriptions.get(changedKey)
|
||||
val subscribers = state.subscriptions.get(changedKey)
|
||||
if (subscribers.nonEmpty) {
|
||||
val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
|
||||
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
|
||||
val listing =
|
||||
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false)
|
||||
subscribers.foreach(_ ! listing)
|
||||
|
|
@ -239,81 +323,128 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
val entry = Entry(serviceInstance, setup.selfSystemUid)
|
||||
ctx.log
|
||||
.debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
|
||||
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
|
||||
// actor already watched after one service key registration
|
||||
if (!state.servicesPerActor.contains(serviceInstance))
|
||||
ctx.watchWith(serviceInstance, LocalServiceActorTerminated(serviceInstance))
|
||||
|
||||
maybeReplyTo match {
|
||||
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
|
||||
case None =>
|
||||
}
|
||||
val ddataKey = registry.ddataKeyFor(key)
|
||||
val ddataKey = state.registry.ddataKeyFor(key)
|
||||
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
|
||||
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
|
||||
}
|
||||
behavior(setup, state.addLocalService(serviceInstance, key))
|
||||
} else {
|
||||
ctx.log.error("ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
|
||||
if (serviceInstance.path.address.hasLocalScope) {
|
||||
val entry = Entry(serviceInstance, setup.selfSystemUid)
|
||||
ctx.log.debugN(
|
||||
"ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
|
||||
cluster.selfAddress,
|
||||
key.asServiceKey.id,
|
||||
entry)
|
||||
val newState = state.removeLocalService(serviceInstance, key, setup.newTombstoneDeadline())
|
||||
if (!newState.servicesPerActor.contains(serviceInstance)) {
|
||||
// last service for actor unregistered, stop watching
|
||||
ctx.unwatch(serviceInstance)
|
||||
}
|
||||
maybeReplyTo match {
|
||||
case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
|
||||
case None =>
|
||||
}
|
||||
val ddataKey = state.registry.ddataKeyFor(key)
|
||||
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
|
||||
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
|
||||
}
|
||||
// tombstone removals so they are not re-added by merging with other concurrent
|
||||
// registrations for the same key
|
||||
behavior(setup, newState)
|
||||
} else {
|
||||
ctx.log.error("ClusterReceptionist [{}] - Unregistering non-local [{}] is not supported", serviceInstance)
|
||||
Behaviors.same
|
||||
}
|
||||
Behaviors.same
|
||||
|
||||
case ReceptionistMessages.Find(key, replyTo) =>
|
||||
val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
|
||||
val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
|
||||
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
|
||||
Behaviors.same
|
||||
|
||||
case ReceptionistMessages.Subscribe(key, subscriber) =>
|
||||
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
|
||||
if (subscriber.path.address.hasLocalScope) {
|
||||
ctx.watchWith(subscriber, SubscriberTerminated(subscriber))
|
||||
|
||||
// immediately reply with initial listings to the new subscriber
|
||||
val listing = {
|
||||
val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
|
||||
ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
|
||||
// immediately reply with initial listings to the new subscriber
|
||||
val listing = {
|
||||
val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
|
||||
ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
|
||||
}
|
||||
subscriber ! listing
|
||||
|
||||
behavior(setup, state.copy(subscriptions = state.subscriptions.inserted(key)(subscriber)))
|
||||
} else {
|
||||
ctx.log.error("ClusterReceptionist [{}] - Subscriptions from non-local [{}] is not supported", subscriber)
|
||||
Behaviors.same
|
||||
}
|
||||
subscriber ! listing
|
||||
|
||||
next(newSubscriptions = subscriptions.inserted(key)(subscriber))
|
||||
}
|
||||
|
||||
def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match {
|
||||
|
||||
case SubscriberTerminated(key, subscriber) =>
|
||||
next(newSubscriptions = subscriptions.removed(key)(subscriber))
|
||||
case SubscriberTerminated(subscriber) =>
|
||||
behavior(setup, state.removeSubscriber(subscriber))
|
||||
|
||||
case RegisteredActorTerminated(key, serviceInstance) =>
|
||||
case LocalServiceActorTerminated(serviceInstance) =>
|
||||
val entry = Entry(serviceInstance, setup.selfSystemUid)
|
||||
|
||||
// could be empty if there was a race between termination and unregistration
|
||||
val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
|
||||
|
||||
ctx.log.debugN(
|
||||
"ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]",
|
||||
cluster.selfAddress,
|
||||
key.asServiceKey.id,
|
||||
keys.map(_.asServiceKey.id).mkString(", "),
|
||||
entry)
|
||||
val ddataKey = registry.ddataKeyFor(key)
|
||||
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
|
||||
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
|
||||
|
||||
keys.foreach { key =>
|
||||
val ddataKey = state.registry.ddataKeyFor(key.asServiceKey)
|
||||
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
|
||||
ServiceRegistry(registry).removeBinding(key.asServiceKey, entry).toORMultiMap
|
||||
}
|
||||
}
|
||||
// tombstone removals so they are not re-added by merging with other concurrent
|
||||
// registrations for the same key
|
||||
next(newState = registry.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
|
||||
behavior(setup, state.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
|
||||
|
||||
case ChangeFromReplicator(ddataKey, value) =>
|
||||
// every change will come back this way - this is where the local notifications happens
|
||||
val newState = ServiceRegistry(value)
|
||||
val changedKeys = registry.collectChangedKeys(ddataKey, newState)
|
||||
val newRegistry = registry.withServiceRegistry(ddataKey, newState)
|
||||
val newRegistry = ServiceRegistry(value)
|
||||
val changedKeys = state.registry.collectChangedKeys(ddataKey, newRegistry)
|
||||
val newState = state.copy(registry = state.registry.withServiceRegistry(ddataKey, newRegistry))
|
||||
|
||||
if (changedKeys.nonEmpty) {
|
||||
if (ctx.log.isDebugEnabled) {
|
||||
ctx.log.debugN(
|
||||
"ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]",
|
||||
cluster.selfAddress,
|
||||
newState.entries.entries,
|
||||
newRegistry.entries.entries,
|
||||
changedKeys
|
||||
.map(key => key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]"))
|
||||
.map(key => key.asServiceKey.id -> newRegistry.entriesFor(key).mkString("[", ", ", "]"))
|
||||
.mkString(", "),
|
||||
newRegistry.tombstones.mkString(", "))
|
||||
state.tombstones.mkString(", "))
|
||||
}
|
||||
|
||||
changedKeys.foreach { changedKey =>
|
||||
val serviceKey = changedKey.asServiceKey
|
||||
|
||||
val subscribers = subscriptions.get(changedKey)
|
||||
val subscribers = state.subscriptions.get(changedKey)
|
||||
if (subscribers.nonEmpty) {
|
||||
val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
|
||||
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
|
||||
val listing =
|
||||
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true)
|
||||
subscribers.foreach(_ ! listing)
|
||||
|
|
@ -321,7 +452,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
|
||||
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
|
||||
// is re-introduced because of a concurrent update, in that case we need to re-remove it
|
||||
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone)
|
||||
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
|
||||
if (tombstonedButReAdded.nonEmpty) {
|
||||
if (ctx.log.isDebugEnabled)
|
||||
ctx.log.debug2(
|
||||
|
|
@ -339,13 +470,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
}
|
||||
}
|
||||
|
||||
next(newRegistry)
|
||||
behavior(setup, newState)
|
||||
} else {
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
case NodeAdded(uniqueAddress) =>
|
||||
next(registry.addNode(uniqueAddress))
|
||||
behavior(setup, state.copy(registry = state.registry.addNode(uniqueAddress)))
|
||||
|
||||
case NodeRemoved(uniqueAddress) =>
|
||||
if (uniqueAddress == selfUniqueAddress) {
|
||||
|
|
@ -363,38 +494,39 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
nodesRemoved(Set(uniqueAddress))
|
||||
}
|
||||
|
||||
next(registry.removeNode(uniqueAddress))
|
||||
behavior(setup, state.copy(registry = state.registry.removeNode(uniqueAddress)))
|
||||
}
|
||||
|
||||
case NodeUnreachable(uniqueAddress) =>
|
||||
val keysForNode = registry.keysFor(uniqueAddress)
|
||||
val newRegistry = registry.addUnreachable(uniqueAddress)
|
||||
val keysForNode = state.registry.keysFor(uniqueAddress)
|
||||
val newState = state.copy(registry = state.registry.addUnreachable(uniqueAddress))
|
||||
if (keysForNode.nonEmpty) {
|
||||
ctx.log.debug2(
|
||||
"ClusterReceptionist [{}] - Node with registered services unreachable [{}]",
|
||||
cluster.selfAddress,
|
||||
uniqueAddress)
|
||||
reachabilityChanged(keysForNode, newRegistry)
|
||||
reachabilityChanged(keysForNode, newState)
|
||||
}
|
||||
next(newRegistry)
|
||||
behavior(setup, newState)
|
||||
|
||||
case NodeReachable(uniqueAddress) =>
|
||||
val keysForNode = registry.keysFor(uniqueAddress)
|
||||
val newRegistry = registry.removeUnreachable(uniqueAddress)
|
||||
val keysForNode = state.registry.keysFor(uniqueAddress)
|
||||
val newState = state.copy(registry = state.registry.removeUnreachable(uniqueAddress))
|
||||
if (keysForNode.nonEmpty) {
|
||||
ctx.log.debug2(
|
||||
"ClusterReceptionist [{}] - Node with registered services reachable again [{}]",
|
||||
cluster.selfAddress,
|
||||
uniqueAddress)
|
||||
reachabilityChanged(keysForNode, newRegistry)
|
||||
reachabilityChanged(keysForNode, newState)
|
||||
}
|
||||
next(newRegistry)
|
||||
behavior(setup, newState)
|
||||
|
||||
case RemoveTick =>
|
||||
// ok to update from several nodes but more efficient to try to do it from one node
|
||||
if (isLeader) {
|
||||
val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress)
|
||||
val notInCluster = allAddressesInState.diff(registry.nodes)
|
||||
val allAddressesInState: Set[UniqueAddress] =
|
||||
state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
|
||||
val notInCluster = allAddressesInState.diff(state.registry.nodes)
|
||||
|
||||
if (notInCluster.isEmpty) Behaviors.same
|
||||
else {
|
||||
|
|
@ -409,11 +541,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
|
|||
Behaviors.same
|
||||
|
||||
case PruneTombstonesTick =>
|
||||
val prunedRegistry = registry.pruneTombstones()
|
||||
if (prunedRegistry eq registry) Behaviors.same
|
||||
val prunedState = state.pruneTombstones()
|
||||
if (prunedState eq state) Behaviors.same
|
||||
else {
|
||||
ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress)
|
||||
next(prunedRegistry)
|
||||
behavior(setup, prunedState)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,8 +12,6 @@ import akka.cluster.UniqueAddress
|
|||
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, SelfUniqueAddress }
|
||||
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry }
|
||||
|
||||
import scala.concurrent.duration.Deadline
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -23,7 +21,7 @@ import scala.concurrent.duration.Deadline
|
|||
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
|
||||
key -> new ServiceRegistry(EmptyORMultiMap)
|
||||
}.toMap
|
||||
new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty, Set.empty)
|
||||
new ShardedServiceRegistry(emptyRegistries, Set.empty, Set.empty)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -34,14 +32,11 @@ import scala.concurrent.duration.Deadline
|
|||
* Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not
|
||||
* get too large ddata messages)
|
||||
*
|
||||
* @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
|
||||
* for a key. Since the only way to unregister is to stop, we don't need to keep track of
|
||||
* the service key
|
||||
|
||||
*
|
||||
*/
|
||||
@InternalApi private[akka] final case class ShardedServiceRegistry(
|
||||
serviceRegistries: Map[DDataKey, ServiceRegistry],
|
||||
tombstones: Map[ActorRef[_], Deadline],
|
||||
nodes: Set[UniqueAddress],
|
||||
unreachable: Set[UniqueAddress]) {
|
||||
|
||||
|
|
@ -70,32 +65,8 @@ import scala.concurrent.duration.Deadline
|
|||
def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[AbstractServiceKey] =
|
||||
serviceRegistries.valuesIterator.flatMap(_.keysFor(address)).toSet
|
||||
|
||||
/**
|
||||
* @return (reachable-nodes, all)
|
||||
*/
|
||||
def activeActorRefsFor[T](
|
||||
key: ServiceKey[T],
|
||||
selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = {
|
||||
val ddataKey = ddataKeyFor(key)
|
||||
val entries = serviceRegistries(ddataKey).entriesFor(key)
|
||||
val selfAddress = selfUniqueAddress.address
|
||||
val reachable = Set.newBuilder[ActorRef[T]]
|
||||
val all = Set.newBuilder[ActorRef[T]]
|
||||
entries.foreach { entry =>
|
||||
val entryAddress = entry.uniqueAddress(selfAddress)
|
||||
if (nodes.contains(entryAddress) && !hasTombstone(entry.ref)) {
|
||||
val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]]
|
||||
all += ref
|
||||
if (!unreachable.contains(entryAddress)) {
|
||||
reachable += ref
|
||||
}
|
||||
}
|
||||
}
|
||||
(reachable.result(), all.result())
|
||||
}
|
||||
|
||||
def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
|
||||
copy(serviceRegistries + (ddataKey -> registry), tombstones)
|
||||
copy(serviceRegistries + (ddataKey -> registry))
|
||||
|
||||
def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] =
|
||||
allEntries.collect {
|
||||
|
|
@ -118,18 +89,6 @@ import scala.concurrent.duration.Deadline
|
|||
acc + (ddataKey -> updated)
|
||||
}
|
||||
|
||||
def addTombstone(actorRef: ActorRef[_], deadline: Deadline): ShardedServiceRegistry =
|
||||
copy(tombstones = tombstones + (actorRef -> deadline))
|
||||
|
||||
def hasTombstone(actorRef: ActorRef[_]): Boolean =
|
||||
tombstones.nonEmpty && tombstones.contains(actorRef)
|
||||
|
||||
def pruneTombstones(): ShardedServiceRegistry = {
|
||||
copy(tombstones = tombstones.filter {
|
||||
case (_, deadline) => deadline.hasTimeLeft
|
||||
})
|
||||
}
|
||||
|
||||
def addNode(node: UniqueAddress): ShardedServiceRegistry =
|
||||
copy(nodes = nodes + node)
|
||||
|
||||
|
|
|
|||
|
|
@ -187,6 +187,19 @@ public interface ReceptionistExample {
|
|||
}
|
||||
// #find
|
||||
|
||||
default void deregisterSample() {
|
||||
Behaviors.<PingService.Ping>setup(
|
||||
context -> {
|
||||
// #deregister
|
||||
context
|
||||
.getSystem()
|
||||
.receptionist()
|
||||
.tell(Receptionist.deregister(PingService.pingServiceKey, context.getSelf()));
|
||||
// #deregister
|
||||
return Behaviors.empty();
|
||||
});
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ActorSystem<Void> system = ActorSystem.create(Guardian.create(), "ReceptionistExample");
|
||||
Thread.sleep(10000);
|
||||
|
|
|
|||
|
|
@ -4,9 +4,6 @@
|
|||
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
|
||||
|
||||
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
|
||||
<level>INFO</level>
|
||||
</filter>
|
||||
<encoder>
|
||||
<pattern>%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n</pattern>
|
||||
</encoder>
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ object ClusterReceptionistSpec {
|
|||
}
|
||||
|
||||
val PingKey = ServiceKey[PingProtocol]("pingy")
|
||||
val AnotherKey = ServiceKey[PingProtocol]("pingy-2")
|
||||
}
|
||||
|
||||
class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
|
||||
|
|
@ -525,7 +526,6 @@ class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
|
|||
system2.receptionist ! Subscribe(TheKey, regProbe2.ref)
|
||||
regProbe2.fishForMessage(10.seconds) {
|
||||
case TheKey.Listing(actors) if actors.nonEmpty =>
|
||||
println(actors)
|
||||
FishingOutcomes.complete
|
||||
case _ => FishingOutcomes.continue
|
||||
}
|
||||
|
|
@ -567,5 +567,173 @@ class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
|
|||
}
|
||||
}
|
||||
|
||||
"handle unregistration and re-registration of services" in {
|
||||
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config)
|
||||
val system1 = testKit1.system
|
||||
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
|
||||
val system2 = testKit2.system
|
||||
try {
|
||||
|
||||
val clusterNode1 = Cluster(system1)
|
||||
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
|
||||
val clusterNode2 = Cluster(system2)
|
||||
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
|
||||
|
||||
val regProbe1 = TestProbe[Any]()(system1)
|
||||
val regProbe2 = TestProbe[Any]()(system2)
|
||||
|
||||
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
|
||||
|
||||
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
|
||||
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
|
||||
|
||||
// register and verify seen on remote side
|
||||
val service1 = testKit1.spawn(pingPongBehavior)
|
||||
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(PingKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
|
||||
|
||||
// another service for the same key
|
||||
val service2 = testKit1.spawn(pingPongBehavior)
|
||||
system1.receptionist ! Register(PingKey, service2, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(PingKey, service2))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
|
||||
|
||||
// unregister service1 and verify
|
||||
system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Deregistered(PingKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
|
||||
|
||||
// re-register and verify
|
||||
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(PingKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
|
||||
|
||||
// cover a race between termination and unregistration as well (should lead to only one update)
|
||||
system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
|
||||
service1 ! Perish
|
||||
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
|
||||
regProbe2.expectNoMessage(1.second)
|
||||
|
||||
akka.cluster.Cluster(system1).shutdown()
|
||||
clusterNode2.manager ! Down(clusterNode1.selfMember.address)
|
||||
} finally {
|
||||
testKit1.shutdownTestKit()
|
||||
testKit2.shutdownTestKit()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"handle unregistration per key not per actor" in {
|
||||
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-11", ClusterReceptionistSpec.config)
|
||||
val system1 = testKit1.system
|
||||
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
|
||||
val system2 = testKit2.system
|
||||
try {
|
||||
|
||||
val clusterNode1 = Cluster(system1)
|
||||
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
|
||||
val clusterNode2 = Cluster(system2)
|
||||
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
|
||||
|
||||
val regProbe1 = TestProbe[Any]()(system1)
|
||||
val regProbe2 = TestProbe[Any]()(system2)
|
||||
|
||||
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
|
||||
|
||||
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
|
||||
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
|
||||
system2.receptionist ! Subscribe(AnotherKey, regProbe2.ref)
|
||||
regProbe2.expectMessage(Listing(AnotherKey, Set.empty[ActorRef[PingProtocol]]))
|
||||
|
||||
// register same actor for two service keys and verify seen on remote side
|
||||
val service1 = testKit1.spawn(pingPongBehavior)
|
||||
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(PingKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
|
||||
system1.receptionist ! Register(AnotherKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(AnotherKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
|
||||
|
||||
// unregister service1 for one of the keys and verify
|
||||
system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Deregistered(PingKey, service1))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(0)
|
||||
system2.receptionist ! Find(AnotherKey, regProbe2.ref)
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
|
||||
|
||||
system1.receptionist ! Find(PingKey, regProbe1.ref)
|
||||
regProbe1.expectMessageType[Listing].serviceInstances(PingKey).size should ===(0)
|
||||
system1.receptionist ! Find(AnotherKey, regProbe1.ref)
|
||||
regProbe1.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
|
||||
|
||||
akka.cluster.Cluster(system1).shutdown()
|
||||
clusterNode2.manager ! Down(clusterNode1.selfMember.address)
|
||||
} finally {
|
||||
testKit1.shutdownTestKit()
|
||||
testKit2.shutdownTestKit()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"handle concurrent unregistration and registration on different nodes" in {
|
||||
// this covers the fact that with ddata a removal can be lost
|
||||
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-12", ClusterReceptionistSpec.config)
|
||||
val system1 = testKit1.system
|
||||
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
|
||||
val system2 = testKit2.system
|
||||
try {
|
||||
|
||||
val clusterNode1 = Cluster(system1)
|
||||
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
|
||||
val clusterNode2 = Cluster(system2)
|
||||
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
|
||||
|
||||
val regProbe1 = TestProbe[Any]()(system1)
|
||||
val regProbe2 = TestProbe[Any]()(system2)
|
||||
|
||||
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
|
||||
|
||||
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
|
||||
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
|
||||
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
|
||||
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
|
||||
|
||||
// register an actor on one side and verify seen on both
|
||||
val service1 = testKit1.spawn(pingPongBehavior)
|
||||
val service2 = testKit2.spawn(pingPongBehavior)
|
||||
system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
|
||||
regProbe1.expectMessage(Registered(PingKey, service1))
|
||||
regProbe1.expectMessage(Listing(PingKey, Set(service1)))
|
||||
regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
|
||||
|
||||
// then concurrently register on one node and unregister on the other node for the same key (ofc racy)
|
||||
system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
|
||||
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
|
||||
regProbe1.expectMessage(Deregistered(PingKey, service1))
|
||||
regProbe2.expectMessage(Registered(PingKey, service2))
|
||||
|
||||
regProbe2.fishForMessage(3.seconds) {
|
||||
case PingKey.Listing(actors) if actors == Set(service2) => FishingOutcomes.complete
|
||||
case PingKey.Listing(actors) if actors.size == 2 =>
|
||||
// we may see both actors before we see the removal
|
||||
FishingOutcomes.continueAndIgnore
|
||||
}
|
||||
|
||||
regProbe1.fishForMessage(3.seconds) {
|
||||
case PingKey.Listing(actors) if actors.size == 1 => FishingOutcomes.complete
|
||||
case PingKey.Listing(actors) if actors.isEmpty => FishingOutcomes.continueAndIgnore
|
||||
}
|
||||
|
||||
akka.cluster.Cluster(system1).shutdown()
|
||||
clusterNode2.manager ! Down(clusterNode1.selfMember.address)
|
||||
} finally {
|
||||
testKit1.shutdownTestKit()
|
||||
testKit2.shutdownTestKit()
|
||||
}
|
||||
|
||||
}
|
||||
// Fixme concurrent registration and unregistration
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.typed.internal.receptionist
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.internal.receptionist.AbstractServiceKey
|
||||
import akka.actor.typed.receptionist.ServiceKey
|
||||
import akka.cluster.UniqueAddress
|
||||
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.SubscriptionsKV
|
||||
import akka.util.TypedMultiMap
|
||||
import org.scalatest.Matchers
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
import scala.concurrent.duration.Deadline
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class ClusterReceptionistStateSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers {
|
||||
|
||||
val SomeService = ServiceKey[Int]("boy-oh-boy!")
|
||||
val SomeOtherService = ServiceKey[Int]("disappointing!")
|
||||
|
||||
private def emptyState(
|
||||
distributedKeyCount: Int = 1,
|
||||
self: UniqueAddress = UniqueAddress(Address("127.0.0.1", "MySystem"), 555L)) =
|
||||
ClusterReceptionist.State(
|
||||
registry = ShardedServiceRegistry(distributedKeyCount).addNode(self),
|
||||
servicesPerActor = Map.empty,
|
||||
tombstones = Map.empty,
|
||||
subscriptions = TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
|
||||
|
||||
"The internal ClusterReceptionist State" must {
|
||||
|
||||
"keep track of local keys per service" in {
|
||||
val someRef = createTestProbe[Int]().ref
|
||||
var state = emptyState()
|
||||
state = state.addLocalService(someRef, SomeService)
|
||||
state = state.addLocalService(someRef, SomeOtherService)
|
||||
state.servicesPerActor(someRef) should ===(Set(SomeService, SomeOtherService))
|
||||
state = state.removeLocalService(someRef, SomeService, Deadline.now)
|
||||
state = state.removeLocalService(someRef, SomeOtherService, Deadline.now)
|
||||
state.servicesPerActor.get(someRef) should ===(None)
|
||||
}
|
||||
|
||||
"keep a tombstone for removed services" in {
|
||||
val someRef = createTestProbe[Int]().ref
|
||||
var state = emptyState()
|
||||
state = state.addLocalService(someRef, SomeService)
|
||||
state = state.removeLocalService(someRef, SomeService, Deadline.now)
|
||||
state.hasTombstone(SomeService)(someRef) should ===(true)
|
||||
}
|
||||
|
||||
"prune tombstones" in {
|
||||
val someRef = createTestProbe[Int]().ref
|
||||
var state = emptyState()
|
||||
state = state.addLocalService(someRef, SomeService)
|
||||
state = state.removeLocalService(someRef, SomeService, Deadline.now - (10.seconds))
|
||||
state = state.pruneTombstones()
|
||||
state.tombstones shouldBe empty
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -94,6 +94,12 @@ object PingPongExample {
|
|||
}
|
||||
//#find
|
||||
|
||||
Behaviors.setup[PingService.Ping] { context =>
|
||||
//#deregister
|
||||
context.system.receptionist ! Receptionist.Deregister(PingService.PingServiceKey, context.self)
|
||||
//#deregister
|
||||
Behaviors.empty
|
||||
}
|
||||
}
|
||||
|
||||
object ReceptionistExample {
|
||||
|
|
|
|||
|
|
@ -34,8 +34,9 @@ a `Listing`, which contains a `Set` of actor references that are registered for
|
|||
registered to the same key.
|
||||
|
||||
The registry is dynamic. New actors can be registered during the lifecycle of the system. Entries are removed when
|
||||
registered actors are stopped or a node is removed from the @ref:[Cluster](cluster.md). To facilitate this dynamic aspect you can also subscribe
|
||||
to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed.
|
||||
registered actors are stopped, manually deregistered or the node they live on is removed from the @ref:[Cluster](cluster.md).
|
||||
To facilitate this dynamic aspect you can also subscribe to changes with the `Receptionist.Subscribe` message. It will send
|
||||
`Listing` messages to the subscriber when entries for a key are changed.
|
||||
|
||||
These imports are used in the following example:
|
||||
|
||||
|
|
@ -89,6 +90,18 @@ Java
|
|||
Also note how a `messageAdapter` is used to convert the `Receptionist.Listing` to a message type that
|
||||
the `PingManager` understands.
|
||||
|
||||
If a server no longer wish to be associated with a service key it can deregister using the command `Receptionist.Deregister`
|
||||
which will remove the association and inform all subscribers.
|
||||
|
||||
The command can optionally send an acknowledgement once the local receptionist has removed the registration. The acknowledgement does not guarantee
|
||||
that all subscribers has seen that the instance has been removed, it may still receive messages from subscribers for some time after this.
|
||||
|
||||
Scala
|
||||
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #deregister }
|
||||
|
||||
Java
|
||||
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #deregister }
|
||||
|
||||
## Cluster Receptionist
|
||||
|
||||
The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist
|
||||
|
|
@ -103,3 +116,9 @@ registered actors that are reachable. The full set of actors, including unreacha
|
|||
|
||||
One important difference from local only receptions are the serialization concerns, all messages sent to and back from
|
||||
an actor on another node must be serializable, see @ref:[serialization](../serialization.md).
|
||||
|
||||
## Receptionist Scalability
|
||||
|
||||
The receptionist does not scale up to any number of services or very high turnaround of services.
|
||||
It will likely handle up to thousands or tens of thousands of services. Use cases with higher
|
||||
demands the receptionist for initial contact between actors on the nodes while the actual logic of those is up to the applications own actors.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue