diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala similarity index 92% rename from akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala rename to akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala index 346e19b1fc..9c29fec995 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/LocalReceptionistSpec.scala @@ -2,14 +2,15 @@ * Copyright (C) 2014-2018 Lightbend Inc. */ -package akka.actor.typed.receptionist +package akka.actor.typed.internal.receptionist import akka.actor.typed._ import akka.actor.typed.receptionist.Receptionist._ +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.testkit.typed.TestKitSettings -import akka.testkit.typed.scaladsl.{ BehaviorTestKit, TestInbox, ActorTestKit, TestProbe } +import akka.testkit.typed.scaladsl.{ ActorTestKit, BehaviorTestKit, TestInbox, TestProbe } import org.scalatest.concurrent.Eventually import scala.concurrent.Future @@ -32,18 +33,16 @@ class LocalReceptionistSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } } - import akka.actor.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior ⇒ receptionistBehavior } - implicit val testSettings = TestKitSettings(system) abstract class TestSetup { - val receptionist = spawn(receptionistBehavior) + val receptionist = spawn(LocalReceptionist.behavior) } "A local receptionist" must { "register a service" in { - val testkit = BehaviorTestKit(receptionistBehavior) + val testkit = BehaviorTestKit(LocalReceptionist.behavior) val a = TestInbox[ServiceA]("a") val r = TestInbox[Registered]("r") testkit.run(Register(ServiceKeyA, a.ref, r.ref)) @@ -57,7 +56,7 @@ class LocalReceptionistSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "register two services" in { - val testkit = BehaviorTestKit(receptionistBehavior) + val testkit = BehaviorTestKit(LocalReceptionist.behavior) val a = TestInbox[ServiceA]("a") val r = TestInbox[Registered]("r") testkit.run(Register(ServiceKeyA, a.ref, r.ref)) @@ -74,7 +73,7 @@ class LocalReceptionistSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "register two services with the same key" in { - val testkit = BehaviorTestKit(receptionistBehavior) + val testkit = BehaviorTestKit(LocalReceptionist.behavior) val a1 = TestInbox[ServiceA]("a1") val r = TestInbox[Registered]("r") testkit.run(Register(ServiceKeyA, a1.ref, r.ref)) @@ -153,7 +152,7 @@ class LocalReceptionistSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "work with ask" in { - val receptionist = spawn(receptionistBehavior) + val receptionist = spawn(LocalReceptionist.behavior) val serviceA = spawn(behaviorA) val f: Future[Registered] = receptionist ? (Register(ServiceKeyA, serviceA, _)) f.futureValue should be(Registered(ServiceKeyA, serviceA)) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ServiceKeySerializationSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializationSpec.scala similarity index 90% rename from akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ServiceKeySerializationSpec.scala rename to akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializationSpec.scala index 3dea276f5a..ab51d9aad7 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/ServiceKeySerializationSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializationSpec.scala @@ -2,11 +2,11 @@ * Copyright (C) 2009-2018 Lightbend Inc. */ -package akka.actor.typed.receptionist +package akka.actor.typed.internal.receptionist import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.actor.typed.internal.ActorRefSerializationSpec -import akka.actor.typed.internal.receptionist.ServiceKeySerializer +import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.adapter._ import akka.serialization.SerializationExtension import akka.testkit.typed.scaladsl.ActorTestKit diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index aecb4f1425..782bb22d57 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -36,6 +36,6 @@ akka.actor { serialization-bindings { "akka.actor.typed.ActorRef" = typed-misc "akka.actor.typed.internal.adapter.ActorRefAdapter" = typed-misc - "akka.actor.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = service-key + "akka.actor.typed.internal.receptionist.DefaultServiceKey" = service-key } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala new file mode 100644 index 0000000000..b9b8b04cc4 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/LocalReceptionist.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.actor.typed.internal.receptionist + +import akka.actor.typed.{ ActorRef, Behavior, Terminated } +import akka.actor.typed.receptionist.Receptionist._ +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.actor.typed.scaladsl.Behaviors.{ immutable, same } +import akka.annotation.InternalApi +import akka.util.TypedMultiMap + +/** + * Marker interface to use with dynamic access + * + * INTERNAL API + */ +@InternalApi +private[akka] trait ReceptionistBehaviorProvider { + def behavior: Behavior[Command] +} + +/** INTERNAL API */ +@InternalApi +private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider { + + type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol] + type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV] + type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]] + type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] + + sealed trait InternalCommand + final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand + final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand + + override def behavior: Behavior[Command] = behavior( + TypedMultiMap.empty[AbstractServiceKey, KV], + TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] + ).narrow[Command] + + private def behavior( + serviceRegistry: LocalServiceRegistry, + subscriptions: SubscriptionRegistry): Behavior[Any] = { + + // Helper to create new state + def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) = + behavior(newRegistry, newSubscriptions) + + /* + * Hack to allow multiple termination notifications per target + * FIXME: replace by simple map in our state + */ + def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit = + ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ⇒ + innerCtx.watch(target) + Behaviors.immutable[Nothing]((_, _) ⇒ Behaviors.same) + .onSignal { + case (_, Terminated(`target`)) ⇒ + ctx.self ! msg + Behaviors.stopped + } + }) + + // Helper that makes sure that subscribers are notified when an entry is changed + def updateRegistry(changedKeysHint: Set[AbstractServiceKey], f: LocalServiceRegistry ⇒ LocalServiceRegistry): Behavior[Any] = { + val newRegistry = f(serviceRegistry) + + def notifySubscribersFor[T](key: AbstractServiceKey): Unit = { + val newListing = newRegistry.get(key) + subscriptions.get(key).foreach(_ ! ReceptionistMessages.Listing(key.asServiceKey, newListing)) + } + + changedKeysHint foreach notifySubscribersFor + next(newRegistry = newRegistry) + } + + def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = + replyTo ! ReceptionistMessages.Listing(key, serviceRegistry get key) + + def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match { + case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) ⇒ + ctx.log.debug("Actor was registered: {} {}", key, serviceInstance) + watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) + maybeReplyTo match { + case Some(replyTo) ⇒ replyTo ! ReceptionistMessages.Registered(key, serviceInstance) + case None ⇒ + } + updateRegistry(Set(key), _.inserted(key)(serviceInstance)) + + case ReceptionistMessages.Find(key, replyTo) ⇒ + replyWithListing(key, replyTo) + same + + case ReceptionistMessages.Subscribe(key, subscriber) ⇒ + watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) + + // immediately reply with initial listings to the new subscriber + replyWithListing(key, subscriber) + + next(newSubscriptions = subscriptions.inserted(key)(subscriber)) + } + + def onInternal(ctx: ActorContext[Any], cmd: InternalCommand): Behavior[Any] = cmd match { + case RegisteredActorTerminated(key, serviceInstance) ⇒ + ctx.log.debug("Registered actor terminated: {} {}", key, serviceInstance) + updateRegistry(Set(key), _.removed(key)(serviceInstance)) + + case SubscriberTerminated(key, subscriber) ⇒ + next(newSubscriptions = subscriptions.removed(key)(subscriber)) + } + + immutable[Any] { (ctx, msg) ⇒ + msg match { + case cmd: Command ⇒ onCommand(ctx, cmd) + case cmd: InternalCommand ⇒ onInternal(ctx, cmd) + case _ ⇒ Behaviors.unhandled + } + } + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala deleted file mode 100644 index fd16421bff..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Copyright (C) 2009-2018 Lightbend Inc. - */ - -package akka.actor.typed.internal.receptionist - -import akka.actor.Address -import akka.actor.ExtendedActorSystem -import akka.annotation.InternalApi -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.actor.typed.Terminated -import akka.actor.typed.receptionist.Receptionist._ -import akka.actor.typed.receptionist.ServiceKey -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.Behaviors.immutable -import akka.actor.typed.scaladsl.Behaviors.same -import akka.actor.typed.scaladsl.ActorContext -import akka.util.TypedMultiMap - -/** - * Marker interface to use with dynamic access - * - * Internal API - */ -@InternalApi -private[akka] trait ReceptionistBehaviorProvider { - def behavior: Behavior[Command] -} - -/** Internal API */ -@InternalApi -private[akka] object ReceptionistImpl extends ReceptionistBehaviorProvider { - // FIXME: make sure to provide serializer - final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] { - override def toString: String = s"ServiceKey[$typeName]($id)" - } - - /** - * Interface to allow plugging of external service discovery infrastructure in to the existing receptionist API. - */ - trait ExternalInterface[State] { - def onRegister[T](key: ServiceKey[T], ref: ActorRef[T]): Unit - def onUnregister[T](key: ServiceKey[T], ref: ActorRef[T]): Unit - def onExternalUpdate(update: State) - - final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]], state: State) extends ReceptionistInternalCommand - } - - object LocalExternalInterface extends ExternalInterface[LocalServiceRegistry] { - def onRegister[T](key: ServiceKey[T], ref: ActorRef[T]): Unit = () - def onUnregister[T](key: ServiceKey[T], ref: ActorRef[T]): Unit = () - def onExternalUpdate(update: LocalServiceRegistry): Unit = () - } - - override def behavior: Behavior[Command] = localOnlyBehavior - val localOnlyBehavior: Behavior[Command] = init(_ ⇒ LocalExternalInterface) - - type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol] - type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV] - object LocalServiceRegistry { - val empty: LocalServiceRegistry = TypedMultiMap.empty[AbstractServiceKey, KV] - } - - sealed abstract class ReceptionistInternalCommand extends InternalCommand - final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends ReceptionistInternalCommand - final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[MessageImpls.Listing[T]]) extends ReceptionistInternalCommand - object NodesRemoved { - val empty = NodesRemoved(Set.empty) - } - final case class NodesRemoved(addresses: Set[Address]) extends ReceptionistInternalCommand - - type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[MessageImpls.Listing[K#Protocol]] - type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] - - private[akka] def init[State](externalInterfaceFactory: ActorContext[AllCommands] ⇒ ExternalInterface[State]): Behavior[Command] = - Behaviors.setup[AllCommands] { ctx ⇒ - val externalInterface = externalInterfaceFactory(ctx) - behavior( - TypedMultiMap.empty[AbstractServiceKey, KV], - TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV], - externalInterface) - }.narrow[Command] - - private def behavior[State]( - serviceRegistry: LocalServiceRegistry, - subscriptions: SubscriptionRegistry, - externalInterface: ExternalInterface[State]): Behavior[AllCommands] = { - - // Helper to create new state - def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) = - behavior(newRegistry, newSubscriptions, externalInterface) - - /* - * Hack to allow multiple termination notifications per target - * FIXME: replace by simple map in our state - */ - def watchWith(ctx: ActorContext[AllCommands], target: ActorRef[_], msg: AllCommands): Unit = - ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ⇒ - innerCtx.watch(target) - Behaviors.immutable[Nothing]((_, _) ⇒ Behaviors.same) - .onSignal { - case (_, Terminated(`target`)) ⇒ - ctx.self ! msg - Behaviors.stopped - } - }) - - // Helper that makes sure that subscribers are notified when an entry is changed - def updateRegistry(changedKeysHint: Set[AbstractServiceKey], f: LocalServiceRegistry ⇒ LocalServiceRegistry): Behavior[AllCommands] = { - val newRegistry = f(serviceRegistry) - - def notifySubscribersFor[T](key: AbstractServiceKey): Unit = { - val newListing = newRegistry.get(key) - subscriptions.get(key).foreach(_ ! MessageImpls.Listing(key.asServiceKey, newListing)) - } - - changedKeysHint foreach notifySubscribersFor - next(newRegistry = newRegistry) - } - - def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = - replyTo ! MessageImpls.Listing(key, serviceRegistry get key) - - immutable[AllCommands] { (ctx, msg) ⇒ - msg match { - case MessageImpls.Register(key, serviceInstance, maybeReplyTo) ⇒ - ctx.log.debug("Actor was registered: {} {}", key, serviceInstance) - watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) - maybeReplyTo match { - case Some(replyTo) ⇒ replyTo ! MessageImpls.Registered(key, serviceInstance) - case None ⇒ - } - externalInterface.onRegister(key, serviceInstance) - - updateRegistry(Set(key), _.inserted(key)(serviceInstance)) - - case MessageImpls.Find(key, replyTo) ⇒ - replyWithListing(key, replyTo) - - same - - case externalInterface.RegistrationsChangedExternally(changes, state) ⇒ - - ctx.log.debug("Registration changed: {}", changes) - - // FIXME: get rid of casts - def makeChanges(registry: LocalServiceRegistry): LocalServiceRegistry = - changes.foldLeft(registry) { - case (reg, (key, values)) ⇒ - reg.setAll(key)(values.asInstanceOf[Set[ActorRef[key.Protocol]]]) - } - externalInterface.onExternalUpdate(state) - updateRegistry(changes.keySet, makeChanges) // overwrite all changed keys - - case RegisteredActorTerminated(key, serviceInstance) ⇒ - ctx.log.debug("Registered actor terminated: {} {}", key, serviceInstance) - externalInterface.onUnregister(key, serviceInstance) - updateRegistry(Set(key), _.removed(key)(serviceInstance)) - - case NodesRemoved(addresses) ⇒ - if (addresses.isEmpty) - Behaviors.same - else { - import akka.actor.typed.scaladsl.adapter._ - val localAddress = ctx.system.toUntyped.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - - def isOnRemovedNode(ref: ActorRef[_]): Boolean = { - if (ref.path.address.hasLocalScope) addresses(localAddress) - else addresses(ref.path.address) - } - - var changedKeys = Set.empty[AbstractServiceKey] - val newRegistry: LocalServiceRegistry = { - serviceRegistry.keySet.foldLeft(serviceRegistry) { - case (reg, key) ⇒ - val values = reg.get(key) - val newValues = values.filterNot(isOnRemovedNode) - if (values.size == newValues.size) reg // no change - else { - changedKeys += key - // FIXME: get rid of casts - reg.setAll(key)(newValues.asInstanceOf[Set[ActorRef[key.Protocol]]]) - } - } - } - - if (changedKeys.isEmpty) - Behaviors.same - else { - if (ctx.log.isDebugEnabled) - ctx.log.debug( - "Node(s) [{}] removed, updated keys [{}]", - addresses.mkString(","), changedKeys.map(_.asServiceKey.id).mkString(",")) - updateRegistry(changedKeys, _ ⇒ newRegistry) - } - } - - case MessageImpls.Subscribe(key, subscriber) ⇒ - watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) - - // immediately reply with initial listings to the new subscriber - replyWithListing(key, subscriber) - - next(newSubscriptions = subscriptions.inserted(key)(subscriber)) - - case SubscriberTerminated(key, subscriber) ⇒ - next(newSubscriptions = subscriptions.removed(key)(subscriber)) - - case _: InternalCommand ⇒ - // silence compiler exhaustive check - Behaviors.unhandled - } - } - } -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala new file mode 100644 index 0000000000..3207beb589 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistMessages.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.actor.typed.internal.receptionist + +import akka.actor.typed.ActorRef +import akka.actor.typed.receptionist.Receptionist.Command +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } +import akka.annotation.InternalApi + +import scala.collection.JavaConverters._ + +/** + * Internal API + * + * Shared message implementations for local and cluster receptionist + */ +@InternalApi +private[akka] object ReceptionistMessages { + // some trixery here to provide a nice _and_ safe API in the face + // of type erasure, more type safe factory methods for each message + // is the user API below while still hiding the type parameter so that + // users don't incorrectly match against it + final case class Register[T] private[akka] ( + key: ServiceKey[T], + serviceInstance: ActorRef[T], + replyTo: Option[ActorRef[Receptionist.Registered]]) extends Command + + final case class Registered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T]) extends Receptionist.Registered { + def isForKey(key: ServiceKey[_]): Boolean = key == this.key + def serviceInstance[M](key: ServiceKey[M]): ActorRef[M] = { + if (key != this.key) throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]") + _serviceInstance.asInstanceOf[ActorRef[M]] + } + + def getServiceInstance[M](key: ServiceKey[M]): ActorRef[M] = + serviceInstance(key) + } + + final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command + + final case class Listing[T] private[akka] (key: ServiceKey[T], _serviceInstances: Set[ActorRef[T]]) extends Receptionist.Listing { + + def isForKey(key: ServiceKey[_]): Boolean = key == this.key + + def serviceInstances[M](key: ServiceKey[M]): Set[ActorRef[M]] = { + if (key != this.key) throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]") + _serviceInstances.asInstanceOf[Set[ActorRef[M]]] + } + + def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] = + serviceInstances(key).asJava + } + + final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing]) extends Command + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala new file mode 100644 index 0000000000..33796ccc04 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKey.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.actor.typed.internal.receptionist + +import akka.actor.typed.receptionist.ServiceKey +import akka.annotation.InternalApi + +/** + * Internal representation of [[ServiceKey]] which is needed + * in order to use a TypedMultiMap (using keys with a type parameter does not + * work in Scala 2.x). + * + * Internal API + */ +@InternalApi +private[akka] abstract class AbstractServiceKey { + type Protocol + + /** Type-safe down-cast */ + def asServiceKey: ServiceKey[Protocol] +} + +/** + * This is the only actual concrete service key type + * + * Internal API + */ +final case class DefaultServiceKey[T](id: String, typeName: String) extends ServiceKey[T] { + override def toString: String = s"ServiceKey[$typeName]($id)" +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializer.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializer.scala index c98574f82e..753e6aaa52 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializer.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ServiceKeySerializer.scala @@ -6,13 +6,15 @@ package akka.actor.typed.internal.receptionist import java.nio.charset.StandardCharsets -import akka.actor.typed.internal.receptionist.ReceptionistImpl.DefaultServiceKey import akka.actor.typed.receptionist.ServiceKey import akka.annotation.InternalApi import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } +/** + * Internal API + */ @InternalApi -class ServiceKeySerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { +final class ServiceKeySerializer(val system: akka.actor.ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { def manifest(o: AnyRef): String = o match { case key: DefaultServiceKey[_] ⇒ key.typeName case _ ⇒ diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala index c0acacb3d5..d92ae7085f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala @@ -4,14 +4,9 @@ package akka.actor.typed.receptionist -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.actor.typed.ActorRef -import akka.actor.typed.ActorSystem -import akka.actor.typed.Extension -import akka.actor.typed.ExtensionId -import akka.actor.typed.internal.receptionist.ReceptionistBehaviorProvider -import akka.actor.typed.internal.receptionist.ReceptionistImpl -import akka.actor.typed.receptionist.Receptionist.{ Find, Registered } +import akka.actor.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } +import akka.actor.typed.internal.receptionist._ +import akka.annotation.DoNotInherit import scala.collection.JavaConverters._ import scala.concurrent.duration._ @@ -26,25 +21,20 @@ class Receptionist(system: ActorSystem[_]) extends Extension { } val ref: ActorRef[Receptionist.Command] = { - val behavior = - if (hasCluster) + val provider: ReceptionistBehaviorProvider = + if (hasCluster) { system.dynamicAccess .createInstanceFor[ReceptionistBehaviorProvider]("akka.cluster.typed.internal.receptionist.ClusterReceptionist$", Nil) .recover { case ex ⇒ - system.log.error( - ex, - "ClusterReceptionist could not be loaded dynamically. Make sure you have all required binaries on the classpath.") - ReceptionistImpl - }.get.behavior - - else ReceptionistImpl.localOnlyBehavior + throw new RuntimeException("ClusterReceptionist could not be loaded dynamically. Make sure you have 'akka-cluster-typed' in the classpath.") + }.get + } else LocalReceptionist ActorRef( - system.systemActorOf(behavior, "receptionist")( + system.systemActorOf(provider.behavior, "receptionist")( // FIXME: where should that timeout be configured? Shouldn't there be a better `Extension` // implementation that does this dance for us? - 10.seconds)) } } @@ -54,13 +44,13 @@ object ServiceKey { * Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol. */ def apply[T](id: String)(implicit classTag: ClassTag[T]): ServiceKey[T] = - ReceptionistImpl.DefaultServiceKey(id, classTag.runtimeClass.getName) + DefaultServiceKey(id, classTag.runtimeClass.getName) /** * Java API: Creates a service key. The given ID should uniquely define a service with a given protocol. */ def create[T](clazz: Class[T], id: String): ServiceKey[T] = - ReceptionistImpl.DefaultServiceKey(id, clazz.getName) + DefaultServiceKey(id, clazz.getName) } @@ -69,8 +59,11 @@ object ServiceKey { * T, meaning that it signifies that the type T is the entry point into the * protocol spoken by that service (think of it as the set of first messages * that a client could send). + * + * Not for user extension, see factories in companion object: [[ServiceKey#create]] and [[ServiceKey#apply]] */ -abstract class ServiceKey[T] extends Receptionist.AbstractServiceKey { key ⇒ +@DoNotInherit +abstract class ServiceKey[T] extends AbstractServiceKey { key ⇒ type Protocol = T def id: String def asServiceKey: ServiceKey[T] = this @@ -99,81 +92,20 @@ abstract class ServiceKey[T] extends Receptionist.AbstractServiceKey { key ⇒ * publish their identity together with the protocols that they implement. Other * Actors need only know the Receptionist’s identity in order to be able to use * the services of the registered Actors. + * + * These are the messages (and the extension) for interacting with the receptionist. + * The receptionist is easiest accessed through the system: [[ActorSystem.receptionist]] */ object Receptionist extends ExtensionId[Receptionist] { def createExtension(system: ActorSystem[_]): Receptionist = new Receptionist(system) def get(system: ActorSystem[_]): Receptionist = apply(system) - /** - * Internal representation of [[ServiceKey]] which is needed - * in order to use a TypedMultiMap (using keys with a type parameter does not - * work in Scala 2.x). - * - * Internal API - */ - @InternalApi - private[akka] sealed abstract class AbstractServiceKey { - type Protocol - - /** Type-safe down-cast */ - def asServiceKey: ServiceKey[Protocol] - } - - /** Internal superclass for external and internal commands */ - @InternalApi - sealed private[akka] abstract class AllCommands - /** * The set of commands accepted by a Receptionist. + * + * Not for user Extension */ - sealed abstract class Command extends AllCommands - @InternalApi - private[typed] abstract class InternalCommand extends AllCommands - - /** - * Internal API - */ - @InternalApi - private[akka] object MessageImpls { - // some trixery here to provide a nice _and_ safe API in the face - // of type erasure, more type safe factory methods for each message - // is the user API below while still hiding the type parameter so that - // users don't incorrecly match against it - - final case class Register[T] private[akka] ( - key: ServiceKey[T], - serviceInstance: ActorRef[T], - replyTo: Option[ActorRef[Receptionist.Registered]]) extends Command - - final case class Registered[T] private[akka] (key: ServiceKey[T], _serviceInstance: ActorRef[T]) extends Receptionist.Registered { - def isForKey(key: ServiceKey[_]): Boolean = key == this.key - def serviceInstance[M](key: ServiceKey[M]): ActorRef[M] = { - if (key != this.key) throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]") - _serviceInstance.asInstanceOf[ActorRef[M]] - } - - def getServiceInstance[M](key: ServiceKey[M]): ActorRef[M] = - serviceInstance(key) - } - - final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command - - final case class Listing[T] private[akka] (key: ServiceKey[T], _serviceInstances: Set[ActorRef[T]]) extends Receptionist.Listing { - - def isForKey(key: ServiceKey[_]): Boolean = key == this.key - - def serviceInstances[M](key: ServiceKey[M]): Set[ActorRef[M]] = { - if (key != this.key) throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]") - _serviceInstances.asInstanceOf[Set[ActorRef[M]]] - } - - def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] = - serviceInstances(key).asJava - } - - final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing]) extends Command - - } + @DoNotInherit abstract class Command /** * Associate the given [[akka.actor.typed.ActorRef]] with the given [[ServiceKey]]. Multiple @@ -189,12 +121,12 @@ object Receptionist extends ExtensionId[Receptionist] { * Create a Register without Ack that the service was registered */ def apply[T](key: ServiceKey[T], service: ActorRef[T]): Command = - new MessageImpls.Register[T](key, service, None) + new ReceptionistMessages.Register[T](key, service, None) /** * Create a Register with an actor that will get an ack that the service was registered */ def apply[T](key: ServiceKey[T], service: ActorRef[T], replyTo: ActorRef[Registered]): Command = - new MessageImpls.Register[T](key, service, Some(replyTo)) + new ReceptionistMessages.Register[T](key, service, Some(replyTo)) } /** * Java API: A Register message without Ack that the service was registered @@ -214,7 +146,7 @@ object Receptionist extends ExtensionId[Receptionist] { * Not for user extension */ @DoNotInherit - sealed trait Registered { + trait Registered { def isForKey(key: ServiceKey[_]): Boolean @@ -240,7 +172,7 @@ object Receptionist extends ExtensionId[Receptionist] { * Scala API */ def apply[T](key: ServiceKey[T], serviceInstance: ActorRef[T]): Registered = - new MessageImpls.Registered(key, serviceInstance) + new ReceptionistMessages.Registered(key, serviceInstance) } /** @@ -261,7 +193,7 @@ object Receptionist extends ExtensionId[Receptionist] { * Scala API: */ def apply[T](key: ServiceKey[T], subscriber: ActorRef[Listing]): Command = - new MessageImpls.Subscribe(key, subscriber) + new ReceptionistMessages.Subscribe(key, subscriber) } @@ -281,12 +213,12 @@ object Receptionist extends ExtensionId[Receptionist] { object Find { /** Scala API: */ def apply[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Command = - new MessageImpls.Find(key, replyTo) + new ReceptionistMessages.Find(key, replyTo) /** * Special factory to make using Find with ask easier */ - def apply[T](key: ServiceKey[T]): ActorRef[Listing] ⇒ Command = ref ⇒ new MessageImpls.Find(key, ref) + def apply[T](key: ServiceKey[T]): ActorRef[Listing] ⇒ Command = ref ⇒ new ReceptionistMessages.Find(key, ref) } /** @@ -304,7 +236,7 @@ object Receptionist extends ExtensionId[Receptionist] { * Not for user extension. */ @DoNotInherit - sealed trait Listing { + trait Listing { /** Scala API */ def key: ServiceKey[_] /** Java API */ @@ -330,7 +262,7 @@ object Receptionist extends ExtensionId[Receptionist] { object Listing { /** Scala API: */ def apply[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]): Listing = - new MessageImpls.Listing[T](key, serviceInstances) + new ReceptionistMessages.Listing[T](key, serviceInstances) } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index 45b7e6ffb4..f4bba1bef8 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -4,190 +4,272 @@ package akka.cluster.typed.internal.receptionist -import scala.concurrent.duration._ - -import akka.annotation.InternalApi -import akka.cluster.Cluster -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.ORMultiMap -import akka.cluster.ddata.ORMultiMapKey -import akka.cluster.ddata.Replicator -import akka.cluster.ddata.Replicator.WriteConsistency -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.actor.typed.internal.receptionist.ReceptionistBehaviorProvider -import akka.actor.typed.internal.receptionist.ReceptionistImpl -import akka.actor.typed.internal.receptionist.ReceptionistImpl._ -import akka.actor.typed.receptionist.Receptionist.AbstractServiceKey -import akka.actor.typed.receptionist.Receptionist.AllCommands +import akka.actor.typed.internal.receptionist.{ AbstractServiceKey, ReceptionistBehaviorProvider, ReceptionistMessages } import akka.actor.typed.receptionist.Receptionist.Command import akka.actor.typed.receptionist.ServiceKey -import akka.actor.typed.scaladsl.ActorContext -import scala.language.existentials -import scala.language.higherKinds - -import akka.actor.typed.ActorSystem -import akka.actor.Address -import akka.cluster.ClusterEvent +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.actor.typed.{ ActorRef, Behavior, Terminated } +import akka.actor.{ Address, ExtendedActorSystem } +import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberRemoved -import akka.util.Helpers.toRootLowerCase -import com.typesafe.config.Config +import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator } +import akka.cluster.{ Cluster, ClusterEvent } +import akka.util.TypedMultiMap -/** Internal API */ +import scala.language.{ existentials, higherKinds } +import akka.actor.typed.scaladsl.adapter._ + +/** INTERNAL API */ @InternalApi private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { + + type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]] + type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] + private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], ActorRef[_]]("ReceptionistKey") private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], ActorRef[_]] - case class TypedORMultiMap[K[_], V[_]](map: ORMultiMap[K[_], V[_]]) extends AnyVal { - def getOrElse[T](key: K[T], default: ⇒ Set[V[T]]): Set[V[T]] = - map.getOrElse(key, default.asInstanceOf[Set[V[_]]]).asInstanceOf[Set[V[T]]] + case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends AnyVal { - def getOrEmpty[T](key: K[T]): Set[V[T]] = getOrElse(key, Set.empty) + // let's hide all the ugly casts we can in here + def getOrElse[T](key: AbstractServiceKey, default: ⇒ Set[ActorRef[_]]): Set[ActorRef[key.Protocol]] = + map.getOrElse(key.asServiceKey, default.asInstanceOf[Set[ActorRef[_]]]).asInstanceOf[Set[ActorRef[key.Protocol]]] - def addBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] = - TypedORMultiMap[K, V](map.addBinding(key, value)) + def getOrEmpty[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = getOrElse(key, Set.empty) - def removeBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] = - TypedORMultiMap[K, V](map.removeBinding(key, value)) + def addBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry = + ServiceRegistry(map.addBinding(key, value)) - def toORMultiMap: ORMultiMap[K[_], V[_]] = map - } - object TypedORMultiMap { - def empty[K[_], V[_]] = TypedORMultiMap[K, V](ORMultiMap.empty[K[_], V[_]]) - } - type ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef] - object ServiceRegistry { - def empty: ServiceRegistry = TypedORMultiMap.empty - def apply(map: ORMultiMap[ServiceKey[_], ActorRef[_]]): ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef](map) - } + def removeBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry = + ServiceRegistry(map.removeBinding(key, value)) - private case object RemoveTick - - def behavior: Behavior[Command] = clusterBehavior - val clusterBehavior: Behavior[Command] = ReceptionistImpl.init(ctx ⇒ clusteredReceptionist(ctx)) - - object ClusterReceptionistSettings { - def apply(system: ActorSystem[_]): ClusterReceptionistSettings = - apply(system.settings.config.getConfig("akka.cluster.typed.receptionist")) - - def apply(config: Config): ClusterReceptionistSettings = { - val writeTimeout = 5.seconds // the timeout is not important - val writeConsistency = { - val key = "write-consistency" - toRootLowerCase(config.getString(key)) match { - case "local" ⇒ Replicator.WriteLocal - case "majority" ⇒ Replicator.WriteMajority(writeTimeout) - case "all" ⇒ Replicator.WriteAll(writeTimeout) - case _ ⇒ Replicator.WriteTo(config.getInt(key), writeTimeout) - } + def removeAll(removals: Map[AbstractServiceKey, Set[ActorRef[_]]])(implicit cluster: Cluster): ServiceRegistry = { + removals.foldLeft(this) { + case (acc, (key, actors)) ⇒ + actors.foldLeft(acc) { + case (innerAcc, actor) ⇒ + innerAcc.removeBinding[key.Protocol](key.asServiceKey, actor.asInstanceOf[ActorRef[key.Protocol]]) + } + } + } + + def toORMultiMap: ORMultiMap[ServiceKey[_], ActorRef[_]] = map + } + object ServiceRegistry { + val empty = ServiceRegistry(EmptyORMultiMap) + + def collectChangedKeys(previousState: ServiceRegistry, newState: ServiceRegistry): Set[AbstractServiceKey] = { + val allKeys = previousState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet + allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key) ⇒ + val oldValues = previousState.getOrEmpty(key) + val newValues = newState.getOrEmpty(key) + if (oldValues != newValues) acc + key + else acc } - ClusterReceptionistSettings( - writeConsistency, - pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis - ) } } - case class ClusterReceptionistSettings( - writeConsistency: WriteConsistency, - pruningInterval: FiniteDuration) + sealed trait InternalCommand + final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand + final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand + final case class NodeRemoved(addresses: Address) extends InternalCommand + final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends InternalCommand + case object RemoveTick extends InternalCommand - /** - * Returns an ReceptionistImpl.ExternalInterface that synchronizes registered services with - */ - def clusteredReceptionist(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface[ServiceRegistry] = { - import akka.actor.typed.scaladsl.adapter._ + // captures setup/dependencies so we can avoid doing it over and over again + private class Setup(ctx: ActorContext[Any]) { val untypedSystem = ctx.system.toUntyped - val settings = ClusterReceptionistSettings(ctx.system) - val replicator = DistributedData(untypedSystem).replicator implicit val cluster = Cluster(untypedSystem) + } - var state = ServiceRegistry.empty + override def behavior: Behavior[Command] = Behaviors.setup[Any] { ctx ⇒ - def diff(lastState: ServiceRegistry, newState: ServiceRegistry): Map[AbstractServiceKey, Set[ActorRef[_]]] = { - def changesForKey[T](registry: Map[AbstractServiceKey, Set[ActorRef[_]]], key: ServiceKey[T]): Map[AbstractServiceKey, Set[ActorRef[_]]] = { - val oldValues = lastState.getOrEmpty(key) - val newValues = newState.getOrEmpty(key) - if (oldValues != newValues) - registry + (key → newValues.asInstanceOf[Set[ActorRef[_]]]) - else - registry - } - - val allKeys = lastState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet - allKeys - .foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]])(changesForKey(_, _)) - } - - val externalInterface = new ExternalInterface[ServiceRegistry] { - private def updateRegistry(update: ServiceRegistry ⇒ ServiceRegistry): Unit = { - state = update(state) - replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ - update(ServiceRegistry(registry)).toORMultiMap - } - } - - def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = - updateRegistry(_.addBinding(key, address)) - - def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = - updateRegistry(_.removeBinding(key, address)) - - def onExternalUpdate(update: ServiceRegistry): Unit = { - state = update - } - } + val setup = new Setup(ctx) + // subscribe to changes from other nodes val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = ctx.messageAdapter[Replicator.ReplicatorMessage] { - case changed @ Replicator.Changed(ReceptionistKey) ⇒ - val value = changed.get(ReceptionistKey) - val oldState = state - val newState = ServiceRegistry(value) - val changes = diff(oldState, newState) - externalInterface.RegistrationsChangedExternally(changes, newState) + case changed @ Replicator.Changed(ReceptionistKey) ⇒ ChangeFromReplicator(changed.get(ReceptionistKey)) } - - replicator ! Replicator.Subscribe(ReceptionistKey, replicatorMessageAdapter.toUntyped) + setup.replicator ! Replicator.Subscribe(ReceptionistKey, replicatorMessageAdapter.toUntyped) // remove entries when members are removed val clusterEventMessageAdapter: ActorRef[MemberRemoved] = - ctx.messageAdapter[MemberRemoved] { - case MemberRemoved(member, _) ⇒ - // ok to update from several nodes but more efficient to try to do it from one node - if (cluster.state.leader.contains(cluster.selfAddress)) { - if (member.address == cluster.selfAddress) NodesRemoved.empty - else NodesRemoved(Set(member.address)) - } else - NodesRemoved.empty - } - - cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) + ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) ⇒ NodeRemoved(member.address) } + setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, - // which is possible for OR CRDTs - val removeTickMessageAdapter: ActorRef[RemoveTick.type] = - ctx.messageAdapter[RemoveTick.type] { _ ⇒ - // ok to update from several nodes but more efficient to try to do it from one node - if (cluster.state.leader.contains(cluster.selfAddress)) { - val allAddressesInState: Set[Address] = state.map.entries.flatMap { - case (_, values) ⇒ - // don't care about local (empty host:port addresses) - values.collect { case ref if ref.path.address.hasGlobalScope ⇒ ref.path.address } - }(collection.breakOut) - val clusterAddresses = cluster.state.members.map(_.address) - val diff = allAddressesInState diff clusterAddresses - if (diff.isEmpty) NodesRemoved.empty else NodesRemoved(diff) - } else - NodesRemoved.empty + // which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message + ctx.system.scheduler.schedule(setup.settings.pruningInterval, setup.settings.pruningInterval, + ctx.self.toUntyped, RemoveTick)(ctx.system.executionContext) + + behavior( + setup, + ServiceRegistry.empty, + TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] + ) + }.narrow[Command] + + /** + * @param state The last seen state from the replicator - only updated when we get an update from th replicator + * @param subscriptions Locally subscriptions, not replicated + */ + def behavior( + setup: Setup, + state: ServiceRegistry, + subscriptions: SubscriptionRegistry): Behavior[Any] = + Behaviors.setup[Any] { ctx ⇒ + import setup._ + + // Helper to create new behavior + def next( + newState: ServiceRegistry = state, + newSubscriptions: SubscriptionRegistry = subscriptions) = + behavior(setup, newState, newSubscriptions) + + /* + * Hack to allow multiple termination notifications per target + * FIXME: replace by simple map in our state + */ + def watchWith(ctx: ActorContext[Any], target: ActorRef[_], msg: InternalCommand): Unit = + ctx.spawnAnonymous[Nothing](Behaviors.setup[Nothing] { innerCtx ⇒ + innerCtx.watch(target) + Behaviors.immutable[Nothing]((_, _) ⇒ Behaviors.same) + .onSignal { + case (_, Terminated(`target`)) ⇒ + ctx.self ! msg + Behaviors.stopped + } + }) + + def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { + val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + subscriptions.get(key).foreach(_ ! msg) } - ctx.system.scheduler.schedule(settings.pruningInterval, settings.pruningInterval, - removeTickMessageAdapter.toUntyped, RemoveTick)(ctx.system.executionContext) + def nodesRemoved(addresses: Set[Address]): Behavior[Any] = { + // ok to update from several nodes but more efficient to try to do it from one node + if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { + import akka.actor.typed.scaladsl.adapter._ + val localAddress = ctx.system.toUntyped.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - externalInterface - } + def isOnRemovedNode(ref: ActorRef[_]): Boolean = { + if (ref.path.address.hasLocalScope) addresses(localAddress) + else addresses(ref.path.address) + } + + val removals = { + state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]]) { + case (acc, (key, values)) ⇒ + val removedActors = values.filter(isOnRemovedNode) + if (removedActors.isEmpty) acc // no change + else acc + (key -> removedActors) + } + } + + if (removals.nonEmpty) { + if (ctx.log.isDebugEnabled) + ctx.log.debug( + "Node(s) [{}] removed, updating registry [{}]", + addresses.mkString(","), + removals.map { case (key, actors) ⇒ key.asServiceKey.id -> actors.mkString("[", ", ", "]") }.mkString(",")) + + replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + ServiceRegistry(registry).removeAll(removals).toORMultiMap + } + } + Behaviors.same + + } else Behaviors.same + } + + def onCommand(cmd: Command): Behavior[Any] = cmd match { + case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) ⇒ + ctx.log.debug("Actor was registered: [{}] [{}]", key, serviceInstance.path) + watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) + maybeReplyTo match { + case Some(replyTo) ⇒ replyTo ! ReceptionistMessages.Registered(key, serviceInstance) + case None ⇒ + } + replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + ServiceRegistry(registry).addBinding(key, serviceInstance).toORMultiMap + } + Behaviors.same + + case ReceptionistMessages.Find(key, replyTo) ⇒ + replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + Behaviors.same + + case ReceptionistMessages.Subscribe(key, subscriber) ⇒ + watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) + + // immediately reply with initial listings to the new subscriber + subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + + next(newSubscriptions = subscriptions.inserted(key)(subscriber)) + } + + def onInternalCommand(cmd: InternalCommand): Behavior[Any] = cmd match { + + case SubscriberTerminated(key, subscriber) ⇒ + next(newSubscriptions = subscriptions.removed(key)(subscriber)) + + case RegisteredActorTerminated(key, serviceInstance) ⇒ + ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, serviceInstance.path) + replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + ServiceRegistry(registry).removeBinding(key, serviceInstance).toORMultiMap + } + Behaviors.same + + case ChangeFromReplicator(value) ⇒ + // every change will come back this way - this is where the local notifications happens + val newState = ServiceRegistry(value) + val changedKeys = ServiceRegistry.collectChangedKeys(state, newState) + if (changedKeys.nonEmpty) { + if (ctx.log.isDebugEnabled) { + ctx.log.debug( + "Registration changed: [{}]", + changedKeys.map(key ⇒ + key.asServiceKey.id -> newState.getOrEmpty(key).map(_.path).mkString("[", ", ", "]") + ).mkString(", ")) + } + changedKeys.foreach(notifySubscribersFor(_, newState)) + next(newState) + } else { + Behaviors.same + } + + case NodeRemoved(address) ⇒ + // ok to update from several nodes but more efficient to try to do it from one node + if (cluster.state.leader.contains(cluster.selfAddress)) { + nodesRemoved(Set(address)) + } else Behaviors.same + + case RemoveTick ⇒ + // ok to update from several nodes but more efficient to try to do it from one node + if (cluster.state.leader.contains(cluster.selfAddress)) { + val allAddressesInState: Set[Address] = state.map.entries.flatMap { + case (_, values) ⇒ + // don't care about local (empty host:port addresses) + values.collect { case ref if ref.path.address.hasGlobalScope ⇒ ref.path.address } + }(collection.breakOut) + val clusterAddresses = cluster.state.members.map(_.address) + val diff = allAddressesInState diff clusterAddresses + if (diff.isEmpty) Behavior.same + else nodesRemoved(diff) + } else + Behavior.same + } + + Behaviors.immutable[Any] { (ctx, msg) ⇒ + msg match { + // support two heterogenous types of messages without union types + case cmd: Command ⇒ onCommand(cmd) + case cmd: InternalCommand ⇒ onInternalCommand(cmd) + case _ ⇒ Behaviors.unhandled + } + } + } } diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala new file mode 100644 index 0000000000..7d571ad30d --- /dev/null +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster.typed.internal.receptionist + +import akka.actor.typed.ActorSystem +import akka.annotation.InternalApi +import akka.cluster.ddata.Replicator +import akka.cluster.ddata.Replicator.WriteConsistency +import akka.util.Helpers.toRootLowerCase +import com.typesafe.config.Config + +import scala.concurrent.duration._ +import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS } + +/** + * Internal API + */ +@InternalApi +private[akka] object ClusterReceptionistSettings { + def apply(system: ActorSystem[_]): ClusterReceptionistSettings = + apply(system.settings.config.getConfig("akka.cluster.typed.receptionist")) + + def apply(config: Config): ClusterReceptionistSettings = { + val writeTimeout = 5.seconds // the timeout is not important + val writeConsistency = { + val key = "write-consistency" + toRootLowerCase(config.getString(key)) match { + case "local" ⇒ Replicator.WriteLocal + case "majority" ⇒ Replicator.WriteMajority(writeTimeout) + case "all" ⇒ Replicator.WriteAll(writeTimeout) + case _ ⇒ Replicator.WriteTo(config.getInt(key), writeTimeout) + } + } + ClusterReceptionistSettings( + writeConsistency, + pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis + ) + } +} + +/** + * Internal API + */ +@InternalApi +private[akka] case class ClusterReceptionistSettings( + writeConsistency: WriteConsistency, + pruningInterval: FiniteDuration) + diff --git a/akka-docs/src/main/paradox/cluster-client.md b/akka-docs/src/main/paradox/cluster-client.md index 188eded6aa..e6e772c958 100644 --- a/akka-docs/src/main/paradox/cluster-client.md +++ b/akka-docs/src/main/paradox/cluster-client.md @@ -186,7 +186,7 @@ Maven ## Configuration -The @unidoc[ClusterClientReceptionist] extension (or @unidoc[ClusterReceptionistSettings]) can be configured +The @unidoc[ClusterClientReceptionist] extension (or @unidoc[akka.cluster.client.ClusterReceptionistSettings]) can be configured with the following properties: @@snip [reference.conf]($akka$/akka-cluster-tools/src/main/resources/reference.conf) { #receptionist-ext-config }