Merge pull request #26610 from akka/wip-receptionist-snippet-patriknw

cleanup Receptionist example
This commit is contained in:
Patrik Nordwall 2019-03-29 14:54:42 +01:00 committed by GitHub
commit baeaca1ce0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 200 additions and 372 deletions

View file

@ -0,0 +1,106 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.typed;
// #import
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;
// #import
import akka.actor.typed.ActorSystem;
public class ReceptionistExample {
// #ping-service
public static class PingService {
static final ServiceKey<Ping> pingServiceKey = ServiceKey.create(Ping.class, "pingService");
public static class Pong {}
public static class Ping {
private final ActorRef<Pong> replyTo;
public Ping(ActorRef<Pong> replyTo) {
this.replyTo = replyTo;
}
}
static Behavior<Ping> createBehavior() {
return Behaviors.setup(
context -> {
context
.getSystem()
.receptionist()
.tell(Receptionist.register(pingServiceKey, context.getSelf()));
return Behaviors.receive(Ping.class).onMessage(Ping.class, PingService::onPing).build();
});
}
private static Behavior<Ping> onPing(ActorContext<Ping> context, Ping msg) {
context.getLog().info("Pinged by {}", msg.replyTo);
msg.replyTo.tell(new Pong());
return Behaviors.same();
}
}
// #ping-service
// #pinger
public static class Pinger {
static Behavior<PingService.Pong> createBehavior(ActorRef<PingService.Ping> pingService) {
return Behaviors.setup(
(ctx) -> {
pingService.tell(new PingService.Ping(ctx.getSelf()));
return Behaviors.receive(PingService.Pong.class)
.onMessage(PingService.Pong.class, Pinger::onPong)
.build();
});
}
private static Behavior<PingService.Pong> onPong(
ActorContext<PingService.Pong> context, PingService.Pong msg) {
context.getLog().info("{} was ponged!!", context.getSelf());
return Behaviors.stopped();
}
}
// #pinger
// #pinger-guardian
public static Behavior<Void> createGuardianBehavior() {
return Behaviors.setup(
context -> {
context
.getSystem()
.receptionist()
.tell(
Receptionist.subscribe(
PingService.pingServiceKey, context.getSelf().narrow()));
context.spawnAnonymous(PingService.createBehavior());
return Behaviors.receive(Object.class)
.onMessage(
Receptionist.Listing.class,
(c, msg) -> {
msg.getServiceInstances(PingService.pingServiceKey)
.forEach(
pingService ->
context.spawnAnonymous(Pinger.createBehavior(pingService)));
return Behaviors.same();
})
.build();
})
.narrow();
}
// #pinger-guardian
public static void main(String[] args) throws Exception {
ActorSystem<Void> system = ActorSystem.create(createGuardianBehavior(), "ReceptionistExample");
Thread.sleep(10000);
system.terminate();
}
}

View file

