diff --git a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala index ec9a14ed9d..ae911f2689 100644 --- a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala +++ b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala @@ -81,7 +81,7 @@ class IntroSpec extends TypedSpec { // using global pool since we want to run tasks after system.terminate import scala.concurrent.ExecutionContext.Implicits.global - val system: ActorSystem[Greet] = ActorSystem("hello", Props(greeter)) + val system: ActorSystem[Greet] = ActorSystem("hello", greeter) val future: Future[Greeted] = system ? (Greet("world", _)) @@ -111,11 +111,11 @@ class IntroSpec extends TypedSpec { //#chatroom-gabbler //#chatroom-main - val main: Behavior[Unit] = + val main: Behavior[akka.NotUsed] = Full { case Sig(ctx, PreStart) => - val chatRoom = ctx.spawn(Props(ChatRoom.behavior), "chatroom") - val gabblerRef = ctx.spawn(Props(gabbler), "gabbler") + val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom") + val gabblerRef = ctx.spawn(gabbler, "gabbler") ctx.watch(gabblerRef) chatRoom ! GetSession("ol’ Gabbler", gabblerRef) Same @@ -123,7 +123,7 @@ class IntroSpec extends TypedSpec { Stopped } - val system = ActorSystem("ChatRoomDemo", Props(main)) + val system = ActorSystem("ChatRoomDemo", main) Await.result(system.whenTerminated, 1.second) //#chatroom-main } diff --git a/akka-docs/rst/scala/typed.rst b/akka-docs/rst/scala/typed.rst index cff938b4d1..dbf9be61aa 100644 --- a/akka-docs/rst/scala/typed.rst +++ b/akka-docs/rst/scala/typed.rst @@ -57,10 +57,7 @@ Now we want to try out this Actor, so we must start an ActorSystem to host it: .. includecode:: code/docs/akka/typed/IntroSpec.scala#hello-world After importing the Actor’s protocol definition we start an Actor system from -the defined behavior, wrapping it in :class:`Props` like an actor on stage. The -props we are giving to this one are just the defaults, we could at this point -also configure which thread pool will be used to run it or its mailbox capacity -for incoming messages. +the defined behavior. As Carl Hewitt said, one Actor is no Actor—it would be quite lonely with nobody to talk to. In this sense the example is a little cruel because we only @@ -73,7 +70,7 @@ properly typed already, no type checks or casts needed. This is possible due to the type information that is part of the message protocol: the ``?`` operator takes as argument a function that accepts an :class:`ActorRef[U]` (which explains the ``_`` hole in the expression on line 7 above) and the ``replyTo`` -parameter which we fill in like that is of type ``ActorRef[Greeted]``, which +parameter which we fill in is of type ``ActorRef[Greeted]``, which means that the value that fulfills the :class:`Promise` can only be of type :class:`Greeted`. @@ -195,7 +192,7 @@ The behavior that we declare here can handle both subtypes of :class:`Command`. trigger the dissemination of the contained chat room message to all connected clients. But we do not want to give the ability to send :class:`PostSessionMessage` commands to arbitrary clients, we reserve that -right to the wrappers we create—otherwise clients could pose as complete +right to the wrappers we create—otherwise clients could pose as completely different screen names (imagine the :class:`GetSession` protocol to include authentication information to further secure this). Therefore we narrow the behavior down to only accepting :class:`GetSession` commands before exposing it @@ -261,7 +258,7 @@ choice: In good tradition we call the ``main`` Actor what it is, it directly corresponds to the ``main`` method in a traditional Java application. This Actor will perform its job on its own accord, we do not need to send messages -from the outside, so we declare it to be of type ``Unit``. Actors receive not +from the outside, so we declare it to be of type ``NotUsed``. Actors receive not only external messages, they also are notified of certain system events, so-called Signals. In order to get access to those we choose to implement this particular one using the :class:`Full` behavior decorator. The name stems from diff --git a/akka-typed/src/main/scala/akka/typed/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/ActorContext.scala index 32f360a94b..3dc3aa57d1 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorContext.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorContext.scala @@ -38,9 +38,9 @@ trait ActorContext[T] { def self: ActorRef[T] /** - * The [[Props]] from which this Actor was created. + * Return the mailbox capacity that was configured by the parent for this actor. */ - def props: Props[T] + def mailboxCapacity: Int /** * The [[ActorSystem]] to which this Actor belongs. @@ -62,12 +62,12 @@ trait ActorContext[T] { * Create a child Actor from the given [[Props]] under a randomly chosen name. * It is good practice to name Actors wherever practical. */ - def spawnAnonymous[U](props: Props[U]): ActorRef[U] + def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] /** * Create a child Actor from the given [[Props]] and with the given name. */ - def spawn[U](props: Props[U], name: String): ActorRef[U] + def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] /** * Force the child Actor under the given name to terminate after it finishes @@ -135,9 +135,9 @@ trait ActorContext[T] { * See [[EffectfulActorContext]] for more advanced uses. */ class StubbedActorContext[T]( - val name: String, - override val props: Props[T], - override val system: ActorSystem[Nothing]) extends ActorContext[T] { + val name: String, + override val mailboxCapacity: Int, + override val system: ActorSystem[Nothing]) extends ActorContext[T] { val inbox = Inbox[T](name) override val self = inbox.ref @@ -147,12 +147,12 @@ class StubbedActorContext[T]( override def children: Iterable[ActorRef[Nothing]] = _children.values map (_.ref) override def child(name: String): Option[ActorRef[Nothing]] = _children get name map (_.ref) - override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = { + override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = { val i = Inbox[U](childName.next()) _children += i.ref.path.name → i i.ref } - override def spawn[U](props: Props[U], name: String): ActorRef[U] = + override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = _children get name match { case Some(_) ⇒ throw new untyped.InvalidActorNameException(s"actor name $name is already taken") case None ⇒ @@ -186,7 +186,7 @@ class StubbedActorContext[T]( def executionContext: ExecutionContextExecutor = system.executionContext - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAnonymous(Props.empty) + def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAnonymous[Any](Behavior.emptyBehavior) /** * Retrieve the named inbox. The passed ActorRef must be one that was returned diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index f9073eaba8..e4b8ea1b4a 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -120,13 +120,15 @@ object ActorSystem { * Akka Typed [[Behavior]] hierarchies—this system cannot run untyped * [[akka.actor.Actor]] instances. */ - def apply[T](name: String, guardianProps: Props[T], - config: Option[Config] = None, - classLoader: Option[ClassLoader] = None, - executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { + def apply[T](name: String, guardianBehavior: Behavior[T], + guardianDeployment: DeploymentConfig = EmptyDeploymentConfig, + config: Option[Config] = None, + classLoader: Option[ClassLoader] = None, + executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { + Behavior.validateAsInitial(guardianBehavior) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) - new ActorSystemImpl(name, appConfig, cl, executionContext, guardianProps) + new ActorSystemImpl(name, appConfig, cl, executionContext, guardianBehavior, guardianDeployment) } /** @@ -134,13 +136,15 @@ object ActorSystem { * which runs Akka Typed [[Behavior]] on an emulation layer. In this * system typed and untyped actors can coexist. */ - def adapter[T](name: String, guardianProps: Props[T], - config: Option[Config] = None, - classLoader: Option[ClassLoader] = None, - executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { + def adapter[T](name: String, guardianBehavior: Behavior[T], + guardianDeployment: DeploymentConfig = EmptyDeploymentConfig, + config: Option[Config] = None, + classLoader: Option[ClassLoader] = None, + executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { + Behavior.validateAsInitial(guardianBehavior) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) - val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianProps))) + val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianBehavior, guardianDeployment))) untyped.start() new ActorSystemAdapter(untyped) } diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index 5b3e22e44b..89bb6959fe 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -131,6 +131,18 @@ object Behavior { case other ⇒ other } + def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] = + behavior match { + case `sameBehavior` | `unhandledBehavior` ⇒ + throw new IllegalArgumentException(s"cannot use $behavior as initial behavior") + case x ⇒ x + } + + def preStart[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { + val b = validateAsInitial(behavior) + if (isAlive(b)) canonicalize(b.management(ctx, PreStart), b) else b + } + def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne stoppedBehavior def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq unhandledBehavior diff --git a/akka-typed/src/main/scala/akka/typed/Deployment.scala b/akka-typed/src/main/scala/akka/typed/Deployment.scala new file mode 100644 index 0000000000..c0ba4591be --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/Deployment.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.typed + +import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor } +import java.util.concurrent.{ Executor, Executors } +import scala.reflect.ClassTag +import scala.annotation.tailrec + +/** + * Data structure for describing an actor’s deployment details like which + * executor to run it on. + * + * Deliberately not sealed in order to emphasize future extensibility by the + * framework—this is not intended to be extended by user code. + */ +abstract class DeploymentConfig extends Product with Serializable { + def next: DeploymentConfig + def ++(next: DeploymentConfig): DeploymentConfig + + def withDispatcherDefault: DeploymentConfig = DispatcherDefault(this) + def withDispatcherFromConfig(path: String): DeploymentConfig = DispatcherFromConfig(path, this) + def withDispatcherFromExecutor(executor: Executor): DeploymentConfig = DispatcherFromExecutor(executor, this) + def withDispatcherFromExecutionContext(ec: ExecutionContext): DeploymentConfig = DispatcherFromExecutionContext(ec, this) + + def withMailboxCapacity(capacity: Int): DeploymentConfig = MailboxCapacity(capacity, this) + + def firstOrElse[T <: DeploymentConfig: ClassTag](default: T): T = { + @tailrec def rec(d: DeploymentConfig): T = { + d match { + case EmptyDeploymentConfig ⇒ default + case t: T ⇒ t + case _ ⇒ rec(d.next) + } + } + rec(this) + } + + def allOf[T <: DeploymentConfig: ClassTag]: List[DeploymentConfig] = { + @tailrec def select(d: DeploymentConfig, acc: List[DeploymentConfig]): List[DeploymentConfig] = + d match { + case EmptyDeploymentConfig ⇒ acc.reverse + case t: T ⇒ select(d.next, (d ++ EmptyDeploymentConfig) :: acc) + case _ ⇒ select(d.next, acc) + } + select(this, Nil) + } + + def filterNot[T <: DeploymentConfig: ClassTag]: DeploymentConfig = { + @tailrec def select(d: DeploymentConfig, acc: List[DeploymentConfig]): List[DeploymentConfig] = + d match { + case EmptyDeploymentConfig ⇒ acc + case t: T ⇒ select(d.next, acc) + case _ ⇒ select(d.next, d :: acc) + } + @tailrec def link(l: List[DeploymentConfig], acc: DeploymentConfig): DeploymentConfig = + l match { + case d :: ds ⇒ link(ds, d ++ acc) + case Nil ⇒ acc + } + link(select(this, Nil), EmptyDeploymentConfig) + } +} + +final case class MailboxCapacity(capacity: Int, next: DeploymentConfig = EmptyDeploymentConfig) extends DeploymentConfig { + override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next) +} + +case object EmptyDeploymentConfig extends DeploymentConfig { + override def next = throw new NoSuchElementException("EmptyDeploymentConfig has no next") + override def ++(next: DeploymentConfig): DeploymentConfig = next +} + +sealed trait DispatcherSelector extends DeploymentConfig + +sealed case class DispatcherDefault(next: DeploymentConfig) extends DispatcherSelector { + override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next) +} +object DispatcherDefault { + // this is hidden in order to avoid having people match on this object + private val empty = DispatcherDefault(EmptyDeploymentConfig) + def apply(): DispatcherDefault = empty +} +final case class DispatcherFromConfig(path: String, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector { + override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next) +} +final case class DispatcherFromExecutor(executor: Executor, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector { + override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next) +} +final case class DispatcherFromExecutionContext(ec: ExecutionContext, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector { + override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next) +} + +trait Dispatchers { + def lookup(selector: DispatcherSelector): ExecutionContextExecutor + def shutdown(): Unit +} diff --git a/akka-typed/src/main/scala/akka/typed/Dispatchers.scala b/akka-typed/src/main/scala/akka/typed/Dispatchers.scala deleted file mode 100644 index 169c20cb4f..0000000000 --- a/akka-typed/src/main/scala/akka/typed/Dispatchers.scala +++ /dev/null @@ -1,13 +0,0 @@ -/** - * Copyright (C) 2016 Lightbend Inc. - */ -package akka.typed - -import scala.concurrent.ExecutionContextExecutor -import java.util.concurrent.Executors -import akka.event.LoggingAdapter - -trait Dispatchers { - def lookup(selector: DispatcherSelector): ExecutionContextExecutor - def shutdown(): Unit -} diff --git a/akka-typed/src/main/scala/akka/typed/Effects.scala b/akka-typed/src/main/scala/akka/typed/Effects.scala index 555aee3732..133e959e23 100644 --- a/akka-typed/src/main/scala/akka/typed/Effects.scala +++ b/akka-typed/src/main/scala/akka/typed/Effects.scala @@ -30,8 +30,8 @@ object Effect { * An [[ActorContext]] for testing purposes that records the effects performed * on it and otherwise stubs them out like a [[StubbedActorContext]]. */ -class EffectfulActorContext[T](_name: String, _props: Props[T], _system: ActorSystem[Nothing]) - extends StubbedActorContext[T](_name, _props, _system) { +class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _mailboxCapacity: Int, _system: ActorSystem[Nothing]) + extends StubbedActorContext[T](_name, _mailboxCapacity, _system) { import akka.{ actor ⇒ a } import Effect._ @@ -49,22 +49,23 @@ class EffectfulActorContext[T](_name: String, _props: Props[T], _system: ActorSy } def hasEffects: Boolean = effectQueue.peek() != null - private var current = props.creator() - signal(PreStart) + private var current = _initialBehavior + + if (Behavior.isAlive(current)) signal(PreStart) def currentBehavior: Behavior[T] = current def run(msg: T): Unit = current = Behavior.canonicalize(current.message(this, msg), current) def signal(signal: Signal): Unit = current = Behavior.canonicalize(current.management(this, signal), current) - override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = { - val ref = super.spawnAnonymous(props) + override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = { + val ref = super.spawnAnonymous(behavior) effectQueue.offer(Spawned(ref.path.name)) ref } - override def spawn[U](props: Props[U], name: String): ActorRef[U] = { + override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = { effectQueue.offer(Spawned(name)) - super.spawn(props, name) + super.spawn(behavior, name) } override def stop(child: ActorRef[Nothing]): Boolean = { effectQueue.offer(Stopped(child.path.name)) diff --git a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala index 0f33f6a0a5..9efb4988ad 100644 --- a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala +++ b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala @@ -42,14 +42,6 @@ final case object PreStart extends Signal @SerialVersionUID(1L) final case object PreRestart extends Signal -/** - * Lifecycle signal that is fired upon restart of the Actor after replacing - * the behavior with the fresh one (i.e. this signal is received within the - * fresh replacement behavior). - */ -@SerialVersionUID(1L) -final case object PostRestart extends Signal - /** * Lifecycle signal that is fired after this actor and all its child actors * (transitively) have terminated. The [[Terminated]] signal is only sent to diff --git a/akka-typed/src/main/scala/akka/typed/Props.scala b/akka-typed/src/main/scala/akka/typed/Props.scala deleted file mode 100644 index ca29cf1571..0000000000 --- a/akka-typed/src/main/scala/akka/typed/Props.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.typed - -import java.util.concurrent.Executor -import scala.concurrent.ExecutionContext - -sealed trait DispatcherSelector -case object DispatcherDefault extends DispatcherSelector -final case class DispatcherFromConfig(path: String) extends DispatcherSelector -final case class DispatcherFromExecutor(executor: Executor) extends DispatcherSelector -final case class DispatcherFromExecutionContext(ec: ExecutionContext) extends DispatcherSelector - -/** - * Props describe how to dress up a [[Behavior]] so that it can become an Actor. - */ -final case class Props[T](creator: () ⇒ Behavior[T], dispatcher: DispatcherSelector, mailboxCapacity: Int) { - def withDispatcher(configPath: String) = copy(dispatcher = DispatcherFromConfig(configPath)) - def withDispatcher(executor: Executor) = copy(dispatcher = DispatcherFromExecutor(executor)) - def withDispatcher(ec: ExecutionContext) = copy(dispatcher = DispatcherFromExecutionContext(ec)) - def withQueueSize(size: Int) = copy(mailboxCapacity = size) -} - -/** - * Props describe how to dress up a [[Behavior]] so that it can become an Actor. - */ -object Props { - /** - * Create a Props instance from a block of code that creates a [[Behavior]]. - * - * FIXME: investigate the pros and cons of making this take an explicit - * function instead of a by-name argument - */ - def apply[T](block: ⇒ Behavior[T]): Props[T] = Props(() ⇒ block, DispatcherDefault, Int.MaxValue) - - /** - * Props for a Behavior that just ignores all messages. - */ - def empty[T]: Props[T] = _empty.asInstanceOf[Props[T]] - private val _empty: Props[Any] = Props(ScalaDSL.Static[Any] { case _ ⇒ ScalaDSL.Unhandled }) - -} diff --git a/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala b/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala index b0a3488deb..42652b5a46 100644 --- a/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala +++ b/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala @@ -59,6 +59,22 @@ object ScalaDSL { override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" } + /** + * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation + * is deferred to the child actor instead of running within the parent. + */ + final case class Deferred[T](factory: () ⇒ Behavior[T]) extends Behavior[T] { + override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = { + if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)") + Behavior.preStart(factory(), ctx) + } + + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = + throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)") + + override def toString: String = s"Deferred(${LineNumbers(factory)})" + } + /** * Return this behavior from message processing in order to advise the * system to reuse the previous behavior. This is provided in order to @@ -140,8 +156,7 @@ object ScalaDSL { context.stop(child) } behavior.applyOrElse(Sig(context, PostStop), fallback) - case Sig(context, PostRestart) ⇒ behavior.applyOrElse(Sig(context, PreStart), fallback) - case _ ⇒ Unhandled + case _ ⇒ Unhandled } behavior.applyOrElse(Sig(ctx, msg), fallback) } @@ -253,27 +268,40 @@ object ScalaDSL { * sides of [[And]] and [[Or]] combinators. */ final case class SynchronousSelf[T](f: ActorRef[T] ⇒ Behavior[T]) extends Behavior[T] { - private val inbox = Inbox[T]("synchronousSelf") - private var _behavior = f(inbox.ref) - private def behavior = _behavior - private def setBehavior(ctx: ActorContext[T], b: Behavior[T]): Unit = - _behavior = canonicalize(b, _behavior) - // FIXME should we protect against infinite loops? - @tailrec private def run(ctx: ActorContext[T], next: Behavior[T]): Behavior[T] = { - setBehavior(ctx, next) - if (inbox.hasMessages) run(ctx, behavior.message(ctx, inbox.receiveMsg())) - else if (isUnhandled(next)) Unhandled - else if (isAlive(next)) this - else Stopped + private class B extends Behavior[T] { + private val inbox = Inbox[T]("synchronousSelf") + private var _behavior = Behavior.validateAsInitial(f(inbox.ref)) + private def behavior = _behavior + private def setBehavior(ctx: ActorContext[T], b: Behavior[T]): Unit = + _behavior = canonicalize(b, _behavior) + + // FIXME should we protect against infinite loops? + @tailrec private def run(ctx: ActorContext[T], next: Behavior[T]): Behavior[T] = { + setBehavior(ctx, next) + if (inbox.hasMessages) run(ctx, behavior.message(ctx, inbox.receiveMsg())) + else if (isUnhandled(next)) Unhandled + else if (isAlive(next)) this + else Stopped + } + + override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = + run(ctx, behavior.management(ctx, msg)) + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = + run(ctx, behavior.message(ctx, msg)) + + override def toString: String = s"SynchronousSelf($behavior)" } - override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = - run(ctx, behavior.management(ctx, msg)) - override def message(ctx: ActorContext[T], msg: T): Behavior[T] = - run(ctx, behavior.message(ctx, msg)) + override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = { + if (msg != PreStart) throw new IllegalStateException(s"SynchronousSelf must receive PreStart as first message (got $msg)") + Behavior.preStart(new B(), ctx) + } - override def toString: String = s"SynchronousSelf($behavior)" + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = + throw new IllegalStateException(s"SynchronousSelf must receive PreStart as first message (got $msg)") + + override def toString: String = s"SynchronousSelf(${LineNumbers(f)})" } /** @@ -392,12 +420,8 @@ object ScalaDSL { */ def SelfAware[T](behavior: ActorRef[T] ⇒ Behavior[T]): Behavior[T] = FullTotal { - case Sig(ctx, signal) ⇒ - val behv = behavior(ctx.self) - canonicalize(behv.management(ctx, signal), behv) - case Msg(ctx, msg) ⇒ - val behv = behavior(ctx.self) - canonicalize(behv.message(ctx, msg), behv) + case Sig(ctx, PreStart) ⇒ Behavior.preStart(behavior(ctx.self), ctx) + case msg ⇒ throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)") } /** @@ -416,12 +440,8 @@ object ScalaDSL { */ def ContextAware[T](behavior: ActorContext[T] ⇒ Behavior[T]): Behavior[T] = FullTotal { - case Sig(ctx, signal) ⇒ - val behv = behavior(ctx) - canonicalize(behv.management(ctx, signal), behv) - case Msg(ctx, msg) ⇒ - val behv = behavior(ctx) - canonicalize(behv.message(ctx, msg), behv) + case Sig(ctx, PreStart) ⇒ Behavior.preStart(behavior(ctx), ctx) + case msg ⇒ throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $msg)") } /** diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala index bf7237196e..fa5ca67a15 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorAdapter.scala @@ -6,16 +6,12 @@ package adapter import akka.{ actor ⇒ a } -private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) extends a.Actor { +private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor { import Behavior._ - var behavior: Behavior[T] = _ + var behavior: Behavior[T] = _initialBehavior - { - behavior = canonicalize(_initialBehavior(), behavior) - if (behavior == null) throw new IllegalStateException("initial behavior cannot be `same` or `unhandled`") - if (!isAlive(behavior)) context.stop(self) - } + if (!isAlive(behavior)) context.stop(self) val ctx = new ActorContextAdapter[T](context) @@ -59,7 +55,7 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten override def preRestart(reason: Throwable, message: Option[Any]): Unit = next(behavior.management(ctx, PreRestart), PreRestart) override def postRestart(reason: Throwable): Unit = - next(behavior.management(ctx, PostRestart), PostRestart) + next(behavior.management(ctx, PreStart), PreStart) override def postStop(): Unit = next(behavior.management(ctx, PostStop), PostStop) } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala index 5cc02bbf58..7818561d9e 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala @@ -14,12 +14,14 @@ import scala.concurrent.ExecutionContextExecutor private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorContext[T] { override def self = ActorRefAdapter(ctx.self) - override def props = PropsAdapter(ctx.props) override val system = ActorSystemAdapter(ctx.system) + override def mailboxCapacity = 1 << 29 // FIXME override def children = ctx.children.map(ActorRefAdapter(_)) override def child(name: String) = ctx.child(name).map(ActorRefAdapter(_)) - override def spawnAnonymous[U](props: Props[U]) = ctx.spawnAnonymous(props) - override def spawn[U](props: Props[U], name: String) = ctx.spawn(props, name) + override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) = + ctx.spawnAnonymous(behavior, deployment) + override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) = + ctx.spawn(behavior, name, deployment) override def stop(child: ActorRef[Nothing]) = toUntyped(child) match { case f: akka.actor.FunctionRef ⇒ diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala index 3b2dd72bd3..1d00842c63 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorSystemAdapter.scala @@ -29,10 +29,10 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) override def dispatchers: Dispatchers = new Dispatchers { override def lookup(selector: DispatcherSelector): ExecutionContextExecutor = selector match { - case DispatcherDefault ⇒ untyped.dispatcher - case DispatcherFromConfig(str) ⇒ untyped.dispatchers.lookup(str) - case DispatcherFromExecutionContext(_) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter") - case DispatcherFromExecutor(_) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter") + case DispatcherDefault(_) ⇒ untyped.dispatcher + case DispatcherFromConfig(str, _) ⇒ untyped.dispatchers.lookup(str) + case DispatcherFromExecutionContext(_, _) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter") + case DispatcherFromExecutor(_, _) ⇒ throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter") } override def shutdown(): Unit = () // there was no shutdown in untyped Akka } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala index 0377f93c4c..e0a15c3897 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/PropsAdapter.scala @@ -9,14 +9,14 @@ import akka.{ actor ⇒ a } private[typed] object PropsAdapter { // FIXME dispatcher and queue size - def apply(p: Props[_]): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (p.creator: AnyRef) :: Nil) + def apply(b: Behavior[_], deploy: DeploymentConfig): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (b: AnyRef) :: Nil) - def apply[T](p: a.Props): Props[T] = { + def apply[T](p: a.Props): Behavior[T] = { assert(p.clazz == classOf[ActorAdapter[_]], "typed.Actor must have typed.Props") p.args match { - case (creator: Function0[_]) :: Nil ⇒ + case (initial: Behavior[_]) :: Nil ⇒ // FIXME queue size - Props(creator.asInstanceOf[() ⇒ Behavior[T]], DispatcherFromConfig(p.deploy.dispatcher), Int.MaxValue) + initial.asInstanceOf[Behavior[T]] case _ ⇒ throw new AssertionError("typed.Actor args must be right") } } diff --git a/akka-typed/src/main/scala/akka/typed/adapter/package.scala b/akka-typed/src/main/scala/akka/typed/adapter/package.scala index 055fc6a17a..10e4e7a24d 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/package.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/package.scala @@ -9,17 +9,17 @@ package object adapter { import akka.dispatch.sysmsg implicit class ActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal { - def spawnAnonymous[T](props: Props[T]): ActorRef[T] = - ActorRefAdapter(sys.actorOf(PropsAdapter(props))) - def spawn[T](props: Props[T], name: String): ActorRef[T] = - ActorRefAdapter(sys.actorOf(PropsAdapter(props), name)) + def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment))) + def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name)) } implicit class ActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal { - def spawnAnonymous[T](props: Props[T]): ActorRef[T] = - ActorRefAdapter(ctx.actorOf(PropsAdapter(props))) - def spawn[T](props: Props[T], name: String): ActorRef[T] = - ActorRefAdapter(ctx.actorOf(PropsAdapter(props), name)) + def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment))) + def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] = + ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name)) } implicit def actorRefAdapter(ref: akka.actor.ActorRef): ActorRef[Any] = ActorRefAdapter(ref) diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index 5f335e8d08..fa19a03cd4 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -65,9 +65,11 @@ object ActorCell { * INTERNAL API */ private[typed] class ActorCell[T]( - override val system: ActorSystem[Nothing], - override val props: Props[T], - val parent: ActorRefImpl[Nothing]) + override val system: ActorSystem[Nothing], + protected val initialBehavior: Behavior[T], + override val executionContext: ExecutionContextExecutor, + override val mailboxCapacity: Int, + val parent: ActorRefImpl[Nothing]) extends ActorContext[T] with Runnable with SupervisionMechanics[T] with DeathWatch[T] { import ActorCell._ @@ -98,10 +100,12 @@ private[typed] class ActorCell[T]( protected def ctx: ActorContext[T] = this - override def spawn[U](props: Props[U], name: String): ActorRef[U] = { + override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRef[U] = { if (childrenMap contains name) throw new InvalidActorNameException(s"actor name [$name] is not unique") if (terminatingMap contains name) throw new InvalidActorNameException(s"actor name [$name] is not yet free") - val cell = new ActorCell[U](system, props, self) + val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext)) + val capacity = deployment.firstOrElse(MailboxCapacity(1000)) // FIXME where should this number come from? + val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self) val ref = new LocalActorRef[U](self.path / name, cell) cell.setSelf(ref) childrenMap = childrenMap.updated(name, ref) @@ -110,10 +114,10 @@ private[typed] class ActorCell[T]( } private var nextName = 0L - override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = { + override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig): ActorRef[U] = { val name = Helpers.base64(nextName) nextName += 1 - spawn(props, name) + spawn(behavior, name, deployment) } override def stop(child: ActorRef[Nothing]): Boolean = { @@ -140,8 +144,6 @@ private[typed] class ActorCell[T]( override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable = system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext) - override val executionContext: ExecutionContextExecutor = system.dispatchers.lookup(props.dispatcher) - override def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = { val name = Helpers.base64(nextName, new java.lang.StringBuilder("$!")) nextName += 1 @@ -172,7 +174,7 @@ private[typed] class ActorCell[T]( protected[typed] def getStatus: Int = _status private[this] val queue: Queue[T] = new ConcurrentLinkedQueue[T] private[typed] def peekMessage: T = queue.peek() - private[this] val maxQueue: Int = Math.min(props.mailboxCapacity, maxActivations) + private[this] val maxQueue: Int = Math.min(mailboxCapacity, maxActivations) @volatile private[this] var _systemQueue: LatestFirstSystemMessageList = SystemMessageList.LNil protected def maySend: Boolean = !isTerminating diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala index e84fc734d6..c21f9f862a 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala @@ -25,13 +25,13 @@ object ActorSystemImpl { import ScalaDSL._ sealed trait SystemCommand - case class CreateSystemActor[T](props: Props[T])(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand + case class CreateSystemActor[T](behavior: Behavior[T])(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand val systemGuardianBehavior: Behavior[SystemCommand] = ContextAware { ctx ⇒ Static { case create: CreateSystemActor[t] ⇒ - create.replyTo ! ctx.spawnAnonymous(create.props) + create.replyTo ! ctx.spawnAnonymous(create.behavior) } } } @@ -63,11 +63,12 @@ Distributed Data: */ private[typed] class ActorSystemImpl[-T]( - override val name: String, - _config: Config, - _cl: ClassLoader, - _ec: Option[ExecutionContext], - _userGuardianProps: Props[T]) + override val name: String, + _config: Config, + _cl: ClassLoader, + _ec: Option[ExecutionContext], + _userGuardianBehavior: Behavior[T], + _userGuardianDeployment: DeploymentConfig) extends ActorRef[T](a.RootActorPath(a.Address("akka", name)) / "user") with ActorSystem[T] with ActorRefImpl[T] { import ActorSystemImpl._ @@ -149,7 +150,7 @@ private[typed] class ActorSystemImpl[-T]( } override val dispatchers: Dispatchers = new DispatchersImpl(settings, log) - override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault) + override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault()) override val startTime: Long = System.currentTimeMillis() override def uptime: Long = (System.currentTimeMillis() - startTime) / 1000 @@ -182,8 +183,10 @@ private[typed] class ActorSystemImpl[-T]( override def isLocal: Boolean = true } - private def createTopLevel[U](props: Props[U], name: String): ActorRefImpl[U] = { - val cell = new ActorCell(this, props, theOneWhoWalksTheBubblesOfSpaceTime) + private def createTopLevel[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRefImpl[U] = { + val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext)) + val capacity = deployment.firstOrElse(MailboxCapacity(1000)) // FIXME where should this number come from? + val cell = new ActorCell(this, behavior, dispatchers.lookup(dispatcher), capacity.capacity, theOneWhoWalksTheBubblesOfSpaceTime) val ref = new LocalActorRef(rootPath / name, cell) cell.setSelf(ref) topLevelActors.add(ref) @@ -191,8 +194,8 @@ private[typed] class ActorSystemImpl[-T]( ref } - private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(Props(systemGuardianBehavior), "system") - private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianProps, "user") + private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyDeploymentConfig) + private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianDeployment) override def terminate(): Future[Terminated] = { theOneWhoWalksTheBubblesOfSpaceTime.sendSystem(Terminate()) @@ -217,10 +220,10 @@ private[typed] class ActorSystemImpl[-T]( override def sendSystem(msg: SystemMessage): Unit = userGuardian.sendSystem(msg) override def isLocal: Boolean = true - def systemActorOf[U](props: Props[U], name: String)(implicit timeout: Timeout): Future[ActorRef[U]] = { + def systemActorOf[U](behavior: Behavior[U], name: String)(implicit timeout: Timeout): Future[ActorRef[U]] = { import AskPattern._ implicit val sched = scheduler - systemGuardian ? CreateSystemActor(props) + systemGuardian ? CreateSystemActor(behavior) } def printTree: String = { diff --git a/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala b/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala index 8e920f4d00..58a324cef4 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/SupervisionMechanics.scala @@ -19,7 +19,7 @@ private[typed] trait SupervisionMechanics[T] { * INTERFACE WITH ACTOR CELL */ protected def system: ActorSystem[Nothing] - protected def props: Props[T] + protected def initialBehavior: Behavior[T] protected def self: ActorRefImpl[T] protected def parent: ActorRefImpl[Nothing] protected def behavior: Behavior[T] @@ -66,15 +66,11 @@ private[typed] trait SupervisionMechanics[T] { } private def create(): Boolean = { - behavior = Behavior.canonicalize(props.creator(), behavior) - if (behavior == null) { - fail(new IllegalStateException("cannot start actor with “same” or “unhandled” behavior, terminating")) - } else { - if (system.settings.DebugLifecycle) - publish(Logging.Debug(self.path.toString, clazz(behavior), "started")) - if (Behavior.isAlive(behavior)) next(behavior.management(ctx, PreStart), PreStart) - else self.sendSystem(Terminate()) - } + behavior = initialBehavior + if (system.settings.DebugLifecycle) + publish(Logging.Debug(self.path.toString, clazz(behavior), "started")) + if (Behavior.isAlive(behavior)) next(behavior.management(ctx, PreStart), PreStart) + else self.sendSystem(Terminate()) true } diff --git a/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala b/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala index 0297c1f0f7..b1a9c7caff 100644 --- a/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala +++ b/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala @@ -15,14 +15,19 @@ import akka.event.Logging * FIXME add limited restarts and back-off (with limited buffering or vacation responder) * FIXME write tests that ensure that all Behaviors are okay with getting PostRestart as first signal */ -final case class Restarter[T, Thr <: Throwable: ClassTag](behavior: () ⇒ Behavior[T], resume: Boolean) extends Behavior[T] { +final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] { - private[this] var current = behavior() + private[this] var current = initialBehavior - // FIXME remove allocation overhead once finalized - private def canonicalize(ctx: ActorContext[T], block: ⇒ Behavior[T]): Behavior[T] = { + private def restart(ctx: ActorContext[T]): Behavior[T] = { + try current.management(ctx, PreRestart) catch { case NonFatal(_) ⇒ } + current = initialBehavior + current.management(ctx, PreStart) + } + + override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = { val b = - try block + try current.management(ctx, signal) catch { case ex: Thr ⇒ ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, current.getClass, ex.getMessage)) @@ -32,22 +37,22 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](behavior: () ⇒ Behav if (Behavior.isAlive(current)) this else ScalaDSL.Stopped } - private def restart(ctx: ActorContext[T]): Behavior[T] = { - try current.management(ctx, PreRestart) catch { case NonFatal(_) ⇒ } - current = behavior() - current.management(ctx, PostRestart) + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = { + val b = + try current.message(ctx, msg) + catch { + case ex: Thr ⇒ + ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, current.getClass, ex.getMessage)) + if (resume) current else restart(ctx) + } + current = Behavior.canonicalize(b, current) + if (Behavior.isAlive(current)) this else ScalaDSL.Stopped } - - override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = - canonicalize(ctx, current.management(ctx, signal)) - - override def message(ctx: ActorContext[T], msg: T): Behavior[T] = - canonicalize(ctx, current.message(ctx, msg)) } object Restarter { class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) { - def wrap[T](p: Props[T]) = Props(() ⇒ Restarter(p.creator, resume)(c), p.dispatcher, p.mailboxCapacity) + def wrap[T](b: Behavior[T]) = Restarter(Behavior.validateAsInitial(b), resume)(c) } def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume) diff --git a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala index 3a711e9c3d..9ae349f9c1 100644 --- a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala @@ -54,7 +54,7 @@ object ActorContextSpec { case object Unwatched extends Event final case class GetInfo(replyTo: ActorRef[Info]) extends Command - final case class Info(self: ActorRef[Command], props: Props[Command], system: ActorSystem[Nothing]) extends Event + final case class Info(self: ActorRef[Command], system: ActorSystem[Nothing]) extends Event final case class GetChild(name: String, replyTo: ActorRef[Child]) extends Command final case class Child(c: Option[ActorRef[Nothing]]) extends Event @@ -95,8 +95,8 @@ object ActorContextSpec { throw ex case MkChild(name, mon, replyTo) ⇒ val child = name match { - case None ⇒ ctx.spawnAnonymous(Restarter[Throwable]().wrap(Props(subject(mon)))) - case Some(n) ⇒ ctx.spawn(Restarter[Throwable]().wrap(Props(subject(mon))), n) + case None ⇒ ctx.spawnAnonymous(Restarter[Throwable]().wrap(subject(mon))) + case Some(n) ⇒ ctx.spawn(Restarter[Throwable]().wrap(subject(mon)), n) } replyTo ! Created(child) Same @@ -125,7 +125,7 @@ object ActorContextSpec { replyTo ! Unwatched Same case GetInfo(replyTo) ⇒ - replyTo ! Info(ctx.self, ctx.props, ctx.system) + replyTo ! Info(ctx.self, ctx.system) Same case GetChild(name, replyTo) ⇒ replyTo ! Child(ctx.child(name)) @@ -190,7 +190,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( def setup(name: String, wrapper: Option[Restarter.Apply[_]] = None)( proc: (ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ - val props = wrapper.map(_.wrap(Props(behavior(ctx)))).getOrElse(Props(behavior(ctx))) + val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx)) val steps = startWith.withKeepTraces(true)(ctx.spawn(props, "subject")) .expectMessage(expectTimeout) { (msg, ref) ⇒ @@ -273,7 +273,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( log.assertDone(expectTimeout) subj }.expectMessage(expectTimeout) { (msg, subj) ⇒ - msg should ===(GotSignal(PostRestart)) + msg should ===(GotSignal(PreStart)) ctx.stop(subj) }.expectMessage(expectTimeout) { (msg, _) ⇒ msg should ===(GotSignal(PostStop)) @@ -301,7 +301,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( case (msgs, (subj, child, log)) ⇒ msgs should ===( ChildEvent(GotSignal(PreRestart)) :: - ChildEvent(GotSignal(PostRestart)) :: Nil) + ChildEvent(GotSignal(PreStart)) :: Nil) log.assertDone(expectTimeout) child ! BecomeInert(self) // necessary to avoid PostStop/Terminated interference (subj, child) @@ -346,7 +346,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( (subj, log) }.expectMessage(expectTimeout) { case (msg, (subj, log)) ⇒ - msg should ===(GotSignal(PostRestart)) + msg should ===(GotSignal(PreStart)) log.assertDone(expectTimeout) subj } @@ -380,7 +380,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( def `08 must not stop non-child actor`(): Unit = sync(setup("ctx08") { (ctx, startWith) ⇒ val self = ctx.self startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) { pair ⇒ - (pair._1, pair._2, ctx.spawn(Props(behavior(ctx)), "A")) + (pair._1, pair._2, ctx.spawn(behavior(ctx), "A")) }.expectMessage(expectTimeout) { case (msg, (subj, child, other)) ⇒ msg should ===(GotSignal(PreStart)) diff --git a/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala b/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala index e21e91bbdc..6e74a73793 100644 --- a/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala @@ -54,7 +54,7 @@ class BehaviorSpec extends TypedSpec { protected def mkCtx(requirePreStart: Boolean = false, factory: (ActorRef[Event]) ⇒ Behavior[Command] = behavior) = { val inbox = Inbox[Event]("evt") - val ctx = new EffectfulActorContext("ctx", Props(factory(inbox.ref)), system) + val ctx = new EffectfulActorContext("ctx", factory(inbox.ref), 1000, system) val msgs = inbox.receiveAll() if (requirePreStart) msgs should ===(GotSignal(PreStart) :: Nil) @@ -117,14 +117,6 @@ class BehaviorSpec extends TypedSpec { mkCtx().check(GetSelf).check(PreRestart) } - def `must react to PostRestart`(): Unit = { - mkCtx().check(PostRestart) - } - - def `must react to a message after PostRestart`(): Unit = { - mkCtx().check(PostRestart).check(GetSelf) - } - def `must react to Terminated`(): Unit = { mkCtx().check(Terminated(Inbox("x").ref)(null)) } @@ -201,10 +193,6 @@ class BehaviorSpec extends TypedSpec { mkCtx().check(Swap).check(GetSelf).check(PreRestart) } - def `must react to a message after PostRestart after swap`(): Unit = { - mkCtx().check(PostRestart).check(Swap).check(GetSelf) - } - def `must react to Terminated after swap`(): Unit = { mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null)) } diff --git a/akka-typed/src/test/scala/akka/typed/DeploymentConfigSpec.scala b/akka-typed/src/test/scala/akka/typed/DeploymentConfigSpec.scala new file mode 100644 index 0000000000..9b3a3dc02f --- /dev/null +++ b/akka-typed/src/test/scala/akka/typed/DeploymentConfigSpec.scala @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.typed + +class DeploymentConfigSpec extends TypedSpecSetup { + + val dispatcherFirst = DispatcherDefault(MailboxCapacity(666, DispatcherFromConfig("pool"))) + val mailboxFirst = MailboxCapacity(999) ++ dispatcherFirst + + object `A DeploymentConfig` { + + def `must get first dispatcher`(): Unit = { + dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst) + mailboxFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst) + } + + def `must get first mailbox`(): Unit = { + dispatcherFirst.firstOrElse[MailboxCapacity](null).capacity should ===(666) + mailboxFirst.firstOrElse[MailboxCapacity](null).capacity should ===(999) + } + + def `must get default value`(): Unit = { + mailboxFirst.firstOrElse[DispatcherFromExecutor](null) should ===(null) + } + + def `must filter out the right things`(): Unit = { + val filtered = mailboxFirst.filterNot[DispatcherSelector] + filtered.firstOrElse[MailboxCapacity](null).capacity should ===(999) + filtered.firstOrElse[DispatcherSelector](null) should ===(null) + } + + def `must yield all configs of some type`(): Unit = { + dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherDefault() :: DispatcherFromConfig("pool") :: Nil) + mailboxFirst.allOf[MailboxCapacity] should ===(List(999, 666).map(MailboxCapacity(_))) + } + + } +} diff --git a/akka-typed/src/test/scala/akka/typed/PerformanceSpec.scala b/akka-typed/src/test/scala/akka/typed/PerformanceSpec.scala index 2155b21e99..fe109009df 100644 --- a/akka-typed/src/test/scala/akka/typed/PerformanceSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/PerformanceSpec.scala @@ -28,15 +28,15 @@ class PerformanceSpec extends TypedSpec( StepWise[Pong] { (ctx, startWith) ⇒ startWith { - val pinger = Props(SelfAware[Ping](self ⇒ Static { msg ⇒ + val pinger = SelfAware[Ping](self ⇒ Static { msg ⇒ if (msg.x == 0) { msg.report ! Pong(0, self, msg.report) } else msg.pong ! Pong(msg.x - 1, self, msg.report) - })).withDispatcher(executor) + }) // FIXME .withDispatcher(executor) - val ponger = Props(SelfAware[Pong](self ⇒ Static { msg ⇒ + val ponger = SelfAware[Pong](self ⇒ Static { msg ⇒ msg.ping ! Ping(msg.x, self, msg.report) - })).withDispatcher(executor) + }) // FIXME .withDispatcher(executor) val actors = for (i ← 1 to pairs) diff --git a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala index 25b9aee260..24c1fce8e0 100644 --- a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala @@ -43,8 +43,8 @@ class TypedSpec(val config: Config) extends TypedSpecSetup { // extension point def setTimeout: Timeout = Timeout(1.minute) - val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf)) - val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf)) + val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) + val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) trait NativeSystem { def system = nativeSystem @@ -66,14 +66,14 @@ class TypedSpec(val config: Config) extends TypedSpecSetup { import akka.testkit._ def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) - val blackhole = await(nativeSystem ? Create(Props(ScalaDSL.Full[Any] { case _ ⇒ ScalaDSL.Same }), "blackhole")) + val blackhole = await(nativeSystem ? Create(ScalaDSL.Full[Any] { case _ ⇒ ScalaDSL.Same }, "blackhole")) /** * Run an Actor-based test. The test procedure is most conveniently * formulated using the [[StepWise$]] behavior type. */ def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] = - system ? (RunTest(name, Props(behavior), _, timeout.duration)) + system ? (RunTest(name, behavior, _, timeout.duration)) // TODO remove after basing on ScalaTest 3 with async support def sync(f: Future[Status])(implicit system: ActorSystem[Command]): Unit = { @@ -135,9 +135,9 @@ object TypedSpec { case object Start extends Start sealed trait Command - case class RunTest[T](name: String, props: Props[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command + case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command case class Terminate(reply: ActorRef[Status]) extends Command - case class Create[T](props: Props[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command + case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command sealed trait Status case object Success extends Status @@ -156,7 +156,7 @@ object TypedSpec { } case _: Sig[_] ⇒ Same case Msg(ctx, r: RunTest[t]) ⇒ - val test = ctx.spawn(r.props, r.name) + val test = ctx.spawn(r.behavior, r.name) ctx.schedule(r.timeout, r.replyTo, Timedout) ctx.watch(test) guardian(outstanding + ((test, r.replyTo))) @@ -164,7 +164,7 @@ object TypedSpec { reply ! Success Stopped case Msg(ctx, c: Create[t]) ⇒ - c.replyTo ! ctx.spawn(c.props, c.name) + c.replyTo ! ctx.spawn(c.behavior, c.name) Same } diff --git a/akka-typed/src/test/scala/akka/typed/internal/ActorCellSpec.scala b/akka-typed/src/test/scala/akka/typed/internal/ActorCellSpec.scala index ce891182e5..32438b4301 100644 --- a/akka-typed/src/test/scala/akka/typed/internal/ActorCellSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/internal/ActorCellSpec.scala @@ -22,7 +22,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must be creatable`(): Unit = { val parent = new DebugRef[String](sys.path / "creatable", true) - val cell = new ActorCell(sys, Props({ parent ! "created"; Static[String] { s ⇒ parent ! s } }), parent) + val cell = new ActorCell(sys, Deferred[String](() ⇒ { parent ! "created"; Static { s ⇒ parent ! s } }), ec, 1000, parent) debugCell(cell) { ec.queueSize should ===(0) cell.sendSystem(Create()) @@ -40,7 +40,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala val parent = new DebugRef[String](sys.path / "creatable???", true) val self = new DebugRef[String](sys.path / "creatableSelf", true) val ??? = new NotImplementedError - val cell = new ActorCell(sys, Props[String]({ parent ! "created"; throw ??? }), parent) + val cell = new ActorCell(sys, Deferred[String](() ⇒ { parent ! "created"; throw ??? }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -60,7 +60,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must be able to terminate after construction`(): Unit = { val parent = new DebugRef[String](sys.path / "terminate", true) val self = new DebugRef[String](sys.path / "terminateSelf", true) - val cell = new ActorCell(sys, Props({ parent ! "created"; Stopped[String] }), parent) + val cell = new ActorCell(sys, Deferred[String](() ⇒ { parent ! "created"; Stopped }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -80,7 +80,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must be able to terminate after PreStart`(): Unit = { val parent = new DebugRef[String](sys.path / "terminate", true) val self = new DebugRef[String](sys.path / "terminateSelf", true) - val cell = new ActorCell(sys, Props({ parent ! "created"; Full[String] { case Sig(ctx, PreStart) ⇒ Stopped } }), parent) + val cell = new ActorCell(sys, Deferred(() ⇒ { parent ! "created"; Full[String] { case Sig(ctx, PreStart) ⇒ Stopped } }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -101,7 +101,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala val parent = new DebugRef[String](sys.path / "terminate", true) val self = new DebugRef[String](sys.path / "terminateSelf", true) val ex = new AssertionError - val cell = new ActorCell(sys, Props({ parent ! "created"; Static[String](s ⇒ throw ex) }), parent) + val cell = new ActorCell(sys, Deferred(() ⇒ { parent ! "created"; Static[String](s ⇒ throw ex) }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -124,7 +124,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must signal failure when starting behavior is "same"`(): Unit = { val parent = new DebugRef[String](sys.path / "startSame", true) val self = new DebugRef[String](sys.path / "startSameSelf", true) - val cell = new ActorCell(sys, Props({ parent ! "created"; Same[String] }), parent) + val cell = new ActorCell(sys, Deferred(() ⇒ { parent ! "created"; Same[String] }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -140,8 +140,8 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala parent.receiveAll() match { case Left(DeathWatchNotification(`self`, exc)) :: Nil ⇒ exc should not be null - exc shouldBe an[IllegalStateException] - exc.getMessage should include("same") + exc shouldBe an[IllegalArgumentException] + exc.getMessage should include("Same") case other ⇒ fail(s"$other was not a DeathWatchNotification") } } @@ -150,7 +150,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must signal failure when starting behavior is "unhandled"`(): Unit = { val parent = new DebugRef[String](sys.path / "startSame", true) val self = new DebugRef[String](sys.path / "startSameSelf", true) - val cell = new ActorCell(sys, Props({ parent ! "created"; Unhandled[String] }), parent) + val cell = new ActorCell(sys, Deferred(() ⇒ { parent ! "created"; Unhandled[String] }), ec, 1000, parent) cell.setSelf(self) debugCell(cell) { ec.queueSize should ===(0) @@ -166,8 +166,8 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala parent.receiveAll() match { case Left(DeathWatchNotification(`self`, exc)) :: Nil ⇒ exc should not be null - exc shouldBe an[IllegalStateException] - exc.getMessage should include("same") + exc shouldBe an[IllegalArgumentException] + exc.getMessage should include("Unhandled") case other ⇒ fail(s"$other was not a DeathWatchNotification") } } @@ -181,7 +181,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala */ def `must not execute more messages than were batched naturally`(): Unit = { val parent = new DebugRef[String](sys.path / "batching", true) - val cell = new ActorCell(sys, Props(SelfAware[String] { self ⇒ Static { s ⇒ self ! s; parent ! s } }), parent) + val cell = new ActorCell(sys, SelfAware[String] { self ⇒ Static { s ⇒ self ! s; parent ! s } }, ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -214,7 +214,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must signal DeathWatch when terminating normally`(): Unit = { val parent = new DebugRef[String](sys.path / "watchNormal", true) val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Props(Empty[String]), parent) + val cell = new ActorCell(sys, Empty[String], ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -238,7 +238,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala val parent = new DebugRef[String](sys.path / "watchAbnormal", true) val client = new DebugRef[String](parent.path / "client", true) val other = new DebugRef[String](parent.path / "other", true) - val cell = new ActorCell(sys, Props(ContextAware[String] { ctx ⇒ ctx.watch(parent); Empty }), parent) + val cell = new ActorCell(sys, ContextAware[String] { ctx ⇒ ctx.watch(parent); Empty }, ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -270,7 +270,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must signal DeathWatch when watching after termination`(): Unit = { val parent = new DebugRef[String](sys.path / "watchLate", true) val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Props(Stopped[String]), parent) + val cell = new ActorCell(sys, Stopped[String], ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -289,7 +289,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must signal DeathWatch when watching after termination but before deactivation`(): Unit = { val parent = new DebugRef[String](sys.path / "watchSomewhatLate", true) val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Props(Empty[String]), parent) + val cell = new ActorCell(sys, Empty[String], ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -309,7 +309,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must not signal DeathWatch after Unwatch has been processed`(): Unit = { val parent = new DebugRef[String](sys.path / "watchUnwatch", true) val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Props(Empty[String]), parent) + val cell = new ActorCell(sys, Empty[String], ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -326,7 +326,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must send messages to deadLetters after being terminated`(): Unit = { val parent = new DebugRef[String](sys.path / "sendDeadLetters", true) - val cell = new ActorCell(sys, Props(Stopped[String]), parent) + val cell = new ActorCell(sys, Stopped[String], ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -347,10 +347,10 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala */ def `must not terminate before children have terminated`(): Unit = { val parent = new DebugRef[ActorRef[Nothing]](sys.path / "waitForChild", true) - val cell = new ActorCell(sys, Props(ContextAware[String] { ctx ⇒ - ctx.spawn(Props(SelfAware[String] { self ⇒ parent ! self; Empty }), "child") + val cell = new ActorCell(sys, ContextAware[String] { ctx ⇒ + ctx.spawn(SelfAware[String] { self ⇒ parent ! self; Empty }, "child") Empty - }), parent) + }, ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -380,10 +380,10 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must properly terminate if failing while handling Terminated for child actor`(): Unit = { val parent = new DebugRef[ActorRef[Nothing]](sys.path / "terminateWhenDeathPact", true) - val cell = new ActorCell(sys, Props(ContextAware[String] { ctx ⇒ - ctx.watch(ctx.spawn(Props(SelfAware[String] { self ⇒ parent ! self; Empty }), "child")) + val cell = new ActorCell(sys, ContextAware[String] { ctx ⇒ + ctx.watch(ctx.spawn(SelfAware[String] { self ⇒ parent ! self; Empty }, "child")) Empty - }), parent) + }, ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { @@ -416,7 +416,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala def `must not terminate twice if failing in PostStop`(): Unit = { val parent = new DebugRef[String](sys.path / "terminateProperlyPostStop", true) - val cell = new ActorCell(sys, Props(Full[String] { case Sig(_, PostStop) ⇒ ??? }), parent) + val cell = new ActorCell(sys, Full[String] { case Sig(_, PostStop) ⇒ ??? }, ec, 1000, parent) val ref = new LocalActorRef(parent.path / "child", cell) cell.setSelf(ref) debugCell(cell) { diff --git a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala index 79d46e4327..3fcd5084a0 100644 --- a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala @@ -23,11 +23,11 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca case class Probe(msg: String, replyTo: ActorRef[String]) trait CommonTests { - def system[T](name: String, props: Props[T]): ActorSystem[T] + def system[T](name: String, behavior: Behavior[T]): ActorSystem[T] def suite: String - def withSystem[T](name: String, props: Props[T], doTerminate: Boolean = true)(block: ActorSystem[T] ⇒ Unit): Terminated = { - val sys = system(s"$suite-$name", props) + def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] ⇒ Unit): Terminated = { + val sys = system(s"$suite-$name", behavior) try { block(sys) if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue @@ -39,7 +39,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca } def `must start the guardian actor and terminate when it terminates`(): Unit = { - val t = withSystem("a", Props(Total[Probe] { p ⇒ p.replyTo ! p.msg; Stopped }), doTerminate = false) { sys ⇒ + val t = withSystem("a", Total[Probe] { p ⇒ p.replyTo ! p.msg; Stopped }, doTerminate = false) { sys ⇒ val inbox = Inbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { inbox.hasMessages should ===(true) } @@ -52,11 +52,11 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca def `must terminate the guardian actor`(): Unit = { val inbox = Inbox[String]("terminate") - val sys = system("terminate", Props(Full[Probe] { + val sys = system("terminate", Full[Probe] { case Sig(ctx, PostStop) ⇒ inbox.ref ! "done" Same - })) + }) sys.terminate().futureValue inbox.receiveAll() should ===("done" :: Nil) } @@ -64,19 +64,19 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca def `must log to the event stream`(): Unit = pending def `must have a name`(): Unit = - withSystem("name", Props(Empty[String])) { sys ⇒ + withSystem("name", Empty[String]) { sys ⇒ sys.name should ===(suite + "-name") } def `must report its uptime`(): Unit = - withSystem("uptime", Props(Empty[String])) { sys ⇒ + withSystem("uptime", Empty[String]) { sys ⇒ sys.uptime should be < 1L Thread.sleep(1000) sys.uptime should be >= 1L } def `must have a working thread factory`(): Unit = - withSystem("thread", Props(Empty[String])) { sys ⇒ + withSystem("thread", Empty[String]) { sys ⇒ val p = Promise[Int] sys.threadFactory.newThread(new Runnable { def run(): Unit = p.success(42) @@ -85,7 +85,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca } def `must be able to run Futures`(): Unit = - withSystem("futures", Props(Empty[String])) { sys ⇒ + withSystem("futures", Empty[String]) { sys ⇒ val f = Future(42)(sys.executionContext) f.futureValue should ===(42) } @@ -93,12 +93,12 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca } object `An ActorSystemImpl` extends CommonTests { - def system[T](name: String, props: Props[T]): ActorSystem[T] = ActorSystem(name, props) + def system[T](name: String, behavior: Behavior[T]): ActorSystem[T] = ActorSystem(name, behavior) def suite = "native" // this is essential to complete ActorCellSpec, see there def `must correctly treat Watch dead letters`(): Unit = - withSystem("deadletters", Props(Empty[String])) { sys ⇒ + withSystem("deadletters", Empty[String]) { sys ⇒ val client = new DebugRef[Int](sys.path / "debug", true) sys.deadLetters.sorry.sendSystem(Watch(sys, client)) client.receiveAll() should ===(Left(DeathWatchNotification(sys, null)) :: Nil) @@ -106,7 +106,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca } object `An ActorSystemAdapter` extends CommonTests { - def system[T](name: String, props: Props[T]): ActorSystem[T] = ActorSystem.adapter(name, props) + def system[T](name: String, behavior: Behavior[T]): ActorSystem[T] = ActorSystem.adapter(name, behavior) def suite = "adapter" } } diff --git a/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala b/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala index b7d7b4db4c..fd4f1a11e7 100644 --- a/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala @@ -25,34 +25,39 @@ class ReceiverSpec extends TypedSpec { private def afterGetOneFirst(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = behavior[Msg] + .management(ctx, PreStart) .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] .message(ctx, GetOne(Duration.Zero)(dummyInbox.ref)) private def afterGetOneLater(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = behavior[Msg] + .management(ctx, PreStart) .message(ctx, GetOne(1.second)(dummyInbox.ref)) .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] private def afterGetOneTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = behavior[Msg] + .management(ctx, PreStart) .message(ctx, GetOne(1.nano)(dummyInbox.ref)) .asInstanceOf[Behavior[InternalCommand[Msg]]].message(ctx.asInstanceOf[ActorContext[InternalCommand[Msg]]], ReceiveTimeout()).asInstanceOf[Behavior[Command[Msg]]] private def afterGetAll(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = behavior[Msg] + .management(ctx, PreStart) .message(ctx, GetAll(1.nano)(dummyInbox.ref)) .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] .message(ctx, GetAll(Duration.Zero)(dummyInbox.ref)) private def afterGetAllTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = behavior[Msg] + .management(ctx, PreStart) .message(ctx, GetAll(1.nano)(dummyInbox.ref)) .message(ctx, GetAll(Duration.Zero)(dummyInbox.ref)) private def setup(name: String, behv: Behavior[Command[Msg]] = behavior[Msg])( proc: (EffectfulActorContext[Command[Msg]], EffectfulActorContext[Msg], Inbox[Replies[Msg]]) ⇒ Unit): Unit = for (Setup(description, behv, messages, effects) ← startingPoints) { - val ctx = new EffectfulActorContext("ctx", Props(ScalaDSL.ContextAware(behv)), nativeSystem) + val ctx = new EffectfulActorContext("ctx", ScalaDSL.ContextAware(behv), 1000, nativeSystem) withClue(s"[running for starting point '$description' (${ctx.currentBehavior})]: ") { dummyInbox.receiveAll() should have size messages ctx.getAllEffects() should have size effects diff --git a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala index 8940722f33..05920bbbe1 100644 --- a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala @@ -13,17 +13,17 @@ class ReceptionistSpec extends TypedSpec { trait ServiceA case object ServiceKeyA extends ServiceKey[ServiceA] - val propsA = Props(Static[ServiceA](msg ⇒ ())) + val behaviorA = Static[ServiceA](msg ⇒ ()) trait ServiceB case object ServiceKeyB extends ServiceKey[ServiceB] - val propsB = Props(Static[ServiceB](msg ⇒ ())) + val behaviorB = Static[ServiceB](msg ⇒ ()) trait CommonTests { implicit def system: ActorSystem[TypedSpec.Command] def `must register a service`(): Unit = { - val ctx = new EffectfulActorContext("register", Props(behavior), system) + val ctx = new EffectfulActorContext("register", behavior, 1000, system) val a = Inbox[ServiceA]("a") val r = Inbox[Registered[_]]("r") ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) @@ -37,7 +37,7 @@ class ReceptionistSpec extends TypedSpec { } def `must register two services`(): Unit = { - val ctx = new EffectfulActorContext("registertwo", Props(behavior), system) + val ctx = new EffectfulActorContext("registertwo", behavior, 1000, system) val a = Inbox[ServiceA]("a") val r = Inbox[Registered[_]]("r") ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) @@ -54,7 +54,7 @@ class ReceptionistSpec extends TypedSpec { } def `must register two services with the same key`(): Unit = { - val ctx = new EffectfulActorContext("registertwosame", Props(behavior), system) + val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system) val a1 = Inbox[ServiceA]("a1") val r = Inbox[Registered[_]]("r") ctx.run(Register(ServiceKeyA, a1.ref)(r.ref)) @@ -71,7 +71,7 @@ class ReceptionistSpec extends TypedSpec { } def `must unregister services when they terminate`(): Unit = { - val ctx = new EffectfulActorContext("registertwosame", Props(behavior), system) + val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system) val r = Inbox[Registered[_]]("r") val a = Inbox[ServiceA]("a") ctx.run(Register(ServiceKeyA, a.ref)(r.ref)) @@ -108,8 +108,8 @@ class ReceptionistSpec extends TypedSpec { StepWise[Registered[ServiceA]] { (ctx, startWith) ⇒ val self = ctx.self startWith.withKeepTraces(true) { - val r = ctx.spawnAnonymous(Props(behavior)) - val s = ctx.spawnAnonymous(propsA) + val r = ctx.spawnAnonymous(behavior) + val s = ctx.spawnAnonymous(behaviorA) val f = r ? Register(ServiceKeyA, s) r ! Register(ServiceKeyA, s)(self) (f, s)