From c2e45fa6dcc24d0b5851bab4da057f3506bcc903 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 22 Sep 2017 11:38:07 +0200 Subject: [PATCH] =type Cluster and local Receptionist, #23634 * a Receptionists extension It's basically an improved copy of the former receptionist pattern which is removed here as well. * Cluster implementation using Distributed Data * =typ make ActorRef.apply work for adapted actor systems --- .../main/scala/akka/util/TypedMultiMap.scala | 11 ++ .../ClusterReceptionistSpec.scala | 155 +++++++++++++++++ .../akka/typed/internal/ActorSystemStub.scala | 3 - .../akka/typed/internal/FunctionRefSpec.scala | 2 +- .../LocalReceptionistSpec.scala} | 124 +++++++++----- .../src/main/scala/akka/typed/ActorRef.scala | 10 +- .../main/scala/akka/typed/ActorSystem.scala | 7 +- .../receptionist/ClusterReceptionist.scala | 119 +++++++++++++ .../akka/typed/internal/ActorSystemImpl.scala | 10 +- .../akka/typed/internal/ExtensionsImpl.scala | 2 +- .../internal/adapter/ActorSystemAdapter.scala | 23 +-- .../receptionist/ReceptionistImpl.scala | 160 ++++++++++++++++++ .../akka/typed/patterns/Receptionist.scala | 93 ---------- .../typed/receptionist/Receptionist.scala | 150 ++++++++++++++++ 14 files changed, 702 insertions(+), 167 deletions(-) create mode 100644 akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala rename akka-typed-tests/src/test/scala/akka/typed/{patterns/ReceptionistSpec.scala => receptionist/LocalReceptionistSpec.scala} (51%) create mode 100644 akka-typed/src/main/scala/akka/typed/cluster/internal/receptionist/ClusterReceptionist.scala create mode 100644 akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala delete mode 100644 akka-typed/src/main/scala/akka/typed/patterns/Receptionist.scala create mode 100644 akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala diff --git a/akka-actor/src/main/scala/akka/util/TypedMultiMap.scala b/akka-actor/src/main/scala/akka/util/TypedMultiMap.scala index e0b0ecb04c..9c7760ce10 100644 --- a/akka-actor/src/main/scala/akka/util/TypedMultiMap.scala +++ b/akka-actor/src/main/scala/akka/util/TypedMultiMap.scala @@ -88,6 +88,17 @@ class TypedMultiMap[T <: AnyRef, K[_ <: T]] private (private val map: Map[T, Set } } + def setAll(key: T)(values: Set[K[key.type]]): TypedMultiMap[T, K] = + new TypedMultiMap[T, K](map.updated(key, values.asInstanceOf[Set[Any]])) + + /** + * Add all entries from the other map, overwriting existing entries. + * + * FIXME: should it merge, instead? + */ + def ++(other: TypedMultiMap[T, K]): TypedMultiMap[T, K] = + new TypedMultiMap[T, K](map ++ other.map) + override def toString: String = s"TypedMultiMap($map)" override def equals(other: Any) = other match { case o: TypedMultiMap[_, _] ⇒ map == o.map diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala new file mode 100644 index 0000000000..a0518a1e11 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.receptionist + +import java.nio.charset.StandardCharsets + +import akka.actor.ExtendedActorSystem +import akka.cluster.Cluster +import akka.serialization.SerializerWithStringManifest +import akka.typed.ActorRef +import akka.typed.ActorSystem +import akka.typed.TypedSpec +import akka.typed.TypedSpec.Command +import akka.typed.cluster.ActorRefResolver +import akka.typed.internal.adapter.ActorRefAdapter +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.internal.receptionist.ReceptionistImpl +import akka.typed.receptionist.Receptionist +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object ClusterReceptionistSpec { + val config = ConfigFactory.parseString( + """ + akka.log-level = DEBUG + akka.actor { + provider = cluster + serialize-messages = off + allow-java-serialization = off + serializers { + test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$PingSerializer" + } + serialization-bindings { + "akka.typed.cluster.receptionist.ClusterReceptionistSpec$Ping" = test + "akka.typed.cluster.receptionist.ClusterReceptionistSpec$Pong$" = test + "akka.typed.cluster.receptionist.ClusterReceptionistSpec$Perish$" = test + "akka.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = test + "akka.typed.internal.adapter.ActorRefAdapter" = test + } + } + akka.remote.artery.enabled = true + akka.remote.artery.canonical.port = 25552 + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + """) + + trait PingProtocol + case object Pong + case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol + + case object Perish extends PingProtocol + + val pingPong = Actor.immutable[PingProtocol] { (ctx, msg) ⇒ + + msg match { + case Ping(respondTo) ⇒ + respondTo ! Pong + Actor.same + + case Perish ⇒ + Actor.stopped + } + + } + + class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 47 + def manifest(o: AnyRef): String = o match { + case _: Ping ⇒ "a" + case Pong ⇒ "b" + case Perish ⇒ "c" + case ReceptionistImpl.DefaultServiceKey(id) ⇒ "d" + case a: ActorRefAdapter[_] ⇒ "e" + } + + def toBinary(o: AnyRef): Array[Byte] = o match { + case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8) + case Pong ⇒ Array.emptyByteArray + case Perish ⇒ Array.emptyByteArray + case ReceptionistImpl.DefaultServiceKey(id) ⇒ id.getBytes(StandardCharsets.UTF_8) + case a: ActorRefAdapter[_] ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(a).getBytes(StandardCharsets.UTF_8) + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case "a" ⇒ Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))) + case "b" ⇒ Pong + case "c" ⇒ Perish + case "d" ⇒ ReceptionistImpl.DefaultServiceKey[Any](new String(bytes, StandardCharsets.UTF_8)) + case "e" ⇒ ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)) + } + } + + val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy") +} + +class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) { + import ClusterReceptionistSpec._ + + implicit val testSettings = TestKitSettings(adaptedSystem) + val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) + val clusterNode1 = Cluster(untypedSystem1) + + val system2 = akka.actor.ActorSystem( + adaptedSystem.name, + ConfigFactory.parseString( + """ + akka.remote.artery.canonical.port = 0 + """ + ).withFallback(adaptedSystem.settings.config)) + val adaptedSystem2 = system2.toTyped + val clusterNode2 = Cluster(system2) + + clusterNode1.join(clusterNode1.selfAddress) + clusterNode2.join(clusterNode1.selfAddress) + + object `The ClusterReceptionist` extends StartSupport { + def system: ActorSystem[Command] = adaptedSystem + import Receptionist._ + + def `must eventually replicate registrations to the other side`() = new TestSetup { + val regProbe = TestProbe[Any]()(adaptedSystem, testSettings) + val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings) + + adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref) + regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service = start(pingPong) + system.receptionist ! Register(PingKey, service, regProbe.ref) + regProbe.expectMsg(Registered(PingKey, service)) + + val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]] + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe2.ref) + regProbe2.expectMsg(Pong) + + service ! Perish + regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + } + } + + trait TestSetup { + } + + override def afterAll(): Unit = { + super.afterAll() + Await.result(adaptedSystem.terminate(), 3.seconds) + Await.result(system2.terminate(), 3.seconds) + } +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/internal/ActorSystemStub.scala b/akka-typed-tests/src/test/scala/akka/typed/internal/ActorSystemStub.scala index ae080526c5..61190dcc45 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/internal/ActorSystemStub.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/internal/ActorSystemStub.scala @@ -58,9 +58,6 @@ private[typed] class ActorSystemStub(val name: String) override def printTree: String = "no tree for ActorSystemStub" - val receptionistInbox = Inbox[patterns.Receptionist.Command]("receptionist") - override def receptionist: ActorRef[patterns.Receptionist.Command] = receptionistInbox.ref - def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(implicit timeout: Timeout): Future[ActorRef[U]] = { Future.failed(new UnsupportedOperationException("ActorSystemStub cannot create system actors")) } diff --git a/akka-typed-tests/src/test/scala/akka/typed/internal/FunctionRefSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/internal/FunctionRefSpec.scala index 92391041af..f9f0d83aa6 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/internal/FunctionRefSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/internal/FunctionRefSpec.scala @@ -26,7 +26,7 @@ class FunctionRefSpec extends TypedSpecSetup { val ref = ActorRef(f) ref ! "42" ref ! "43" - target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil) + target.receiveAll() should ===(Right("42") :: Right("43") :: Nil) } def `must forward messages that are received before getting the ActorRef`(): Unit = { diff --git a/akka-typed-tests/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/receptionist/LocalReceptionistSpec.scala similarity index 51% rename from akka-typed-tests/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala rename to akka-typed-tests/src/test/scala/akka/typed/receptionist/LocalReceptionistSpec.scala index e7d54af006..fcb59a70ff 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/receptionist/LocalReceptionistSpec.scala @@ -1,36 +1,54 @@ /** * Copyright (C) 2014-2017 Lightbend Inc. */ -package akka.typed.patterns +package akka.typed.receptionist -import Receptionist._ +import akka.typed._ +import akka.typed.receptionist.Receptionist._ +import akka.typed.scaladsl.Actor import akka.typed.scaladsl.AskPattern._ +import akka.typed.testkit.EffectfulActorContext +import akka.typed.testkit.Inbox +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import org.scalatest.concurrent.Eventually import scala.concurrent.duration._ -import akka.typed._ -import akka.typed.scaladsl.Actor -import akka.typed.scaladsl.Actor._ -import akka.typed.testkit.{ Effect, EffectfulActorContext, Inbox } -class ReceptionistSpec extends TypedSpec { +class LocalReceptionistSpec extends TypedSpec with Eventually { trait ServiceA - case object ServiceKeyA extends ServiceKey[ServiceA] + val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a") val behaviorA = Actor.empty[ServiceA] trait ServiceB - case object ServiceKeyB extends ServiceKey[ServiceB] + val ServiceKeyB = Receptionist.ServiceKey[ServiceB]("service-b") val behaviorB = Actor.empty[ServiceB] - trait CommonTests { + case object Stop extends ServiceA with ServiceB + val stoppableBehavior = Actor.immutable[Any] { (ctx, msg) ⇒ + msg match { + case Stop ⇒ Behavior.stopped + case _ ⇒ Behavior.same + } + } + + import akka.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior ⇒ behavior } + + trait CommonTests extends StartSupport { implicit def system: ActorSystem[TypedSpec.Command] + implicit val testSettings = TestKitSettings(system) + + abstract class TestSetup { + val receptionist = start(behavior) + } def `must register a service`(): Unit = { val ctx = new EffectfulActorContext("register", behavior, 1000, system) val a = Inbox[ServiceA]("a") val r = Inbox[Registered[_]]("r") ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) - ctx.getAllEffects() should be(Effect.Watched(a.ref) :: Nil) + ctx.getEffect() // watching however that is implemented r.receiveMsg() should be(Registered(ServiceKeyA, a.ref)) val q = Inbox[Listing[ServiceA]]("q") ctx.run(Find(ServiceKeyA)(q.ref)) @@ -73,38 +91,62 @@ class ReceptionistSpec extends TypedSpec { assertEmpty(a1, a2, r, q) } - def `must unregister services when they terminate`(): Unit = { - val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system) - val r = Inbox[Registered[_]]("r") - val a = Inbox[ServiceA]("a") - ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) - ctx.getEffect() should be(Effect.Watched(a.ref)) - r.receiveMsg() should be(Registered(ServiceKeyA, a.ref)) + def `must unregister services when they terminate`(): Unit = new TestSetup { + val regProbe = TestProbe[Any]("regProbe") - val b = Inbox[ServiceB]("b") - ctx.run(Register(ServiceKeyB, b.ref)(r.ref)) - ctx.getEffect() should be(Effect.Watched(b.ref)) - r.receiveMsg() should be(Registered(ServiceKeyB, b.ref)) + val serviceA = start(stoppableBehavior.narrow[ServiceA]) + receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) + regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) - val c = Inbox[Any]("c") - ctx.run(Register(ServiceKeyA, c.ref)(r.ref)) - ctx.run(Register(ServiceKeyB, c.ref)(r.ref)) - ctx.getAllEffects() should be(Seq(Effect.Watched(c.ref), Effect.Watched(c.ref))) - r.receiveMsg() should be(Registered(ServiceKeyA, c.ref)) - r.receiveMsg() should be(Registered(ServiceKeyB, c.ref)) + val serviceB = start(stoppableBehavior.narrow[ServiceB]) + receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref) + regProbe.expectMsg(Registered(ServiceKeyB, serviceB)) - val q = Inbox[Listing[_]]("q") - ctx.run(Find(ServiceKeyA)(q.ref)) - q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref, c.ref))) - ctx.run(Find(ServiceKeyB)(q.ref)) - q.receiveMsg() should be(Listing(ServiceKeyB, Set(b.ref, c.ref))) + val serviceC = start(stoppableBehavior) + receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref) + receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref) + regProbe.expectMsg(Registered(ServiceKeyA, serviceC)) + regProbe.expectMsg(Registered(ServiceKeyB, serviceC)) - ctx.signal(Terminated(c.ref)(null)) - ctx.run(Find(ServiceKeyA)(q.ref)) - q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref))) - ctx.run(Find(ServiceKeyB)(q.ref)) - q.receiveMsg() should be(Listing(ServiceKeyB, Set(b.ref))) - assertEmpty(a, b, c, r, q) + receptionist ! Find(ServiceKeyA, regProbe.ref) + regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceC))) + receptionist ! Find(ServiceKeyB, regProbe.ref) + regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB, serviceC))) + + serviceC ! Stop + + eventually { + receptionist ! Find(ServiceKeyA, regProbe.ref) + regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA))) + receptionist ! Find(ServiceKeyB, regProbe.ref) + regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB))) + } + } + + def `must support subscribing to service changes`(): Unit = new TestSetup { + val regProbe = TestProbe[Registered[_]]("regProbe") + + val aSubscriber = TestProbe[Listing[ServiceA]]("aUser") + receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref) + + aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]])) + + val serviceA: ActorRef[ServiceA] = start(stoppableBehavior) + receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref) + regProbe.expectMsg(Registered(ServiceKeyA, serviceA)) + + aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA))) + + val serviceA2: ActorRef[ServiceA] = start(stoppableBehavior) + receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref) + regProbe.expectMsg(Registered(ServiceKeyA, serviceA2)) + + aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceA2))) + + serviceA ! Stop + aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA2))) + serviceA2 ! Stop + aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]])) } def `must work with ask`(): Unit = sync(runTest("Receptionist") { @@ -132,9 +174,9 @@ class ReceptionistSpec extends TypedSpec { StepWise[Listing[ServiceA]] { (ctx, startWith) ⇒ val self = ctx.self startWith.withKeepTraces(true) { - ctx.system.receptionist ! Find(ServiceKeyA)(self) + system.receptionist ! Find(ServiceKeyA)(self) }.expectMessage(1.second) { (msg, _) ⇒ - msg.addresses should ===(Set()) + msg.serviceInstances should ===(Set()) } } }) diff --git a/akka-typed/src/main/scala/akka/typed/ActorRef.scala b/akka-typed/src/main/scala/akka/typed/ActorRef.scala index 7eb3c1ec82..cd1b3daaf2 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorRef.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorRef.scala @@ -4,9 +4,11 @@ package akka.typed import akka.{ actor ⇒ a } + import scala.annotation.unchecked.uncheckedVariance import language.implicitConversions import scala.concurrent.Future +import scala.util.Success /** * An ActorRef is the identity or address of an Actor instance. It is valid @@ -65,7 +67,13 @@ object ActorRef { * messages in while the Future is not fulfilled. */ def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = - new internal.FutureRef(FuturePath, bufferSize, f) + f.value match { + // an AdaptedActorSystem will always create refs eagerly, so it will take this path + case Some(Success(ref)) ⇒ ref + // for other ActorSystem implementations, this might work, it currently doesn't work + // for the adapted system, because the typed FutureRef cannot be watched from untyped + case x ⇒ new internal.FutureRef(FuturePath, bufferSize, f) + } /** * Create an ActorRef by providing a function that is invoked for sending diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index f80cfbc70f..133fcf5a81 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -17,6 +17,8 @@ import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange import java.util.Optional +import akka.typed.receptionist.Receptionist + /** * An ActorSystem is home to a hierarchy of Actors. It is created using * [[ActorSystem#apply]] from a [[Behavior]] object that describes the root @@ -146,9 +148,10 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: inter def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]] /** - * Return a reference to this system’s [[akka.typed.patterns.Receptionist$]]. + * Return a reference to this system’s [[akka.typed.receptionist.Receptionist]]. */ - def receptionist: ActorRef[patterns.Receptionist.Command] + def receptionist: ActorRef[Receptionist.Command] = + Receptionist(this).ref } object ActorSystem { diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/receptionist/ClusterReceptionist.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/receptionist/ClusterReceptionist.scala new file mode 100644 index 0000000000..bd1e19e46d --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/receptionist/ClusterReceptionist.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.internal.receptionist + +import akka.annotation.InternalApi +import akka.cluster.Cluster +import akka.cluster.ddata.DistributedData +import akka.cluster.ddata.ORMultiMap +import akka.cluster.ddata.ORMultiMapKey +import akka.cluster.ddata.Replicator +import akka.cluster.ddata.Replicator.WriteConsistency +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.internal.receptionist.ReceptionistBehaviorProvider +import akka.typed.internal.receptionist.ReceptionistImpl +import akka.typed.internal.receptionist.ReceptionistImpl._ +import akka.typed.receptionist.Receptionist.AbstractServiceKey +import akka.typed.receptionist.Receptionist.AllCommands +import akka.typed.receptionist.Receptionist.Command +import akka.typed.receptionist.Receptionist.ServiceKey +import akka.typed.scaladsl.ActorContext + +import scala.language.existentials +import scala.language.higherKinds + +/** Internal API */ +@InternalApi +private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { + private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], ActorRef[_]]("ReceptionistKey") + private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], ActorRef[_]] + + case class TypedORMultiMap[K[_], V[_]](val map: ORMultiMap[K[_], V[_]]) extends AnyVal { + def getOrElse[T](key: K[T], default: ⇒ Set[V[T]]): Set[V[T]] = + map.getOrElse(key, default.asInstanceOf[Set[V[_]]]).asInstanceOf[Set[V[T]]] + + def getOrEmpty[T](key: K[T]): Set[V[T]] = getOrElse(key, Set.empty) + + def addBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] = + TypedORMultiMap[K, V](map.addBinding(key, value)) + + def removeBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] = + TypedORMultiMap[K, V](map.removeBinding(key, value)) + + def toORMultiMap: ORMultiMap[K[_], V[_]] = map + } + object TypedORMultiMap { + def empty[K[_], V[_]] = TypedORMultiMap[K, V](ORMultiMap.empty[K[_], V[_]]) + } + type ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef] + object ServiceRegistry { + def empty: ServiceRegistry = TypedORMultiMap.empty + def apply(map: ORMultiMap[ServiceKey[_], ActorRef[_]]): ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef](map) + } + + def behavior: Behavior[Command] = clusterBehavior + val clusterBehavior: Behavior[Command] = ReceptionistImpl.init(clusteredReceptionist()) + + case class ClusterReceptionistSettings( + writeConsistency: WriteConsistency = Replicator.WriteLocal + ) + + /** + * Returns an ReceptionistImpl.ExternalInterface that synchronizes registered services with + */ + def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface = { + import akka.typed.scaladsl.adapter._ + val untypedSystem = ctx.system.toUntyped + + val replicator = DistributedData(untypedSystem).replicator + implicit val cluster = Cluster(untypedSystem) + + var state = ServiceRegistry.empty + + def diff(lastState: ServiceRegistry, newState: ServiceRegistry): Map[AbstractServiceKey, Set[ActorRef[_]]] = { + def changesForKey[T](registry: Map[AbstractServiceKey, Set[ActorRef[_]]], key: ServiceKey[T]): Map[AbstractServiceKey, Set[ActorRef[_]]] = { + val oldValues = lastState.getOrEmpty(key) + val newValues = newState.getOrEmpty(key) + if (oldValues != newValues) + registry + (key → newValues.asInstanceOf[Set[ActorRef[_]]]) + else + registry + } + + val allKeys = lastState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet + allKeys + .foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]])(changesForKey(_, _)) + } + + val adapter: ActorRef[Replicator.ReplicatorMessage] = + ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage) ⇒ + x match { + case changed @ Replicator.Changed(ReceptionistKey) ⇒ + val value = changed.get(ReceptionistKey) + val oldState = state + state = ServiceRegistry(value) // is that thread-safe? + val changes = diff(oldState, state) + RegistrationsChangedExternally(changes) + } + } + + replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped) + + new ExternalInterface { + private def updateRegistry(update: ServiceRegistry ⇒ ServiceRegistry): Unit = { + state = update(state) + replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + update(ServiceRegistry(registry)).toORMultiMap + } + } + + def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = + updateRegistry(_.addBinding(key, address)) + + def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = + updateRegistry(_.removeBinding(key, address)) + } + } +} diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala index ee425d16c0..62e5d0b57a 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala @@ -5,22 +5,29 @@ package akka.typed package internal import com.typesafe.config.Config + import scala.concurrent.ExecutionContext import java.util.concurrent.ThreadFactory + import scala.concurrent.{ ExecutionContextExecutor, Future } import akka.{ actor ⇒ a, dispatch ⇒ d, event ⇒ e } + import scala.util.control.NonFatal import scala.util.control.ControlThrowable import scala.collection.immutable import akka.typed.Dispatchers + import scala.concurrent.Promise import java.util.concurrent.ConcurrentSkipListSet import java.util.concurrent.atomic.AtomicBoolean + import scala.collection.JavaConverters._ import scala.util.Success import akka.util.Timeout import java.io.Closeable import java.util.concurrent.atomic.AtomicInteger + +import akka.typed.receptionist.Receptionist import akka.typed.scaladsl.AskPattern object ActorSystemImpl { @@ -227,9 +234,6 @@ private[typed] class ActorSystemImpl[-T]( private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyProps) - override val receptionist: ActorRef[patterns.Receptionist.Command] = - ActorRef(systemActorOf(patterns.Receptionist.behavior, "receptionist")(settings.untyped.CreationTimeout)) - private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianProps) // now we can start up the loggers diff --git a/akka-typed/src/main/scala/akka/typed/internal/ExtensionsImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ExtensionsImpl.scala index 9688b2d222..1f69dc096e 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ExtensionsImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ExtensionsImpl.scala @@ -25,7 +25,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ /** * Hook for ActorSystem to load extensions on startup */ - protected final def loadExtensions() { + protected final def loadExtensions(): Unit = { /** * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala index 6ed6ad7360..a63c3c2cd6 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala @@ -5,17 +5,13 @@ package akka.typed package internal package adapter -import akka.{ actor ⇒ a, dispatch ⇒ d } -import akka.dispatch.sysmsg +import akka.{ actor ⇒ a } import scala.concurrent.ExecutionContextExecutor import akka.util.Timeout import scala.concurrent.Future import akka.annotation.InternalApi -import akka.typed.scaladsl.adapter.AdapterExtension - -import scala.annotation.unchecked.uncheckedVariance /** * INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context). @@ -66,9 +62,6 @@ import scala.annotation.unchecked.uncheckedVariance override def uptime: Long = untyped.uptime override def printTree: String = untyped.printTree - override def receptionist: ActorRef[patterns.Receptionist.Command] = - ReceptionistExtension(untyped).receptionist - import akka.dispatch.ExecutionContexts.sameThreadExecutionContext override def terminate(): scala.concurrent.Future[akka.typed.Terminated] = @@ -103,19 +96,5 @@ private[typed] object ActorSystemAdapter { case _ ⇒ throw new UnsupportedOperationException("only adapted untyped ActorSystem permissible " + s"($sys of class ${sys.getClass.getName})") } - - object ReceptionistExtension extends a.ExtensionId[ReceptionistExtension] with a.ExtensionIdProvider { - override def get(system: a.ActorSystem): ReceptionistExtension = super.get(system) - override def lookup = ReceptionistExtension - override def createExtension(system: a.ExtendedActorSystem): ReceptionistExtension = - new ReceptionistExtension(system) - } - - class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension { - val receptionist: ActorRef[patterns.Receptionist.Command] = - ActorRefAdapter(system.systemActorOf( - PropsAdapter(() ⇒ patterns.Receptionist.behavior, EmptyProps), - "receptionist")) - } } diff --git a/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala new file mode 100644 index 0000000000..37350926ce --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/receptionist/ReceptionistImpl.scala @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.internal.receptionist + +import akka.annotation.InternalApi +import akka.typed.ActorRef +import akka.typed.Behavior +import akka.typed.Terminated +import akka.typed.receptionist.Receptionist._ +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.Actor.immutable +import akka.typed.scaladsl.Actor.same +import akka.typed.scaladsl.ActorContext +import akka.util.TypedMultiMap + +import scala.reflect.ClassTag + +/** + * Marker interface to use with dynamic access + * + * Internal API + */ +@InternalApi +private[typed] trait ReceptionistBehaviorProvider { + def behavior: Behavior[Command] +} + +/** Internal API */ +@InternalApi +private[typed] object ReceptionistImpl extends ReceptionistBehaviorProvider { + // FIXME: make sure to provide serializer + case class DefaultServiceKey[T](id: String)(implicit tTag: ClassTag[T]) extends ServiceKey[T] { + override def toString: String = s"ServiceKey[$tTag]($id)" + } + + /** + * Interface to allow plugging of external service discovery infrastructure in to the existing receptionist API. + */ + trait ExternalInterface { + def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit + def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit + } + object LocalExternalInterface extends ExternalInterface { + def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = () + def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = () + } + + override def behavior: Behavior[Command] = localOnlyBehavior + val localOnlyBehavior: Behavior[Command] = init(_ ⇒ LocalExternalInterface) + + type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol] + type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV] + object LocalServiceRegistry { + val empty: LocalServiceRegistry = TypedMultiMap.empty[AbstractServiceKey, KV] + } + + sealed abstract class ReceptionistInternalCommand extends InternalCommand + final case class RegisteredActorTerminated[T](key: ServiceKey[T], address: ActorRef[T]) extends ReceptionistInternalCommand + final case class SubscriberTerminated[T](key: ServiceKey[T], address: ActorRef[Listing[T]]) extends ReceptionistInternalCommand + final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]]) extends ReceptionistInternalCommand + + type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[Listing[K#Protocol]] + type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] + + private[typed] def init(externalInterfaceFactory: ActorContext[AllCommands] ⇒ ExternalInterface): Behavior[Command] = + Actor.deferred[AllCommands] { ctx ⇒ + val externalInterface = externalInterfaceFactory(ctx) + behavior( + TypedMultiMap.empty[AbstractServiceKey, KV], + TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV], + externalInterface) + }.narrow[Command] + + private def behavior( + serviceRegistry: LocalServiceRegistry, + subscriptions: SubscriptionRegistry, + externalInterface: ExternalInterface): Behavior[AllCommands] = { + + /** Helper to create new state */ + def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) = + behavior(newRegistry, newSubscriptions, externalInterface) + + /** + * Hack to allow multiple termination notifications per target + * FIXME: replace by simple map in our state + */ + def watchWith(ctx: ActorContext[AllCommands], target: ActorRef[_], msg: AllCommands): Unit = + ctx.spawnAnonymous[Nothing](Actor.deferred[Nothing] { innerCtx ⇒ + innerCtx.watch(target) + Actor.immutable[Nothing]((_, _) ⇒ Actor.same) + .onSignal { + case (_, Terminated(`target`)) ⇒ + ctx.self ! msg + Actor.stopped + } + }) + + /** Helper that makes sure that subscribers are notified when an entry is changed */ + def updateRegistry(changedKeysHint: Set[AbstractServiceKey], f: LocalServiceRegistry ⇒ LocalServiceRegistry): Behavior[AllCommands] = { + val newRegistry = f(serviceRegistry) + + def notifySubscribersFor[T](key: AbstractServiceKey): Unit = { + val newListing = newRegistry.get(key) + subscriptions.get(key).foreach(_ ! Listing(key.asServiceKey, newListing)) + } + + changedKeysHint foreach notifySubscribersFor + next(newRegistry = newRegistry) + } + + def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing[T]]): Unit = + replyTo ! Listing(key, serviceRegistry get key) + + immutable[AllCommands] { (ctx, msg) ⇒ + msg match { + case Register(key, serviceInstance, replyTo) ⇒ + println(s"[${ctx.self}] Actor was registered: $key $serviceInstance") + watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) + replyTo ! Registered(key, serviceInstance) + externalInterface.onRegister(key, serviceInstance) + + updateRegistry(Set(key), _.inserted(key)(serviceInstance)) + + case Find(key, replyTo) ⇒ + replyWithListing(key, replyTo) + + same + + case RegistrationsChangedExternally(changes) ⇒ + println(s"[${ctx.self}] Registration changed: $changes") + + // FIXME: get rid of casts + def makeChanges(registry: LocalServiceRegistry): LocalServiceRegistry = + changes.foldLeft(registry) { + case (reg, (key, values)) ⇒ + reg.setAll(key)(values.asInstanceOf[Set[ActorRef[key.Protocol]]]) + } + + updateRegistry(changes.keySet, makeChanges) // overwrite all changed keys + + case RegisteredActorTerminated(key, serviceInstance) ⇒ + println(s"[${ctx.self}] Registered actor terminated: $key $serviceInstance") + externalInterface.onUnregister(key, serviceInstance) + updateRegistry(Set(key), _.removed(key)(serviceInstance)) + + case Subscribe(key, subscriber) ⇒ + watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) + + // immediately reply with initial listings to the new subscriber + replyWithListing(key, subscriber) + + next(newSubscriptions = subscriptions.inserted(key)(subscriber)) + + case SubscriberTerminated(key, subscriber) ⇒ + next(newSubscriptions = subscriptions.removed(key)(subscriber)) + } + } + } +} diff --git a/akka-typed/src/main/scala/akka/typed/patterns/Receptionist.scala b/akka-typed/src/main/scala/akka/typed/patterns/Receptionist.scala deleted file mode 100644 index 029dd487ee..0000000000 --- a/akka-typed/src/main/scala/akka/typed/patterns/Receptionist.scala +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Copyright (C) 2014-2017 Lightbend Inc. - */ -package akka.typed.patterns - -import akka.typed.ActorRef -import akka.typed.Behavior -import akka.typed.Terminated -import akka.util.TypedMultiMap -import akka.typed.scaladsl.Actor._ - -/** - * A Receptionist is an entry point into an Actor hierarchy where select Actors - * publish their identity together with the protocols that they implement. Other - * Actors need only know the Receptionist’s identity in order to be able to use - * the services of the registered Actors. - */ -object Receptionist { - - /** - * Internal representation of [[Receptionist.ServiceKey]] which is needed - * in order to use a TypedMultiMap (using keys with a type parameter does not - * work in Scala 2.x). - */ - trait AbstractServiceKey { - type Type - } - - /** - * A service key is an object that implements this trait for a given protocol - * T, meaning that it signifies that the type T is the entry point into the - * protocol spoken by that service (think of it as the set of first messages - * that a client could send). - */ - trait ServiceKey[T] extends AbstractServiceKey { - final override type Type = T - } - - /** - * The set of commands accepted by a Receptionist. - */ - sealed trait Command - /** - * Associate the given [[akka.typed.ActorRef]] with the given [[ServiceKey]]. Multiple - * registrations can be made for the same key. Unregistration is implied by - * the end of the referenced Actor’s lifecycle. - */ - final case class Register[T](key: ServiceKey[T], address: ActorRef[T])(val replyTo: ActorRef[Registered[T]]) extends Command - /** - * Query the Receptionist for a list of all Actors implementing the given - * protocol. - */ - final case class Find[T](key: ServiceKey[T])(val replyTo: ActorRef[Listing[T]]) extends Command - - /** - * Confirmation that the given [[akka.typed.ActorRef]] has been associated with the [[ServiceKey]]. - */ - final case class Registered[T](key: ServiceKey[T], address: ActorRef[T]) - /** - * Current listing of all Actors that implement the protocol given by the [[ServiceKey]]. - */ - final case class Listing[T](key: ServiceKey[T], addresses: Set[ActorRef[T]]) - - /** - * Initial behavior of a receptionist, used to create a new receptionist like in the following: - * - * {{{ - * val receptionist: ActorRef[Receptionist.Command] = ctx.spawn(Props(Receptionist.behavior), "receptionist") - * }}} - */ - val behavior: Behavior[Command] = behavior(TypedMultiMap.empty[AbstractServiceKey, KV]) - - private type KV[K <: AbstractServiceKey] = ActorRef[K#Type] - - private def behavior(map: TypedMultiMap[AbstractServiceKey, KV]): Behavior[Command] = immutable[Command] { (ctx, msg) ⇒ - msg match { - case r: Register[t] ⇒ - ctx.watch(r.address) - r.replyTo ! Registered(r.key, r.address) - behavior(map.inserted(r.key)(r.address)) - case f: Find[t] ⇒ - val set = map get f.key - f.replyTo ! Listing(f.key, set) - same - } - } onSignal { - case (ctx, Terminated(ref)) ⇒ - behavior(map valueRemoved ref) - case x ⇒ unhandled - } -} - -abstract class Receptionist diff --git a/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala b/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala new file mode 100644 index 0000000000..14bdc25c93 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/receptionist/Receptionist.scala @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.receptionist + +import akka.annotation.InternalApi +import akka.typed.ActorRef +import akka.typed.ActorSystem +import akka.typed.Extension +import akka.typed.ExtensionId +import akka.typed.internal.receptionist.ReceptionistBehaviorProvider +import akka.typed.internal.receptionist.ReceptionistImpl + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.reflect.ClassTag + +class Receptionist(system: ActorSystem[_]) extends Extension { + private def hasCluster: Boolean = + // FIXME: replace with better indicator that cluster is enabled + system.settings.config.getString("akka.actor.provider") == "cluster" + + val ref: ActorRef[Receptionist.Command] = { + val behavior = + if (hasCluster) + system.dynamicAccess + .createInstanceFor[ReceptionistBehaviorProvider]("akka.typed.cluster.internal.receptionist.ClusterReceptionist$", Nil) + .recover { + case ex ⇒ + system.log.error( + ex, + "ClusterReceptionist could not be loaded dynamically. Make sure you have all required binaries on the classpath.") + ReceptionistImpl + }.get.behavior + + else ReceptionistImpl.localOnlyBehavior + + ActorRef( + system.systemActorOf(behavior, "receptionist")( + // FIXME: where should that timeout be configured? Shouldn't there be a better `Extension` + // implementation that does this dance for us? + + 10.seconds + )) + } +} + +/** + * A Receptionist is an entry point into an Actor hierarchy where select Actors + * publish their identity together with the protocols that they implement. Other + * Actors need only know the Receptionist’s identity in order to be able to use + * the services of the registered Actors. + */ +object Receptionist extends ExtensionId[Receptionist] { + def createExtension(system: ActorSystem[_]): Receptionist = new Receptionist(system) + def get(system: ActorSystem[_]): Receptionist = apply(system) + + /** + * Internal representation of [[ServiceKey]] which is needed + * in order to use a TypedMultiMap (using keys with a type parameter does not + * work in Scala 2.x). + * + * Internal API + */ + @InternalApi + private[typed] sealed abstract class AbstractServiceKey { + type Protocol + + /** Type-safe down-cast */ + def asServiceKey: ServiceKey[Protocol] + } + + /** + * A service key is an object that implements this trait for a given protocol + * T, meaning that it signifies that the type T is the entry point into the + * protocol spoken by that service (think of it as the set of first messages + * that a client could send). + */ + abstract class ServiceKey[T] extends AbstractServiceKey { + final type Protocol = T + def id: String + def asServiceKey: ServiceKey[T] = this + } + + object ServiceKey { + /** + * Creates a service key. The given ID should uniquely define a service with a given protocol. + */ + // FIXME: not sure if the ClassTag pulls its weight. It's only used in toString currently. + def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = ReceptionistImpl.DefaultServiceKey(id) + } + + /** Internal superclass for external and internal commands */ + @InternalApi + sealed private[typed] abstract class AllCommands + + /** + * The set of commands accepted by a Receptionist. + */ + sealed abstract class Command extends AllCommands + @InternalApi + private[typed] abstract class InternalCommand extends AllCommands + + /** + * Associate the given [[akka.typed.ActorRef]] with the given [[ServiceKey]]. Multiple + * registrations can be made for the same key. Unregistration is implied by + * the end of the referenced Actor’s lifecycle. + * + * Registration will be acknowledged with the [[Registered]] message to the given replyTo actor. + */ + final case class Register[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) extends Command + object Register { + /** Auxiliary constructor to be used with the ask pattern */ + def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] ⇒ Register[T] = + replyTo ⇒ Register(key, service, replyTo) + } + + /** + * Confirmation that the given [[akka.typed.ActorRef]] has been associated with the [[ServiceKey]]. + */ + final case class Registered[T](key: ServiceKey[T], serviceInstance: ActorRef[T]) + + /** + * Subscribe the given actor to service updates. When new instances are registered or unregistered to the given key + * the given subscriber will be sent a [[Listing]] with the new set of instances for that service. + * + * The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends + * with the termination of the subscriber. + */ + final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command + + /** + * Query the Receptionist for a list of all Actors implementing the given + * protocol. + */ + final case class Find[T](key: ServiceKey[T], replyTo: ActorRef[Listing[T]]) extends Command + object Find { + /** Auxiliary constructor to use with the ask pattern */ + def apply[T](key: ServiceKey[T]): ActorRef[Listing[T]] ⇒ Find[T] = + replyTo ⇒ Find(key, replyTo) + } + + /** + * Current listing of all Actors that implement the protocol given by the [[ServiceKey]]. + */ + final case class Listing[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]) { + /** Java API */ + def getServiceInstances: java.util.Set[ActorRef[T]] = serviceInstances.asJava + } +}