@ -1,256 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.typed;
import akka.actor.Address;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;
import akka.cluster.ClusterEvent;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class ReceptionistExampleTest extends JUnitSuite {
static class RandomRouter {
private static class RouterBehavior<T> extends AbstractBehavior<Object> {
private final Class<T> messageClass;
private final ServiceKey<T> serviceKey;
private final List<ActorRef<T>> routees = new ArrayList<>();
public RouterBehavior(
ActorContext<Object> ctx, Class<T> messageClass, ServiceKey<T> serviceKey) {
this.messageClass = messageClass;
this.serviceKey = serviceKey;
ctx.getSystem()
.receptionist()
.tell(Receptionist.subscribe(serviceKey, ctx.getSelf().narrow()));
}
@Override
public Receive<Object> createReceive() {
return newReceiveBuilder()
.onMessage(
Receptionist.Listing.class,
listing -> listing.isForKey(serviceKey),
(listing) -> {
routees.clear();
routees.addAll(listing.getServiceInstances(serviceKey));
return this;
})
.onMessage(
messageClass,
(msg) -> {
int i = ThreadLocalRandom.current().nextInt(routees.size());
routees.get(i).tell(msg);
return this;
})
.build();
}
}
public static <T> Behavior<T> router(ServiceKey<T> serviceKey, Class<T> messageClass) {
return Behaviors.setup(ctx -> new RouterBehavior<T>(ctx, messageClass, serviceKey)).narrow();
}
}
// same as above, but also subscribes to cluster reachability events and
// avoids routees that are unreachable
static class ClusterRouter {
private static class WrappedReachabilityEvent {
final ClusterEvent.ReachabilityEvent event;
public WrappedReachabilityEvent(ClusterEvent.ReachabilityEvent event) {
this.event = event;
}
}
private static class ClusterRouterBehavior<T> extends AbstractBehavior<Object> {
private final Class<T> messageClass;
private final ServiceKey<T> serviceKey;
private final List<ActorRef<T>> routees = new ArrayList<>();
private final Set<Address> unreachable = new HashSet<>();
private final List<ActorRef<T>> reachable = new ArrayList<>();
public ClusterRouterBehavior(
ActorContext<Object> ctx, Class<T> messageClass, ServiceKey<T> serviceKey) {
this.messageClass = messageClass;
this.serviceKey = serviceKey;
ctx.getSystem()
.receptionist()
.tell(Receptionist.subscribe(serviceKey, ctx.getSelf().narrow()));
Cluster cluster = Cluster.get(ctx.getSystem());
// typically you have to map such external messages into this
// actor's protocol with a message adapter
/* this is how it is done in the scala sample, but that is awkward with Java
ActorRef<ClusterEvent.ReachabilityEvent> reachabilityAdapter =
ctx.messageAdapter(
ClusterEvent.ReachabilityEvent.class,
(event) -> new WrappedReachabilityEvent(event));
cluster.subscriptions().tell(Subscribe.create(reachabilityAdapter, ClusterEvent.ReachabilityEvent.class));
*/
cluster
.subscriptions()
.tell(Subscribe.create(ctx.getSelf().narrow(), ClusterEvent.UnreachableMember.class));
cluster
.subscriptions()
.tell(Subscribe.create(ctx.getSelf().narrow(), ClusterEvent.ReachableMember.class));
}
private void updateReachable() {
reachable.clear();
for (ActorRef<T> routee : routees) {
if (!unreachable.contains(routee.path().address())) {
reachable.add(routee);
}
}
}
@Override
public Receive<Object> createReceive() {
return newReceiveBuilder()
.onMessage(
Receptionist.Listing.class,
listing -> listing.isForKey(serviceKey),
listing -> {
routees.clear();
routees.addAll(listing.getServiceInstances(serviceKey));
updateReachable();
return this;
})
.onMessage(
ClusterEvent.ReachableMember.class,
reachableMember -> {
unreachable.remove(reachableMember.member().address());
updateReachable();
return this;
})
.onMessage(
ClusterEvent.UnreachableMember.class,
unreachableMember -> {
unreachable.add(unreachableMember.member().address());
updateReachable();
return this;
})
.onMessage(
messageClass,
msg -> {
int i = ThreadLocalRandom.current().nextInt(reachable.size());
reachable.get(i).tell(msg);
return this;
})
.build();
}
}
public static <T> Behavior<T> clusterRouter(ServiceKey<T> serviceKey, Class<T> messageClass) {
return Behaviors.setup((ctx) -> new ClusterRouterBehavior<T>(ctx, messageClass, serviceKey))
.narrow();
}
}
public static class PingPongExample {
// #ping-service
static final ServiceKey<Ping> PingServiceKey = ServiceKey.create(Ping.class, "pingService");
public static class Pong {}
public static class Ping {
private final ActorRef<Pong> replyTo;
Ping(ActorRef<Pong> replyTo) {
this.replyTo = replyTo;
}
}
static Behavior<Ping> pingService() {
return Behaviors.setup(
(ctx) -> {
ctx.getSystem()
.receptionist()
.tell(Receptionist.register(PingServiceKey, ctx.getSelf()));
return Behaviors.receive(Ping.class)
.onMessage(
Ping.class,
(c, msg) -> {
msg.replyTo.tell(new Pong());
return Behaviors.same();
})
.build();
});
}
// #ping-service
// #pinger
static Behavior<Pong> pinger(ActorRef<Ping> pingService) {
return Behaviors.setup(
(ctx) -> {
pingService.tell(new Ping(ctx.getSelf()));
return Behaviors.receive(Pong.class)
.onMessage(
Pong.class,
(c, msg) -> {
System.out.println("I was ponged! " + msg);
return Behaviors.same();
})
.build();
});
}
// #pinger
// #pinger-guardian
static Behavior<Void> guardian() {
return Behaviors.setup(
(ctx) -> {
ctx.getSystem()
.receptionist()
.tell(Receptionist.subscribe(PingServiceKey, ctx.getSelf().narrow()));
ActorRef<Ping> ps = ctx.spawnAnonymous(pingService());
ctx.watch(ps);
return Behaviors.receive(Object.class)
.onMessage(
Receptionist.Listing.class,
listing -> listing.isForKey(PingServiceKey),
(c, msg) -> {
msg.getServiceInstances(PingServiceKey)
.forEach(ar -> ctx.spawnAnonymous(pinger(ar)));
return Behaviors.same();
})
.build();
})
.narrow();
}
// #pinger-guardian
}
@Test
public void workPlease() throws Exception {
ActorSystem<Void> system =
ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample");
Await.ready(system.terminate(), Duration.create(2, TimeUnit.SECONDS));
}
}

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.typed
//#import
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.Receptionist.Listing
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
//#import
object PingPongExample {
//#ping-service
val PingServiceKey = ServiceKey[Ping]("pingService")
final case class Ping(replyTo: ActorRef[Pong.type])
final case object Pong
val pingService: Behavior[Ping] =
Behaviors.setup { ctx =>
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self)
Behaviors.receive { (context, msg) =>
msg match {
case Ping(replyTo) =>
context.log.info("Pinged by {}", replyTo)
replyTo ! Pong
Behaviors.same
}
}
}
//#ping-service
//#pinger
def pinger(pingService: ActorRef[Ping]): Behavior[Pong.type] =
Behaviors.setup[Pong.type] { ctx =>
pingService ! Ping(ctx.self)
Behaviors.receive { (context, msg) =>
context.log.info("{} was ponged!!", context.self)
Behaviors.stopped
}
}
//#pinger
//#pinger-guardian
val guardian: Behavior[Nothing] =
Behaviors
.setup[Listing] { context =>
context.spawnAnonymous(pingService)
context.system.receptionist ! Receptionist.Subscribe(PingServiceKey, context.self)
Behaviors.receiveMessagePartial[Listing] {
case PingServiceKey.Listing(listings) =>
listings.foreach(ps => context.spawnAnonymous(pinger(ps)))
Behaviors.same
}
}
.narrow
//#pinger-guardian
}
object ReceptionistExample {
import akka.actor.typed.ActorSystem
def main(args: Array[String]): Unit = {
val system = ActorSystem[Nothing](PingPongExample.guardian, "PingPongExample")
Thread.sleep(10000)
system.terminate()
}
}

