=type Cluster and local Receptionist, #23634

* a Receptionists extension

It's basically an improved copy of the former receptionist pattern which is
removed here as well.

* Cluster implementation using Distributed Data

* =typ make ActorRef.apply work for adapted actor systems
This commit is contained in:
Johannes Rudolph 2017-09-22 11:38:07 +02:00 committed by Patrik Nordwall
parent c31f6b862f
commit c2e45fa6dc
14 changed files with 702 additions and 167 deletions

View file

@ -88,6 +88,17 @@ class TypedMultiMap[T <: AnyRef, K[_ <: T]] private (private val map: Map[T, Set
}
}
def setAll(key: T)(values: Set[K[key.type]]): TypedMultiMap[T, K] =
new TypedMultiMap[T, K](map.updated(key, values.asInstanceOf[Set[Any]]))
/**
* Add all entries from the other map, overwriting existing entries.
*
* FIXME: should it merge, instead?
*/
def ++(other: TypedMultiMap[T, K]): TypedMultiMap[T, K] =
new TypedMultiMap[T, K](map ++ other.map)
override def toString: String = s"TypedMultiMap($map)"
override def equals(other: Any) = other match {
case o: TypedMultiMap[_, _] map == o.map

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.receptionist
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.cluster.Cluster
import akka.serialization.SerializerWithStringManifest
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.TypedSpec
import akka.typed.TypedSpec.Command
import akka.typed.cluster.ActorRefResolver
import akka.typed.internal.adapter.ActorRefAdapter
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.internal.receptionist.ReceptionistImpl
import akka.typed.receptionist.Receptionist
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
object ClusterReceptionistSpec {
val config = ConfigFactory.parseString(
"""
akka.log-level = DEBUG
akka.actor {
provider = cluster
serialize-messages = off
allow-java-serialization = off
serializers {
test = "akka.typed.cluster.receptionist.ClusterReceptionistSpec$PingSerializer"
}
serialization-bindings {
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Ping" = test
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Pong$" = test
"akka.typed.cluster.receptionist.ClusterReceptionistSpec$Perish$" = test
"akka.typed.internal.receptionist.ReceptionistImpl$DefaultServiceKey" = test
"akka.typed.internal.adapter.ActorRefAdapter" = test
}
}
akka.remote.artery.enabled = true
akka.remote.artery.canonical.port = 25552
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
""")
trait PingProtocol
case object Pong
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol
case object Perish extends PingProtocol
val pingPong = Actor.immutable[PingProtocol] { (ctx, msg)
msg match {
case Ping(respondTo)
respondTo ! Pong
Actor.same
case Perish
Actor.stopped
}
}
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 47
def manifest(o: AnyRef): String = o match {
case _: Ping "a"
case Pong "b"
case Perish "c"
case ReceptionistImpl.DefaultServiceKey(id) "d"
case a: ActorRefAdapter[_] "e"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: Ping ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
case Pong Array.emptyByteArray
case Perish Array.emptyByteArray
case ReceptionistImpl.DefaultServiceKey(id) id.getBytes(StandardCharsets.UTF_8)
case a: ActorRefAdapter[_] ActorRefResolver(system.toTyped).toSerializationFormat(a).getBytes(StandardCharsets.UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case "a" Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
case "b" Pong
case "c" Perish
case "d" ReceptionistImpl.DefaultServiceKey[Any](new String(bytes, StandardCharsets.UTF_8))
case "e" ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
}
}
val PingKey = Receptionist.ServiceKey[PingProtocol]("pingy")
}
class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) {
import ClusterReceptionistSpec._
implicit val testSettings = TestKitSettings(adaptedSystem)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem)
val clusterNode1 = Cluster(untypedSystem1)
val system2 = akka.actor.ActorSystem(
adaptedSystem.name,
ConfigFactory.parseString(
"""
akka.remote.artery.canonical.port = 0
"""
).withFallback(adaptedSystem.settings.config))
val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(system2)
clusterNode1.join(clusterNode1.selfAddress)
clusterNode2.join(clusterNode1.selfAddress)
object `The ClusterReceptionist` extends StartSupport {
def system: ActorSystem[Command] = adaptedSystem
import Receptionist._
def `must eventually replicate registrations to the other side`() = new TestSetup {
val regProbe = TestProbe[Any]()(adaptedSystem, testSettings)
val regProbe2 = TestProbe[Any]()(adaptedSystem2, testSettings)
adaptedSystem2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service = start(pingPong)
system.receptionist ! Register(PingKey, service, regProbe.ref)
regProbe.expectMsg(Registered(PingKey, service))
val Listing(PingKey, remoteServiceRefs) = regProbe2.expectMsgType[Listing[PingProtocol]]
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe2.ref)
regProbe2.expectMsg(Pong)
service ! Perish
regProbe2.expectMsg(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
}
}
trait TestSetup {
}
override def afterAll(): Unit = {
super.afterAll()
Await.result(adaptedSystem.terminate(), 3.seconds)
Await.result(system2.terminate(), 3.seconds)
}
}

