diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java new file mode 100644 index 0000000000..e086f28767 --- /dev/null +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package jdocs.akka.cluster.typed; + +// #import +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.receptionist.Receptionist; +import akka.actor.typed.receptionist.ServiceKey; +// #import +import akka.actor.typed.ActorSystem; + +public class ReceptionistExample { + + // #ping-service + public static class PingService { + + static final ServiceKey pingServiceKey = ServiceKey.create(Ping.class, "pingService"); + + public static class Pong {} + + public static class Ping { + private final ActorRef replyTo; + + public Ping(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static Behavior createBehavior() { + return Behaviors.setup( + context -> { + context + .getSystem() + .receptionist() + .tell(Receptionist.register(pingServiceKey, context.getSelf())); + + return Behaviors.receive(Ping.class).onMessage(Ping.class, PingService::onPing).build(); + }); + } + + private static Behavior onPing(ActorContext context, Ping msg) { + context.getLog().info("Pinged by {}", msg.replyTo); + msg.replyTo.tell(new Pong()); + return Behaviors.same(); + } + } + // #ping-service + + // #pinger + public static class Pinger { + static Behavior createBehavior(ActorRef pingService) { + return Behaviors.setup( + (ctx) -> { + pingService.tell(new PingService.Ping(ctx.getSelf())); + return Behaviors.receive(PingService.Pong.class) + .onMessage(PingService.Pong.class, Pinger::onPong) + .build(); + }); + } + + private static Behavior onPong( + ActorContext context, PingService.Pong msg) { + context.getLog().info("{} was ponged!!", context.getSelf()); + return Behaviors.stopped(); + } + } + // #pinger + + // #pinger-guardian + public static Behavior createGuardianBehavior() { + return Behaviors.setup( + context -> { + context + .getSystem() + .receptionist() + .tell( + Receptionist.subscribe( + PingService.pingServiceKey, context.getSelf().narrow())); + context.spawnAnonymous(PingService.createBehavior()); + return Behaviors.receive(Object.class) + .onMessage( + Receptionist.Listing.class, + (c, msg) -> { + msg.getServiceInstances(PingService.pingServiceKey) + .forEach( + pingService -> + context.spawnAnonymous(Pinger.createBehavior(pingService))); + return Behaviors.same(); + }) + .build(); + }) + .narrow(); + } + // #pinger-guardian + + public static void main(String[] args) throws Exception { + ActorSystem system = ActorSystem.create(createGuardianBehavior(), "ReceptionistExample"); + Thread.sleep(10000); + system.terminate(); + } +} diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java deleted file mode 100644 index 0789b9a828..0000000000 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package jdocs.akka.cluster.typed; - -import akka.actor.Address; -import akka.actor.typed.ActorRef; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.Receive; -import akka.actor.typed.receptionist.Receptionist; -import akka.actor.typed.receptionist.ServiceKey; -import akka.cluster.ClusterEvent; -import akka.cluster.typed.Cluster; -import akka.cluster.typed.Subscribe; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -public class ReceptionistExampleTest extends JUnitSuite { - - static class RandomRouter { - - private static class RouterBehavior extends AbstractBehavior { - private final Class messageClass; - private final ServiceKey serviceKey; - private final List> routees = new ArrayList<>(); - - public RouterBehavior( - ActorContext ctx, Class messageClass, ServiceKey serviceKey) { - this.messageClass = messageClass; - this.serviceKey = serviceKey; - - ctx.getSystem() - .receptionist() - .tell(Receptionist.subscribe(serviceKey, ctx.getSelf().narrow())); - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage( - Receptionist.Listing.class, - listing -> listing.isForKey(serviceKey), - (listing) -> { - routees.clear(); - routees.addAll(listing.getServiceInstances(serviceKey)); - return this; - }) - .onMessage( - messageClass, - (msg) -> { - int i = ThreadLocalRandom.current().nextInt(routees.size()); - routees.get(i).tell(msg); - return this; - }) - .build(); - } - } - - public static Behavior router(ServiceKey serviceKey, Class messageClass) { - return Behaviors.setup(ctx -> new RouterBehavior(ctx, messageClass, serviceKey)).narrow(); - } - } - - // same as above, but also subscribes to cluster reachability events and - // avoids routees that are unreachable - static class ClusterRouter { - - private static class WrappedReachabilityEvent { - final ClusterEvent.ReachabilityEvent event; - - public WrappedReachabilityEvent(ClusterEvent.ReachabilityEvent event) { - this.event = event; - } - } - - private static class ClusterRouterBehavior extends AbstractBehavior { - private final Class messageClass; - private final ServiceKey serviceKey; - private final List> routees = new ArrayList<>(); - private final Set
unreachable = new HashSet<>(); - private final List> reachable = new ArrayList<>(); - - public ClusterRouterBehavior( - ActorContext ctx, Class messageClass, ServiceKey serviceKey) { - this.messageClass = messageClass; - this.serviceKey = serviceKey; - ctx.getSystem() - .receptionist() - .tell(Receptionist.subscribe(serviceKey, ctx.getSelf().narrow())); - - Cluster cluster = Cluster.get(ctx.getSystem()); - // typically you have to map such external messages into this - // actor's protocol with a message adapter - /* this is how it is done in the scala sample, but that is awkward with Java - ActorRef reachabilityAdapter = - ctx.messageAdapter( - ClusterEvent.ReachabilityEvent.class, - (event) -> new WrappedReachabilityEvent(event)); - cluster.subscriptions().tell(Subscribe.create(reachabilityAdapter, ClusterEvent.ReachabilityEvent.class)); - */ - cluster - .subscriptions() - .tell(Subscribe.create(ctx.getSelf().narrow(), ClusterEvent.UnreachableMember.class)); - cluster - .subscriptions() - .tell(Subscribe.create(ctx.getSelf().narrow(), ClusterEvent.ReachableMember.class)); - } - - private void updateReachable() { - reachable.clear(); - for (ActorRef routee : routees) { - if (!unreachable.contains(routee.path().address())) { - reachable.add(routee); - } - } - } - - @Override - public Receive createReceive() { - return newReceiveBuilder() - .onMessage( - Receptionist.Listing.class, - listing -> listing.isForKey(serviceKey), - listing -> { - routees.clear(); - routees.addAll(listing.getServiceInstances(serviceKey)); - updateReachable(); - return this; - }) - .onMessage( - ClusterEvent.ReachableMember.class, - reachableMember -> { - unreachable.remove(reachableMember.member().address()); - updateReachable(); - return this; - }) - .onMessage( - ClusterEvent.UnreachableMember.class, - unreachableMember -> { - unreachable.add(unreachableMember.member().address()); - updateReachable(); - return this; - }) - .onMessage( - messageClass, - msg -> { - int i = ThreadLocalRandom.current().nextInt(reachable.size()); - reachable.get(i).tell(msg); - return this; - }) - .build(); - } - } - - public static Behavior clusterRouter(ServiceKey serviceKey, Class messageClass) { - return Behaviors.setup((ctx) -> new ClusterRouterBehavior(ctx, messageClass, serviceKey)) - .narrow(); - } - } - - public static class PingPongExample { - // #ping-service - static final ServiceKey PingServiceKey = ServiceKey.create(Ping.class, "pingService"); - - public static class Pong {} - - public static class Ping { - private final ActorRef replyTo; - - Ping(ActorRef replyTo) { - this.replyTo = replyTo; - } - } - - static Behavior pingService() { - return Behaviors.setup( - (ctx) -> { - ctx.getSystem() - .receptionist() - .tell(Receptionist.register(PingServiceKey, ctx.getSelf())); - return Behaviors.receive(Ping.class) - .onMessage( - Ping.class, - (c, msg) -> { - msg.replyTo.tell(new Pong()); - return Behaviors.same(); - }) - .build(); - }); - } - // #ping-service - - // #pinger - static Behavior pinger(ActorRef pingService) { - return Behaviors.setup( - (ctx) -> { - pingService.tell(new Ping(ctx.getSelf())); - return Behaviors.receive(Pong.class) - .onMessage( - Pong.class, - (c, msg) -> { - System.out.println("I was ponged! " + msg); - return Behaviors.same(); - }) - .build(); - }); - } - // #pinger - - // #pinger-guardian - static Behavior guardian() { - return Behaviors.setup( - (ctx) -> { - ctx.getSystem() - .receptionist() - .tell(Receptionist.subscribe(PingServiceKey, ctx.getSelf().narrow())); - ActorRef ps = ctx.spawnAnonymous(pingService()); - ctx.watch(ps); - return Behaviors.receive(Object.class) - .onMessage( - Receptionist.Listing.class, - listing -> listing.isForKey(PingServiceKey), - (c, msg) -> { - msg.getServiceInstances(PingServiceKey) - .forEach(ar -> ctx.spawnAnonymous(pinger(ar))); - return Behaviors.same(); - }) - .build(); - }) - .narrow(); - } - // #pinger-guardian - } - - @Test - public void workPlease() throws Exception { - ActorSystem system = - ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample"); - - Await.ready(system.terminate(), Duration.create(2, TimeUnit.SECONDS)); - } -} diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala new file mode 100644 index 0000000000..e55c5db637 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package docs.akka.cluster.typed + +//#import +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.Receptionist.Listing +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.Behaviors +//#import + +object PingPongExample { + //#ping-service + val PingServiceKey = ServiceKey[Ping]("pingService") + + final case class Ping(replyTo: ActorRef[Pong.type]) + final case object Pong + + val pingService: Behavior[Ping] = + Behaviors.setup { ctx => + ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self) + Behaviors.receive { (context, msg) => + msg match { + case Ping(replyTo) => + context.log.info("Pinged by {}", replyTo) + replyTo ! Pong + Behaviors.same + } + } + } + //#ping-service + + //#pinger + def pinger(pingService: ActorRef[Ping]): Behavior[Pong.type] = + Behaviors.setup[Pong.type] { ctx => + pingService ! Ping(ctx.self) + Behaviors.receive { (context, msg) => + context.log.info("{} was ponged!!", context.self) + Behaviors.stopped + } + } + //#pinger + + //#pinger-guardian + val guardian: Behavior[Nothing] = + Behaviors + .setup[Listing] { context => + context.spawnAnonymous(pingService) + context.system.receptionist ! Receptionist.Subscribe(PingServiceKey, context.self) + Behaviors.receiveMessagePartial[Listing] { + case PingServiceKey.Listing(listings) => + listings.foreach(ps => context.spawnAnonymous(pinger(ps))) + Behaviors.same + } + } + .narrow + //#pinger-guardian + +} + +object ReceptionistExample { + import akka.actor.typed.ActorSystem + + def main(args: Array[String]): Unit = { + val system = ActorSystem[Nothing](PingPongExample.guardian, "PingPongExample") + Thread.sleep(10000) + system.terminate() + } + +} diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala deleted file mode 100644 index 6cad9dfa67..0000000000 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package docs.akka.cluster.typed - -import akka.actor.typed._ -import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } -import akka.actor.typed.receptionist.Receptionist.Listing -import akka.actor.typed.scaladsl._ -import com.typesafe.config.ConfigFactory -import org.scalatest.WordSpec -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{ Millis, Seconds, Span } - -object PingPongExample { - //#ping-service - val PingServiceKey = ServiceKey[Ping]("pingService") - - final case class Ping(replyTo: ActorRef[Pong.type]) - final case object Pong - - val pingService: Behavior[Ping] = - Behaviors.setup { ctx => - ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self) - Behaviors.receive[Ping] { (_, msg) => - msg match { - case Ping(replyTo) => - println("Pinged, now stopping") - replyTo ! Pong - Behaviors.stopped - } - } - } - //#ping-service - - //#pinger - def pinger(pingService: ActorRef[Ping]) = Behaviors.setup[Pong.type] { ctx => - pingService ! Ping(ctx.self) - Behaviors.receive { (_, msg) => - println("I was ponged!!" + msg) - Behaviors.same - } - } - //#pinger - - //#pinger-guardian - val guardian: Behavior[Nothing] = Behaviors - .setup[Listing] { ctx => - ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self) - val ps = ctx.spawnAnonymous(pingService) - ctx.watch(ps) - Behaviors - .receiveMessagePartial[Listing] { - case PingServiceKey.Listing(listings) if listings.nonEmpty => - listings.foreach(ps => ctx.spawnAnonymous(pinger(ps))) - Behaviors.same - } - .receiveSignal { - case (_, Terminated(`ps`)) => - println("Ping service has shut down") - Behaviors.stopped - } - } - .narrow - //#pinger-guardian - -} - -object ReceptionistExampleSpec { - val clusterConfig = ConfigFactory.parseString(s""" -#config -akka { - actor { - provider = "cluster" - } - cluster.jmx.multi-mbeans-in-same-jvm = on - remote { - netty.tcp { - hostname = "127.0.0.1" - port = 2551 - } - } -} -#config -akka.remote.netty.tcp.port = 0 -akka.remote.artery.canonical.port = 0 - """) - -} - -class ReceptionistExampleSpec extends WordSpec with ScalaFutures { - - import PingPongExample._ - - implicit override val patienceConfig = - PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis))) - - "A local basic example" must { - "show register" in { - val system = ActorSystem[Nothing](guardian, "PingPongExample") - system.whenTerminated.futureValue - } - } -} diff --git a/akka-docs/src/main/paradox/typed/actor-discovery.md b/akka-docs/src/main/paradox/typed/actor-discovery.md index 20100cafb3..a70bebb44f 100644 --- a/akka-docs/src/main/paradox/typed/actor-discovery.md +++ b/akka-docs/src/main/paradox/typed/actor-discovery.md @@ -29,38 +29,47 @@ The registry is dynamic. New actors can be registered during the lifecycle of th registered actors are stopped or a node is removed from the cluster. To facilitate this dynamic aspect you can also subscribe to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed. -The first scenario is an actor running that needs to be discovered by another actor but you are unable +The primary scenario for using the receptionist is when an actor needs to be discovered by another actor but you are unable to put a reference to it in an incoming message. -First we create a `PingService` actor and register it with the `Receptionist` against a +These imports are used in the following example: + +Scala +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #import } + +Java +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #import } + +First we create a @scala[`pingService`]@java[`PingService`] actor and register it with the `Receptionist` against a `ServiceKey` that will later be used to lookup the reference: Scala -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #ping-service } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #ping-service } Java -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #ping-service } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #ping-service } -Then we have another actor that requires a `PingService` to be constructed: +Then we have another actor that requires a @scala[`pingService`]@java[`PingService`] to be constructed: Scala -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #pinger } Java -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #pinger } Finally in the guardian actor we spawn the service as well as subscribing to any actors registering against the `ServiceKey`. Subscribing means that the guardian actor will be informed of any new registrations via a `Listing` message: Scala -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger-guardian } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #pinger-guardian } Java -: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger-guardian } +: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #pinger-guardian } -Each time a new (which is just a single time in this example) `PingService` is registered the -guardian actor spawns a pinger to ping it. +Each time a new (which is just a single time in this example) @scala[`pingService`]@java[`PingService`] is registered the +guardian actor spawns a @scala[`pinger`]@java[`Pinger`] for each currently known `PingService`. The @scala[`pinger`]@java[`Pinger`] +sends a `Ping` message and when receiving the `Pong` reply it stops. ## Cluster Receptionist