View file

@ -1,105 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.typed
import akka.actor.typed._
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.receptionist.Receptionist.Listing
import akka.actor.typed.scaladsl._
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpec
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }
object PingPongExample {
//#ping-service
val PingServiceKey = ServiceKey[Ping]("pingService")
final case class Ping(replyTo: ActorRef[Pong.type])
final case object Pong
val pingService: Behavior[Ping] =
Behaviors.setup { ctx =>
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self)
Behaviors.receive[Ping] { (_, msg) =>
msg match {
case Ping(replyTo) =>
println("Pinged, now stopping")
replyTo ! Pong
Behaviors.stopped
}
}
}
//#ping-service
//#pinger
def pinger(pingService: ActorRef[Ping]) = Behaviors.setup[Pong.type] { ctx =>
pingService ! Ping(ctx.self)
Behaviors.receive { (_, msg) =>
println("I was ponged!!" + msg)
Behaviors.same
}
}
//#pinger
//#pinger-guardian
val guardian: Behavior[Nothing] = Behaviors
.setup[Listing] { ctx =>
ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self)
val ps = ctx.spawnAnonymous(pingService)
ctx.watch(ps)
Behaviors
.receiveMessagePartial[Listing] {
case PingServiceKey.Listing(listings) if listings.nonEmpty =>
listings.foreach(ps => ctx.spawnAnonymous(pinger(ps)))
Behaviors.same
}
.receiveSignal {
case (_, Terminated(`ps`)) =>
println("Ping service has shut down")
Behaviors.stopped
}
}
.narrow
//#pinger-guardian
}
object ReceptionistExampleSpec {
val clusterConfig = ConfigFactory.parseString(s"""
#config
akka {
actor {
provider = "cluster"
}
cluster.jmx.multi-mbeans-in-same-jvm = on
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
}
#config
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""")
}
class ReceptionistExampleSpec extends WordSpec with ScalaFutures {
import PingPongExample._
implicit override val patienceConfig =
PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
"A local basic example" must {
"show register" in {
val system = ActorSystem[Nothing](guardian, "PingPongExample")
system.whenTerminated.futureValue
}
}
}

