diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index b08f4710ce..beed838f76 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -59,8 +59,10 @@ private[akka] trait Children { this: ActorCell ⇒ other } - private[akka] def addFunctionRef(f: (ActorRef, Any) ⇒ Unit): FunctionRef = { - val childPath = new ChildActorPath(self.path, randomName(new java.lang.StringBuilder("$$")), ActorCell.newUid()) + private[akka] def addFunctionRef(f: (ActorRef, Any) ⇒ Unit, name: String = ""): FunctionRef = { + val r = randomName(new java.lang.StringBuilder("$$")) + val n = if (name != "") s"$r-$name" else r + val childPath = new ChildActorPath(self.path, n, ActorCell.newUid()) val ref = new FunctionRef(childPath, provider, system.eventStream, f) @tailrec def rec(): Unit = { diff --git a/akka-typed/src/main/scala/akka/typed/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/ActorContext.scala index 72082f0707..5ce4e40e3e 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorContext.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorContext.scala @@ -123,8 +123,22 @@ trait ActorContext[T] { * Create a child actor that will wrap messages such that other Actor’s * protocols can be ingested by this Actor. You are strongly advised to cache * these ActorRefs or to stop them when no longer needed. + * + * The name of the child actor will be composed of a unique identifier + * starting with a dollar sign to which the given `name` argument is + * appended, with an inserted hyphen between these two parts. Therefore + * the given `name` argument does not need to be unique within the scope + * of the parent actor. */ - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] + def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] + + /** + * Create an anonymous child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + */ + def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAdapter(f, "") + } /** @@ -139,8 +153,8 @@ class StubbedActorContext[T]( override val mailboxCapacity: Int, override val system: ActorSystem[Nothing]) extends ActorContext[T] { - val inbox = Inbox[T](name) - override val self = inbox.ref + val selfInbox = Inbox[T](name) + override val self = selfInbox.ref private var _children = TreeMap.empty[String, Inbox[_]] private val childName = Iterator from 1 map (Helpers.base64(_)) @@ -172,37 +186,48 @@ class StubbedActorContext[T]( case Some(inbox) ⇒ inbox.ref == child } } - def watch[U](other: ActorRef[U]): ActorRef[U] = other - def watch(other: akka.actor.ActorRef): other.type = other - def unwatch[U](other: ActorRef[U]): ActorRef[U] = other - def unwatch(other: akka.actor.ActorRef): other.type = other - def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = () - def cancelReceiveTimeout(): Unit = () + override def watch[U](other: ActorRef[U]): ActorRef[U] = other + override def unwatch[U](other: ActorRef[U]): ActorRef[U] = other + override def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = () + override def cancelReceiveTimeout(): Unit = () - def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable = new untyped.Cancellable { - def cancel() = false - def isCancelled = true + override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable = new untyped.Cancellable { + override def cancel() = false + override def isCancelled = true } - def executionContext: ExecutionContextExecutor = system.executionContext + override def executionContext: ExecutionContextExecutor = system.executionContext - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAnonymous[Any](Behavior.emptyBehavior) + override def spawnAdapter[U](f: U ⇒ T, name: String = ""): ActorRef[U] = { + val n = if (name != "") s"${childName.next()}-$name" else childName.next() + val i = Inbox[U](n) + _children += i.ref.path.name → i + new internal.FunctionRef[U]( + self.path / i.ref.path.name, + (msg, _) ⇒ { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } }, + (self) ⇒ selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null))) + } /** - * Retrieve the named inbox. The passed ActorRef must be one that was returned + * Retrieve the inbox representing the given child actor. The passed ActorRef must be one that was returned * by one of the spawn methods earlier. */ - def getInbox[U](child: ActorRef[U]): Inbox[U] = { + def childInbox[U](child: ActorRef[U]): Inbox[U] = { val inbox = _children(child.path.name).asInstanceOf[Inbox[U]] if (inbox.ref != child) throw new IllegalArgumentException(s"$child is not a child of $this") inbox } + /** + * Retrieve the inbox representing the child actor with the given name. + */ + def childInbox[U](name: String): Inbox[U] = _children(name).asInstanceOf[Inbox[U]] + /** * Remove the given inbox from the list of children, for example after * having simulated its termination. */ - def removeInbox(child: ActorRef[Nothing]): Unit = _children -= child.path.name + def removeChildInbox(child: ActorRef[Nothing]): Unit = _children -= child.path.name override def toString: String = s"Inbox($self)" } diff --git a/akka-typed/src/main/scala/akka/typed/ActorRef.scala b/akka-typed/src/main/scala/akka/typed/ActorRef.scala index a3e1f94e0c..43cdcb3ef4 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorRef.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorRef.scala @@ -18,6 +18,7 @@ import scala.concurrent.Future * (i.e. this delivery is not reliable). */ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[Nothing]] { this: internal.ActorRefImpl[T] ⇒ + /** * Send a message to the Actor referenced by this ActorRef using *at-most-once* * messaging semantics. diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index ecece14dab..5d1e28a510 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -139,6 +139,11 @@ trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒ * method. */ def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] + + /** + * Return a reference to this system’s [[akka.typed.patterns.Receptionist$]]. + */ + def receptionist: ActorRef[patterns.Receptionist.Command] } object ActorSystem { diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index b2995810d0..b3f1eb2858 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -111,7 +111,9 @@ object Behavior { @SerialVersionUID(1L) private[akka] object stoppedBehavior extends Behavior[Nothing] { override def management(ctx: ActorContext[Nothing], msg: Signal): Behavior[Nothing] = { - assert(msg == PostStop, s"stoppedBehavior received $msg (only PostStop is expected)") + assert( + msg == PostStop || msg.isInstanceOf[Terminated], + s"stoppedBehavior received $msg (only PostStop or Terminated expected)") this } override def message(ctx: ActorContext[Nothing], msg: Nothing): Behavior[Nothing] = throw new UnsupportedOperationException("Not Implemented") diff --git a/akka-typed/src/main/scala/akka/typed/Effects.scala b/akka-typed/src/main/scala/akka/typed/Effects.scala index 678b188dfe..c8acad73ae 100644 --- a/akka-typed/src/main/scala/akka/typed/Effects.scala +++ b/akka-typed/src/main/scala/akka/typed/Effects.scala @@ -54,6 +54,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma if (Behavior.isAlive(current)) signal(PreStart) def currentBehavior: Behavior[T] = current + def isAlive: Boolean = Behavior.isAlive(current) def run(msg: T): Unit = current = Behavior.canonicalize(current.message(this, msg), current) def signal(signal: Signal): Unit = current = Behavior.canonicalize(current.management(this, signal), current) @@ -63,6 +64,11 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma effectQueue.offer(Spawned(ref.path.name)) ref } + override def spawnAdapter[U](f: U ⇒ T, name: String = ""): ActorRef[U] = { + val ref = super.spawnAdapter(f, name) + effectQueue.offer(Spawned(ref.path.name)) + ref + } override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = { effectQueue.offer(Spawned(name)) super.spawn(behavior, name) diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala index 7d8ae9d377..dcd5733b68 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala @@ -52,9 +52,9 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo import ctx.dispatcher ctx.system.scheduler.scheduleOnce(delay, toUntyped(target), msg) } - override def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = { + override def spawnAdapter[U](f: U ⇒ T, name: String = ""): ActorRef[U] = { val cell = ctx.asInstanceOf[akka.actor.ActorCell] - val ref = cell.addFunctionRef((_, msg) ⇒ ctx.self ! f(msg.asInstanceOf[U])) + val ref = cell.addFunctionRef((_, msg) ⇒ ctx.self ! f(msg.asInstanceOf[U]), name) ActorRefAdapter[U](ref) } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala index 7c44463705..1fd87b0a03 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala @@ -21,6 +21,8 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) extends ActorRef[T](a.RootActorPath(a.Address("akka", untyped.name)) / "user") with ActorSystem[T] with internal.ActorRefImpl[T] { + import ActorSystemAdapter._ + // Members declared in akka.typed.ActorRef override def tell(msg: T): Unit = untyped.guardian ! msg override def isLocal: Boolean = true @@ -52,6 +54,9 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) override def uptime: Long = untyped.uptime override def printTree: String = untyped.printTree + override def receptionist: ActorRef[patterns.Receptionist.Command] = + ReceptionistExtension(untyped).receptionist + import akka.dispatch.ExecutionContexts.sameThreadExecutionContext override def terminate(): scala.concurrent.Future[akka.typed.Terminated] = @@ -68,4 +73,10 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) private[typed] object ActorSystemAdapter { def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl]) + + object ReceptionistExtension extends a.ExtensionKey[ReceptionistExtension] + class ReceptionistExtension(system: a.ExtendedActorSystem) extends a.Extension { + val receptionist: ActorRef[patterns.Receptionist.Command] = + ActorRefAdapter(system.systemActorOf(PropsAdapter(patterns.Receptionist.behavior, EmptyDeploymentConfig), "receptionist")) + } } diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index 9ae334dfef..d3e13e9c1f 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -144,12 +144,13 @@ private[typed] class ActorCell[T]( override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable = system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext) - override def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = { - val name = Helpers.base64(nextName, new java.lang.StringBuilder("$!")) + override def spawnAdapter[U](f: U ⇒ T, _name: String = ""): ActorRef[U] = { + val baseName = Helpers.base64(nextName, new java.lang.StringBuilder("$!")) nextName += 1 + val name = if (_name != "") s"$baseName-${_name}" else baseName val ref = new FunctionRef[U]( self.path / name, - (msg, _) ⇒ send(f(msg)), + (msg, _) ⇒ { val m = f(msg); if (m != null) send(m) }, (self) ⇒ sendSystem(DeathWatchNotification(self, null))) childrenMap = childrenMap.updated(name, ref) ref diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorRefImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorRefImpl.scala index 42c6ab9dcd..0176727070 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorRefImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorRefImpl.scala @@ -136,7 +136,7 @@ private[typed] object WatchableRef { * promises are made about delivery delays: as long as the Future is not ready * messages will be queued, afterwards they get sent without waiting. */ -private[typed] class FutureRef[-T](_p: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_p) { +private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_path) { import FutureRef._ // Keep in synch with `targetOffset` in companion (could also change on mixing in a trait). diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala index cc0f5e5464..824c1d3c66 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala @@ -219,6 +219,10 @@ private[typed] class ActorSystemImpl[-T]( } private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyDeploymentConfig) + + override val receptionist: ActorRef[patterns.Receptionist.Command] = + ActorRef(systemActorOf(patterns.Receptionist.behavior, "receptionist")(settings.untyped.CreationTimeout)) + private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianDeployment) // now we can start up the loggers diff --git a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala index cc6e763f8d..418e9ba62a 100644 --- a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala @@ -70,7 +70,7 @@ object ActorContextSpec { final case class BecomeCareless(replyTo: ActorRef[BecameCareless.type]) extends Command case object BecameCareless extends Event - final case class GetAdapter(replyTo: ActorRef[Adapter]) extends Command + final case class GetAdapter(replyTo: ActorRef[Adapter], name: String = "") extends Command final case class Adapter(a: ActorRef[Command]) extends Event def subject(monitor: ActorRef[Monitor]): Behavior[Command] = @@ -151,14 +151,13 @@ object ActorContextSpec { monitor ! GotSignal(sig) Same } - case GetAdapter(replyTo) ⇒ - replyTo ! Adapter(ctx.spawnAdapter(identity)) + case GetAdapter(replyTo, name) ⇒ + replyTo ! Adapter(ctx.spawnAdapter(identity, name)) Same } } } -/* Kept failing on CI-server, disabled until someone has time to investigate more closely class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( """|akka { | loglevel = WARNING @@ -523,6 +522,15 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( msgs.toSet should ===(Set(Left(Terminated(adapter)(null)), Right(GotSignal(PostStop)))) } }) + + def `41 must create a named adapter`(): Unit = sync(setup("ctx41") { (ctx, startWith) ⇒ + startWith.keep { subj ⇒ + subj ! GetAdapter(ctx.self, "named") + }.expectMessage(expectTimeout) { (msg, subj) ⇒ + val Adapter(adapter) = msg + adapter.path.name should include("named") + } + }) } trait Normal extends Tests { @@ -603,4 +611,3 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( object `An ActorContext with Or (right, adapted)` extends OrRight with AdaptedSystem } -*/ diff --git a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala index 9fc9bf4474..d451cf45f8 100644 --- a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala @@ -3,7 +3,7 @@ */ package akka.typed -import org.scalatest.Spec +import org.scalatest.refspec.RefSpec import org.scalatest.Matchers import org.scalatest.BeforeAndAfterAll import akka.testkit.AkkaSpec @@ -19,17 +19,18 @@ import language.existentials import akka.testkit.EventFilter import akka.testkit.TestEvent.Mute import org.scalatest.concurrent.ScalaFutures -import org.scalactic.ConversionCheckedTripleEquals -import org.scalactic.Constraint +import org.scalactic.TypeCheckedTripleEquals +import org.scalactic.CanEqual import org.junit.runner.RunWith import scala.util.control.NonFatal import org.scalatest.exceptions.TestFailedException +import akka.util.TypedMultiMap /** * Helper class for writing tests for typed Actors with ScalaTest. */ @RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TypedSpecSetup extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals +class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with ScalaFutures with TypeCheckedTripleEquals /** * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. @@ -116,13 +117,13 @@ class TypedSpec(val config: Config) extends TypedSpecSetup { } // for ScalaTest === compare of Class objects - implicit def classEqualityConstraint[A, B]: Constraint[Class[A], Class[B]] = - new Constraint[Class[A], Class[B]] { + implicit def classEqualityConstraint[A, B]: CanEqual[Class[A], Class[B]] = + new CanEqual[Class[A], Class[B]] { def areEqual(a: Class[A], b: Class[B]) = a == b } - implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: Constraint[Set[A], T] = - new Constraint[Set[A], T] { + implicit def setEqualityConstraint[A, T <: Set[_ <: A]]: CanEqual[Set[A], T] = + new CanEqual[Set[A], T] { def areEqual(a: Set[A], b: T) = a == b } } @@ -180,6 +181,7 @@ object TypedSpec { } class TypedSpecSpec extends TypedSpec { + object `A TypedSpec` { trait CommonTests { diff --git a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemStub.scala b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemStub.scala index e51b6cc4c0..b05c424bbf 100644 --- a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemStub.scala +++ b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemStub.scala @@ -54,6 +54,9 @@ private[typed] class ActorSystemStub(val name: String) override def printTree: String = "no tree for ActorSystemStub" + val receptionistInbox = Inbox[patterns.Receptionist.Command]("receptionist") + override def receptionist: ActorRef[patterns.Receptionist.Command] = receptionistInbox.ref + def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] = { Future.failed(new UnsupportedOperationException("ActorSystemStub cannot create system actors")) } diff --git a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala index d4c548c8d4..a4c24c3c43 100644 --- a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala @@ -125,6 +125,17 @@ class ReceptionistSpec extends TypedSpec { } }) + def `must be present in the system`(): Unit = sync(runTest("systemReceptionist") { + StepWise[Listing[ServiceA]] { (ctx, startWith) ⇒ + val self = ctx.self + startWith.withKeepTraces(true) { + ctx.system.receptionist ! Find(ServiceKeyA)(self) + }.expectMessage(1.second) { (msg, _) ⇒ + msg.addresses should ===(Set()) + } + } + }) + } object `A Receptionist (native)` extends CommonTests with NativeSystem diff --git a/project/MiMa.scala b/project/MiMa.scala index 30c48d8ce9..d3ea8bf141 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -397,7 +397,13 @@ object MiMa extends AutoPlugin { // #22277 changes to internal classes ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpServerHandler.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.transport.netty.TcpClientHandler.this"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.netty.TcpHandlers.log") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.netty.TcpHandlers.log"), + + // #22105 Akka Typed process DSL + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.addFunctionRef"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.dungeon.Children.addFunctionRef$default$2") ) Map(