From f17dc5c7f70c3f35d5548020163f6b9c02c88b3b Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 5 Jan 2018 15:21:52 +0000 Subject: [PATCH] Receptionist docs and examples --- .../testing/async/BasicAsyncTestingTest.java | 7 +- .../src/test/resources/application.conf | 5 + .../src/test/resources/reference.conf | 4 - .../scala/akka/actor/typed/ActorSystem.scala | 14 ++ .../akka/actor/typed/MessageAndSignals.scala | 2 +- .../typed/receptionist/Receptionist.scala | 16 +- .../akka/cluster/typed/ClusterApiTest.java | 8 +- .../typed/ReceptionistExampleTest.java | 78 +++++++ .../akka/cluster/typed/ClusterApiSpec.scala | 4 +- .../typed/ClusterSingletonApiSpec.scala | 8 +- .../ClusterReceptionistSpec.scala | 49 ++-- .../typed/BasicClusterExampleSpec.scala | 191 ++++++++++++++++ .../akka/cluster/typed/PingSerializer.scala | 44 ++++ .../typed/ReceptionistExampleSpec.scala | 209 ++++++++++++++++++ .../src/main/paradox/actor-discovery-typed.md | 56 +++++ akka-docs/src/main/paradox/actors-typed.md | 2 +- .../main/paradox/cluster-sharding-typed.md | 16 +- akka-docs/src/main/paradox/cluster-typed.md | 108 ++++++--- akka-docs/src/main/paradox/index-typed.md | 3 +- akka-docs/src/main/paradox/testing-typed.md | 2 +- .../testkit/typed/StubbedActorContext.scala | 83 +------ .../scala/akka/testkit/typed/TestInbox.scala | 5 +- .../testkit/typed/javadsl/TestProbe.scala | 5 +- .../testkit/typed/scaladsl/TestProbe.scala | 7 +- 24 files changed, 758 insertions(+), 168 deletions(-) delete mode 100644 akka-actor-typed-tests/src/test/resources/reference.conf create mode 100644 akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java create mode 100644 akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala create mode 100644 akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala create mode 100644 akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala create mode 100644 akka-docs/src/main/paradox/actor-discovery-typed.md diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java index c447b02372..7177c1ac05 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java @@ -13,9 +13,6 @@ import org.junit.Test; //#test-header public class BasicAsyncTestingTest extends TestKit { - public BasicAsyncTestingTest() { - super("BasicAsyncTestingTest"); - } //#test-header //#under-test @@ -52,7 +49,7 @@ public class BasicAsyncTestingTest extends TestKit { @Test public void testVerifyingAResponse() { //#test-spawn - TestProbe probe = new TestProbe<>(system(), testkitSettings()); + TestProbe probe = new TestProbe<>(system()); ActorRef pinger = spawn(echoActor, "ping"); pinger.tell(new Ping("hello", probe.ref())); probe.expectMsg(new Pong("hello")); @@ -62,7 +59,7 @@ public class BasicAsyncTestingTest extends TestKit { @Test public void testVerifyingAResponseAnonymous() { //#test-spawn-anonymous - TestProbe probe = new TestProbe<>(system(), testkitSettings()); + TestProbe probe = new TestProbe<>(system()); ActorRef pinger = spawn(echoActor); pinger.tell(new Ping("hello", probe.ref())); probe.expectMsg(new Pong("hello")); diff --git a/akka-actor-typed-tests/src/test/resources/application.conf b/akka-actor-typed-tests/src/test/resources/application.conf index e053d9d956..02ef9c204b 100644 --- a/akka-actor-typed-tests/src/test/resources/application.conf +++ b/akka-actor-typed-tests/src/test/resources/application.conf @@ -19,3 +19,8 @@ dispatcher-8 { parallelism-max=8 } } + +akka.typed { + # for the akka.actor.ExtensionSpec + library-extensions += "akka.actor.typed.InstanceCountingExtension" +} diff --git a/akka-actor-typed-tests/src/test/resources/reference.conf b/akka-actor-typed-tests/src/test/resources/reference.conf deleted file mode 100644 index 5bbd174c10..0000000000 --- a/akka-actor-typed-tests/src/test/resources/reference.conf +++ /dev/null @@ -1,4 +0,0 @@ -akka.typed { - # for the akka.actor.ExtensionSpec - library-extensions += "akka.actor.typed.InstanceCountingExtension" -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 5ee9b8f8bd..20b691e0be 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -170,6 +170,14 @@ object ActorSystem { val appConfig = config.getOrElse(ConfigFactory.load(cl)) createInternal(name, guardianBehavior, guardianProps, Some(appConfig), classLoader, executionContext) } + /** + * Scala API: Create an ActorSystem + */ + def apply[T]( + guardianBehavior: Behavior[T], + name: String, + config: Config + ): ActorSystem[T] = apply(guardianBehavior, name, config = Some(config)) /** * Java API: Create an ActorSystem @@ -191,6 +199,12 @@ object ActorSystem { def create[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] = apply(guardianBehavior, name) + /** + * Java API: Create an ActorSystem + */ + def create[T](guardianBehavior: Behavior[T], name: String, config: Config): ActorSystem[T] = + apply(guardianBehavior, name, config = Some(config)) + /** * Create an ActorSystem based on the untyped [[akka.actor.ActorSystem]] * which runs Akka Typed [[Behavior]] on an emulation layer. In this diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala index a16161e413..d8f6ea678c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala @@ -57,7 +57,7 @@ case object PostStop extends PostStop { /** * Lifecycle signal that is fired when an Actor that was watched has terminated. * Watching is performed by invoking the - * [[akka.actor.typed.ActorContext]] `watch` method. The DeathWatch service is + * [[akka.actor.typed.scaladsl.ActorContext.watch]] . The DeathWatch service is * idempotent, meaning that registering twice has the same effect as registering * once. Registration does not need to happen before the Actor terminates, a * notification is guaranteed to arrive after both registration and termination diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala index 4f83e12e2d..b8a2967ef5 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala @@ -109,7 +109,7 @@ object Receptionist extends ExtensionId[Receptionist] { /** * Associate the given [[akka.actor.typed.ActorRef]] with the given [[ServiceKey]]. Multiple - * registrations can be made for the same key. Unregistration is implied by + * registrations can be made for the same key. De-registration is implied by * the end of the referenced Actor’s lifecycle. * * Registration will be acknowledged with the [[Registered]] message to the given replyTo actor. @@ -119,6 +119,12 @@ object Receptionist extends ExtensionId[Receptionist] { /** Auxiliary constructor to be used with the ask pattern */ def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] ⇒ Register[T] = replyTo ⇒ Register(key, service, replyTo) + + /** + * Java API + */ + def create[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) = + Register(key, serviceInstance, replyTo) } /** @@ -135,6 +141,14 @@ object Receptionist extends ExtensionId[Receptionist] { */ final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command + object Subscribe { + /** + * Java API + */ + def create[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) = + Subscribe(key, subscriber) + } + /** * Query the Receptionist for a list of all Actors implementing the given * protocol. diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index 07efea4852..29fdff81a7 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -13,8 +13,6 @@ import scala.concurrent.duration.Duration; public class ClusterApiTest extends JUnitSuite { - - @Test public void joinLeaveAndObserve() throws Exception { Config config = ConfigFactory.parseString( @@ -35,18 +33,16 @@ public class ClusterApiTest extends JUnitSuite { ActorSystem system2 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config)); try { - TestKitSettings testKitSettings = new TestKitSettings(system1.settings().config()); - Cluster cluster1 = Cluster.get(system1); Cluster cluster2 = Cluster.get(system2); - TestProbe probe1 = new TestProbe<>(system1, testKitSettings); + TestProbe probe1 = new TestProbe<>(system1); cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class)); cluster1.manager().tell(new Join(cluster1.selfMember().address())); probe1.expectMsgType(SelfUp.class); - TestProbe probe2 = new TestProbe<>(system2, testKitSettings); + TestProbe probe2 = new TestProbe<>(system2); cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class)); cluster2.manager().tell(new Join(cluster1.selfMember().address())); probe2.expectMsgType(SelfUp.class); diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java new file mode 100644 index 0000000000..54b8718f04 --- /dev/null +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java @@ -0,0 +1,78 @@ +package jdocs.akka.cluster.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Actor; +import akka.actor.typed.receptionist.Receptionist; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + +public class ReceptionistExampleTest { + + public static class PingPongExample { + //#ping-service + static Receptionist.ServiceKey PingServiceKey = + Receptionist.ServiceKey.create(Ping.class, "pingService"); + + public static class Pong {} + public static class Ping { + private final ActorRef replyTo; + Ping(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + static Behavior pingService() { + return Actor.deferred((ctx) -> { + ctx.getSystem().receptionist() + .tell(Receptionist.Register.create(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters())); + return Actor.immutable(Ping.class) + .onMessage(Ping.class, (c, msg) -> { + msg.replyTo.tell(new Pong()); + return Actor.same(); + }).build(); + }); + } + //#ping-service + + //#pinger + static Behavior pinger(ActorRef pingService) { + return Actor.deferred((ctx) -> { + pingService.tell(new Ping(ctx.getSelf())); + return Actor.immutable(Pong.class) + .onMessage(Pong.class, (c, msg) -> { + System.out.println("I was ponged! " + msg); + return Actor.same(); + }).build(); + }); + } + //#pinger + + //#pinger-guardian + static Behavior> guardian() { + return Actor.deferred((ctx) -> { + ctx.getSystem().receptionist() + .tell(Receptionist.Subscribe.create(PingServiceKey, ctx.getSelf())); + ActorRef ps = ctx.spawnAnonymous(pingService()); + ctx.watch(ps); + return Actor.immutable((c, msg) -> { + msg.getServiceInstances().forEach(ar -> ctx.spawnAnonymous(pinger(ar))); + return Actor.same(); + }); + }); + } + //#pinger-guardian + } + + @Test + public void workPlease() throws Exception { + ActorSystem> system = + ActorSystem.create(PingPongExample.guardian(), "ReceptionistExample"); + + Await.ready(system.terminate(), Duration.create(2, TimeUnit.SECONDS)); + } +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala index dfca1b5efb..0f254bef57 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala @@ -48,8 +48,8 @@ class ClusterApiSpec extends TestKit("ClusterApiSpec", ClusterApiSpec.config) wi try { val clusterNode2 = Cluster(adaptedSystem2) - val node1Probe = TestProbe[AnyRef]()(system, testSettings) - val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings) + val node1Probe = TestProbe[AnyRef]()(system) + val node2Probe = TestProbe[AnyRef]()(adaptedSystem2) // initial cached selfMember clusterNode1.selfMember.status should ===(MemberStatus.Removed) diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala index 9c78d5b4bd..4a3f58c634 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala @@ -102,10 +102,10 @@ class ClusterSingletonApiSpec extends TestKit("ClusterSingletonApiSpec", Cluster "A typed cluster singleton" must { "be accessible from two nodes in a cluster" in { - val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings]) + val node1UpProbe = TestProbe[SelfUp]()(system) clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp]) - val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings]) + val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2) clusterNode1.subscriptions ! Subscribe(node2UpProbe.ref, classOf[SelfUp]) clusterNode1.manager ! Join(clusterNode1.selfMember.address) @@ -125,8 +125,8 @@ class ClusterSingletonApiSpec extends TestKit("ClusterSingletonApiSpec", Cluster cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref) cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref) - val node1PongProbe = TestProbe[Pong.type]()(system, implicitly[TestKitSettings]) - val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings]) + val node1PongProbe = TestProbe[Pong.type]()(system) + val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2) node1PongProbe.awaitAssert({ node1ref ! Ping(node1PongProbe.ref) diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 22bb13d6ec..8692354e9b 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -6,14 +6,14 @@ package akka.cluster.typed.internal.receptionist import java.nio.charset.StandardCharsets import akka.actor.ExtendedActorSystem -import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown } +import akka.actor.typed.{ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown} import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ import akka.cluster.Cluster import akka.serialization.SerializerWithStringManifest -import akka.testkit.typed.{ TestKit, TestKitSettings } +import akka.testkit.typed.{TestKit, TestKitSettings} import akka.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory @@ -45,14 +45,12 @@ object ClusterReceptionistSpec { akka.cluster.jmx.multi-mbeans-in-same-jvm = on """) - trait PingProtocol case object Pong + trait PingProtocol case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol - case object Perish extends PingProtocol - val pingPong = Actor.immutable[PingProtocol] { (_, msg) ⇒ - + val pingPongBehavior = Actor.immutable[PingProtocol] { (_, msg) ⇒ msg match { case Ping(respondTo) ⇒ respondTo ! Pong @@ -67,14 +65,14 @@ object ClusterReceptionistSpec { def identifier: Int = 47 def manifest(o: AnyRef): String = o match { case _: Ping ⇒ "a" - case Pong ⇒ "b" - case Perish ⇒ "c" + case Pong ⇒ "b" + case Perish ⇒ "c" } def toBinary(o: AnyRef): Array[Byte] = o match { case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8) - case Pong ⇒ Array.emptyByteArray - case Perish ⇒ Array.emptyByteArray + case Pong ⇒ Array.emptyByteArray + case Perish ⇒ Array.emptyByteArray } def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { @@ -110,31 +108,26 @@ class ClusterReceptionistSpec extends TestKit("ClusterReceptionistSpec", Cluster "The cluster receptionist" must { "must eventually replicate registrations to the other side" in { - new TestSetup { - val regProbe = TestProbe[Any]()(system, testSettings) - val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings) + val regProbe = TestProbe[Any]()(system) + val regProbe2 = TestProbe[Any]()(adaptedSystem2) - adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref) - regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref) + regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - val service = spawn(pingPong) - system.receptionist ! Register(PingKey, service, regProbe.ref) - regProbe.expectMsg(Registered(PingKey, service)) + val service = spawn(pingPongBehavior) + system.receptionist ! Register(PingKey, service, regProbe.ref) + regProbe.expectMsg(Registered(PingKey, service)) - val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]] - val theRef = remoteServiceRefs.head - theRef ! Ping(regProbe2.ref) - regProbe2.expectMsg(Pong) + val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]] + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe2.ref) + regProbe2.expectMsg(Pong) - service ! Perish - regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - } + service ! Perish + regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) } } - trait TestSetup { - } - override def afterAll(): Unit = { super.afterAll() Await.result(system.terminate(), 3.seconds) diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala new file mode 100644 index 0000000000..c8d1ba717f --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -0,0 +1,191 @@ +package docs.akka.cluster.typed + +import com.typesafe.config.ConfigFactory +import org.scalatest.{Matchers, WordSpec} +//#cluster-imports +import akka.actor.typed._ +import akka.actor.typed.scaladsl._ +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import akka.cluster.typed._ +//#cluster-imports +import akka.testkit.typed.scaladsl.TestProbe +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.time.{Millis, Seconds, Span} + +import scala.concurrent.duration._ + +object BasicClusterExampleSpec { + val configSystem1 = ConfigFactory.parseString( + s""" +#config-seeds +akka { + actor { + provider = "cluster" + } + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 2551 + } + } + + cluster { + seed-nodes = [ + "akka.tcp://ClusterSystem@127.0.0.1:2551", + "akka.tcp://ClusterSystem@127.0.0.1:2552"] + } +} +#config-seeds + """) + + val configSystem2 = ConfigFactory.parseString( + s""" + akka.remote.netty.tcp.port = 2552 + """ + ).withFallback(configSystem1) +} + +class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually { + + import BasicClusterExampleSpec._ + + "Cluster API" must { + "init cluster" in { + val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem1) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem2) + try { + val cluster1 = Cluster(system1) + val cluster2 = Cluster(system2) + } finally { + system1.terminate().futureValue + system2.terminate().futureValue + } + + } + } +} + +object BasicClusterManualSpec { + val clusterConfig = ConfigFactory.parseString( + s""" +#config +akka { + actor.provider = "cluster" + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } +} +#config + """) + +} + +class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually with Matchers { + + import BasicClusterManualSpec._ + + implicit override val patienceConfig = + PatienceConfig(timeout = scaled(Span(10, Seconds)), interval = scaled(Span(100, Millis))) + + "Cluster API" must { + "init cluster" in { + val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + + try { + //#cluster-create + val cluster1 = Cluster(system) + //#cluster-create + val cluster2 = Cluster(system2) + + //#cluster-join + cluster1.manager ! Join(cluster1.selfMember.address) + //#cluster-join + cluster2.manager ! Join(cluster1.selfMember.address) + + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + } + + //#cluster-leave + cluster2.manager ! Leave(cluster2.selfMember.address) + //#cluster-leave + + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up) + cluster2.isTerminated shouldEqual true + } + } finally { + system.terminate().futureValue + system2.terminate().futureValue + } + } + + "subscribe to cluster events" in { + implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + + try { + val cluster1 = Cluster(system1) + val cluster2 = Cluster(system2) + val cluster3 = Cluster(system3) + + //#cluster-subscribe + val testProbe = TestProbe[MemberEvent]() + cluster1.subscriptions ! Subscribe(testProbe.ref, classOf[MemberEvent]) + //#cluster-subscribe + + cluster1.manager ! Join(cluster1.selfMember.address) + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up) + } + testProbe.expectMsg(MemberUp(cluster1.selfMember)) + + cluster2.manager ! Join(cluster1.selfMember.address) + testProbe.expectMsgType[MemberJoined].member.address shouldEqual cluster2.selfMember.address + testProbe.expectMsgType[MemberUp].member.address shouldEqual cluster2.selfMember.address + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + } + + cluster3.manager ! Join(cluster1.selfMember.address) + testProbe.expectMsgType[MemberJoined].member.address shouldEqual cluster3.selfMember.address + testProbe.expectMsgType[MemberUp].member.address shouldEqual cluster3.selfMember.address + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up) + cluster2.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up) + cluster3.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up, MemberStatus.up) + } + + //#cluster-leave-example + cluster1.manager ! Leave(cluster2.selfMember.address) + testProbe.expectMsgType[MemberLeft].member.address shouldEqual cluster2.selfMember.address + testProbe.expectMsgType[MemberExited].member.address shouldEqual cluster2.selfMember.address + testProbe.expectMsgType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address + //#cluster-leave-example + + eventually { + cluster1.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + cluster3.state.members.toList.map(_.status) shouldEqual List(MemberStatus.up, MemberStatus.up) + } + + system1.log.info("Downing node 3") + cluster1.manager ! Down(cluster3.selfMember.address) + testProbe.expectMsgType[MemberRemoved](10.seconds).member.address shouldEqual cluster3.selfMember.address + + testProbe.expectNoMsg(1000.millis) + } finally { + system1.terminate().futureValue + system2.terminate().futureValue + system3.terminate().futureValue + } + } + } +} diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala new file mode 100644 index 0000000000..46657a010f --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala @@ -0,0 +1,44 @@ +package docs.akka.cluster.typed + +import java.nio.charset.StandardCharsets + +import akka.actor.ExtendedActorSystem +import akka.actor.typed.ActorRefResolver +import akka.actor.typed.scaladsl.adapter._ +import akka.serialization.SerializerWithStringManifest +import docs.akka.cluster.typed.PingPongExample._ + + +//#serializer +class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + private val actorRefResolver = ActorRefResolver(system.toTyped) + + private val PingManifest = "a" + private val PongManifest = "b" + + override def identifier = 41 + + override def manifest(msg: AnyRef) = msg match { + case _: Ping ⇒ PingManifest + case Pong ⇒ PongManifest + } + + override def toBinary(msg: AnyRef) = msg match { + case Ping(who) ⇒ + ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) + case Pong ⇒ + Array.emptyByteArray + } + + override def fromBinary(bytes: Array[Byte], manifest: String) = { + manifest match { + case PingManifest ⇒ + val str = new String(bytes, StandardCharsets.UTF_8) + val ref = actorRefResolver.resolveActorRef[Pong.type](str) + Ping(ref) + case PongManifest ⇒ + Pong + } + } +} +//#serializer diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala new file mode 100644 index 0000000000..15c1722d46 --- /dev/null +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala @@ -0,0 +1,209 @@ +package docs.akka.cluster.typed + +import java.util.concurrent.ThreadLocalRandom + +import akka.actor.Address +import akka.actor.typed._ +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.Receptionist.{Find, Listing, ServiceKey} +import akka.actor.typed.scaladsl._ +import akka.cluster.ClusterEvent._ +import akka.cluster.typed.{Cluster, Join, Subscribe} +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpec +import org.scalatest.concurrent.ScalaFutures + +import scala.collection.immutable.Set + +object RandomRouter { + + def router[T](serviceKey: ServiceKey[T]): Behavior[T] = + Actor.deferred[Any] { ctx ⇒ + ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self) + + def routingBehavior(routees: Vector[ActorRef[T]]): Behavior[Any] = + Actor.immutable { (_, msg) ⇒ + msg match { + case Listing(_, services: Set[ActorRef[T]]) ⇒ + routingBehavior(services.toVector) + case other: T@unchecked ⇒ + if (routees.isEmpty) + Actor.unhandled + else { + val i = ThreadLocalRandom.current.nextInt(routees.size) + routees(i) ! other + Actor.same + } + } + } + + routingBehavior(Vector.empty) + }.narrow[T] + + private final case class WrappedReachabilityEvent(event: ReachabilityEvent) + + // same as above, but also subscribes to cluster reachability events and + // avoids routees that are unreachable + def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] = + Actor.deferred[Any] { ctx ⇒ + ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self) + + val cluster = Cluster(ctx.system) + // typically you have to map such external messages into this + // actor's protocol with a message adapter + val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.spawnAdapter(WrappedReachabilityEvent.apply) + cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent]) + + def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] = + Actor.immutable { (_, msg) ⇒ + msg match { + case Listing(_, services: Set[ActorRef[T]]) ⇒ + routingBehavior(services.toVector, unreachable) + case WrappedReachabilityEvent(event) => event match { + case UnreachableMember(m) => + routingBehavior(routees, unreachable + m.address) + case ReachableMember(m) => + routingBehavior(routees, unreachable - m.address) + } + + case other: T@unchecked ⇒ + if (routees.isEmpty) + Actor.unhandled + else { + val reachableRoutes = + if (unreachable.isEmpty) routees + else routees.filterNot { r => unreachable(r.path.address) } + + val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size) + reachableRoutes(i) ! other + Actor.same + } + } + } + + routingBehavior(Vector.empty, Set.empty) + }.narrow[T] +} + +object PingPongExample { + //#ping-service + val PingServiceKey = Receptionist.ServiceKey[Ping]("pingService") + + final case class Ping(replyTo: ActorRef[Pong.type]) + final case object Pong + + val pingService: Behavior[Ping] = + Actor.deferred { ctx ⇒ + ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self, ctx.system.deadLetters) + Actor.immutable[Ping] { (_, msg) ⇒ + msg match { + case Ping(replyTo) ⇒ + replyTo ! Pong + Actor.stopped + } + } + } + //#ping-service + + //#pinger + def pinger(pingService: ActorRef[Ping]) = Actor.deferred[Pong.type] { ctx => + pingService ! Ping(ctx.self) + Actor.immutable { (_, msg) => + println("I was ponged!!" + msg) + Actor.same + } + } + //#pinger + + //#pinger-guardian + val guardian: Behavior[Listing[Ping]] = Actor.deferred { ctx => + ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self) + val ps = ctx.spawnAnonymous(pingService) + ctx.watch(ps) + Actor.immutablePartial[Listing[Ping]] { + case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty => + listings.foreach(ps => ctx.spawnAnonymous(pinger(ps))) + Actor.same + } onSignal { + case (_, Terminated(`ps`)) => + println("Ping service has shut down") + Actor.stopped + } + } + //#pinger-guardian + + //#pinger-guardian-pinger-service + val guardianJustPingService: Behavior[Listing[Ping]] = Actor.deferred { ctx => + val ps = ctx.spawnAnonymous(pingService) + ctx.watch(ps) + Actor.immutablePartial[Listing[Ping]] { + case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty => + listings.foreach(ps => ctx.spawnAnonymous(pinger(ps))) + Actor.same + } onSignal { + case (_, Terminated(`ps`)) => + println("Ping service has shut down") + Actor.stopped + } + } + //#pinger-guardian-pinger-service + + //#pinger-guardian-just-pinger + val guardianJustPinger: Behavior[Listing[Ping]] = Actor.deferred { ctx => + ctx.system.receptionist ! Receptionist.Subscribe(PingServiceKey, ctx.self) + Actor.immutablePartial[Listing[Ping]] { + case (c, Listing(PingServiceKey, listings)) if listings.nonEmpty => + listings.foreach(ps => ctx.spawnAnonymous(pinger(ps))) + Actor.same + } + } + //#pinger-guardian-just-pinger + +} + +object ReceptionistExampleSpec { + val clusterConfig = ConfigFactory.parseString( + s""" +#config +akka { + actor { + provider = "cluster" + } + cluster.jmx.multi-mbeans-in-same-jvm = on + remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } +} +#config + """) + +} + +class ReceptionistExampleSpec extends WordSpec with ScalaFutures { + + import ReceptionistExampleSpec._ + import PingPongExample._ + + "A local basic example" must { + "show register" in { + val system = ActorSystem(guardian, "PingPongExample") + system.whenTerminated.futureValue + } + } + + "A remote basic example" must { + "show register" in { + val system1 = ActorSystem(guardianJustPingService, "PingPongExample", clusterConfig) + val system2 = ActorSystem(guardianJustPinger, "PingPongExample", clusterConfig) + + val cluster1 = Cluster(system1) + val cluster2 = Cluster(system2) + + cluster1.manager ! Join(cluster1.selfMember.address) + cluster1.manager ! Join(cluster2.selfMember.address) + } + } +} diff --git a/akka-docs/src/main/paradox/actor-discovery-typed.md b/akka-docs/src/main/paradox/actor-discovery-typed.md new file mode 100644 index 0000000000..35e39163d5 --- /dev/null +++ b/akka-docs/src/main/paradox/actor-discovery-typed.md @@ -0,0 +1,56 @@ +# Actor discovery + +With @ref:[untyped actors](general/addressing.md) you would use `ActorSelection` to "lookup" actors. Given an actor path with +address information you can get hold of an `ActorRef` to any actor. `ActorSelection` does not exist in Akka Typed, +so how do you get the actor references? You can send refs in messages but you need something to bootstrap the interaction. + +## Receptionist + +For this purpose there is an actor called the `Receptionist`. You register the specific actors that should be discoverable +from other nodes in the local `Receptionist` instance. The API of the receptionist is also based on actor messages. +This registry of actor references is then automatically distributed to all other nodes in the cluster. +You can lookup such actors with the key that was used when they were registered. The reply to such a `Find` request is +a `Listing`, which contains a `Set` of actor references that are registered for the key. Note that several actors can be +registered to the same key. + +The registry is dynamic. New actors can be registered during the lifecycle of the system. Entries are removed when +registered actors are stopped or a node is removed from the cluster. To facilitate this dynamic aspect you can also subscribe +to changes with the `Receptionist.Subscribe` message. It will send `Listing` messages to the subscriber when entries for a key are changed. + +The first scenario is an actor running that needs to be discovered by another actor but you are unable +to put a reference to it in an incoming message. + +First we create a `PingService` actor and register it with the `Receptionist` against a +`ServiceKey` that will later be used to lookup the reference: + +Scala +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #ping-service } + +Java +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #ping-service } + +Then we have another actor that requires a `PingService` to be constructed: + +Scala +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger } + +Java +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger } + +Finally in the guardian actor we spawn the service as well as subscribing to any actors registering +against the `ServiceKey`. Subscribing means that the guardian actor will be informed of any +new registrations via a `Listing` message: + +Scala +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala) { #pinger-guardian } + +Java +: @@snip [ReceptionistExample]($akka$/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java) { #pinger-guardian } + +Each time a new (which is just a single time in this example) `PingService` is registered the +guardian actor spawns a pinger to ping it. + +## Cluster Receptionist + +The `Receptionist` also works in a cluster, the state for the receptionist is propagated via @ref:[distributed data](distributed-data.md). +The only difference is the serialisation concerns, see @ref:[clustering](cluster-typed.md). diff --git a/akka-docs/src/main/paradox/actors-typed.md b/akka-docs/src/main/paradox/actors-typed.md index 5979c7b78e..568541cdcc 100644 --- a/akka-docs/src/main/paradox/actors-typed.md +++ b/akka-docs/src/main/paradox/actors-typed.md @@ -20,7 +20,7 @@ To use Akka Typed add the following dependency: @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka artifact=akka-actor-typed_2.12 - version=$version$ + version=$akka.version$ } ## Introduction diff --git a/akka-docs/src/main/paradox/cluster-sharding-typed.md b/akka-docs/src/main/paradox/cluster-sharding-typed.md index 3c45dc7576..b8f7e11a23 100644 --- a/akka-docs/src/main/paradox/cluster-sharding-typed.md +++ b/akka-docs/src/main/paradox/cluster-sharding-typed.md @@ -1,9 +1,21 @@ # Sharding +@@@ warning + +This module is currently marked as @ref:[may change](common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + +@@@ + +To use the testkit add the following dependency: + @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka artifact=akka-cluster-sharding-typed_2.12 - version=$version$ + version=$akka.version$ } -TODO +For an introduction to Akka Cluster concepts see [Cluster Specification]. This documentation shows how to use the typed +Cluster API. \ No newline at end of file diff --git a/akka-docs/src/main/paradox/cluster-typed.md b/akka-docs/src/main/paradox/cluster-typed.md index 466cc34fda..ae80856963 100644 --- a/akka-docs/src/main/paradox/cluster-typed.md +++ b/akka-docs/src/main/paradox/cluster-typed.md @@ -1,30 +1,88 @@ # Cluster -sbt -: @@@vars - ``` - "com.typesafe.akka" %% "akka-cluster-typed" % "$akka.version$" - ``` - @@@ +@@@ warning -Gradle -: @@@vars - ``` - dependencies { - compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.11', version: '$akka.version$' - } - ``` - @@@ +This module is currently marked as @ref:[may change](common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + +@@@ + +To use the testkit add the following dependency: + +@@dependency [sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-cluster-typed_2.12 + version=$akka.version$ +} + +For an introduction to Akka Cluster concepts see @ref:[Cluster Specification](common/cluster.md). This documentation shows how to use the typed +Cluster API. All of the examples below assume the following imports: + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-imports } + +And the minimum configuration required is to set a host/port for remoting and the `cluster` + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #config } + +## Cluster API extension + +The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of +cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor +references, i.e. it’s a message based API. + +The references are on the `Cluster` extension: + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-create } + + +The Cluster extensions gives you access to: + +* manager: An `ActorRef[ClusterCommand]` where a `ClusterCommand` is a command such as: `Join`, `Leave` and `Down` +* subscriptions: An `ActorRef[ClusterStateSubscription]` where a `ClusterStateSubscription` is one of `GetCurrentState` or `Subscribe` and `Unsubscribe` to cluster events like `MemberRemoved` +* state: The current `CurrentClusterState` + + +### Cluster Management + +If not using configuration to specify seeds joining the cluster can be done programmatically via the `manager`. + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-join } + +Leaving and downing are similar e.g. + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-leave } + +### Cluster subscriptions + +Cluster `subscriptions` can be used to receive messages when cluster state changes. For example, registering +for all `MemberEvent`s, then using the `manager` to have a node leave the cluster will result in events +for the node going through the lifecycle described in @ref:[Cluster Specification](common/cluster.md). + +This example subscribes with a `TestProbe` but in a real application it would be an Actor: + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-subscribe } + +Then asking a node to leave: + +Scala +: @@snip [BasicClusterExampleSpec.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala) { #cluster-leave-example } + +## Serialization + +See [serialization](https://doc.akka.io/docs/akka/current/scala/serialization.html) for how messages are sent between +ActorSystems. Actor references are typically included in the messages, +since there is no `sender`. To serialize actor references to/from string representation you will use the `ActorRefResolver`. +For example here's how a serializer could look for the `Ping` and `Pong` messages above: + +Scala +: @@snip [PingSerializer.scala]($akka$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/PingSerializer.scala) { #serializer } -Maven -: @@@vars - ``` - - com.typesafe.akka - akka-cluster-typed_$scala.binary_version$ - $akka.version$ - - ``` - @@@ -TODO diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index 130faec70d..364484c68c 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -5,8 +5,9 @@ @@@ index * [actors](actors-typed.md) -* [fault-tolerance-typed.md](fault-tolerance-typed.md) * [coexisting](coexisting.md) +* [fault-tolerance](fault-tolerance-typed.md) +* [actor-discovery](actor-discovery-typed.md) * [cluster](cluster-typed.md) * [cluster-sharding](cluster-sharding-typed.md) * [persistence](persistence-typed.md) diff --git a/akka-docs/src/main/paradox/testing-typed.md b/akka-docs/src/main/paradox/testing-typed.md index 9367d159db..a5be16d660 100644 --- a/akka-docs/src/main/paradox/testing-typed.md +++ b/akka-docs/src/main/paradox/testing-typed.md @@ -15,7 +15,7 @@ To use the testkit add the following dependency: @@dependency [sbt,Maven,Gradle] { group=com.typesafe.akka artifact=akka-testkit-typed_2.12 - version=$version$ + version=$akka.version$ scope=test } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala index d07915c5b8..b7285382bb 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/StubbedActorContext.scala @@ -5,104 +5,31 @@ import akka.{ actor ⇒ untyped } import akka.actor.typed._ import akka.util.Helpers import akka.{ actor ⇒ a } -import akka.util.Unsafe.{ instance ⇒ unsafe } import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration import akka.annotation.InternalApi -import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl, ActorSystemStub } - -import scala.annotation.tailrec -import scala.util.control.NonFatal +import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl, ActorSystemStub, SystemMessage } /** * A local synchronous ActorRef that invokes the given function for every message send. - * This reference can be watched and will do the right thing when it receives a [[akka.actor.typed.internal.DeathWatchNotification]]. * This reference cannot watch other references. */ private[akka] final class FunctionRef[-T]( _path: a.ActorPath, send: (T, FunctionRef[T]) ⇒ Unit, _terminate: FunctionRef[T] ⇒ Unit) - extends WatchableRef[T](_path) { + extends ActorRef[T] with ActorRefImpl[T] { override def tell(msg: T): Unit = { if (msg == null) throw InvalidMessageException("[null] is not an allowed message") - if (isAlive) - try send(msg, this) catch { - case NonFatal(_) ⇒ // nothing we can do here - } - else () // we don’t have deadLetters available - } - - import internal._ - - override def sendSystem(signal: SystemMessage): Unit = signal match { - case internal.Create() ⇒ // nothing to do - case internal.DeathWatchNotification(_, _) ⇒ // we’re not watching, and we’re not a parent either - case internal.Terminate() ⇒ doTerminate() - case internal.Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing) - case internal.Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing) - case NoMessage ⇒ // nothing to do + send(msg, this) } + override def path = _path + override def sendSystem(signal: SystemMessage): Unit = {} override def isLocal = true - - override def terminate(): Unit = _terminate(this) -} - -/** - * The mechanics for synthetic ActorRefs that have a lifecycle and support being watched. - */ -private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] { - import WatchableRef._ - - /** - * Callback that is invoked when this ref has terminated. Even if doTerminate() is - * called multiple times, this callback is invoked only once. - */ - protected def terminate(): Unit - - type S = Set[ActorRefImpl[Nothing]] - - @volatile private[this] var _watchedBy: S = Set.empty - - protected def isAlive: Boolean = _watchedBy != null - - protected def doTerminate(): Unit = { - val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S] - if (watchedBy != null) { - try terminate() catch { case NonFatal(ex) ⇒ } - if (watchedBy.nonEmpty) watchedBy foreach sendTerminated - } - } - - private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit = - watcher.sendSystem(internal.DeathWatchNotification(this, null)) - - @tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit = - _watchedBy match { - case null ⇒ sendTerminated(watcher) - case watchedBy ⇒ - if (!watchedBy.contains(watcher)) - if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher)) - addWatcher(watcher) // try again - } - - @tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = { - _watchedBy match { - case null ⇒ // do nothing... - case watchedBy ⇒ - if (watchedBy.contains(watcher)) - if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher)) - remWatcher(watcher) // try again - } - } -} - -private[typed] object WatchableRef { - val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy")) } /** diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestInbox.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestInbox.scala index 7cf277a3d4..5cfbebb7bf 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestInbox.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestInbox.scala @@ -14,9 +14,8 @@ import scala.collection.immutable /** * Utility for use as an [[ActorRef]] when synchronously testing [[akka.actor.typed.Behavior]] - * to be used along with [[BehaviorTestkit]]. - * - * See [[akka.testkit.typed.scaladsl.TestProbe]] for asynchronous testing. + * to be used along with [[BehaviorTestkit]]. If you plan to use a real [[akka.actor.typed.ActorSystem]] + * then use [[akka.testkit.typed.scaladsl.TestProbe]] for asynchronous testing. */ @ApiMayChange class TestInbox[T](name: String) { diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala index 149954ddbf..89ce65c959 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/javadsl/TestProbe.scala @@ -4,14 +4,13 @@ package akka.testkit.typed.javadsl import akka.actor.typed.ActorSystem -import akka.testkit.typed.TestKitSettings /** * Java API: */ -class TestProbe[M](name: String, system: ActorSystem[_], settings: TestKitSettings) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system, settings) { +class TestProbe[M](name: String, system: ActorSystem[_]) extends akka.testkit.typed.scaladsl.TestProbe[M](name)(system) { - def this(system: ActorSystem[_], settings: TestKitSettings) = this("testProbe", system, settings) + def this(system: ActorSystem[_]) = this("testProbe", system) /** * Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor. diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala index 097cda34df..a0d4645922 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/scaladsl/TestProbe.scala @@ -28,10 +28,10 @@ import scala.util.control.NonFatal object TestProbe { private val testActorId = new AtomicInteger(0) - def apply[M]()(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] = + def apply[M]()(implicit system: ActorSystem[_]): TestProbe[M] = apply(name = "testProbe") - def apply[M](name: String)(implicit system: ActorSystem[_], settings: TestKitSettings): TestProbe[M] = + def apply[M](name: String)(implicit system: ActorSystem[_]): TestProbe[M] = new TestProbe(name) private def testActor[M](queue: BlockingDeque[M]): Behavior[M] = Actor.immutable { (ctx, msg) ⇒ @@ -40,9 +40,10 @@ object TestProbe { } } -class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settings: TestKitSettings) { +class TestProbe[M](name: String)(implicit system: ActorSystem[_]) { import TestProbe._ + private implicit val settings = TestKitSettings(system) private val queue = new LinkedBlockingDeque[M] private var end: Duration = Duration.Undefined