View file

@ -29,38 +29,47 @@ The registry is dynamic. New actors can be registered during the lifecycle of th
registered actors are stopped or a node is removed from the cluster. To facilitate this dynamic aspect you can also subscribe registered actors are stopped or a node is removed from the cluster. To facilitate this dynamic aspect you can also subscribe
to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed. to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed.
The first scenario is an actor running that needs to be discovered by another actor but you are unable The primary scenario for using the receptionist is when an actor needs to be discovered by another actor but you are unable
to put a reference to it in an incoming message. to put a reference to it in an incoming message.
First we create a `PingService` actor and register it with the `Receptionist` against a These imports are used in the following example:
Scala
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #import }
Java
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #import }
First we create a @scala[`pingService`]@java[`PingService`] actor and register it with the `Receptionist` against a
`ServiceKey` that will later be used to lookup the reference: `ServiceKey` that will later be used to lookup the reference:
Scala Scala
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #ping-service } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #ping-service }
Java Java
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #ping-service } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #ping-service }
Then we have another actor that requires a `PingService` to be constructed: Then we have another actor that requires a @scala[`pingService`]@java[`PingService`] to be constructed:
Scala Scala
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #pinger }
Java Java
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #pinger }
Finally in the guardian actor we spawn the service as well as subscribing to any actors registering Finally in the guardian actor we spawn the service as well as subscribing to any actors registering
against the `ServiceKey`. Subscribing means that the guardian actor will be informed of any against the `ServiceKey`. Subscribing means that the guardian actor will be informed of any
new registrations via a `Listing` message: new registrations via a `Listing` message:
Scala Scala
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger-guardian } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExample.scala) { #pinger-guardian }
Java Java
: @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger-guardian } : @@snip [ReceptionistExample](/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExample.java) { #pinger-guardian }
Each time a new (which is just a single time in this example) `PingService` is registered the Each time a new (which is just a single time in this example) @scala[`pingService`]@java[`PingService`] is registered the
guardian actor spawns a pinger to ping it. guardian actor spawns a @scala[`pinger`]@java[`Pinger`] for each currently known `PingService`. The @scala[`pinger`]@java[`Pinger`]
sends a `Ping` message and when receiving the `Pong` reply it stops.
## Cluster Receptionist ## Cluster Receptionist