Review updates and making the Receptionist API work from Java

This commit is contained in:
Johan Andrén 2018-01-09 17:03:50 +01:00 committed by Konrad `ktoso` Malawski
parent 45c7303d3f
commit 3a9facfe32
9 changed files with 84 additions and 62 deletions

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor.typed.receptionist;
import akka.actor.typed.ActorRef;
public class ReceptionistApiTest {
public void compileOnlyApiTest() {
ActorRef<String> ref = null;
ActorRef<Receptionist.Registered<String>> listener = null;
ServiceKey<String> key = ServiceKey.create(String.class, "id");
Receptionist.Register register = new Receptionist.Register<>(key, ref, listener);
ActorRef<Receptionist.Listing<String>> listingRecipient = null;
Receptionist.Find find = new Receptionist.Find<>(key, listingRecipient);
Receptionist.Subscribe subscribe = new Receptionist.Subscribe<>(key, listingRecipient);
}
}

View file

@ -16,11 +16,11 @@ import scala.concurrent.Future
class LocalReceptionistSpec extends TestKit with TypedAkkaSpecWithShutdown with Eventually {
trait ServiceA
val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a")
val ServiceKeyA = ServiceKey[ServiceA]("service-a")
val behaviorA = Actor.empty[ServiceA]
trait ServiceB
val ServiceKeyB = Receptionist.ServiceKey[ServiceB]("service-b")
val ServiceKeyB = ServiceKey[ServiceB]("service-b")
val behaviorB = Actor.empty[ServiceB]
case object Stop extends ServiceA with ServiceB

View file

@ -8,6 +8,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Terminated
import akka.actor.typed.receptionist.Receptionist._
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.Actor.immutable
import akka.actor.typed.scaladsl.Actor.same

View file

@ -44,6 +44,33 @@ class Receptionist(system: ActorSystem[_]) extends Extension {
}
}
object ServiceKey {
/**
* Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName)
/**
* Java API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
def create[T](clazz: Class[T], id: String): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, clazz.getName)
}
/**
* A service key is an object that implements this trait for a given protocol
* T, meaning that it signifies that the type T is the entry point into the
* protocol spoken by that service (think of it as the set of first messages
* that a client could send).
*/
abstract class ServiceKey[T] extends Receptionist.AbstractServiceKey {
final type Protocol = T
def id: String
def asServiceKey: ServiceKey[T] = this
}
/**
* A Receptionist is an entry point into an Actor hierarchy where select Actors
* publish their identity together with the protocols that they implement. Other
@ -69,33 +96,6 @@ object Receptionist extends ExtensionId[Receptionist] {
def asServiceKey: ServiceKey[Protocol]
}
/**
* A service key is an object that implements this trait for a given protocol
* T, meaning that it signifies that the type T is the entry point into the
* protocol spoken by that service (think of it as the set of first messages
* that a client could send).
*/
abstract class ServiceKey[T] extends AbstractServiceKey {
final type Protocol = T
def id: String
def asServiceKey: ServiceKey[T] = this
}
object ServiceKey {
/**
* Scala API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, implicitly[ClassTag[T]].runtimeClass.getName)
/**
* Java API: Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
def create[T](clazz: Class[T], id: String): ServiceKey[T] =
ReceptionistImpl.DefaultServiceKey(id, clazz.getName)
}
/** Internal superclass for external and internal commands */
@InternalApi
sealed private[akka] abstract class AllCommands
@ -119,12 +119,6 @@ object Receptionist extends ExtensionId[Receptionist] {
/** Auxiliary constructor to be used with the ask pattern */
def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] Register[T] =
replyTo Register(key, service, replyTo)
/**
* Java API
*/
def create[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) =
Register(key, serviceInstance, replyTo)
}
/**
@ -141,14 +135,6 @@ object Receptionist extends ExtensionId[Receptionist] {
*/
final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command
object Subscribe {
/**
* Java API
*/
def create[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) =
Subscribe(key, subscriber)
}
/**
* Query the Receptionist for a list of all Actors implementing the given
* protocol.

View file

@ -18,7 +18,7 @@ import akka.actor.typed.internal.receptionist.ReceptionistImpl._
import akka.actor.typed.receptionist.Receptionist.AbstractServiceKey
import akka.actor.typed.receptionist.Receptionist.AllCommands
import akka.actor.typed.receptionist.Receptionist.Command
import akka.actor.typed.receptionist.Receptionist.ServiceKey
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.ActorContext
import scala.language.existentials

View file

@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Actor;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
@ -14,8 +15,8 @@ public class ReceptionistExampleTest {
public static class PingPongExample {
//#ping-service
static Receptionist.ServiceKey<Ping> PingServiceKey =
Receptionist.ServiceKey.create(Ping.class, "pingService");
static ServiceKey<Ping> PingServiceKey =
ServiceKey.create(Ping.class, "pingService");
public static class Pong {}
public static class Ping {
@ -28,7 +29,7 @@ public class ReceptionistExampleTest {
static Behavior<Ping> pingService() {
return Actor.deferred((ctx) -> {
ctx.getSystem().receptionist()
.tell(Receptionist.Register.create(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters()));
.tell(new Receptionist.Register<>(PingServiceKey, ctx.getSelf(), ctx.getSystem().deadLetters()));
return Actor.immutable(Ping.class)
.onMessage(Ping.class, (c, msg) -> {
msg.replyTo.tell(new Pong());
@ -55,7 +56,7 @@ public class ReceptionistExampleTest {
static Behavior<Receptionist.Listing<Ping>> guardian() {
return Actor.deferred((ctx) -> {
ctx.getSystem().receptionist()
.tell(Receptionist.Subscribe.create(PingServiceKey, ctx.getSelf()));
.tell(new Receptionist.Subscribe<>(PingServiceKey, ctx.getSelf()));
ActorRef<Ping> ps = ctx.spawnAnonymous(pingService());
ctx.watch(ps);
return Actor.immutable((c, msg) -> {

View file

@ -8,7 +8,7 @@ import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown }
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Cluster
@ -82,7 +82,7 @@ object ClusterReceptionistSpec {
}
}
val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy")
val PingKey = ServiceKey[PingProtocol]("pingy")
}
class ClusterReceptionistSpec extends TestKit("ClusterReceptionistSpec", ClusterReceptionistSpec.config)

View file

@ -1,5 +1,6 @@
package docs.akka.cluster.typed
import akka.testkit.SocketUtil
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
//#cluster-imports
@ -41,7 +42,7 @@ akka {
val configSystem2 = ConfigFactory.parseString(
s"""
akka.remote.netty.tcp.port = 2552
akka.remote.netty.tcp.port = 0
"""
).withFallback(configSystem1)
}
@ -52,8 +53,16 @@ class BasicClusterConfigSpec extends WordSpec with ScalaFutures with Eventually
"Cluster API" must {
"init cluster" in {
val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem1)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", configSystem2)
// config is pulled into docs, but we don't want to hardcode ports because that makes for brittle tests
val sys1Port = SocketUtil.temporaryLocalPort()
val sys2Port = SocketUtil.temporaryLocalPort()
def config(port: Int) = ConfigFactory.parseString(s"""
akka.remote.netty.tcp.port = $port
akka.cluster.seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:$sys1Port", "akka.tcp://ClusterSystem@127.0.0.1:$sys2Port" ]
""")
val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", config(sys1Port).withFallback(configSystem1))
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", config(sys2Port).withFallback(configSystem2))
try {
val cluster1 = Cluster(system1)
val cluster2 = Cluster(system2)
@ -75,13 +84,15 @@ akka {
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
port = 2551
}
}
}
#config
""")
val noPort = ConfigFactory.parseString("akka.remote.netty.tcp.port = 0")
}
class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually with Matchers {
@ -93,8 +104,9 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
"Cluster API" must {
"init cluster" in {
val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig))
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig))
try {
//#cluster-create
@ -127,9 +139,9 @@ class BasicClusterManualSpec extends WordSpec with ScalaFutures with Eventually
}
"subscribe to cluster events" in {
implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", clusterConfig)
implicit val system1 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig))
val system2 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig))
val system3 = ActorSystem[Nothing](Actor.empty, "ClusterSystem", noPort.withFallback(clusterConfig))
try {
val cluster1 = Cluster(system1)

View file

@ -4,8 +4,8 @@ import java.util.concurrent.ThreadLocalRandom
import akka.actor.Address
import akka.actor.typed._
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.Receptionist.{ Find, Listing, ServiceKey }
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.receptionist.Receptionist.Listing
import akka.actor.typed.scaladsl._
import akka.cluster.ClusterEvent._
import akka.cluster.typed.{ Cluster, Join, Subscribe }
@ -87,7 +87,7 @@ object RandomRouter {
object PingPongExample {
//#ping-service
val PingServiceKey = Receptionist.ServiceKey[Ping]("pingService")
val PingServiceKey = ServiceKey[Ping]("pingService")
final case class Ping(replyTo: ActorRef[Pong.type])
final case object Pong