Receptionist docs and examples

This commit is contained in:
Christopher Batey 2018-01-05 15:21:52 +00:00 committed by Konrad `ktoso` Malawski
parent cf455f3c11
commit f17dc5c7f7
24 changed files with 758 additions and 168 deletions

View file

@ -13,9 +13,6 @@ import org.junit.Test;
//#test-header
public class BasicAsyncTestingTest extends TestKit {
public BasicAsyncTestingTest() {
super("BasicAsyncTestingTest");
}
//#test-header
//#under-test
@ -52,7 +49,7 @@ public class BasicAsyncTestingTest extends TestKit {
@Test
public void testVerifyingAResponse() {
//#test-spawn
TestProbe<Pong> probe = new TestProbe<>(system(), testkitSettings());
TestProbe<Pong> probe = new TestProbe<>(system());
ActorRef<Ping> pinger = spawn(echoActor, "ping");
pinger.tell(new Ping("hello", probe.ref()));
probe.expectMsg(new Pong("hello"));
@ -62,7 +59,7 @@ public class BasicAsyncTestingTest extends TestKit {
@Test
public void testVerifyingAResponseAnonymous() {
//#test-spawn-anonymous
TestProbe<Pong> probe = new TestProbe<>(system(), testkitSettings());
TestProbe<Pong> probe = new TestProbe<>(system());
ActorRef<Ping> pinger = spawn(echoActor);
pinger.tell(new Ping("hello", probe.ref()));
probe.expectMsg(new Pong("hello"));

View file

@ -19,3 +19,8 @@ dispatcher-8 {
parallelism-max=8
}
}
akka.typed {
# for the akka.actor.ExtensionSpec
library-extensions += "akka.actor.typed.InstanceCountingExtension"
}

View file

@ -1,4 +0,0 @@
akka.typed {
# for the akka.actor.ExtensionSpec
library-extensions += "akka.actor.typed.InstanceCountingExtension"
}

View file

@ -170,6 +170,14 @@ object ActorSystem {
val appConfig = config.getOrElse(ConfigFactory.load(cl))
createInternal(name, guardianBehavior, guardianProps, Some(appConfig), classLoader, executionContext)
}
/**
* Scala API: Create an ActorSystem
*/
def apply[T](
guardianBehavior: Behavior[T],
name: String,
config: Config
): ActorSystem[T] = apply(guardianBehavior, name, config = Some(config))
/**
* Java API: Create an ActorSystem
@ -191,6 +199,12 @@ object ActorSystem {
def create[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
apply(guardianBehavior, name)
/**
* Java API: Create an ActorSystem
*/
def create[T](guardianBehavior: Behavior[T], name: String, config: Config): ActorSystem[T] =
apply(guardianBehavior, name, config = Some(config))
/**
* Create an ActorSystem based on the untyped [[akka.actor.ActorSystem]]
* which runs Akka Typed [[Behavior]] on an emulation layer. In this

View file

@ -57,7 +57,7 @@ case object PostStop extends PostStop {
/**
* Lifecycle signal that is fired when an Actor that was watched has terminated.
* Watching is performed by invoking the
* [[akka.actor.typed.ActorContext]] `watch` method. The DeathWatch service is
* [[akka.actor.typed.scaladsl.ActorContext.watch]] . The DeathWatch service is
* idempotent, meaning that registering twice has the same effect as registering
* once. Registration does not need to happen before the Actor terminates, a
* notification is guaranteed to arrive after both registration and termination

View file

@ -109,7 +109,7 @@ object Receptionist extends ExtensionId[Receptionist] {
/**
* Associate the given [[akka.actor.typed.ActorRef]] with the given [[ServiceKey]]. Multiple
* registrations can be made for the same key. Unregistration is implied by
* registrations can be made for the same key. De-registration is implied by
* the end of the referenced Actors lifecycle.
*
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor.
@ -119,6 +119,12 @@ object Receptionist extends ExtensionId[Receptionist] {
/** Auxiliary constructor to be used with the ask pattern */
def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] Register[T] =
replyTo Register(key, service, replyTo)
/**
* Java API
*/
def create[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) =
Register(key, serviceInstance, replyTo)
}
/**
@ -135,6 +141,14 @@ object Receptionist extends ExtensionId[Receptionist] {
*/
final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command
object Subscribe {
/**
* Java API
*/
def create[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) =
Subscribe(key, subscriber)
}
/**
* Query the Receptionist for a list of all Actors implementing the given
* protocol.

View file

@ -13,8 +13,6 @@ import scala.concurrent.duration.Duration;
public class ClusterApiTest extends JUnitSuite {
@Test
public void joinLeaveAndObserve() throws Exception {
Config config = ConfigFactory.parseString(
@ -35,18 +33,16 @@ public class ClusterApiTest extends JUnitSuite {
ActorSystem<?> system2 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config));
try {
TestKitSettings testKitSettings = new TestKitSettings(system1.settings().config());
Cluster cluster1 = Cluster.get(system1);
Cluster cluster2 = Cluster.get(system2);
TestProbe<ClusterEvent.ClusterDomainEvent> probe1 = new TestProbe<>(system1, testKitSettings);
TestProbe<ClusterEvent.ClusterDomainEvent> probe1 = new TestProbe<>(system1);
cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class));
cluster1.manager().tell(new Join(cluster1.selfMember().address()));
probe1.expectMsgType(SelfUp.class);
TestProbe<ClusterEvent.ClusterDomainEvent> probe2 = new TestProbe<>(system2, testKitSettings);
TestProbe<ClusterEvent.ClusterDomainEvent> probe2 = new TestProbe<>(system2);
cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class));
cluster2.manager().tell(new Join(cluster1.selfMember().address()));
probe2.expectMsgType(SelfUp.class);

View file

@ -0,0 +1,78 @@
package jdocs.akka.cluster.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Actor;
import akka.actor.typed.receptionist.Receptionist;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
public class ReceptionistExampleTest {
public static class PingPongExample {
//#ping-service
static Receptionist.ServiceKey<Ping> PingServiceKey =
Receptionist.ServiceKey.create(Ping.class, "pingService");
public static class Pong {}
public static class Ping {
private final ActorRef<Pong> replyTo;
Ping(ActorRef<Pong> replyTo) {
this.replyTo = replyTo;
}
}
static Behavior<Ping> pingService() {
return Actor.deferred((ctx) -> {
ctx.getSystem().receptionist()
.tell(Receptionist.Register.create(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters()));
return Actor.immutable(Ping.class)
.onMessage(Ping.class, (c, msg) -> {
msg.replyTo.tell(new Pong());
return Actor.same();
}).build();
});
}
//#ping-service
//#pinger
static Behavior<Pong> pinger(ActorRef<Ping> pingService) {
return Actor.deferred((ctx) -> {
pingService.tell(new Ping(ctx.getSelf()));
return Actor.immutable(Pong.class)
.onMessage(Pong.class, (c, msg) -> {
System.out.println("I was ponged! " + msg);
return Actor.same();
}).build();
});
}
//#pinger
//#pinger-guardian
static Behavior<Receptionist.Listing<Ping>> guardian() {
return Actor.deferred((ctx) -> {
ctx.getSystem().receptionist()
.tell(Receptionist.Subscribe.create(PingServiceKey, ctx.getSelf()));
ActorRef<Ping> ps = ctx.spawnAnonymous(pingService());
ctx.watch(ps);
return Actor.immutable((c, msg) -> {
msg.getServiceInstances().forEach(ar -> ctx.spawnAnonymous(pinger(ar)));
return Actor.same();
});
});
}
//#pinger-guardian
}
@Test
public void workPlease() throws Exception {
ActorSystem<Receptionist.Listing<PingPongExample.Ping>> system =
ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample");
Await.ready(system.terminate(), Duration.create(2, TimeUnit.SECONDS));
}
}

View file

@ -48,8 +48,8 @@ class ClusterApiSpec extends TestKit("ClusterApiSpec", ClusterApiSpec.config) wi
try {
val clusterNode2 = Cluster(adaptedSystem2)
val node1Probe = TestProbe[AnyRef]()(system, testSettings)
val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings)
val node1Probe = TestProbe[AnyRef]()(system)
val node2Probe = TestProbe[AnyRef]()(adaptedSystem2)
// initial cached selfMember
clusterNode1.selfMember.status should ===(MemberStatus.Removed)

View file

@ -102,10 +102,10 @@ class ClusterSingletonApiSpec extends TestKit("ClusterSingletonApiSpec", Cluster
"A typed cluster singleton" must {
"be accessible from two nodes in a cluster" in {
val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings])
val node1UpProbe = TestProbe[SelfUp]()(system)
clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp])
val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings])
val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2)
clusterNode1.subscriptions ! Subscribe(node2UpProbe.ref, classOf[SelfUp])
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
@ -125,8 +125,8 @@ class ClusterSingletonApiSpec extends TestKit("ClusterSingletonApiSpec", Cluster
cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref)
cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref)
val node1PongProbe = TestProbe[Pong.type]()(system, implicitly[TestKitSettings])
val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings])
val node1PongProbe = TestProbe[Pong.type]()(system)
val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2)
node1PongProbe.awaitAssert({
node1ref ! Ping(node1PongProbe.ref)

View file

@ -6,14 +6,14 @@ package akka.cluster.typed.internal.receptionist
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown }
import akka.actor.typed.{ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown}
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Cluster
import akka.serialization.SerializerWithStringManifest
import akka.testkit.typed.{ TestKit, TestKitSettings }
import akka.testkit.typed.{TestKit, TestKitSettings}
import akka.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
@ -45,14 +45,12 @@ object ClusterReceptionistSpec {
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
""")
trait PingProtocol
case object Pong
trait PingProtocol
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol
case object Perish extends PingProtocol
val pingPong = Actor.immutable[PingProtocol] { (_, msg)
val pingPongBehavior = Actor.immutable[PingProtocol] { (_, msg)
msg match {
case Ping(respondTo)
respondTo ! Pong
@ -67,14 +65,14 @@ object ClusterReceptionistSpec {
def identifier: Int = 47
def manifest(o: AnyRef): String = o match {
case _: Ping "a"
case Pong "b"
case Perish "c"
case Pong "b"
case Perish "c"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: Ping ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
case Pong Array.emptyByteArray
case Perish Array.emptyByteArray
case Pong Array.emptyByteArray
case Perish Array.emptyByteArray
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
@ -110,31 +108,26 @@ class ClusterReceptionistSpec extends TestKit("ClusterReceptionistSpec", Cluster
"The cluster receptionist" must {
"must eventually replicate registrations to the other side" in {
new TestSetup {
val regProbe = TestProbe[Any]()(system, testSettings)
val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings)
val regProbe = TestProbe[Any]()(system)
val regProbe2 = TestProbe[Any]()(adaptedSystem2)
adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service = spawn(pingPong)
system.receptionist ! Register(PingKey, service, regProbe.ref)
regProbe.expectMsg(Registered(PingKey, service))
val service = spawn(pingPongBehavior)
system.receptionist ! Register(PingKey, service, regProbe.ref)
regProbe.expectMsg(Registered(PingKey, service))
val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]]
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe2.ref)
regProbe2.expectMsg(Pong)
val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]]
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe2.ref)
regProbe2.expectMsg(Pong)
service ! Perish
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
}
service ! Perish
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
}
}
trait TestSetup {
}
override def afterAll(): Unit = {
super.afterAll()
Await.result(system.terminate(), 3.seconds)

View file

@ -0,0 +1,191 @@
package docs.akka.cluster.typed
import com.typesafe.config.ConfigFactory
import org.scalatest.{Matchers, WordSpec}
//#cluster-imports
import akka.actor.typed._
import akka.actor.typed.scaladsl._
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.cluster.typed._
//#cluster-imports
import akka.testkit.typed.scaladsl.TestProbe
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.time.{Millis, Seconds, Span}
import scala.concurrent.duration._
object BasicClusterExampleSpec {
val configSystem1 = ConfigFactory.parseString(
s"""
#config-seeds
akka {
actor {
provider = "cluster"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
}
}
#config-seeds
""")
val configSystem2 = ConfigFactory.parseString(
s"""
akka.remote.netty.tcp.port = 2552
"""
).withFallback(configSystem1)
}
class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually {
import BasicClusterExampleSpec._
"Cluster API" must {
"init cluster" in {
val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem1)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem2)
try {
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
} finally {
system1.terminate().futureValue
system2.terminate().futureValue
}
}
}
}
object BasicClusterManualSpec {
val clusterConfig = ConfigFactory.parseString(
s"""
#config
akka {
actor.provider = "cluster"
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
#config
""")
}
class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually with Matchers {
import BasicClusterManualSpec._
implicit override val patienceConfig =
PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis)))
"Cluster API" must {
"init cluster" in {
val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
try {
//#cluster-create
val cluster1 = Cluster(system)
//#cluster-create
val cluster2 = Cluster(system2)
//#cluster-join
cluster1.manager ! Join(cluster1.selfMember.address)
//#cluster-join
cluster2.manager ! Join(cluster1.selfMember.address)
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
}
//#cluster-leave
cluster2.manager ! Leave(cluster2.selfMember.address)
//#cluster-leave
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up)
cluster2.isTerminated shouldEqual true
}
} finally {
system.terminate().futureValue
system2.terminate().futureValue
}
}
"subscribe to cluster events" in {
implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
try {
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
val cluster3 = Cluster(system3)
//#cluster-subscribe
val testProbe = TestProbe[MemberEvent]()
cluster1.subscriptions ! Subscribe(testProbe.ref, classOf[MemberEvent])
//#cluster-subscribe
cluster1.manager ! Join(cluster1.selfMember.address)
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up)
}
testProbe.expectMsg(MemberUp(cluster1.selfMember))
cluster2.manager ! Join(cluster1.selfMember.address)
testProbe.expectMsgType[MemberJoined].member.address shouldEqual cluster2.selfMember.address
testProbe.expectMsgType[MemberUp].member.address shouldEqual cluster2.selfMember.address
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
}
cluster3.manager ! Join(cluster1.selfMember.address)
testProbe.expectMsgType[MemberJoined].member.address shouldEqual cluster3.selfMember.address
testProbe.expectMsgType[MemberUp].member.address shouldEqual cluster3.selfMember.address
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up)
cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up)
cluster3.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up)
}
//#cluster-leave-example
cluster1.manager ! Leave(cluster2.selfMember.address)
testProbe.expectMsgType[MemberLeft].member.address shouldEqual cluster2.selfMember.address
testProbe.expectMsgType[MemberExited].member.address shouldEqual cluster2.selfMember.address
testProbe.expectMsgType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address
//#cluster-leave-example
eventually {
cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
cluster3.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up)
}
system1.log.info("Downing node 3")
cluster1.manager ! Down(cluster3.selfMember.address)
testProbe.expectMsgType[MemberRemoved](10.seconds).member.address shouldEqual cluster3.selfMember.address
testProbe.expectNoMsg(1000.millis)
} finally {
system1.terminate().futureValue
system2.terminate().futureValue
system3.terminate().futureValue
}
}
}
}

View file

@ -0,0 +1,44 @@
package docs.akka.cluster.typed
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter._
import akka.serialization.SerializerWithStringManifest
import docs.akka.cluster.typed.PingPongExample._
//#serializer
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
private val actorRefResolver = ActorRefResolver(system.toTyped)
private val PingManifest = "a"
private val PongManifest = "b"
override def identifier = 41
override def manifest(msg: AnyRef) = msg match {
case _: Ping PingManifest
case Pong PongManifest
}
override def toBinary(msg: AnyRef) = msg match {
case Ping(who)
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
case Pong
Array.emptyByteArray
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
manifest match {
case PingManifest
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = actorRefResolver.resolveActorRef[Pong.type](str)
Ping(ref)
case PongManifest
Pong
}
}
}
//#serializer

View file

@ -0,0 +1,209 @@
package docs.akka.cluster.typed
import java.util.concurrent.ThreadLocalRandom
import akka.actor.Address
import akka.actor.typed._
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.Receptionist.{Find, Listing, ServiceKey}
import akka.actor.typed.scaladsl._
import akka.cluster.ClusterEvent._
import akka.cluster.typed.{Cluster, Join, Subscribe}
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpec
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable.Set
object RandomRouter {
def router[T](serviceKey: ServiceKey[T]): Behavior[T] =
Actor.deferred[Any] { ctx
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
def routingBehavior(routees: Vector[ActorRef[T]]): Behavior[Any] =
Actor.immutable { (_, msg)
msg match {
case Listing(_, services: Set[ActorRef[T]])
routingBehavior(services.toVector)
case other: T@unchecked
if (routees.isEmpty)
Actor.unhandled
else {
val i = ThreadLocalRandom.current.nextInt(routees.size)
routees(i) ! other
Actor.same
}
}
}
routingBehavior(Vector.empty)
}.narrow[T]
private final case class WrappedReachabilityEvent(event: ReachabilityEvent)
// same as above, but also subscribes to cluster reachability events and
// avoids routees that are unreachable
def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
Actor.deferred[Any] { ctx
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)
val cluster = Cluster(ctx.system)
// typically you have to map such external messages into this
// actor's protocol with a message adapter
val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.spawnAdapter(WrappedReachabilityEvent.apply)
cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])
def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
Actor.immutable { (_, msg)
msg match {
case Listing(_, services: Set[ActorRef[T]])
routingBehavior(services.toVector, unreachable)
case WrappedReachabilityEvent(event) => event match {
case UnreachableMember(m) =>
routingBehavior(routees, unreachable + m.address)
case ReachableMember(m) =>
routingBehavior(routees, unreachable - m.address)
}
case other: T@unchecked
if (routees.isEmpty)
Actor.unhandled
else {
val reachableRoutes =
if (unreachable.isEmpty) routees
else routees.filterNot { r => unreachable(r.path.address) }
val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
reachableRoutes(i) ! other
Actor.same
}
}
}
routingBehavior(Vector.empty, Set.empty)
}.narrow[T]
}
object PingPongExample {
//#ping-service
val PingServiceKey = Receptionist.ServiceKey[Ping]("pingService")
final case class Ping(replyTo: ActorRef[Pong.type])
final case object Pong
val pingService: Behavior[Ping] =
Actor.deferred { ctx
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self, ctx.system.deadLetters)
Actor.immutable[Ping] { (_, msg)
msg match {
case Ping(replyTo)
replyTo ! Pong
Actor.stopped
}
}
}
//#ping-service
//#pinger
def pinger(pingService: ActorRef[Ping]) = Actor.deferred[Pong.type] { ctx =>
pingService ! Ping(ctx.self)
Actor.immutable { (_, msg) =>
println("I was ponged!!" + msg)
Actor.same
}
}
//#pinger
//#pinger-guardian
val guardian: Behavior[Listing[Ping]] = Actor.deferred { ctx =>
ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self)
val ps = ctx.spawnAnonymous(pingService)
ctx.watch(ps)
Actor.immutablePartial[Listing[Ping]] {
case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty =>
listings.foreach(ps => ctx.spawnAnonymous(pinger(ps)))
Actor.same
} onSignal {
case (_, Terminated(`ps`)) =>
println("Ping service has shut down")
Actor.stopped
}
}
//#pinger-guardian
//#pinger-guardian-pinger-service
val guardianJustPingService: Behavior[Listing[Ping]] = Actor.deferred { ctx =>
val ps = ctx.spawnAnonymous(pingService)
ctx.watch(ps)
Actor.immutablePartial[Listing[Ping]] {
case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty =>
listings.foreach(ps => ctx.spawnAnonymous(pinger(ps)))
Actor.same
} onSignal {
case (_, Terminated(`ps`)) =>
println("Ping service has shut down")
Actor.stopped
}
}
//#pinger-guardian-pinger-service
//#pinger-guardian-just-pinger
val guardianJustPinger: Behavior[Listing[Ping]] = Actor.deferred { ctx =>
ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self)
Actor.immutablePartial[Listing[Ping]] {
case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty =>
listings.foreach(ps => ctx.spawnAnonymous(pinger(ps)))
Actor.same
}
}
//#pinger-guardian-just-pinger
}
object ReceptionistExampleSpec {
val clusterConfig = ConfigFactory.parseString(
s"""
#config
akka {
actor {
provider = "cluster"
}
cluster.jmx.multi-mbeans-in-same-jvm = on
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
}
#config
""")
}
class ReceptionistExampleSpec extends WordSpec with ScalaFutures {
import ReceptionistExampleSpec._
import PingPongExample._
"A local basic example" must {
"show register" in {
val system = ActorSystem(guardian, "PingPongExample")
system.whenTerminated.futureValue
}
}
"A remote basic example" must {
"show register" in {
val system1 = ActorSystem(guardianJustPingService, "PingPongExample", clusterConfig)
val system2 = ActorSystem(guardianJustPinger, "PingPongExample", clusterConfig)
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
cluster1.manager ! Join(cluster1.selfMember.address)
cluster1.manager ! Join(cluster2.selfMember.address)
}
}
}

View file

@ -0,0 +1,56 @@
# Actor discovery
With @ref:[untyped actors](general/addressing.md) you would use `ActorSelection` to "lookup" actors. Given an actor path with
address information you can get hold of an `ActorRef` to any actor. `ActorSelection` does not exist in Akka Typed,
so how do you get the actor references? You can send refs in messages but you need something to bootstrap the interaction.
## Receptionist
For this purpose there is an actor called the `Receptionist`. You register the specific actors that should be discoverable
from other nodes in the local `Receptionist` instance. The API of the receptionist is also based on actor messages.
This registry of actor references is then automatically distributed to all other nodes in the cluster.
You can lookup such actors with the key that was used when they were registered. The reply to such a `Find` request is
a `Listing`, which contains a `Set` of actor references that are registered for the key. Note that several actors can be
registered to the same key.
The registry is dynamic. New actors can be registered during the lifecycle of the system. Entries are removed when
registered actors are stopped or a node is removed from the cluster. To facilitate this dynamic aspect you can also subscribe
to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed.
The first scenario is an actor running that needs to be discovered by another actor but you are unable
to put a reference to it in an incoming message.
First we create a `PingService` actor and register it with the `Receptionist` against a
`ServiceKey` that will later be used to lookup the reference:
Scala
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #ping-service }
Java
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #ping-service }
Then we have another actor that requires a `PingService` to be constructed:
Scala
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger }
Java
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger }
Finally in the guardian actor we spawn the service as well as subscribing to any actors registering
against the `ServiceKey`. Subscribing means that the guardian actor will be informed of any
new registrations via a `Listing` message:
Scala
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger-guardian }
Java
: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger-guardian }
Each time a new (which is just a single time in this example) `PingService` is registered the
guardian actor spawns a pinger to ping it.
## Cluster Receptionist
The `Receptionist` also works in a cluster, the state for the receptionist is propagated via @ref:[distributed data](distributed-data.md).
The only difference is the serialisation concerns, see @ref:[clustering](cluster-typed.md).

View file

@ -20,7 +20,7 @@ To use Akka Typed add the following dependency:
@@dependency [sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-actor-typed_2.12
version=$version$
version=$akka.version$
}
## Introduction

View file

@ -1,9 +1,21 @@
# Sharding
@@@ warning
This module is currently marked as @ref:[may change](common/may-change.md) in the sense
of being the subject of active research. This means that API or semantics can
change without warning or deprecation period and it is not recommended to use
this module in production just yet—you have been warned.
@@@
To use the testkit add the following dependency:
@@dependency [sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster-sharding-typed_2.12
version=$version$
version=$akka.version$
}
TODO
For an introduction to Akka Cluster concepts see [Cluster Specification]. This documentation shows how to use the typed
Cluster API.

View file

@ -1,30 +1,88 @@
# Cluster
sbt
: @@@vars
```
"com.typesafe.akka" %% "akka-cluster-typed" % "$akka.version$"
```
@@@
@@@ warning
Gradle
: @@@vars
```
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.11', version: '$akka.version$'
}
```
@@@
This module is currently marked as @ref:[may change](common/may-change.md) in the sense
of being the subject of active research. This means that API or semantics can
change without warning or deprecation period and it is not recommended to use
this module in production just yet—you have been warned.
@@@
To use the testkit add the following dependency:
@@dependency [sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster-typed_2.12
version=$akka.version$
}
For an introduction to Akka Cluster concepts see @ref:[Cluster Specification](common/cluster.md). This documentation shows how to use the typed
Cluster API. All of the examples below assume the following imports:
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-imports }
And the minimum configuration required is to set a host/port for remoting and the `cluster`
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #config }
## Cluster API extension
The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of
cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor
references, i.e. its a message based API.
The references are on the `Cluster` extension:
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-create }
The Cluster extensions gives you access to:
* manager: An `ActorRef[ClusterCommand]` where a `ClusterCommand` is a command such as: `Join`, `Leave` and `Down`
* subscriptions: An `ActorRef[ClusterStateSubscription]` where a `ClusterStateSubscription` is one of `GetCurrentState` or `Subscribe` and `Unsubscribe` to cluster events like `MemberRemoved`
* state: The current `CurrentClusterState`
### Cluster Management
If not using configuration to specify seeds joining the cluster can be done programmatically via the `manager`.
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-join }
Leaving and downing are similar e.g.
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-leave }
### Cluster subscriptions
Cluster `subscriptions` can be used to receive messages when cluster state changes. For example, registering
for all `MemberEvent`s, then using the `manager` to have a node leave the cluster will result in events
for the node going through the lifecycle described in @ref:[Cluster Specification](common/cluster.md).
This example subscribes with a `TestProbe` but in a real application it would be an Actor:
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-subscribe }
Then asking a node to leave:
Scala
: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-leave-example }
## Serialization
See [serialization](https://doc.akka.io/docs/akka/current/scala/serialization.html) for how messages are sent between
ActorSystems. Actor references are typically included in the messages,
since there is no `sender`. To serialize actor references to/from string representation you will use the `ActorRefResolver`.
For example here's how a serializer could look for the `Ping` and `Pong` messages above:
Scala
: @@snip [PingSerializer.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala) { #serializer }
Maven
: @@@vars
```
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-typed_$scala.binary_version$</artifactId>
<version>$akka.version$</version>
</dependency>
```
@@@
TODO

View file

@ -5,8 +5,9 @@
@@@ index
* [actors](actors-typed.md)
* [fault-tolerance-typed.md](fault-tolerance-typed.md)
* [coexisting](coexisting.md)
* [fault-tolerance](fault-tolerance-typed.md)
* [actor-discovery](actor-discovery-typed.md)
* [cluster](cluster-typed.md)
* [cluster-sharding](cluster-sharding-typed.md)
* [persistence](persistence-typed.md)

View file

@ -15,7 +15,7 @@ To use the testkit add the following dependency:
@@dependency [sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-testkit-typed_2.12
version=$version$
version=$akka.version$
scope=test
}

View file

@ -5,104 +5,31 @@ import akka.{ actor ⇒ untyped }
import akka.actor.typed._
import akka.util.Helpers
import akka.{ actor a }
import akka.util.Unsafe.{ instance unsafe }
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.InternalApi
import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl, ActorSystemStub }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl, ActorSystemStub, SystemMessage }
/**
* A local synchronous ActorRef that invokes the given function for every message send.
* This reference can be watched and will do the right thing when it receives a [[akka.actor.typed.internal.DeathWatchNotification]].
* This reference cannot watch other references.
*/
private[akka] final class FunctionRef[-T](
_path: a.ActorPath,
send: (T, FunctionRef[T]) Unit,
_terminate: FunctionRef[T] Unit)
extends WatchableRef[T](_path) {
extends ActorRef[T] with ActorRefImpl[T] {
override def tell(msg: T): Unit = {
if (msg == null) throw InvalidMessageException("[null] is not an allowed message")
if (isAlive)
try send(msg, this) catch {
case NonFatal(_) // nothing we can do here
}
else () // we dont have deadLetters available
}
import internal._
override def sendSystem(signal: SystemMessage): Unit = signal match {
case internal.Create() // nothing to do
case internal.DeathWatchNotification(_, _) // were not watching, and were not a parent either
case internal.Terminate() doTerminate()
case internal.Watch(watchee, watcher) if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing)
case internal.Unwatch(watchee, watcher) if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing)
case NoMessage // nothing to do
send(msg, this)
}
override def path = _path
override def sendSystem(signal: SystemMessage): Unit = {}
override def isLocal = true
override def terminate(): Unit = _terminate(this)
}
/**
* The mechanics for synthetic ActorRefs that have a lifecycle and support being watched.
*/
private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] {
import WatchableRef._
/**
* Callback that is invoked when this ref has terminated. Even if doTerminate() is
* called multiple times, this callback is invoked only once.
*/
protected def terminate(): Unit
type S = Set[ActorRefImpl[Nothing]]
@volatile private[this] var _watchedBy: S = Set.empty
protected def isAlive: Boolean = _watchedBy != null
protected def doTerminate(): Unit = {
val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S]
if (watchedBy != null) {
try terminate() catch { case NonFatal(ex) }
if (watchedBy.nonEmpty) watchedBy foreach sendTerminated
}
}
private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit =
watcher.sendSystem(internal.DeathWatchNotification(this, null))
@tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit =
_watchedBy match {
case null sendTerminated(watcher)
case watchedBy
if (!watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher))
addWatcher(watcher) // try again
}
@tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = {
_watchedBy match {
case null // do nothing...
case watchedBy
if (watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher))
remWatcher(watcher) // try again
}
}
}
private[typed] object WatchableRef {
val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy"))
}
/**

View file

@ -14,9 +14,8 @@ import scala.collection.immutable
/**
* Utility for use as an [[ActorRef]] when synchronously testing [[akka.actor.typed.Behavior]]
* to be used along with [[BehaviorTestkit]].
*
* See [[akka.testkit.typed.scaladsl.TestProbe]] for asynchronous testing.
* to be used along with [[BehaviorTestkit]]. If you plan to use a real [[akka.actor.typed.ActorSystem]]
* then use [[akka.testkit.typed.scaladsl.TestProbe]] for asynchronous testing.
*/
@ApiMayChange
class TestInbox[T](name: String) {

View file

@ -4,14 +4,13 @@
package akka.testkit.typed.javadsl
import akka.actor.typed.ActorSystem
import akka.testkit.typed.TestKitSettings
/**
* Java API:
*/
class TestProbe[M](name: String, system: ActorSystem[_], settings: TestKitSettings) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system, settings) {
class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system) {
def this(system: ActorSystem[_], settings: TestKitSettings) = this("testProbe", system, settings)
def this(system: ActorSystem[_]) = this("testProbe", system)
/**
* Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor.

View file

@ -28,10 +28,10 @@ import scala.util.control.NonFatal
object TestProbe {
private val testActorId = new AtomicInteger(0)
def apply[M]()(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] =
def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M] =
apply(name = "testProbe")
def apply[M](name: String)(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] =
def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] =
new TestProbe(name)
private def testActor[M](queue: BlockingDeque[M]): Behavior[M] = Actor.immutable { (ctx, msg)
@ -40,9 +40,10 @@ object TestProbe {
}
}
class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settings: TestKitSettings) {
class TestProbe[M](name: String)(implicit system: ActorSystem[_]) {
import TestProbe._
private implicit val settings = TestKitSettings(system)
private val queue = new LinkedBlockingDeque[M]
private var end: Duration = Duration.Undefined