diff --git a/akka-actor-typed-tests/src/test/resources/logback-test.xml b/akka-actor-typed-tests/src/test/resources/logback-test.xml
index 068d2bc81c..6f34278437 100644
--- a/akka-actor-typed-tests/src/test/resources/logback-test.xml
+++ b/akka-actor-typed-tests/src/test/resources/logback-test.xml
@@ -4,9 +4,6 @@
-
- INFO
-
%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n
diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala
index a2d1a9865d..697d74131f 100644
--- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala
+++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala
@@ -41,72 +41,135 @@ object LocalReceptionistSpec {
class LocalReceptionistSpec extends ScalaTestWithActorTestKit with WordSpecLike with LogCapturing {
import LocalReceptionistSpec._
- abstract class TestSetup {
- val receptionist = spawn(LocalReceptionist.behavior)
- }
-
"A local receptionist" must {
"unregister services when they terminate" in {
- new TestSetup {
- val regProbe = TestProbe[Any]("regProbe")
+ val receptionist = spawn(LocalReceptionist.behavior)
+ val regProbe = TestProbe[Any]("regProbe")
- val serviceA = spawn(stoppableBehavior.narrow[ServiceA])
- receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
- regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
+ val serviceA = spawn(stoppableBehavior.narrow[ServiceA])
+ receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
+ regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
- val serviceB = spawn(stoppableBehavior.narrow[ServiceB])
- receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
- regProbe.expectMessage(Registered(ServiceKeyB, serviceB))
+ val serviceB = spawn(stoppableBehavior.narrow[ServiceB])
+ receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
+ regProbe.expectMessage(Registered(ServiceKeyB, serviceB))
- val serviceC = spawn(stoppableBehavior)
- receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
- receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
- regProbe.expectMessage(Registered(ServiceKeyA, serviceC))
- regProbe.expectMessage(Registered(ServiceKeyB, serviceC))
+ val serviceC = spawn(stoppableBehavior)
+ receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
+ receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
+ regProbe.expectMessage(Registered(ServiceKeyA, serviceC))
+ regProbe.expectMessage(Registered(ServiceKeyB, serviceC))
+ receptionist ! Find(ServiceKeyA, regProbe.ref)
+ regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceC)))
+ receptionist ! Find(ServiceKeyB, regProbe.ref)
+ regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB, serviceC)))
+
+ serviceC ! Stop
+
+ eventually {
receptionist ! Find(ServiceKeyA, regProbe.ref)
- regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceC)))
+ regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
receptionist ! Find(ServiceKeyB, regProbe.ref)
- regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB, serviceC)))
-
- serviceC ! Stop
-
- eventually {
- receptionist ! Find(ServiceKeyA, regProbe.ref)
- regProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
- receptionist ! Find(ServiceKeyB, regProbe.ref)
- regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB)))
- }
+ regProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB)))
}
}
+ "unregister programatically" in {
+ val subProbe = TestProbe[Any]()
+ val receptionist = spawn(LocalReceptionist.behavior)
+ receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ val serviceA = TestProbe[ServiceA]()
+ receptionist ! Register(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
+ receptionist ! Deregister(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ }
+
+ "unregister per service key, not service actor" in {
+ val subProbe = TestProbe[Any]()
+ val receptionist = spawn(LocalReceptionist.behavior)
+
+ // subscribe to 2 keys
+ receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ receptionist ! Subscribe(ServiceKeyB, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyB, Set.empty[ActorRef[ServiceB]]))
+
+ // register same service for both 2 keys
+ val service = TestProbe[AnyRef]()
+ receptionist ! Register(ServiceKeyA, service.ref)
+ receptionist ! Register(ServiceKeyB, service.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set(service.ref.narrow[ServiceKeyA.Protocol])))
+ subProbe.expectMessage(Listing(ServiceKeyB, Set(service.ref.narrow[ServiceKeyB.Protocol])))
+
+ // unregister one of the service keys for the service
+ receptionist ! Deregister(ServiceKeyA, service.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ receptionist ! Find(ServiceKeyB, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyB, Set(service.ref.narrow[ServiceKeyB.Protocol])))
+ }
+
+ "unregister and re-register same service actor" in {
+ val subProbe = TestProbe[Any]()
+ val receptionist = spawn(LocalReceptionist.behavior)
+
+ receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+
+ val serviceA = TestProbe[ServiceA]()
+ receptionist ! Register(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
+
+ receptionist ! Deregister(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+
+ receptionist ! Register(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
+ }
+
"support subscribing to service changes" in {
- new TestSetup {
- val regProbe = TestProbe[Registered]("regProbe")
+ val receptionist = spawn(LocalReceptionist.behavior)
+ val regProbe = TestProbe[Registered]("regProbe")
- val aSubscriber = TestProbe[Listing]("aUser")
- receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
+ val aSubscriber = TestProbe[Listing]("aUser")
+ receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
- aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
- val serviceA: ActorRef[ServiceA] = spawn(stoppableBehavior)
- receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
- regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
+ val serviceA: ActorRef[ServiceA] = spawn(stoppableBehavior)
+ receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
+ regProbe.expectMessage(Registered(ServiceKeyA, serviceA))
- aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
+ aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA)))
- val serviceA2: ActorRef[ServiceA] = spawn(stoppableBehavior)
- receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
- regProbe.expectMessage(Registered(ServiceKeyA, serviceA2))
+ val serviceA2: ActorRef[ServiceA] = spawn(stoppableBehavior)
+ receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
+ regProbe.expectMessage(Registered(ServiceKeyA, serviceA2))
- aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
+ aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
- serviceA ! Stop
- aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA2)))
- serviceA2 ! Stop
- aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
- }
+ serviceA ! Stop
+ aSubscriber.expectMessage(Listing(ServiceKeyA, Set(serviceA2)))
+ serviceA2 ! Stop
+ aSubscriber.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ }
+
+ "support subscribing to different services with the same subscriber" in {
+ val subProbe = TestProbe[Any]()
+ val receptionist = spawn(LocalReceptionist.behavior)
+ receptionist ! Subscribe(ServiceKeyA, subProbe.ref)
+ receptionist ! Subscribe(ServiceKeyB, subProbe.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
+ subProbe.expectMessage(Listing(ServiceKeyB, Set.empty[ActorRef[ServiceB]]))
+ val serviceA = TestProbe[ServiceA]()
+ receptionist ! Register(ServiceKeyA, serviceA.ref)
+ subProbe.expectMessage(Listing(ServiceKeyA, Set(serviceA.ref)))
+ val serviceB = TestProbe[ServiceB]()
+ receptionist ! Register(ServiceKeyB, serviceB.ref)
+ subProbe.expectMessage(Listing(ServiceKeyB, Set(serviceB.ref)))
}
"work with ask" in {
diff --git a/akka-actor-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes b/akka-actor-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes
new file mode 100644
index 0000000000..fd95e15587
--- /dev/null
+++ b/akka-actor-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes
@@ -0,0 +1,15 @@
+# internal API and messages changed
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.key")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy$default$1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.this")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#RegisteredActorTerminated.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.key")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy$default$1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#SubscriberTerminated.this")
\ No newline at end of file
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala
index c9049cb646..e006691a05 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala
@@ -6,7 +6,6 @@ package akka.actor.typed.internal.receptionist
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
-import akka.actor.typed.Terminated
import akka.actor.typed.receptionist.Receptionist._
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.ActorContext
@@ -37,98 +36,206 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
override val name = "localReceptionist"
- type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol]
- type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV]
- type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
- type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
+ private type Service[K <: AbstractServiceKey] = ActorRef[K#Protocol]
+ private type Subscriber[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
- sealed trait InternalCommand
- final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
- final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]])
+ private sealed trait InternalCommand
+ private final case class RegisteredActorTerminated[T](ref: ActorRef[T]) extends InternalCommand
+ private final case class SubscriberTerminated[T](ref: ActorRef[ReceptionistMessages.Listing[T]])
extends InternalCommand
+ private object State {
+ def empty =
+ State(
+ TypedMultiMap.empty[AbstractServiceKey, Service],
+ Map.empty,
+ TypedMultiMap.empty[AbstractServiceKey, Subscriber],
+ Map.empty)
+ }
+
+ /**
+ * @param services current registered services per key
+ * @param servicesPerActor current registered service keys per actor (needed for unregistration since an actor can implement several services)
+ * @param subscriptions current subscriptions per service key
+ * @param subscriptionsPerActor current subscriptions per subscriber (needed since a subscriber can subscribe to several keys) FIXME is it really needed?
+ */
+ private final case class State(
+ services: TypedMultiMap[AbstractServiceKey, Service],
+ servicesPerActor: Map[ActorRef[_], Set[AbstractServiceKey]],
+ subscriptions: TypedMultiMap[AbstractServiceKey, Subscriber],
+ subscriptionsPerActor: Map[ActorRef[_], Set[AbstractServiceKey]]) {
+
+ def serviceInstanceAdded[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
+ val newServices = services.inserted(key)(serviceInstance)
+ val newServicePerActor =
+ servicesPerActor.updated(
+ serviceInstance,
+ servicesPerActor.getOrElse(serviceInstance, Set.empty) + key.asServiceKey)
+ copy(services = newServices, servicesPerActor = newServicePerActor)
+ }
+
+ def serviceInstanceRemoved[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
+ val newServices = services.removed(key)(serviceInstance)
+ val newServicePerActor =
+ servicesPerActor.get(serviceInstance) match {
+ case Some(keys) =>
+ val newKeys = keys - key.asServiceKey
+ // only/last service this actor was registered for
+ if (newKeys.isEmpty) {
+ servicesPerActor - serviceInstance
+ } else servicesPerActor.updated(serviceInstance, newKeys)
+ case None =>
+ // no services actually registered for actor
+ servicesPerActor
+ }
+ copy(services = newServices, servicesPerActor = newServicePerActor)
+ }
+
+ def serviceInstanceRemoved(serviceInstance: ActorRef[_]): State = {
+ val keys = servicesPerActor.getOrElse(serviceInstance, Set.empty)
+ val newServices =
+ if (keys.isEmpty) services
+ else
+ keys.foldLeft(services)((acc, key) =>
+ acc.removed(key.asServiceKey)(serviceInstance.asInstanceOf[Service[AbstractServiceKey]]))
+ val newServicesPerActor = servicesPerActor - serviceInstance
+ copy(services = newServices, servicesPerActor = newServicesPerActor)
+ }
+
+ def subscriberAdded[Key <: AbstractServiceKey](key: Key)(subscriber: Subscriber[key.type]): State = {
+ val newSubscriptions = subscriptions.inserted(key)(subscriber)
+ val newSubscriptionsPerActor =
+ subscriptionsPerActor.updated(
+ subscriber,
+ subscriptionsPerActor.getOrElse(subscriber, Set.empty) + key.asServiceKey)
+
+ copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
+ }
+
+ def subscriptionRemoved[Key <: AbstractServiceKey](key: Key)(subscriber: Subscriber[key.type]): State = {
+ val newSubscriptions = subscriptions.removed(key)(subscriber)
+ val newSubscriptionsPerActor =
+ subscriptionsPerActor.get(subscriber) match {
+ case Some(keys) =>
+ val newKeys = keys - key.asServiceKey
+ if (newKeys.isEmpty) {
+ subscriptionsPerActor - subscriber
+ } else {
+ subscriptionsPerActor.updated(subscriber, newKeys)
+ }
+ case None =>
+ // no subscriptions actually exist for actor
+ subscriptionsPerActor
+ }
+ copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
+ }
+
+ def subscriberRemoved(subscriber: ActorRef[_]): State = {
+ val keys = subscriptionsPerActor.getOrElse(subscriber, Set.empty)
+ if (keys.isEmpty) this
+ else {
+ val newSubscriptions = keys.foldLeft(subscriptions) { (subscriptions, key) =>
+ val serviceKey = key.asServiceKey
+ subscriptions.removed(serviceKey)(subscriber.asInstanceOf[Subscriber[serviceKey.type]])
+ }
+ val newSubscriptionsPerActor = subscriptionsPerActor - subscriber
+ copy(subscriptions = newSubscriptions, subscriptionsPerActor = newSubscriptionsPerActor)
+ }
+ }
+ }
+
override def behavior: Behavior[Command] = Behaviors.setup { ctx =>
ctx.setLoggerName(classOf[LocalReceptionist])
- behavior(TypedMultiMap.empty[AbstractServiceKey, KV], TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
- .narrow[Command]
+ behavior(State.empty).narrow[Command]
}
- private def behavior(serviceRegistry: LocalServiceRegistry, subscriptions: SubscriptionRegistry): Behavior[Any] = {
-
- // Helper to create new state
- def next(
- newRegistry: LocalServiceRegistry = serviceRegistry,
- newSubscriptions: SubscriptionRegistry = subscriptions) =
- behavior(newRegistry, newSubscriptions)
-
- /*
- * Hack to allow multiple termination notifications per target
- * FIXME #26505: replace by simple map in our state
- */
- def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit =
- ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx =>
- innerCtx.watch(target)
- Behaviors.receiveSignal[Nothing] {
- case (_, Terminated(`target`)) =>
- ctx.self ! msg
- Behaviors.stopped
- }
- })
-
+ private def behavior(state: State): Behavior[Any] = {
// Helper that makes sure that subscribers are notified when an entry is changed
- def updateRegistry(
- changedKeysHint: Set[AbstractServiceKey],
- f: LocalServiceRegistry => LocalServiceRegistry): Behavior[Any] = {
- val newRegistry = f(serviceRegistry)
+ def updateServices(changedKeysHint: Set[AbstractServiceKey], f: State => State): Behavior[Any] = {
+ val newState = f(state)
def notifySubscribersFor[T](key: AbstractServiceKey): Unit = {
- val newListing = newRegistry.get(key)
- subscriptions
- .get(key)
- .foreach(
- _ ! ReceptionistMessages
- .Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true))
+ val newListing = newState.services.get(key)
+ val listing =
+ ReceptionistMessages.Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true)
+ newState.subscriptions.get(key).foreach(_ ! listing)
}
changedKeysHint.foreach(notifySubscribersFor)
- next(newRegistry = newRegistry)
+ behavior(newState)
}
def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = {
- val listing = serviceRegistry.get(key)
+ val listing = state.services.get(key)
replyTo ! ReceptionistMessages.Listing(key, listing, listing, servicesWereAddedOrRemoved = true)
}
def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match {
+
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
ctx.log.debug2("Actor was registered: {} {}", key, serviceInstance)
- watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
+ if (!state.servicesPerActor.contains(serviceInstance))
+ ctx.watchWith(serviceInstance, RegisteredActorTerminated(serviceInstance))
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None =>
}
- updateRegistry(Set(key), _.inserted(key)(serviceInstance))
+ updateServices(Set(key), _.serviceInstanceAdded(key)(serviceInstance))
+
+ case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
+ val servicesForActor = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
+ if (servicesForActor.isEmpty) {
+ // actor deregistered but we saw a terminate message before we got the deregistration
+ Behaviors.same
+ } else {
+ ctx.log.debug2("Actor was deregistered: {} {}", key, serviceInstance)
+ if ((servicesForActor - key).isEmpty)
+ ctx.unwatch(serviceInstance)
+
+ maybeReplyTo match {
+ case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
+ case None =>
+ }
+
+ updateServices(Set(key), { state =>
+ val newState = state.serviceInstanceRemoved(key)(serviceInstance)
+ if (state.servicesPerActor.getOrElse(serviceInstance, Set.empty).isEmpty)
+ ctx.unwatch(serviceInstance)
+ newState
+ })
+
+ }
case ReceptionistMessages.Find(key, replyTo) =>
replyWithListing(key, replyTo)
Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) =>
- watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
+ if (!state.subscriptionsPerActor.contains(subscriber))
+ ctx.watchWith(subscriber, SubscriberTerminated(subscriber))
// immediately reply with initial listings to the new subscriber
replyWithListing(key, subscriber)
- next(newSubscriptions = subscriptions.inserted(key)(subscriber))
+ behavior(state.subscriberAdded(key)(subscriber))
}
def onInternal(ctx: ActorContext[Any], cmd: InternalCommand): Behavior[Any] = cmd match {
- case RegisteredActorTerminated(key, serviceInstance) =>
- ctx.log.debug2("Registered actor terminated: {} {}", key, serviceInstance)
- updateRegistry(Set(key), _.removed(key)(serviceInstance))
-
- case SubscriberTerminated(key, subscriber) =>
- next(newSubscriptions = subscriptions.removed(key)(subscriber))
+ case RegisteredActorTerminated(serviceInstance) =>
+ val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
+ if (keys.isEmpty) {
+ // actor terminated but had deregistered all registrations before we could process the termination
+ Behaviors.same
+ } else {
+ ctx.log.debug2("Registered actor terminated: [{}] {}", keys.mkString(","), serviceInstance)
+ updateServices(keys, _.serviceInstanceRemoved(serviceInstance))
+ }
+ case SubscriberTerminated(subscriber) =>
+ if (ctx.log.isDebugEnabled) {
+ val keys = state.subscriptionsPerActor.getOrElse(subscriber, Set.empty)
+ ctx.log.debug2("Subscribed actor terminated: [{}] {}", keys.mkString(","), subscriber)
+ }
+ behavior(state.subscriberRemoved(subscriber))
}
Behaviors.receive[Any] { (ctx, msg) =>
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala
index 166be4ff02..76d9613963 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala
@@ -27,6 +27,12 @@ private[akka] object ReceptionistMessages {
replyTo: Option[ActorRef[Receptionist.Registered]])
extends Command
+ final case class Deregister[T] private[akka] (
+ key: ServiceKey[T],
+ serviceInstance: ActorRef[T],
+ replyTo: Option[ActorRef[Receptionist.Deregistered]])
+ extends Command
+
final case class Registered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T])
extends Receptionist.Registered {
def isForKey(key: ServiceKey[_]): Boolean = key == this.key
@@ -40,6 +46,19 @@ private[akka] object ReceptionistMessages {
serviceInstance(key)
}
+ final case class Deregistered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T])
+ extends Receptionist.Deregistered {
+ def isForKey(key: ServiceKey[_]): Boolean = key == this.key
+ def serviceInstance[M](key: ServiceKey[M]): ActorRef[M] = {
+ if (key != this.key)
+ throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]")
+ _serviceInstance.asInstanceOf[ActorRef[M]]
+ }
+
+ def getServiceInstance[M](key: ServiceKey[M]): ActorRef[M] =
+ serviceInstance(key)
+ }
+
final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command
final case class Listing[T] private[akka] (
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala
index 079afc2b73..f99378a584 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Routers.scala
@@ -20,8 +20,8 @@ object Routers {
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
*
* Note that there is a delay between a routee stopping and this being detected by the receptionist, and another
- * before the group detects this, therefore it is best to unregister routees from the receptionist and not stop
- * until the deregistration is complete to minimize the risk of lost messages.
+ * before the group detects this, therefore it is best to deregister routees from the receptionist and not stop
+ * until the deregistration is complete if you want to minimize the risk of lost messages.
*/
def group[T](key: ServiceKey[T]): GroupRouter[T] =
new GroupRouterBuilder[T](key)
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala
index 145b7c753a..34794597fc 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala
@@ -109,7 +109,7 @@ object Receptionist extends ExtensionId[Receptionist] {
* by sending this command to the [[Receptionist.ref]].
*
* Multiple registrations can be made for the same key. De-registration is implied by
- * the end of the referenced Actor’s lifecycle.
+ * the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
*
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor
* if there is one.
@@ -135,7 +135,7 @@ object Receptionist extends ExtensionId[Receptionist] {
* by sending this command to the [[Receptionist.ref]].
*
* Multiple registrations can be made for the same key. De-registration is implied by
- * the end of the referenced Actor’s lifecycle.
+ * the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
*/
def register[T](key: ServiceKey[T], service: ActorRef[T]): Command = Register(key, service)
@@ -145,7 +145,7 @@ object Receptionist extends ExtensionId[Receptionist] {
* by sending this command to the [[Receptionist.ref]].
*
* Multiple registrations can be made for the same key. De-registration is implied by
- * the end of the referenced Actor’s lifecycle.
+ * the end of the referenced Actor’s lifecycle, but it can also be explicitly deregistered before termination.
*
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor.
*/
@@ -200,10 +200,92 @@ object Receptionist extends ExtensionId[Receptionist] {
def registered[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Registered =
Registered(key, serviceInstance)
+ /**
+ * Remove association between the given [[akka.actor.typed.ActorRef]] and the given [[ServiceKey]].
+ *
+ * Deregistration can be acknowledged with the [[Deregistered]] message if the deregister message is created with a
+ * `replyTo` actor.
+ *
+ * Note that getting the [[Deregistered]] confirmation does not mean all service key subscribers has seen the fact
+ * that the actor has been deregistered yet (especially in a clustered context) so it will be possible that messages
+ * sent to the actor in the role of service provider arrive even after getting the confirmation.
+ */
+ object Deregister {
+
+ /**
+ * Create a Deregister without Ack that the service was deregistered
+ */
+ def apply[T](key: ServiceKey[T], service: ActorRef[T]): Command =
+ new ReceptionistMessages.Deregister[T](key, service, None)
+
+ /**
+ * Create a Deregister with an actor that will get an ack that the service was unregistered
+ */
+ def apply[T](key: ServiceKey[T], service: ActorRef[T], replyTo: ActorRef[Deregistered]): Command =
+ new ReceptionistMessages.Deregister[T](key, service, Some(replyTo))
+ }
+
+ /**
+ * Java API: A Deregister message without Ack that the service was unregistered
+ */
+ def deregister[T](key: ServiceKey[T], service: ActorRef[T]): Command = Deregister(key, service)
+
+ /**
+ * Java API: A Deregister message with an actor that will get an ack that the service was unregistered
+ */
+ def deregister[T](key: ServiceKey[T], service: ActorRef[T], replyTo: ActorRef[Deregistered]): Command =
+ Deregister(key, service, replyTo)
+
+ /**
+ * Confirmation that the given [[akka.actor.typed.ActorRef]] no more associated with the [[ServiceKey]] in the local receptionist.
+ * Note that this does not guarantee that subscribers has yet seen that the service is deregistered.
+ *
+ * Not for user extension
+ */
+ @DoNotInherit
+ trait Deregistered {
+
+ def isForKey(key: ServiceKey[_]): Boolean
+
+ /** Scala API */
+ def key: ServiceKey[_]
+
+ /** Java API */
+ def getKey: ServiceKey[_] = key
+
+ /**
+ * Scala API
+ *
+ * Also, see [[ServiceKey.Listing]] for more convenient pattern matching
+ */
+ def serviceInstance[T](key: ServiceKey[T]): ActorRef[T]
+
+ /** Java API */
+ def getServiceInstance[T](key: ServiceKey[T]): ActorRef[T]
+ }
+
+ /**
+ * Sent by the receptionist, available here for easier testing
+ */
+ object Deregistered {
+
+ /**
+ * Scala API
+ */
+ def apply[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Deregistered =
+ new ReceptionistMessages.Deregistered(key, serviceInstance)
+ }
+
+ /**
+ * Java API: Sent by the receptionist, available here for easier testing
+ */
+ def deregistered[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Deregistered =
+ Deregistered(key, serviceInstance)
+
/**
* `Subscribe` message. The given actor will subscribe to service updates when this command is sent to
- * the [[Receptionist.ref]]. When new instances are registered or unregistered to the given key
- * the given subscriber will be sent a [[Listing]] with the new set of instances for that service.
+ * the [[Receptionist.ref]]. When the set of instances registered for the given key changes
+ * the subscriber will be sent a [[Listing]] with the new set of instances for that service.
*
* The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends
* with the termination of the subscriber.
@@ -220,8 +302,8 @@ object Receptionist extends ExtensionId[Receptionist] {
/**
* Java API: `Subscribe` message. The given actor to service updates when this command is sent to
- * * the [[Receptionist.ref]]. When new instances are registered or unregistered to the given key
- * the given subscriber will be sent a [[Listing]] with the new set of instances for that service.
+ * the [[Receptionist.ref]]. When the set of instances registered for the given key changes
+ * the subscriber will be sent a [[Listing]] with the new set of instances for that service.
*
* The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends
* with the termination of the subscriber.
diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala
index ae4d34aa45..21fa9d65e8 100644
--- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala
+++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Routers.scala
@@ -18,8 +18,8 @@ object Routers {
* The current impl does not try to avoid sending messages to unreachable cluster nodes.
*
* Note that there is a delay between a routee stopping and this being detected by the receptionist and another
- * before the group detects this. Because of this it is best to unregister routees from the receptionist and not stop
- * until the deregistration is complete to minimize the risk of lost messages.
+ * before the group detects this. Because of this it is best to deregister routees from the receptionist and not stop
+ * until the deregistration is complete if you want to minimize the risk of lost messages.
*/
def group[T](key: ServiceKey[T]): GroupRouter[T] =
new GroupRouterBuilder[T](key)
diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala b/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala
index 0942dd9973..cd63f960f6 100644
--- a/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala
+++ b/akka-actor/src/main/scala-2.13+/akka/util/ByteIterator.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009-2019 Lightbend Inc.
+ * Copyright (C) 2009-2020 Lightbend Inc.
*/
package akka.util
diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala b/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala
index 938c1eb428..8c2eac8e57 100644
--- a/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala
+++ b/akka-actor/src/main/scala-2.13+/akka/util/ByteString.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009-2019 Lightbend Inc.
+ * Copyright (C) 2009-2020 Lightbend Inc.
*/
package akka.util
diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ccompat/ccompatUsedUntil213.scala b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/ccompatUsedUntil213.scala
index bbdc1114ac..9476875411 100644
--- a/akka-actor/src/main/scala-2.13+/akka/util/ccompat/ccompatUsedUntil213.scala
+++ b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/ccompatUsedUntil213.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2019 Lightbend Inc.
+ * Copyright (C) 2019-2020 Lightbend Inc.
*/
package akka.util.ccompat
diff --git a/akka-actor/src/main/scala-2.13+/akka/util/ccompat/package.scala b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/package.scala
index 62725a62bf..796717f86c 100644
--- a/akka-actor/src/main/scala-2.13+/akka/util/ccompat/package.scala
+++ b/akka-actor/src/main/scala-2.13+/akka/util/ccompat/package.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2018-2019 Lightbend Inc.
+ * Copyright (C) 2018-2020 Lightbend Inc.
*/
package akka.util
diff --git a/akka-actor/src/main/scala-2.13/akka/compat/Future.scala b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala
index 996b37a951..baf9ddd557 100644
--- a/akka-actor/src/main/scala-2.13/akka/compat/Future.scala
+++ b/akka-actor/src/main/scala-2.13/akka/compat/Future.scala
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009-2019 Lightbend Inc.
+ * Copyright (C) 2009-2020 Lightbend Inc.
*/
package akka.compat
diff --git a/akka-cluster-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes b/akka-cluster-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes
new file mode 100644
index 0000000000..2e9e7fa8c2
--- /dev/null
+++ b/akka-cluster-typed/src/main/mima-filters/2.6.1.backwards.excludes/issue-28123-typed-receptionist-unregistration.excludes
@@ -0,0 +1,24 @@
+# internal messages and apis changed
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist.behavior")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.key")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy$default$1")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.this")
+ProblemFilters.exclude[MissingClassProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist$RegisteredActorTerminated")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.apply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.tombstones")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.activeActorRefsFor")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.addTombstone")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.hasTombstone")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.pruneTombstones")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy")
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy$default$2")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.copy$default$4")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.this")
+ProblemFilters.exclude[MissingClassProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist$RegisteredActorTerminated$")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist.behavior")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.cluster.typed.internal.receptionist.ClusterReceptionist#SubscriberTerminated.unapply")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.cluster.typed.internal.receptionist.ShardedServiceRegistry.unapply")
\ No newline at end of file
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
index d6c5677874..8f9a70f225 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
@@ -9,7 +9,7 @@ import akka.actor.typed.receptionist.Receptionist.Command
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, LoggerOps }
-import akka.actor.typed.{ ActorRef, Behavior, Terminated }
+import akka.actor.typed.{ ActorRef, Behavior }
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator }
@@ -58,8 +58,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
private sealed trait InternalCommand extends Command
- private final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
- private final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]])
+ private final case class LocalServiceActorTerminated[T](ref: ActorRef[T]) extends InternalCommand
+ private final case class SubscriberTerminated[T](ref: ActorRef[ReceptionistMessages.Listing[T]])
extends InternalCommand
private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand
private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
@@ -70,6 +70,108 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
private case object RemoveTick extends InternalCommand
private case object PruneTombstonesTick extends InternalCommand
+ /**
+ * @param registry The last seen state from the replicator - only updated when we get an update from th replicator
+ * @param servicesPerActor needed since an actor can implement several services
+ * @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
+ * for a key.
+ * @param subscriptions Locally subscriptions, not replicated
+ */
+ final case class State(
+ registry: ShardedServiceRegistry,
+ servicesPerActor: Map[ActorRef[_], Set[AbstractServiceKey]],
+ tombstones: Map[ActorRef[_], Set[(AbstractServiceKey, Deadline)]],
+ subscriptions: SubscriptionRegistry) {
+
+ /** tombstone all services actor is registered for */
+ def addTombstone(actor: ActorRef[_], deadline: Deadline): State = {
+ servicesPerActor.getOrElse(actor, Set.empty).foldLeft(this) { (state, key) =>
+ state.addTombstone(actor.asInstanceOf[ActorRef[key.Protocol]], key.asServiceKey, deadline)
+ }
+ }
+
+ /** tombstone specific service actor is registered for */
+ def addTombstone[T](actor: ActorRef[T], serviceKey: ServiceKey[T], deadline: Deadline): State = {
+ val newTombsonesForActor = tombstones.getOrElse(actor, Set.empty) + (serviceKey -> deadline)
+ copy(tombstones = tombstones.updated(actor, newTombsonesForActor))
+ }
+
+ def hasTombstone[T](serviceKey: ServiceKey[T])(actorRef: ActorRef[T]): Boolean =
+ tombstones.nonEmpty && tombstones.getOrElse(actorRef, Set.empty).exists { case (key, _) => key == serviceKey }
+
+ def pruneTombstones(): State = {
+ if (tombstones.isEmpty) this
+ else {
+ val newTombstones: Map[ActorRef[_], Set[(AbstractServiceKey, Deadline)]] =
+ tombstones.foldLeft(tombstones) {
+ case (acc, (actorRef, entries)) =>
+ val entriesToKeep = entries.filter {
+ case (_, deadline) => deadline.hasTimeLeft
+ }
+ if (entriesToKeep.size == entries.size) acc
+ else if (entriesToKeep.isEmpty) acc - actorRef
+ else acc.updated(actorRef, entriesToKeep)
+ }
+ if (newTombstones eq tombstones) this
+ else copy(tombstones = newTombstones)
+ }
+ }
+
+ /**
+ * @return (reachable-nodes, all)
+ */
+ def activeActorRefsFor[T](
+ key: ServiceKey[T],
+ selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = {
+ val ddataKey = registry.ddataKeyFor(key)
+ val entries = registry.serviceRegistries(ddataKey).entriesFor(key)
+ val selfAddress = selfUniqueAddress.address
+ val reachable = Set.newBuilder[ActorRef[T]]
+ val all = Set.newBuilder[ActorRef[T]]
+ entries.foreach { entry =>
+ val entryAddress = entry.uniqueAddress(selfAddress)
+ val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]]
+ if (registry.nodes.contains(entryAddress) && !hasTombstone(key)(ref)) {
+ all += ref
+ if (!registry.unreachable.contains(entryAddress)) {
+ reachable += ref
+ }
+ }
+ }
+ (reachable.result(), all.result())
+ }
+
+ def addLocalService[T](serviceInstance: ActorRef[T], key: ServiceKey[T]): State = {
+ val newServicesPerActor =
+ servicesPerActor.updated(serviceInstance, servicesPerActor.getOrElse(serviceInstance, Set.empty) + key)
+ // if the service was previously registered and unregistered we need to remove it from the tombstones
+ val tombstonesForActor = tombstones.getOrElse(serviceInstance, Set.empty)
+ val newTombstones =
+ if (tombstonesForActor.isEmpty) tombstones
+ else tombstones.updated(serviceInstance, tombstonesForActor.filterNot(_._1 == key))
+ copy(servicesPerActor = newServicesPerActor, tombstones = newTombstones)
+ }
+
+ def removeLocalService[T](serviceInstance: ActorRef[T], key: ServiceKey[T], tombstoneDeadline: Deadline): State = {
+ val newServicesForActor = servicesPerActor.get(serviceInstance) match {
+ case Some(keys) =>
+ val newKeys = keys - key
+ if (newKeys.isEmpty)
+ servicesPerActor - serviceInstance
+ else
+ servicesPerActor.updated(serviceInstance, newKeys)
+ case None =>
+ throw new IllegalArgumentException(
+ s"Trying to remove $serviceInstance for $key but that has never been registered")
+ }
+ addTombstone(serviceInstance, key, tombstoneDeadline).copy(servicesPerActor = newServicesForActor)
+ }
+
+ def removeSubscriber(subscriber: ActorRef[ReceptionistMessages.Listing[Any]]): ClusterReceptionist.State =
+ copy(subscriptions = subscriptions.valueRemoved(subscriber))
+
+ }
+
// captures setup/dependencies so we can avoid doing it over and over again
final class Setup(ctx: ActorContext[Command]) {
val classicSystem = ctx.system.toClassic
@@ -96,7 +198,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behaviors.withTimers { timers =>
val setup = new Setup(ctx)
// include selfUniqueAddress so that it can be used locally before joining cluster
- val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress)
+ val initialRegistry =
+ ShardedServiceRegistry(setup.settings.distributedKeyCount).addNode(setup.selfUniqueAddress)
// subscribe to changes from other nodes
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
@@ -107,7 +210,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]])
}
- registry.allDdataKeys.foreach(key =>
+ initialRegistry.allDdataKeys.foreach(key =>
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toClassic))
// keep track of cluster members
@@ -142,38 +245,19 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// around isn't very costly so don't prune often
timers.startTimerWithFixedDelay(PruneTombstonesTick, setup.keepTombstonesFor / 24)
- behavior(setup, registry, TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
+ val initialState = State(
+ registry = initialRegistry,
+ servicesPerActor = Map.empty,
+ tombstones = Map.empty,
+ subscriptions = TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
+ behavior(setup, initialState)
}
}
- /**
- * @param registry The last seen state from the replicator - only updated when we get an update from th replicator
- * @param subscriptions Locally subscriptions, not replicated
- */
- def behavior(setup: Setup, registry: ShardedServiceRegistry, subscriptions: SubscriptionRegistry): Behavior[Command] =
+ def behavior(setup: Setup, state: State): Behavior[Command] =
Behaviors.setup { ctx =>
import setup._
- // Helper to create new behavior
- def next(newState: ShardedServiceRegistry = registry, newSubscriptions: SubscriptionRegistry = subscriptions) =
- behavior(setup, newState, newSubscriptions)
-
- /*
- * Hack to allow multiple termination notifications per target
- * FIXME #26505: replace by simple map in our state
- */
- def watchWith(ctx: ActorContext[Command], target: ActorRef[_], msg: InternalCommand): Unit = {
- ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx =>
- innerCtx.watch(target)
- Behaviors.receiveSignal[Nothing] {
- case (_, Terminated(`target`)) =>
- ctx.self ! msg
- Behaviors.stopped
- }
- })
- ()
- }
-
def isLeader = {
cluster.state.leader.contains(cluster.selfAddress)
}
@@ -184,7 +268,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
val removals = {
- registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
+ state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries)) =>
val removedEntries = entries.filter(isOnRemovedNode)
if (removedEntries.isEmpty) acc // no change
@@ -205,7 +289,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
.mkString(","))
// shard changes over the ddata keys they belong to
- val removalsPerDdataKey = registry.entriesPerDdataKey(removals)
+ val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
removalsPerDdataKey.foreach {
case (ddataKey, removalForKey) =>
@@ -219,13 +303,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
}
- def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newRegistry: ShardedServiceRegistry): Unit = {
+ def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
keysForNode.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
- val subscribers = subscriptions.get(changedKey)
+ val subscribers = state.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
- val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
+ val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false)
subscribers.foreach(_ ! listing)
@@ -239,81 +323,128 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log
.debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
- watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
+ // actor already watched after one service key registration
+ if (!state.servicesPerActor.contains(serviceInstance))
+ ctx.watchWith(serviceInstance, LocalServiceActorTerminated(serviceInstance))
+
maybeReplyTo match {
case Some(replyTo) => replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None =>
}
- val ddataKey = registry.ddataKeyFor(key)
+ val ddataKey = state.registry.ddataKeyFor(key)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
}
+ behavior(setup, state.addLocalService(serviceInstance, key))
} else {
ctx.log.error("ClusterReceptionist [{}] - Register of non-local [{}] is not supported", serviceInstance)
+ Behaviors.same
+ }
+
+ case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
+ if (serviceInstance.path.address.hasLocalScope) {
+ val entry = Entry(serviceInstance, setup.selfSystemUid)
+ ctx.log.debugN(
+ "ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
+ cluster.selfAddress,
+ key.asServiceKey.id,
+ entry)
+ val newState = state.removeLocalService(serviceInstance, key, setup.newTombstoneDeadline())
+ if (!newState.servicesPerActor.contains(serviceInstance)) {
+ // last service for actor unregistered, stop watching
+ ctx.unwatch(serviceInstance)
+ }
+ maybeReplyTo match {
+ case Some(replyTo) => replyTo ! ReceptionistMessages.Deregistered(key, serviceInstance)
+ case None =>
+ }
+ val ddataKey = state.registry.ddataKeyFor(key)
+ replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
+ ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
+ }
+ // tombstone removals so they are not re-added by merging with other concurrent
+ // registrations for the same key
+ behavior(setup, newState)
+ } else {
+ ctx.log.error("ClusterReceptionist [{}] - Unregistering non-local [{}] is not supported", serviceInstance)
+ Behaviors.same
}
- Behaviors.same
case ReceptionistMessages.Find(key, replyTo) =>
- val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
+ val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) =>
- watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
+ if (subscriber.path.address.hasLocalScope) {
+ ctx.watchWith(subscriber, SubscriberTerminated(subscriber))
- // immediately reply with initial listings to the new subscriber
- val listing = {
- val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
- ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
+ // immediately reply with initial listings to the new subscriber
+ val listing = {
+ val (reachable, all) = state.activeActorRefsFor(key, selfUniqueAddress)
+ ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
+ }
+ subscriber ! listing
+
+ behavior(setup, state.copy(subscriptions = state.subscriptions.inserted(key)(subscriber)))
+ } else {
+ ctx.log.error("ClusterReceptionist [{}] - Subscriptions from non-local [{}] is not supported", subscriber)
+ Behaviors.same
}
- subscriber ! listing
- next(newSubscriptions = subscriptions.inserted(key)(subscriber))
}
def onInternalCommand(cmd: InternalCommand): Behavior[Command] = cmd match {
- case SubscriberTerminated(key, subscriber) =>
- next(newSubscriptions = subscriptions.removed(key)(subscriber))
+ case SubscriberTerminated(subscriber) =>
+ behavior(setup, state.removeSubscriber(subscriber))
- case RegisteredActorTerminated(key, serviceInstance) =>
+ case LocalServiceActorTerminated(serviceInstance) =>
val entry = Entry(serviceInstance, setup.selfSystemUid)
+
+ // could be empty if there was a race between termination and unregistration
+ val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
+
ctx.log.debugN(
"ClusterReceptionist [{}] - Registered actor terminated: [{}] [{}]",
cluster.selfAddress,
- key.asServiceKey.id,
+ keys.map(_.asServiceKey.id).mkString(", "),
entry)
- val ddataKey = registry.ddataKeyFor(key)
- replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
- ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
+
+ keys.foreach { key =>
+ val ddataKey = state.registry.ddataKeyFor(key.asServiceKey)
+ replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
+ ServiceRegistry(registry).removeBinding(key.asServiceKey, entry).toORMultiMap
+ }
}
// tombstone removals so they are not re-added by merging with other concurrent
// registrations for the same key
- next(newState = registry.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
+ behavior(setup, state.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
case ChangeFromReplicator(ddataKey, value) =>
// every change will come back this way - this is where the local notifications happens
- val newState = ServiceRegistry(value)
- val changedKeys = registry.collectChangedKeys(ddataKey, newState)
- val newRegistry = registry.withServiceRegistry(ddataKey, newState)
+ val newRegistry = ServiceRegistry(value)
+ val changedKeys = state.registry.collectChangedKeys(ddataKey, newRegistry)
+ val newState = state.copy(registry = state.registry.withServiceRegistry(ddataKey, newRegistry))
+
if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) {
ctx.log.debugN(
"ClusterReceptionist [{}] - Change from replicator: [{}], changes: [{}], tombstones [{}]",
cluster.selfAddress,
- newState.entries.entries,
+ newRegistry.entries.entries,
changedKeys
- .map(key => key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]"))
+ .map(key => key.asServiceKey.id -> newRegistry.entriesFor(key).mkString("[", ", ", "]"))
.mkString(", "),
- newRegistry.tombstones.mkString(", "))
+ state.tombstones.mkString(", "))
}
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
- val subscribers = subscriptions.get(changedKey)
+ val subscribers = state.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
- val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
+ val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true)
subscribers.foreach(_ ! listing)
@@ -321,7 +452,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
- val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone)
+ val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
if (tombstonedButReAdded.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug2(
@@ -339,13 +470,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
}
- next(newRegistry)
+ behavior(setup, newState)
} else {
Behaviors.same
}
case NodeAdded(uniqueAddress) =>
- next(registry.addNode(uniqueAddress))
+ behavior(setup, state.copy(registry = state.registry.addNode(uniqueAddress)))
case NodeRemoved(uniqueAddress) =>
if (uniqueAddress == selfUniqueAddress) {
@@ -363,38 +494,39 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
nodesRemoved(Set(uniqueAddress))
}
- next(registry.removeNode(uniqueAddress))
+ behavior(setup, state.copy(registry = state.registry.removeNode(uniqueAddress)))
}
case NodeUnreachable(uniqueAddress) =>
- val keysForNode = registry.keysFor(uniqueAddress)
- val newRegistry = registry.addUnreachable(uniqueAddress)
+ val keysForNode = state.registry.keysFor(uniqueAddress)
+ val newState = state.copy(registry = state.registry.addUnreachable(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services unreachable [{}]",
cluster.selfAddress,
uniqueAddress)
- reachabilityChanged(keysForNode, newRegistry)
+ reachabilityChanged(keysForNode, newState)
}
- next(newRegistry)
+ behavior(setup, newState)
case NodeReachable(uniqueAddress) =>
- val keysForNode = registry.keysFor(uniqueAddress)
- val newRegistry = registry.removeUnreachable(uniqueAddress)
+ val keysForNode = state.registry.keysFor(uniqueAddress)
+ val newState = state.copy(registry = state.registry.removeUnreachable(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services reachable again [{}]",
cluster.selfAddress,
uniqueAddress)
- reachabilityChanged(keysForNode, newRegistry)
+ reachabilityChanged(keysForNode, newState)
}
- next(newRegistry)
+ behavior(setup, newState)
case RemoveTick =>
// ok to update from several nodes but more efficient to try to do it from one node
if (isLeader) {
- val allAddressesInState: Set[UniqueAddress] = registry.allUniqueAddressesInState(setup.selfUniqueAddress)
- val notInCluster = allAddressesInState.diff(registry.nodes)
+ val allAddressesInState: Set[UniqueAddress] =
+ state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
+ val notInCluster = allAddressesInState.diff(state.registry.nodes)
if (notInCluster.isEmpty) Behaviors.same
else {
@@ -409,11 +541,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behaviors.same
case PruneTombstonesTick =>
- val prunedRegistry = registry.pruneTombstones()
- if (prunedRegistry eq registry) Behaviors.same
+ val prunedState = state.pruneTombstones()
+ if (prunedState eq state) Behaviors.same
else {
ctx.log.debug("ClusterReceptionist [{}] - Pruning tombstones", cluster.selfAddress)
- next(prunedRegistry)
+ behavior(setup, prunedState)
}
}
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala
index e13a9f547b..ca68d56620 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala
@@ -12,8 +12,6 @@ import akka.cluster.UniqueAddress
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, SelfUniqueAddress }
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry }
-import scala.concurrent.duration.Deadline
-
/**
* INTERNAL API
*/
@@ -23,7 +21,7 @@ import scala.concurrent.duration.Deadline
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
key -> new ServiceRegistry(EmptyORMultiMap)
}.toMap
- new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty, Set.empty)
+ new ShardedServiceRegistry(emptyRegistries, Set.empty, Set.empty)
}
}
@@ -34,14 +32,11 @@ import scala.concurrent.duration.Deadline
* Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not
* get too large ddata messages)
*
- * @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
- * for a key. Since the only way to unregister is to stop, we don't need to keep track of
- * the service key
+
*
*/
@InternalApi private[akka] final case class ShardedServiceRegistry(
serviceRegistries: Map[DDataKey, ServiceRegistry],
- tombstones: Map[ActorRef[_], Deadline],
nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress]) {
@@ -70,32 +65,8 @@ import scala.concurrent.duration.Deadline
def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[AbstractServiceKey] =
serviceRegistries.valuesIterator.flatMap(_.keysFor(address)).toSet
- /**
- * @return (reachable-nodes, all)
- */
- def activeActorRefsFor[T](
- key: ServiceKey[T],
- selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = {
- val ddataKey = ddataKeyFor(key)
- val entries = serviceRegistries(ddataKey).entriesFor(key)
- val selfAddress = selfUniqueAddress.address
- val reachable = Set.newBuilder[ActorRef[T]]
- val all = Set.newBuilder[ActorRef[T]]
- entries.foreach { entry =>
- val entryAddress = entry.uniqueAddress(selfAddress)
- if (nodes.contains(entryAddress) && !hasTombstone(entry.ref)) {
- val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]]
- all += ref
- if (!unreachable.contains(entryAddress)) {
- reachable += ref
- }
- }
- }
- (reachable.result(), all.result())
- }
-
def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
- copy(serviceRegistries + (ddataKey -> registry), tombstones)
+ copy(serviceRegistries + (ddataKey -> registry))
def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] =
allEntries.collect {
@@ -118,18 +89,6 @@ import scala.concurrent.duration.Deadline
acc + (ddataKey -> updated)
}
- def addTombstone(actorRef: ActorRef[_], deadline: Deadline): ShardedServiceRegistry =
- copy(tombstones = tombstones + (actorRef -> deadline))
-
- def hasTombstone(actorRef: ActorRef[_]): Boolean =
- tombstones.nonEmpty && tombstones.contains(actorRef)
-
- def pruneTombstones(): ShardedServiceRegistry = {
- copy(tombstones = tombstones.filter {
- case (_, deadline) => deadline.hasTimeLeft
- })
- }
-
def addNode(node: UniqueAddress): ShardedServiceRegistry =
copy(nodes = nodes + node)
diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java
index 7e3c35a656..8b52125aa9 100644
--- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java
+++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java
@@ -187,6 +187,19 @@ public interface ReceptionistExample {
}
// #find
+ default void deregisterSample() {
+ Behaviors.setup(
+ context -> {
+ // #deregister
+ context
+ .getSystem()
+ .receptionist()
+ .tell(Receptionist.deregister(PingService.pingServiceKey, context.getSelf()));
+ // #deregister
+ return Behaviors.empty();
+ });
+ }
+
public static void main(String[] args) throws Exception {
ActorSystem system = ActorSystem.create(Guardian.create(), "ReceptionistExample");
Thread.sleep(10000);
diff --git a/akka-cluster-typed/src/test/resources/logback-test.xml b/akka-cluster-typed/src/test/resources/logback-test.xml
index 41ea808109..ad3473919e 100644
--- a/akka-cluster-typed/src/test/resources/logback-test.xml
+++ b/akka-cluster-typed/src/test/resources/logback-test.xml
@@ -4,9 +4,6 @@
-
- INFO
-
%date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n
diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
index a28726da6e..33d5138cba 100644
--- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
+++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
@@ -66,6 +66,7 @@ object ClusterReceptionistSpec {
}
val PingKey = ServiceKey[PingProtocol]("pingy")
+ val AnotherKey = ServiceKey[PingProtocol]("pingy-2")
}
class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
@@ -525,7 +526,6 @@ class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
system2.receptionist ! Subscribe(TheKey, regProbe2.ref)
regProbe2.fishForMessage(10.seconds) {
case TheKey.Listing(actors) if actors.nonEmpty =>
- println(actors)
FishingOutcomes.complete
case _ => FishingOutcomes.continue
}
@@ -567,5 +567,173 @@ class ClusterReceptionistSpec extends WordSpec with Matchers with LogCapturing {
}
}
+ "handle unregistration and re-registration of services" in {
+ val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config)
+ val system1 = testKit1.system
+ val testKit2 = ActorTestKit(system1.name, system1.settings.config)
+ val system2 = testKit2.system
+ try {
+
+ val clusterNode1 = Cluster(system1)
+ clusterNode1.manager ! Join(clusterNode1.selfMember.address)
+ val clusterNode2 = Cluster(system2)
+ clusterNode2.manager ! Join(clusterNode1.selfMember.address)
+
+ val regProbe1 = TestProbe[Any]()(system1)
+ val regProbe2 = TestProbe[Any]()(system2)
+
+ regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
+
+ system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
+ regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
+
+ // register and verify seen on remote side
+ val service1 = testKit1.spawn(pingPongBehavior)
+ system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
+
+ // another service for the same key
+ val service2 = testKit1.spawn(pingPongBehavior)
+ system1.receptionist ! Register(PingKey, service2, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service2))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
+
+ // unregister service1 and verify
+ system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Deregistered(PingKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
+
+ // re-register and verify
+ system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(2)
+
+ // cover a race between termination and unregistration as well (should lead to only one update)
+ system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
+ service1 ! Perish
+
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
+ regProbe2.expectNoMessage(1.second)
+
+ akka.cluster.Cluster(system1).shutdown()
+ clusterNode2.manager ! Down(clusterNode1.selfMember.address)
+ } finally {
+ testKit1.shutdownTestKit()
+ testKit2.shutdownTestKit()
+ }
+
+ }
+
+ "handle unregistration per key not per actor" in {
+ val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-11", ClusterReceptionistSpec.config)
+ val system1 = testKit1.system
+ val testKit2 = ActorTestKit(system1.name, system1.settings.config)
+ val system2 = testKit2.system
+ try {
+
+ val clusterNode1 = Cluster(system1)
+ clusterNode1.manager ! Join(clusterNode1.selfMember.address)
+ val clusterNode2 = Cluster(system2)
+ clusterNode2.manager ! Join(clusterNode1.selfMember.address)
+
+ val regProbe1 = TestProbe[Any]()(system1)
+ val regProbe2 = TestProbe[Any]()(system2)
+
+ regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
+
+ system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
+ regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
+ system2.receptionist ! Subscribe(AnotherKey, regProbe2.ref)
+ regProbe2.expectMessage(Listing(AnotherKey, Set.empty[ActorRef[PingProtocol]]))
+
+ // register same actor for two service keys and verify seen on remote side
+ val service1 = testKit1.spawn(pingPongBehavior)
+ system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
+ system1.receptionist ! Register(AnotherKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Registered(AnotherKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
+
+ // unregister service1 for one of the keys and verify
+ system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Deregistered(PingKey, service1))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(0)
+ system2.receptionist ! Find(AnotherKey, regProbe2.ref)
+ regProbe2.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
+
+ system1.receptionist ! Find(PingKey, regProbe1.ref)
+ regProbe1.expectMessageType[Listing].serviceInstances(PingKey).size should ===(0)
+ system1.receptionist ! Find(AnotherKey, regProbe1.ref)
+ regProbe1.expectMessageType[Listing].serviceInstances(AnotherKey).size should ===(1)
+
+ akka.cluster.Cluster(system1).shutdown()
+ clusterNode2.manager ! Down(clusterNode1.selfMember.address)
+ } finally {
+ testKit1.shutdownTestKit()
+ testKit2.shutdownTestKit()
+ }
+
+ }
+
+ "handle concurrent unregistration and registration on different nodes" in {
+ // this covers the fact that with ddata a removal can be lost
+ val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-12", ClusterReceptionistSpec.config)
+ val system1 = testKit1.system
+ val testKit2 = ActorTestKit(system1.name, system1.settings.config)
+ val system2 = testKit2.system
+ try {
+
+ val clusterNode1 = Cluster(system1)
+ clusterNode1.manager ! Join(clusterNode1.selfMember.address)
+ val clusterNode2 = Cluster(system2)
+ clusterNode2.manager ! Join(clusterNode1.selfMember.address)
+
+ val regProbe1 = TestProbe[Any]()(system1)
+ val regProbe2 = TestProbe[Any]()(system2)
+
+ regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) should ===(2), 10.seconds)
+
+ system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
+ regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
+ system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
+ regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
+
+ // register an actor on one side and verify seen on both
+ val service1 = testKit1.spawn(pingPongBehavior)
+ val service2 = testKit2.spawn(pingPongBehavior)
+ system1.receptionist ! Register(PingKey, service1, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service1))
+ regProbe1.expectMessage(Listing(PingKey, Set(service1)))
+ regProbe2.expectMessageType[Listing].serviceInstances(PingKey).size should ===(1)
+
+ // then concurrently register on one node and unregister on the other node for the same key (ofc racy)
+ system1.receptionist ! Deregister(PingKey, service1, regProbe1.ref)
+ system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
+ regProbe1.expectMessage(Deregistered(PingKey, service1))
+ regProbe2.expectMessage(Registered(PingKey, service2))
+
+ regProbe2.fishForMessage(3.seconds) {
+ case PingKey.Listing(actors) if actors == Set(service2) => FishingOutcomes.complete
+ case PingKey.Listing(actors) if actors.size == 2 =>
+ // we may see both actors before we see the removal
+ FishingOutcomes.continueAndIgnore
+ }
+
+ regProbe1.fishForMessage(3.seconds) {
+ case PingKey.Listing(actors) if actors.size == 1 => FishingOutcomes.complete
+ case PingKey.Listing(actors) if actors.isEmpty => FishingOutcomes.continueAndIgnore
+ }
+
+ akka.cluster.Cluster(system1).shutdown()
+ clusterNode2.manager ! Down(clusterNode1.selfMember.address)
+ } finally {
+ testKit1.shutdownTestKit()
+ testKit2.shutdownTestKit()
+ }
+
+ }
+ // Fixme concurrent registration and unregistration
}
}
diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistStateSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistStateSpec.scala
new file mode 100644
index 0000000000..aef11096f7
--- /dev/null
+++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistStateSpec.scala
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2009-2020 Lightbend Inc.
+ */
+
+package akka.cluster.typed.internal.receptionist
+
+import akka.actor.Address
+import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import akka.actor.typed.internal.receptionist.AbstractServiceKey
+import akka.actor.typed.receptionist.ServiceKey
+import akka.cluster.UniqueAddress
+import akka.cluster.typed.internal.receptionist.ClusterReceptionist.SubscriptionsKV
+import akka.util.TypedMultiMap
+import org.scalatest.Matchers
+import org.scalatest.WordSpecLike
+
+import scala.concurrent.duration.Deadline
+import scala.concurrent.duration._
+
+class ClusterReceptionistStateSpec extends ScalaTestWithActorTestKit with WordSpecLike with Matchers {
+
+ val SomeService = ServiceKey[Int]("boy-oh-boy!")
+ val SomeOtherService = ServiceKey[Int]("disappointing!")
+
+ private def emptyState(
+ distributedKeyCount: Int = 1,
+ self: UniqueAddress = UniqueAddress(Address("127.0.0.1", "MySystem"), 555L)) =
+ ClusterReceptionist.State(
+ registry = ShardedServiceRegistry(distributedKeyCount).addNode(self),
+ servicesPerActor = Map.empty,
+ tombstones = Map.empty,
+ subscriptions = TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV])
+
+ "The internal ClusterReceptionist State" must {
+
+ "keep track of local keys per service" in {
+ val someRef = createTestProbe[Int]().ref
+ var state = emptyState()
+ state = state.addLocalService(someRef, SomeService)
+ state = state.addLocalService(someRef, SomeOtherService)
+ state.servicesPerActor(someRef) should ===(Set(SomeService, SomeOtherService))
+ state = state.removeLocalService(someRef, SomeService, Deadline.now)
+ state = state.removeLocalService(someRef, SomeOtherService, Deadline.now)
+ state.servicesPerActor.get(someRef) should ===(None)
+ }
+
+ "keep a tombstone for removed services" in {
+ val someRef = createTestProbe[Int]().ref
+ var state = emptyState()
+ state = state.addLocalService(someRef, SomeService)
+ state = state.removeLocalService(someRef, SomeService, Deadline.now)
+ state.hasTombstone(SomeService)(someRef) should ===(true)
+ }
+
+ "prune tombstones" in {
+ val someRef = createTestProbe[Int]().ref
+ var state = emptyState()
+ state = state.addLocalService(someRef, SomeService)
+ state = state.removeLocalService(someRef, SomeService, Deadline.now - (10.seconds))
+ state = state.pruneTombstones()
+ state.tombstones shouldBe empty
+ }
+
+ }
+
+}
diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala
index 176864a5ea..8b1e3343e9 100644
--- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala
+++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala
@@ -94,6 +94,12 @@ object PingPongExample {
}
//#find
+ Behaviors.setup[PingService.Ping] { context =>
+ //#deregister
+ context.system.receptionist ! Receptionist.Deregister(PingService.PingServiceKey, context.self)
+ //#deregister
+ Behaviors.empty
+ }
}
object ReceptionistExample {
diff --git a/akka-docs/src/main/paradox/typed/actor-discovery.md b/akka-docs/src/main/paradox/typed/actor-discovery.md
index 7555ae3de7..902939d803 100644
--- a/akka-docs/src/main/paradox/typed/actor-discovery.md
+++ b/akka-docs/src/main/paradox/typed/actor-discovery.md
@@ -34,8 +34,9 @@ a `Listing`, which contains a `Set` of actor references that are registered for
registered to the same key.
The registry is dynamic. New actors can be registered during the lifecycle of the system. Entries are removed when
-registered actors are stopped or a node is removed from the @ref:[Cluster](cluster.md). To facilitate this dynamic aspect you can also subscribe
-to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed.
+registered actors are stopped, manually deregistered or the node they live on is removed from the @ref:[Cluster](cluster.md).
+To facilitate this dynamic aspect you can also subscribe to changes with the `Receptionist.Subscribe` message. It will send
+`Listing` messages to the subscriber when entries for a key are changed.
These imports are used in the following example:
@@ -89,6 +90,18 @@ Java
Also note how a `messageAdapter` is used to convert the `Receptionist.Listing` to a message type that
the `PingManager` understands.
+If a server no longer wish to be associated with a service key it can deregister using the command `Receptionist.Deregister`
+which will remove the association and inform all subscribers.
+
+The command can optionally send an acknowledgement once the local receptionist has removed the registration. The acknowledgement does not guarantee
+that all subscribers has seen that the instance has been removed, it may still receive messages from subscribers for some time after this.
+
+Scala
+: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #deregister }
+
+Java
+: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #deregister }
+
## Cluster Receptionist
The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist
@@ -103,3 +116,9 @@ registered actors that are reachable. The full set of actors, including unreacha
One important difference from local only receptions are the serialization concerns, all messages sent to and back from
an actor on another node must be serializable, see @ref:[serialization](../serialization.md).
+
+## Receptionist Scalability
+
+The receptionist does not scale up to any number of services or very high turnaround of services.
+It will likely handle up to thousands or tens of thousands of services. Use cases with higher
+demands the receptionist for initial contact between actors on the nodes while the actual logic of those is up to the applications own actors.