View file

@ -58,9 +58,6 @@ private[typed] class ActorSystemStub(val name: String)
override def printTree: String = "no tree for ActorSystemStub"
val receptionistInbox = Inbox[patterns.Receptionist.Command]("receptionist")
override def receptionist: ActorRef[patterns.Receptionist.Command] = receptionistInbox.ref
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(implicit timeout: Timeout): Future[ActorRef[U]] = {
Future.failed(new UnsupportedOperationException("ActorSystemStub cannot create system actors"))
}

View file

@ -26,7 +26,7 @@ class FunctionRefSpec extends TypedSpecSetup {
val ref = ActorRef(f)
ref ! "42"
ref ! "43"
target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil)
target.receiveAll() should ===(Right("42") :: Right("43") :: Nil)
}
def `must forward messages that are received before getting the ActorRef`(): Unit = {

View file

@ -1,36 +1,54 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.patterns
package akka.typed.receptionist
import Receptionist._
import akka.typed._
import akka.typed.receptionist.Receptionist._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.typed.testkit.EffectfulActorContext
import akka.typed.testkit.Inbox
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
import akka.typed._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor._
import akka.typed.testkit.{ Effect, EffectfulActorContext, Inbox }
class ReceptionistSpec extends TypedSpec {
class LocalReceptionistSpec extends TypedSpec with Eventually {
trait ServiceA
case object ServiceKeyA extends ServiceKey[ServiceA]
val ServiceKeyA = Receptionist.ServiceKey[ServiceA]("service-a")
val behaviorA = Actor.empty[ServiceA]
trait ServiceB
case object ServiceKeyB extends ServiceKey[ServiceB]
val ServiceKeyB = Receptionist.ServiceKey[ServiceB]("service-b")
val behaviorB = Actor.empty[ServiceB]
trait CommonTests {
case object Stop extends ServiceA with ServiceB
val stoppableBehavior = Actor.immutable[Any] { (ctx, msg)
msg match {
case Stop Behavior.stopped
case _ Behavior.same
}
}
import akka.typed.internal.receptionist.ReceptionistImpl.{ localOnlyBehavior behavior }
trait CommonTests extends StartSupport {
implicit def system: ActorSystem[TypedSpec.Command]
implicit val testSettings = TestKitSettings(system)
abstract class TestSetup {
val receptionist = start(behavior)
}
def `must register a service`(): Unit = {
val ctx = new EffectfulActorContext("register", behavior, 1000, system)
val a = Inbox[ServiceA]("a")
val r = Inbox[Registered[_]]("r")
ctx.run(Register(ServiceKeyA, a.ref)(r.ref))
ctx.getAllEffects() should be(Effect.Watched(a.ref) :: Nil)
ctx.getEffect() // watching however that is implemented
r.receiveMsg() should be(Registered(ServiceKeyA, a.ref))
val q = Inbox[Listing[ServiceA]]("q")
ctx.run(Find(ServiceKeyA)(q.ref))
@ -73,38 +91,62 @@ class ReceptionistSpec extends TypedSpec {
assertEmpty(a1, a2, r, q)
}
def `must unregister services when they terminate`(): Unit = {
val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system)
val r = Inbox[Registered[_]]("r")
val a = Inbox[ServiceA]("a")
ctx.run(Register(ServiceKeyA, a.ref)(r.ref))
ctx.getEffect() should be(Effect.Watched(a.ref))
r.receiveMsg() should be(Registered(ServiceKeyA, a.ref))
def `must unregister services when they terminate`(): Unit = new TestSetup {
val regProbe = TestProbe[Any]("regProbe")
val b = Inbox[ServiceB]("b")
ctx.run(Register(ServiceKeyB, b.ref)(r.ref))
ctx.getEffect() should be(Effect.Watched(b.ref))
r.receiveMsg() should be(Registered(ServiceKeyB, b.ref))
val serviceA = start(stoppableBehavior.narrow[ServiceA])
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA))
val c = Inbox[Any]("c")
ctx.run(Register(ServiceKeyA, c.ref)(r.ref))
ctx.run(Register(ServiceKeyB, c.ref)(r.ref))
ctx.getAllEffects() should be(Seq(Effect.Watched(c.ref), Effect.Watched(c.ref)))
r.receiveMsg() should be(Registered(ServiceKeyA, c.ref))
r.receiveMsg() should be(Registered(ServiceKeyB, c.ref))
val serviceB = start(stoppableBehavior.narrow[ServiceB])
receptionist ! Register(ServiceKeyB, serviceB, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyB, serviceB))
val q = Inbox[Listing[_]]("q")
ctx.run(Find(ServiceKeyA)(q.ref))
q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref, c.ref)))
ctx.run(Find(ServiceKeyB)(q.ref))
q.receiveMsg() should be(Listing(ServiceKeyB, Set(b.ref, c.ref)))
val serviceC = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceC, regProbe.ref)
receptionist ! Register(ServiceKeyB, serviceC, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceC))
regProbe.expectMsg(Registered(ServiceKeyB, serviceC))
ctx.signal(Terminated(c.ref)(null))
ctx.run(Find(ServiceKeyA)(q.ref))
q.receiveMsg() should be(Listing(ServiceKeyA, Set(a.ref)))
ctx.run(Find(ServiceKeyB)(q.ref))
q.receiveMsg() should be(Listing(ServiceKeyB, Set(b.ref)))
assertEmpty(a, b, c, r, q)
receptionist ! Find(ServiceKeyA, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceC)))
receptionist ! Find(ServiceKeyB, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB, serviceC)))
serviceC ! Stop
eventually {
receptionist ! Find(ServiceKeyA, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyA, Set(serviceA)))
receptionist ! Find(ServiceKeyB, regProbe.ref)
regProbe.expectMsg(Listing(ServiceKeyB, Set(serviceB)))
}
}
def `must support subscribing to service changes`(): Unit = new TestSetup {
val regProbe = TestProbe[Registered[_]]("regProbe")
val aSubscriber = TestProbe[Listing[ServiceA]]("aUser")
receptionist ! Subscribe(ServiceKeyA, aSubscriber.ref)
aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
val serviceA: ActorRef[ServiceA] = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceA, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA))
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA)))
val serviceA2: ActorRef[ServiceA] = start(stoppableBehavior)
receptionist ! Register(ServiceKeyA, serviceA2, regProbe.ref)
regProbe.expectMsg(Registered(ServiceKeyA, serviceA2))
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA, serviceA2)))
serviceA ! Stop
aSubscriber.expectMsg(Listing(ServiceKeyA, Set(serviceA2)))
serviceA2 ! Stop
aSubscriber.expectMsg(Listing(ServiceKeyA, Set.empty[ActorRef[ServiceA]]))
}
def `must work with ask`(): Unit = sync(runTest("Receptionist") {
@ -132,9 +174,9 @@ class ReceptionistSpec extends TypedSpec {
StepWise[Listing[ServiceA]] { (ctx, startWith)
val self = ctx.self
startWith.withKeepTraces(true) {
ctx.system.receptionist ! Find(ServiceKeyA)(self)
system.receptionist ! Find(ServiceKeyA)(self)
}.expectMessage(1.second) { (msg, _)
msg.addresses should ===(Set())
msg.serviceInstances should ===(Set())
}
}
})

View file

@ -4,9 +4,11 @@
package akka.typed
import akka.{ actor a }
import scala.annotation.unchecked.uncheckedVariance
import language.implicitConversions
import scala.concurrent.Future
import scala.util.Success
/**
* An ActorRef is the identity or address of an Actor instance. It is valid
@ -65,7 +67,13 @@ object ActorRef {
* messages in while the Future is not fulfilled.
*/
def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
new internal.FutureRef(FuturePath, bufferSize, f)
f.value match {
// an AdaptedActorSystem will always create refs eagerly, so it will take this path
case Some(Success(ref)) ref
// for other ActorSystem implementations, this might work, it currently doesn't work
// for the adapted system, because the typed FutureRef cannot be watched from untyped
case x new internal.FutureRef(FuturePath, bufferSize, f)
}
/**
* Create an ActorRef by providing a function that is invoked for sending

View file

@ -17,6 +17,8 @@ import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import java.util.Optional
import akka.typed.receptionist.Receptionist
/**
* An ActorSystem is home to a hierarchy of Actors. It is created using
* [[ActorSystem#apply]] from a [[Behavior]] object that describes the root
@ -146,9 +148,10 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: inter
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]]
/**
* Return a reference to this systems [[akka.typed.patterns.Receptionist$]].
* Return a reference to this systems [[akka.typed.receptionist.Receptionist]].
*/
def receptionist: ActorRef[patterns.Receptionist.Command]
def receptionist: ActorRef[Receptionist.Command] =
Receptionist(this).ref
}
object ActorSystem {

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.internal.receptionist
import akka.annotation.InternalApi
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.ORMultiMap
import akka.cluster.ddata.ORMultiMapKey
import akka.cluster.ddata.Replicator
import akka.cluster.ddata.Replicator.WriteConsistency
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.internal.receptionist.ReceptionistBehaviorProvider
import akka.typed.internal.receptionist.ReceptionistImpl
import akka.typed.internal.receptionist.ReceptionistImpl._
import akka.typed.receptionist.Receptionist.AbstractServiceKey
import akka.typed.receptionist.Receptionist.AllCommands
import akka.typed.receptionist.Receptionist.Command
import akka.typed.receptionist.Receptionist.ServiceKey
import akka.typed.scaladsl.ActorContext
import scala.language.existentials
import scala.language.higherKinds
/** Internal API */
@InternalApi
private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], ActorRef[_]]("ReceptionistKey")
private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], ActorRef[_]]
case class TypedORMultiMap[K[_], V[_]](val map: ORMultiMap[K[_], V[_]]) extends AnyVal {
def getOrElse[T](key: K[T], default: Set[V[T]]): Set[V[T]] =
map.getOrElse(key, default.asInstanceOf[Set[V[_]]]).asInstanceOf[Set[V[T]]]
def getOrEmpty[T](key: K[T]): Set[V[T]] = getOrElse(key, Set.empty)
def addBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] =
TypedORMultiMap[K, V](map.addBinding(key, value))
def removeBinding[T](key: K[T], value: V[T])(implicit cluster: Cluster): TypedORMultiMap[K, V] =
TypedORMultiMap[K, V](map.removeBinding(key, value))
def toORMultiMap: ORMultiMap[K[_], V[_]] = map
}
object TypedORMultiMap {
def empty[K[_], V[_]] = TypedORMultiMap[K, V](ORMultiMap.empty[K[_], V[_]])
}
type ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef]
object ServiceRegistry {
def empty: ServiceRegistry = TypedORMultiMap.empty
def apply(map: ORMultiMap[ServiceKey[_], ActorRef[_]]): ServiceRegistry = TypedORMultiMap[ServiceKey, ActorRef](map)
}
def behavior: Behavior[Command] = clusterBehavior
val clusterBehavior: Behavior[Command] = ReceptionistImpl.init(clusteredReceptionist())
case class ClusterReceptionistSettings(
writeConsistency: WriteConsistency = Replicator.WriteLocal
)
/**
* Returns an ReceptionistImpl.ExternalInterface that synchronizes registered services with
*/
def clusteredReceptionist(settings: ClusterReceptionistSettings = ClusterReceptionistSettings())(ctx: ActorContext[AllCommands]): ReceptionistImpl.ExternalInterface = {
import akka.typed.scaladsl.adapter._
val untypedSystem = ctx.system.toUntyped
val replicator = DistributedData(untypedSystem).replicator
implicit val cluster = Cluster(untypedSystem)
var state = ServiceRegistry.empty
def diff(lastState: ServiceRegistry, newState: ServiceRegistry): Map[AbstractServiceKey, Set[ActorRef[_]]] = {
def changesForKey[T](registry: Map[AbstractServiceKey, Set[ActorRef[_]]], key: ServiceKey[T]): Map[AbstractServiceKey, Set[ActorRef[_]]] = {
val oldValues = lastState.getOrEmpty(key)
val newValues = newState.getOrEmpty(key)
if (oldValues != newValues)
registry + (key newValues.asInstanceOf[Set[ActorRef[_]]])
else
registry
}
val allKeys = lastState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet
allKeys
.foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]])(changesForKey(_, _))
}
val adapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.spawnAdapter[Replicator.ReplicatorMessage] { (x: Replicator.ReplicatorMessage)
x match {
case changed @ Replicator.Changed(ReceptionistKey)
val value = changed.get(ReceptionistKey)
val oldState = state
state = ServiceRegistry(value) // is that thread-safe?
val changes = diff(oldState, state)
RegistrationsChangedExternally(changes)
}
}
replicator ! Replicator.Subscribe(ReceptionistKey, adapter.toUntyped)
new ExternalInterface {
private def updateRegistry(update: ServiceRegistry ServiceRegistry): Unit = {
state = update(state)
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry
update(ServiceRegistry(registry)).toORMultiMap
}
}
def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit =
updateRegistry(_.addBinding(key, address))
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit =
updateRegistry(_.removeBinding(key, address))
}
}
}

