From 3a9facfe32e80b2b105bf1b5bd702091915b1676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 9 Jan 2018 17:03:50 +0100 Subject: [PATCH] Review updates and making the Receptionist API work from Java --- .../receptionist/ReceptionistApiTest.java | 22 ++++++ .../receptionist/LocalReceptionistSpec.scala | 4 +- .../receptionist/ReceptionistImpl.scala | 1 + .../typed/receptionist/Receptionist.scala | 68 ++++++++----------- .../receptionist/ClusterReceptionist.scala | 2 +- .../typed/ReceptionistExampleTest.java | 9 +-- .../ClusterReceptionistSpec.scala | 4 +- .../typed/BasicClusterExampleSpec.scala | 30 +++++--- .../typed/ReceptionistExampleSpec.scala | 6 +- 9 files changed, 84 insertions(+), 62 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/akka/actor/typed/receptionist/ReceptionistApiTest.java diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/receptionist/ReceptionistApiTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/receptionist/ReceptionistApiTest.java new file mode 100644 index 0000000000..088afa26b9 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/receptionist/ReceptionistApiTest.java @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ +package akka.actor.typed.receptionist; + + +import akka.actor.typed.ActorRef; + +public class ReceptionistApiTest { + + public void compileOnlyApiTest() { + ActorRef ref = null; + ActorRef> listener = null; + ServiceKey key = ServiceKey.create(String.class, "id"); + Receptionist.Register register = new Receptionist.Register<>(key, ref, listener); + + ActorRef> listingRecipient = null; + Receptionist.Find find = new Receptionist.Find<>(key, listingRecipient); + + Receptionist.Subscribe subscribe = new Receptionist.Subscribe<>(key, listingRecipient); + } +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala index dd76ef0667..01e8d5a72c 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala @@ -16,11 +16,11 @@ import scala.concurrent.Future class LocalReceptionistSpec extends TestKit with TypedAkkaSpecWithShutdown with Eventually { trait ServiceA - val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a") + val ServiceKeyA = ServiceKey[ServiceA]("service-a") val behaviorA = Actor.empty[ServiceA] trait ServiceB - val ServiceKeyB = Receptionist.ServiceKey[ServiceB]("service-b") + val ServiceKeyB = ServiceKey[ServiceB]("service-b") val behaviorB = Actor.empty[ServiceB] case object Stop extends ServiceA with ServiceB diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala index 3a394bfc04..d13a7eaba9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/receptionist/ReceptionistImpl.scala @@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.Terminated import akka.actor.typed.receptionist.Receptionist._ +import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.Actor.immutable import akka.actor.typed.scaladsl.Actor.same diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala index b8a2967ef5..c771628401 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/receptionist/Receptionist.scala @@ -44,6 +44,33 @@ class Receptionist(system: ActorSystem[_]) extends Extension { } } +object ServiceKey { + /** + * Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol. + */ + def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = + ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName) + + /** + * Java API: Creates a service key. The given ID should uniquely define a service with a given protocol. + */ + def create[T](clazz: Class[T], id: String): ServiceKey[T] = + ReceptionistImpl.DefaultServiceKey(id, clazz.getName) + +} + +/** + * A service key is an object that implements this trait for a given protocol + * T, meaning that it signifies that the type T is the entry point into the + * protocol spoken by that service (think of it as the set of first messages + * that a client could send). + */ +abstract class ServiceKey[T] extends Receptionist.AbstractServiceKey { + final type Protocol = T + def id: String + def asServiceKey: ServiceKey[T] = this +} + /** * A Receptionist is an entry point into an Actor hierarchy where select Actors * publish their identity together with the protocols that they implement. Other @@ -69,33 +96,6 @@ object Receptionist extends ExtensionId[Receptionist] { def asServiceKey: ServiceKey[Protocol] } - /** - * A service key is an object that implements this trait for a given protocol - * T, meaning that it signifies that the type T is the entry point into the - * protocol spoken by that service (think of it as the set of first messages - * that a client could send). - */ - abstract class ServiceKey[T] extends AbstractServiceKey { - final type Protocol = T - def id: String - def asServiceKey: ServiceKey[T] = this - } - - object ServiceKey { - /** - * Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol. - */ - def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = - ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName) - - /** - * Java API: Creates a service key. The given ID should uniquely define a service with a given protocol. - */ - def create[T](clazz: Class[T], id: String): ServiceKey[T] = - ReceptionistImpl.DefaultServiceKey(id, clazz.getName) - - } - /** Internal superclass for external and internal commands */ @InternalApi sealed private[akka] abstract class AllCommands @@ -119,12 +119,6 @@ object Receptionist extends ExtensionId[Receptionist] { /** Auxiliary constructor to be used with the ask pattern */ def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] ⇒ Register[T] = replyTo ⇒ Register(key, service, replyTo) - - /** - * Java API - */ - def create[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) = - Register(key, serviceInstance, replyTo) } /** @@ -141,14 +135,6 @@ object Receptionist extends ExtensionId[Receptionist] { */ final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command - object Subscribe { - /** - * Java API - */ - def create[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) = - Subscribe(key, subscriber) - } - /** * Query the Receptionist for a list of all Actors implementing the given * protocol. diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index a5135c2555..5ae0a316d1 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -18,7 +18,7 @@ import akka.actor.typed.internal.receptionist.ReceptionistImpl._ import akka.actor.typed.receptionist.Receptionist.AbstractServiceKey import akka.actor.typed.receptionist.Receptionist.AllCommands import akka.actor.typed.receptionist.Receptionist.Command -import akka.actor.typed.receptionist.Receptionist.ServiceKey +import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.ActorContext import scala.language.existentials diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java index b3a5edd253..a25f971d41 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/ReceptionistExampleTest.java @@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.Actor; import akka.actor.typed.receptionist.Receptionist; +import akka.actor.typed.receptionist.ServiceKey; import scala.concurrent.Await; import scala.concurrent.duration.Duration; @@ -14,8 +15,8 @@ public class ReceptionistExampleTest { public static class PingPongExample { //#ping-service - static Receptionist.ServiceKey PingServiceKey = - Receptionist.ServiceKey.create(Ping.class, "pingService"); + static ServiceKey PingServiceKey = + ServiceKey.create(Ping.class, "pingService"); public static class Pong {} public static class Ping { @@ -28,7 +29,7 @@ public class ReceptionistExampleTest { static Behavior pingService() { return Actor.deferred((ctx) -> { ctx.getSystem().receptionist() - .tell(Receptionist.Register.create(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters())); + .tell(new Receptionist.Register<>(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters())); return Actor.immutable(Ping.class) .onMessage(Ping.class, (c, msg) -> { msg.replyTo.tell(new Pong()); @@ -55,7 +56,7 @@ public class ReceptionistExampleTest { static Behavior> guardian() { return Actor.deferred((ctx) -> { ctx.getSystem().receptionist() - .tell(Receptionist.Subscribe.create(PingServiceKey, ctx.getSelf())); + .tell(new Receptionist.Subscribe<>(PingServiceKey, ctx.getSelf())); ActorRef ps = ctx.spawnAnonymous(pingService()); ctx.watch(ps); return Actor.immutable((c, msg) -> { diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index b78d653676..78923e45ba 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -8,7 +8,7 @@ import java.nio.charset.StandardCharsets import akka.actor.ExtendedActorSystem import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown } import akka.actor.typed.internal.adapter.ActorSystemAdapter -import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.adapter._ import akka.cluster.Cluster @@ -82,7 +82,7 @@ object ClusterReceptionistSpec { } } - val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy") + val PingKey = ServiceKey[PingProtocol]("pingy") } class ClusterReceptionistSpec extends TestKit("ClusterReceptionistSpec", ClusterReceptionistSpec.config) diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala index 76deb8dad4..cb3167e000 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/BasicClusterExampleSpec.scala @@ -1,5 +1,6 @@ package docs.akka.cluster.typed +import akka.testkit.SocketUtil import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } //#cluster-imports @@ -41,7 +42,7 @@ akka { val configSystem2 = ConfigFactory.parseString( s""" - akka.remote.netty.tcp.port = 2552 + akka.remote.netty.tcp.port = 0 """ ).withFallback(configSystem1) } @@ -52,8 +53,16 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually "Cluster API" must { "init cluster" in { - val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem1) - val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem2) + // config is pulled into docs, but we don't want to hardcode ports because that makes for brittle tests + val sys1Port = SocketUtil.temporaryLocalPort() + val sys2Port = SocketUtil.temporaryLocalPort() + def config(port: Int) = ConfigFactory.parseString(s""" + akka.remote.netty.tcp.port = $port + akka.cluster.seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:$sys1Port", "akka.tcp://ClusterSystem@127.0.0.1:$sys2Port" ] + """) + + val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", config(sys1Port).withFallback(configSystem1)) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", config(sys2Port).withFallback(configSystem2)) try { val cluster1 = Cluster(system1) val cluster2 = Cluster(system2) @@ -75,13 +84,15 @@ akka { remote { netty.tcp { hostname = "127.0.0.1" - port = 0 + port = 2551 } } } #config """) + val noPort = ConfigFactory.parseString("akka.remote.netty.tcp.port = 0") + } class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually with Matchers { @@ -93,8 +104,9 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually "Cluster API" must { "init cluster" in { - val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) - val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + + val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig)) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig)) try { //#cluster-create @@ -127,9 +139,9 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually } "subscribe to cluster events" in { - implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) - val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) - val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig) + implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig)) + val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig)) + val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig)) try { val cluster1 = Cluster(system1) diff --git a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala index 279e239657..7e22fee02b 100644 --- a/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala +++ b/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/ReceptionistExampleSpec.scala @@ -4,8 +4,8 @@ import java.util.concurrent.ThreadLocalRandom import akka.actor.Address import akka.actor.typed._ -import akka.actor.typed.receptionist.Receptionist -import akka.actor.typed.receptionist.Receptionist.{ Find, Listing, ServiceKey } +import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } +import akka.actor.typed.receptionist.Receptionist.Listing import akka.actor.typed.scaladsl._ import akka.cluster.ClusterEvent._ import akka.cluster.typed.{ Cluster, Join, Subscribe } @@ -87,7 +87,7 @@ object RandomRouter { object PingPongExample { //#ping-service - val PingServiceKey = Receptionist.ServiceKey[Ping]("pingService") + val PingServiceKey = ServiceKey[Ping]("pingService") final case class Ping(replyTo: ActorRef[Pong.type]) final case object Pong