allow named adapters and add receptionist to system

This commit is contained in:
Roland Kuhn 2017-02-09 17:48:43 +01:00
parent 873e87fb33
commit 5766cd701d
16 changed files with 126 additions and 40 deletions

View file

@ -59,8 +59,10 @@ private[akka] trait Children { this: ActorCell ⇒
other
}
private[akka] def addFunctionRef(f: (ActorRef, Any) Unit): FunctionRef = {
val childPath = new ChildActorPath(self.path, randomName(new java.lang.StringBuilder("$$")), ActorCell.newUid())
private[akka] def addFunctionRef(f: (ActorRef, Any) Unit, name: String = ""): FunctionRef = {
val r = randomName(new java.lang.StringBuilder("$$"))
val n = if (name != "") s"$r-$name" else r
val childPath = new ChildActorPath(self.path, n, ActorCell.newUid())
val ref = new FunctionRef(childPath, provider, system.eventStream, f)
@tailrec def rec(): Unit = {

View file

@ -123,8 +123,22 @@ trait ActorContext[T] {
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T): ActorRef[U]
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
}
/**
@ -139,8 +153,8 @@ class StubbedActorContext[T](
override val mailboxCapacity: Int,
override val system: ActorSystem[Nothing]) extends ActorContext[T] {
val inbox = Inbox[T](name)
override val self = inbox.ref
val selfInbox = Inbox[T](name)
override val self = selfInbox.ref
private var _children = TreeMap.empty[String, Inbox[_]]
private val childName = Iterator from 1 map (Helpers.base64(_))
@ -172,37 +186,48 @@ class StubbedActorContext[T](
case Some(inbox) inbox.ref == child
}
}
def watch[U](other: ActorRef[U]): ActorRef[U] = other
def watch(other: akka.actor.ActorRef): other.type = other
def unwatch[U](other: ActorRef[U]): ActorRef[U] = other
def unwatch(other: akka.actor.ActorRef): other.type = other
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = ()
def cancelReceiveTimeout(): Unit = ()
override def watch[U](other: ActorRef[U]): ActorRef[U] = other
override def unwatch[U](other: ActorRef[U]): ActorRef[U] = other
override def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = ()
override def cancelReceiveTimeout(): Unit = ()
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable = new untyped.Cancellable {
def cancel() = false
def isCancelled = true
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable = new untyped.Cancellable {
override def cancel() = false
override def isCancelled = true
}
def executionContext: ExecutionContextExecutor = system.executionContext
override def executionContext: ExecutionContextExecutor = system.executionContext
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAnonymous[Any](Behavior.emptyBehavior)
override def spawnAdapter[U](f: U T, name: String = ""): ActorRef[U] = {
val n = if (name != "") s"${childName.next()}-$name" else childName.next()
val i = Inbox[U](n)
_children += i.ref.path.name i
new internal.FunctionRef[U](
self.path / i.ref.path.name,
(msg, _) { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } },
(self) selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null)))
}
/**
* Retrieve the named inbox. The passed ActorRef must be one that was returned
* Retrieve the inbox representing the given child actor. The passed ActorRef must be one that was returned
* by one of the spawn methods earlier.
*/
def getInbox[U](child: ActorRef[U]): Inbox[U] = {
def childInbox[U](child: ActorRef[U]): Inbox[U] = {
val inbox = _children(child.path.name).asInstanceOf[Inbox[U]]
if (inbox.ref != child) throw new IllegalArgumentException(s"$child is not a child of $this")
inbox
}
/**
* Retrieve the inbox representing the child actor with the given name.
*/
def childInbox[U](name: String): Inbox[U] = _children(name).asInstanceOf[Inbox[U]]
/**
* Remove the given inbox from the list of children, for example after
* having simulated its termination.
*/
def removeInbox(child: ActorRef[Nothing]): Unit = _children -= child.path.name
def removeChildInbox(child: ActorRef[Nothing]): Unit = _children -= child.path.name
override def toString: String = s"Inbox($self)"
}

View file

@ -18,6 +18,7 @@ import scala.concurrent.Future
* (i.e. this delivery is not reliable).
*/
abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[Nothing]] { this: internal.ActorRefImpl[T]
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.

View file

@ -139,6 +139,11 @@ trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒
* method.
*/
def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]]
/**
* Return a reference to this systems [[akka.typed.patterns.Receptionist$]].
*/
def receptionist: ActorRef[patterns.Receptionist.Command]
}
object ActorSystem {

View file

@ -111,7 +111,9 @@ object Behavior {
@SerialVersionUID(1L)
private[akka] object stoppedBehavior extends Behavior[Nothing] {
override def management(ctx: ActorContext[Nothing], msg: Signal): Behavior[Nothing] = {
assert(msg == PostStop, s"stoppedBehavior received $msg (only PostStop is expected)")
assert(
msg == PostStop || msg.isInstanceOf[Terminated],
s"stoppedBehavior received $msg (only PostStop or Terminated expected)")
this
}
override def message(ctx: ActorContext[Nothing], msg: Nothing): Behavior[Nothing] = throw new UnsupportedOperationException("Not Implemented")

View file

@ -54,6 +54,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
if (Behavior.isAlive(current)) signal(PreStart)
def currentBehavior: Behavior[T] = current
def isAlive: Boolean = Behavior.isAlive(current)
def run(msg: T): Unit = current = Behavior.canonicalize(current.message(this, msg), current)
def signal(signal: Signal): Unit = current = Behavior.canonicalize(current.management(this, signal), current)
@ -63,6 +64,11 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
effectQueue.offer(Spawned(ref.path.name))
ref
}
override def spawnAdapter[U](f: U T, name: String = ""): ActorRef[U] = {
val ref = super.spawnAdapter(f, name)
effectQueue.offer(Spawned(ref.path.name))
ref
}
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = {
effectQueue.offer(Spawned(name))
super.spawn(behavior, name)

View file

@ -52,9 +52,9 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo
import ctx.dispatcher
ctx.system.scheduler.scheduleOnce(delay, toUntyped(target), msg)
}
override def spawnAdapter[U](f: U T): ActorRef[U] = {
override def spawnAdapter[U](f: U T, name: String = ""): ActorRef[U] = {
val cell = ctx.asInstanceOf[akka.actor.ActorCell]
val ref = cell.addFunctionRef((_, msg) ctx.self ! f(msg.asInstanceOf[U]))
val ref = cell.addFunctionRef((_, msg) ctx.self ! f(msg.asInstanceOf[U]), name)
ActorRefAdapter[U](ref)
}

View file

@ -21,6 +21,8 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
extends ActorRef[T](a.RootActorPath(a.Address("akka", untyped.name)) / "user")
with ActorSystem[T] with internal.ActorRefImpl[T] {
import ActorSystemAdapter._
// Members declared in akka.typed.ActorRef
override def tell(msg: T): Unit = untyped.guardian ! msg
override def isLocal: Boolean = true
@ -52,6 +54,9 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
override def uptime: Long = untyped.uptime
override def printTree: String = untyped.printTree
override def receptionist: ActorRef[patterns.Receptionist.Command] =
ReceptionistExtension(untyped).receptionist
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
override def terminate(): scala.concurrent.Future[akka.typed.Terminated] =
@ -68,4 +73,10 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
private[typed] object ActorSystemAdapter {
def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl])
object ReceptionistExtension extends a.ExtensionKey[ReceptionistExtension]
class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension {
val receptionist: ActorRef[patterns.Receptionist.Command] =
ActorRefAdapter(system.systemActorOf(PropsAdapter(patterns.Receptionist.behavior, EmptyDeploymentConfig), "receptionist"))
}
}

View file

@ -144,12 +144,13 @@ private[typed] class ActorCell[T](
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable =
system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext)
override def spawnAdapter[U](f: U T): ActorRef[U] = {
val name = Helpers.base64(nextName, new java.lang.StringBuilder("$!"))
override def spawnAdapter[U](f: U T, _name: String = ""): ActorRef[U] = {
val baseName = Helpers.base64(nextName, new java.lang.StringBuilder("$!"))
nextName += 1
val name = if (_name != "") s"$baseName-${_name}" else baseName
val ref = new FunctionRef[U](
self.path / name,
(msg, _) send(f(msg)),
(msg, _) { val m = f(msg); if (m != null) send(m) },
(self) sendSystem(DeathWatchNotification(self, null)))
childrenMap = childrenMap.updated(name, ref)
ref

View file

@ -136,7 +136,7 @@ private[typed] object WatchableRef {
* promises are made about delivery delays: as long as the Future is not ready
* messages will be queued, afterwards they get sent without waiting.
*/
private[typed] class FutureRef[-T](_p: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_p) {
private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_path) {
import FutureRef._
// Keep in synch with `targetOffset` in companion (could also change on mixing in a trait).

View file

@ -219,6 +219,10 @@ private[typed] class ActorSystemImpl[-T](
}
private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyDeploymentConfig)
override val receptionist: ActorRef[patterns.Receptionist.Command] =
ActorRef(systemActorOf(patterns.Receptionist.behavior, "receptionist")(settings.untyped.CreationTimeout))
private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianDeployment)
// now we can start up the loggers

View file

@ -70,7 +70,7 @@ object ActorContextSpec {
final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command
case object BecameCareless extends Event
final case class GetAdapter(replyTo: ActorRef[Adapter]) extends Command
final case class GetAdapter(replyTo: ActorRef[Adapter], name: String = "") extends Command
final case class Adapter(a: ActorRef[Command]) extends Event
def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
@ -151,14 +151,13 @@ object ActorContextSpec {
monitor ! GotSignal(sig)
Same
}
case GetAdapter(replyTo)
replyTo ! Adapter(ctx.spawnAdapter(identity))
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
Same
}
}
}
/* Kept failing on CI-server, disabled until someone has time to investigate more closely
class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
"""|akka {
| loglevel = WARNING
@ -523,6 +522,15 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
msgs.toSet should ===(Set(Left(Terminated(adapter)(null)), Right(GotSignal(PostStop))))
}
})
def `41 must create a named adapter`(): Unit = sync(setup("ctx41") { (ctx, startWith)
startWith.keep { subj
subj ! GetAdapter(ctx.self, "named")
}.expectMessage(expectTimeout) { (msg, subj)
val Adapter(adapter) = msg
adapter.path.name should include("named")
}
})
}
trait Normal extends Tests {
@ -603,4 +611,3 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
object `An ActorContext with Or (right, adapted)` extends OrRight with AdaptedSystem
}
*/

View file

@ -3,7 +3,7 @@
*/
package akka.typed
import org.scalatest.Spec
import org.scalatest.refspec.RefSpec
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit.AkkaSpec
@ -19,17 +19,18 @@ import language.existentials
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import org.scalatest.concurrent.ScalaFutures
import org.scalactic.ConversionCheckedTripleEquals
import org.scalactic.Constraint
import org.scalactic.TypeCheckedTripleEquals
import org.scalactic.CanEqual
import org.junit.runner.RunWith
import scala.util.control.NonFatal
import org.scalatest.exceptions.TestFailedException
import akka.util.TypedMultiMap
/**
* Helper class for writing tests for typed Actors with ScalaTest.
*/
@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TypedSpecSetup extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals
class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals
/**
* Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter.
@ -116,13 +117,13 @@ class TypedSpec(val config: Config) extends TypedSpecSetup {
}
// for ScalaTest === compare of Class objects
implicit def classEqualityConstraint[A, B]: Constraint[Class[A], Class[B]] =
new Constraint[Class[A], Class[B]] {
implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] =
new CanEqual[Class[A], Class[B]] {
def areEqual(a: Class[A], b: Class[B]) = a == b
}
implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: Constraint[Set[A], T] =
new Constraint[Set[A], T] {
implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] =
new CanEqual[Set[A], T] {
def areEqual(a: Set[A], b: T) = a == b
}
}
@ -180,6 +181,7 @@ object TypedSpec {
}
class TypedSpecSpec extends TypedSpec {
object `A TypedSpec` {
trait CommonTests {

View file

@ -54,6 +54,9 @@ private[typed] class ActorSystemStub(val name: String)
override def printTree: String = "no tree for ActorSystemStub"
val receptionistInbox = Inbox[patterns.Receptionist.Command]("receptionist")
override def receptionist: ActorRef[patterns.Receptionist.Command] = receptionistInbox.ref
def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] = {
Future.failed(new UnsupportedOperationException("ActorSystemStub cannot create system actors"))
}

View file

@ -125,6 +125,17 @@ class ReceptionistSpec extends TypedSpec {
}
})
def `must be present in the system`(): Unit = sync(runTest("systemReceptionist") {
StepWise[Listing[ServiceA]] { (ctx, startWith)
val self = ctx.self
startWith.withKeepTraces(true) {
ctx.system.receptionist ! Find(ServiceKeyA)(self)
}.expectMessage(1.second) { (msg, _)
msg.addresses should ===(Set())
}
}
})
}
object `A Receptionist (native)` extends CommonTests with NativeSystem

View file

@ -397,7 +397,13 @@ object MiMa extends AutoPlugin {
// #22277 changes to internal classes
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpServerHandler.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpClientHandler.this"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.netty.TcpHandlers.log")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.netty.TcpHandlers.log"),
// #22105 Akka Typed process DSL
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.addFunctionRef"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef$default$2")
)
Map(