View file

@ -5,22 +5,29 @@ package akka.typed
package internal
import com.typesafe.config.Config
import scala.concurrent.ExecutionContext
import java.util.concurrent.ThreadFactory
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.{ actor a, dispatch d, event e }
import scala.util.control.NonFatal
import scala.util.control.ControlThrowable
import scala.collection.immutable
import akka.typed.Dispatchers
import scala.concurrent.Promise
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.util.Success
import akka.util.Timeout
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger
import akka.typed.receptionist.Receptionist
import akka.typed.scaladsl.AskPattern
object ActorSystemImpl {
@ -227,9 +234,6 @@ private[typed] class ActorSystemImpl[-T](
private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyProps)
override val receptionist: ActorRef[patterns.Receptionist.Command] =
ActorRef(systemActorOf(patterns.Receptionist.behavior, "receptionist")(settings.untyped.CreationTimeout))
private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianProps)
// now we can start up the loggers

View file

@ -25,7 +25,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
/**
* Hook for ActorSystem to load extensions on startup
*/
protected final def loadExtensions() {
protected final def loadExtensions(): Unit = {
/**
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
*/

View file

@ -5,17 +5,13 @@ package akka.typed
package internal
package adapter
import akka.{ actor a, dispatch d }
import akka.dispatch.sysmsg
import akka.{ actor a }
import scala.concurrent.ExecutionContextExecutor
import akka.util.Timeout
import scala.concurrent.Future
import akka.annotation.InternalApi
import akka.typed.scaladsl.adapter.AdapterExtension
import scala.annotation.unchecked.uncheckedVariance
/**
* INTERNAL API. Lightweight wrapper for presenting an untyped ActorSystem to a Behavior (via the context).
@ -66,9 +62,6 @@ import scala.annotation.unchecked.uncheckedVariance
override def uptime: Long = untyped.uptime
override def printTree: String = untyped.printTree
override def receptionist: ActorRef[patterns.Receptionist.Command] =
ReceptionistExtension(untyped).receptionist
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
override def terminate(): scala.concurrent.Future[akka.typed.Terminated] =
@ -103,19 +96,5 @@ private[typed] object ActorSystemAdapter {
case _ throw new UnsupportedOperationException("only adapted untyped ActorSystem permissible " +
s"($sys of class ${sys.getClass.getName})")
}
object ReceptionistExtension extends a.ExtensionId[ReceptionistExtension] with a.ExtensionIdProvider {
override def get(system: a.ActorSystem): ReceptionistExtension = super.get(system)
override def lookup = ReceptionistExtension
override def createExtension(system: a.ExtendedActorSystem): ReceptionistExtension =
new ReceptionistExtension(system)
}
class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension {
val receptionist: ActorRef[patterns.Receptionist.Command] =
ActorRefAdapter(system.systemActorOf(
PropsAdapter(() patterns.Receptionist.behavior, EmptyProps),
"receptionist"))
}
}

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.internal.receptionist
import akka.annotation.InternalApi
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.Terminated
import akka.typed.receptionist.Receptionist._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.immutable
import akka.typed.scaladsl.Actor.same
import akka.typed.scaladsl.ActorContext
import akka.util.TypedMultiMap
import scala.reflect.ClassTag
/**
* Marker interface to use with dynamic access
*
* Internal API
*/
@InternalApi
private[typed] trait ReceptionistBehaviorProvider {
def behavior: Behavior[Command]
}
/** Internal API */
@InternalApi
private[typed] object ReceptionistImpl extends ReceptionistBehaviorProvider {
// FIXME: make sure to provide serializer
case class DefaultServiceKey[T](id: String)(implicit tTag: ClassTag[T]) extends ServiceKey[T] {
override def toString: String = s"ServiceKey[$tTag]($id)"
}
/**
* Interface to allow plugging of external service discovery infrastructure in to the existing receptionist API.
*/
trait ExternalInterface {
def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit
}
object LocalExternalInterface extends ExternalInterface {
def onRegister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = ()
def onUnregister[T](key: ServiceKey[T], address: ActorRef[T]): Unit = ()
}
override def behavior: Behavior[Command] = localOnlyBehavior
val localOnlyBehavior: Behavior[Command] = init(_ LocalExternalInterface)
type KV[K <: AbstractServiceKey] = ActorRef[K#Protocol]
type LocalServiceRegistry = TypedMultiMap[AbstractServiceKey, KV]
object LocalServiceRegistry {
val empty: LocalServiceRegistry = TypedMultiMap.empty[AbstractServiceKey, KV]
}
sealed abstract class ReceptionistInternalCommand extends InternalCommand
final case class RegisteredActorTerminated[T](key: ServiceKey[T], address: ActorRef[T]) extends ReceptionistInternalCommand
final case class SubscriberTerminated[T](key: ServiceKey[T], address: ActorRef[Listing[T]]) extends ReceptionistInternalCommand
final case class RegistrationsChangedExternally(changes: Map[AbstractServiceKey, Set[ActorRef[_]]]) extends ReceptionistInternalCommand
type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[Listing[K#Protocol]]
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
private[typed] def init(externalInterfaceFactory: ActorContext[AllCommands] ExternalInterface): Behavior[Command] =
Actor.deferred[AllCommands] { ctx
val externalInterface = externalInterfaceFactory(ctx)
behavior(
TypedMultiMap.empty[AbstractServiceKey, KV],
TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV],
externalInterface)
}.narrow[Command]
private def behavior(
serviceRegistry: LocalServiceRegistry,
subscriptions: SubscriptionRegistry,
externalInterface: ExternalInterface): Behavior[AllCommands] = {
/** Helper to create new state */
def next(newRegistry: LocalServiceRegistry = serviceRegistry, newSubscriptions: SubscriptionRegistry = subscriptions) =
behavior(newRegistry, newSubscriptions, externalInterface)
/**
* Hack to allow multiple termination notifications per target
* FIXME: replace by simple map in our state
*/
def watchWith(ctx: ActorContext[AllCommands], target: ActorRef[_], msg: AllCommands): Unit =
ctx.spawnAnonymous[Nothing](Actor.deferred[Nothing] { innerCtx
innerCtx.watch(target)
Actor.immutable[Nothing]((_, _) Actor.same)
.onSignal {
case (_, Terminated(`target`))
ctx.self ! msg
Actor.stopped
}
})
/** Helper that makes sure that subscribers are notified when an entry is changed */
def updateRegistry(changedKeysHint: Set[AbstractServiceKey], f: LocalServiceRegistry LocalServiceRegistry): Behavior[AllCommands] = {
val newRegistry = f(serviceRegistry)
def notifySubscribersFor[T](key: AbstractServiceKey): Unit = {
val newListing = newRegistry.get(key)
subscriptions.get(key).foreach(_ ! Listing(key.asServiceKey, newListing))
}
changedKeysHint foreach notifySubscribersFor
next(newRegistry = newRegistry)
}
def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing[T]]): Unit =
replyTo ! Listing(key, serviceRegistry get key)
immutable[AllCommands] { (ctx, msg)
msg match {
case Register(key, serviceInstance, replyTo)
println(s"[${ctx.self}] Actor was registered: $key $serviceInstance")
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
replyTo ! Registered(key, serviceInstance)
externalInterface.onRegister(key, serviceInstance)
updateRegistry(Set(key), _.inserted(key)(serviceInstance))
case Find(key, replyTo)
replyWithListing(key, replyTo)
same
case RegistrationsChangedExternally(changes)
println(s"[${ctx.self}] Registration changed: $changes")
// FIXME: get rid of casts
def makeChanges(registry: LocalServiceRegistry): LocalServiceRegistry =
changes.foldLeft(registry) {
case (reg, (key, values))
reg.setAll(key)(values.asInstanceOf[Set[ActorRef[key.Protocol]]])
}
updateRegistry(changes.keySet, makeChanges) // overwrite all changed keys
case RegisteredActorTerminated(key, serviceInstance)
println(s"[${ctx.self}] Registered actor terminated: $key $serviceInstance")
externalInterface.onUnregister(key, serviceInstance)
updateRegistry(Set(key), _.removed(key)(serviceInstance))
case Subscribe(key, subscriber)
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
// immediately reply with initial listings to the new subscriber
replyWithListing(key, subscriber)
next(newSubscriptions = subscriptions.inserted(key)(subscriber))
case SubscriberTerminated(key, subscriber)
next(newSubscriptions = subscriptions.removed(key)(subscriber))
}
}
}
}

View file

@ -1,93 +0,0 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.patterns
import akka.typed.ActorRef
import akka.typed.Behavior
import akka.typed.Terminated
import akka.util.TypedMultiMap
import akka.typed.scaladsl.Actor._
/**
* A Receptionist is an entry point into an Actor hierarchy where select Actors
* publish their identity together with the protocols that they implement. Other
* Actors need only know the Receptionists identity in order to be able to use
* the services of the registered Actors.
*/
object Receptionist {
/**
* Internal representation of [[Receptionist.ServiceKey]] which is needed
* in order to use a TypedMultiMap (using keys with a type parameter does not
* work in Scala 2.x).
*/
trait AbstractServiceKey {
type Type
}
/**
* A service key is an object that implements this trait for a given protocol
* T, meaning that it signifies that the type T is the entry point into the
* protocol spoken by that service (think of it as the set of first messages
* that a client could send).
*/
trait ServiceKey[T] extends AbstractServiceKey {
final override type Type = T
}
/**
* The set of commands accepted by a Receptionist.
*/
sealed trait Command
/**
* Associate the given [[akka.typed.ActorRef]] with the given [[ServiceKey]]. Multiple
* registrations can be made for the same key. Unregistration is implied by
* the end of the referenced Actors lifecycle.
*/
final case class Register[T](key: ServiceKey[T], address: ActorRef[T])(val replyTo: ActorRef[Registered[T]]) extends Command
/**
* Query the Receptionist for a list of all Actors implementing the given
* protocol.
*/
final case class Find[T](key: ServiceKey[T])(val replyTo: ActorRef[Listing[T]]) extends Command
/**
* Confirmation that the given [[akka.typed.ActorRef]] has been associated with the [[ServiceKey]].
*/
final case class Registered[T](key: ServiceKey[T], address: ActorRef[T])
/**
* Current listing of all Actors that implement the protocol given by the [[ServiceKey]].
*/
final case class Listing[T](key: ServiceKey[T], addresses: Set[ActorRef[T]])
/**
* Initial behavior of a receptionist, used to create a new receptionist like in the following:
*
* {{{
* val receptionist: ActorRef[Receptionist.Command] = ctx.spawn(Props(Receptionist.behavior), "receptionist")
* }}}
*/
val behavior: Behavior[Command] = behavior(TypedMultiMap.empty[AbstractServiceKey, KV])
private type KV[K <: AbstractServiceKey] = ActorRef[K#Type]
private def behavior(map: TypedMultiMap[AbstractServiceKey, KV]): Behavior[Command] = immutable[Command] { (ctx, msg)
msg match {
case r: Register[t]
ctx.watch(r.address)
r.replyTo ! Registered(r.key, r.address)
behavior(map.inserted(r.key)(r.address))
case f: Find[t]
val set = map get f.key
f.replyTo ! Listing(f.key, set)
same
}
} onSignal {
case (ctx, Terminated(ref))
behavior(map valueRemoved ref)
case x unhandled
}
}
abstract class Receptionist

View file

@ -0,0 +1,150 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.receptionist
import akka.annotation.InternalApi
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.Extension
import akka.typed.ExtensionId
import akka.typed.internal.receptionist.ReceptionistBehaviorProvider
import akka.typed.internal.receptionist.ReceptionistImpl
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.reflect.ClassTag
class Receptionist(system: ActorSystem[_]) extends Extension {
private def hasCluster: Boolean =
// FIXME: replace with better indicator that cluster is enabled
system.settings.config.getString("akka.actor.provider") == "cluster"
val ref: ActorRef[Receptionist.Command] = {
val behavior =
if (hasCluster)
system.dynamicAccess
.createInstanceFor[ReceptionistBehaviorProvider]("akka.typed.cluster.internal.receptionist.ClusterReceptionist$", Nil)
.recover {
case ex
system.log.error(
ex,
"ClusterReceptionist could not be loaded dynamically. Make sure you have all required binaries on the classpath.")
ReceptionistImpl
}.get.behavior
else ReceptionistImpl.localOnlyBehavior
ActorRef(
system.systemActorOf(behavior, "receptionist")(
// FIXME: where should that timeout be configured? Shouldn't there be a better `Extension`
// implementation that does this dance for us?
10.seconds
))
}
}
/**
* A Receptionist is an entry point into an Actor hierarchy where select Actors
* publish their identity together with the protocols that they implement. Other
* Actors need only know the Receptionists identity in order to be able to use
* the services of the registered Actors.
*/
object Receptionist extends ExtensionId[Receptionist] {
def createExtension(system: ActorSystem[_]): Receptionist = new Receptionist(system)
def get(system: ActorSystem[_]): Receptionist = apply(system)
/**
* Internal representation of [[ServiceKey]] which is needed
* in order to use a TypedMultiMap (using keys with a type parameter does not
* work in Scala 2.x).
*
* Internal API
*/
@InternalApi
private[typed] sealed abstract class AbstractServiceKey {
type Protocol
/** Type-safe down-cast */
def asServiceKey: ServiceKey[Protocol]
}
/**
* A service key is an object that implements this trait for a given protocol
* T, meaning that it signifies that the type T is the entry point into the
* protocol spoken by that service (think of it as the set of first messages
* that a client could send).
*/
abstract class ServiceKey[T] extends AbstractServiceKey {
final type Protocol = T
def id: String
def asServiceKey: ServiceKey[T] = this
}
object ServiceKey {
/**
* Creates a service key. The given ID should uniquely define a service with a given protocol.
*/
// FIXME: not sure if the ClassTag pulls its weight. It's only used in toString currently.
def apply[T](id: String)(implicit tTag: ClassTag[T]): ServiceKey[T] = ReceptionistImpl.DefaultServiceKey(id)
}
/** Internal superclass for external and internal commands */
@InternalApi
sealed private[typed] abstract class AllCommands
/**
* The set of commands accepted by a Receptionist.
*/
sealed abstract class Command extends AllCommands
@InternalApi
private[typed] abstract class InternalCommand extends AllCommands
/**
* Associate the given [[akka.typed.ActorRef]] with the given [[ServiceKey]]. Multiple
* registrations can be made for the same key. Unregistration is implied by
* the end of the referenced Actors lifecycle.
*
* Registration will be acknowledged with the [[Registered]] message to the given replyTo actor.
*/
final case class Register[T](key: ServiceKey[T], serviceInstance: ActorRef[T], replyTo: ActorRef[Registered[T]]) extends Command
object Register {
/** Auxiliary constructor to be used with the ask pattern */
def apply[T](key: ServiceKey[T], service: ActorRef[T]): ActorRef[Registered[T]] Register[T] =
replyTo Register(key, service, replyTo)
}
/**
* Confirmation that the given [[akka.typed.ActorRef]] has been associated with the [[ServiceKey]].
*/
final case class Registered[T](key: ServiceKey[T], serviceInstance: ActorRef[T])
/**
* Subscribe the given actor to service updates. When new instances are registered or unregistered to the given key
* the given subscriber will be sent a [[Listing]] with the new set of instances for that service.
*
* The subscription will be acknowledged by sending out a first [[Listing]]. The subscription automatically ends
* with the termination of the subscriber.
*/
final case class Subscribe[T](key: ServiceKey[T], subscriber: ActorRef[Listing[T]]) extends Command
/**
* Query the Receptionist for a list of all Actors implementing the given
* protocol.
*/
final case class Find[T](key: ServiceKey[T], replyTo: ActorRef[Listing[T]]) extends Command
object Find {
/** Auxiliary constructor to use with the ask pattern */
def apply[T](key: ServiceKey[T]): ActorRef[Listing[T]] Find[T] =
replyTo Find(key, replyTo)
}
/**
* Current listing of all Actors that implement the protocol given by the [[ServiceKey]].
*/
final case class Listing[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]) {
/** Java API */
def getServiceInstances: java.util.Set[ActorRef[T]] = serviceInstances.asJava
}
}