remove Props in akka-typed

The deployment configuration is realized as an internally linked list,
which makes it allocation-free in the common case but retains full
extensibility.
This commit is contained in:
Roland Kuhn 2016-08-26 20:41:49 +02:00
parent 577f43233a
commit bf928af092
29 changed files with 397 additions and 293 deletions

View file

@ -81,7 +81,7 @@ class IntroSpec extends TypedSpec {
// using global pool since we want to run tasks after system.terminate
import scala.concurrent.ExecutionContext.Implicits.global
val system: ActorSystem[Greet] = ActorSystem("hello", Props(greeter))
val system: ActorSystem[Greet] = ActorSystem("hello", greeter)
val future: Future[Greeted] = system ? (Greet("world", _))
@ -111,11 +111,11 @@ class IntroSpec extends TypedSpec {
//#chatroom-gabbler
//#chatroom-main
val main: Behavior[Unit] =
val main: Behavior[akka.NotUsed] =
Full {
case Sig(ctx, PreStart) =>
val chatRoom = ctx.spawn(Props(ChatRoom.behavior), "chatroom")
val gabblerRef = ctx.spawn(Props(gabbler), "gabbler")
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Same
@ -123,7 +123,7 @@ class IntroSpec extends TypedSpec {
Stopped
}
val system = ActorSystem("ChatRoomDemo", Props(main))
val system = ActorSystem("ChatRoomDemo", main)
Await.result(system.whenTerminated, 1.second)
//#chatroom-main
}

View file

@ -57,10 +57,7 @@ Now we want to try out this Actor, so we must start an ActorSystem to host it:
.. includecode:: code/docs/akka/typed/IntroSpec.scala#hello-world
After importing the Actors protocol definition we start an Actor system from
the defined behavior, wrapping it in :class:`Props` like an actor on stage. The
props we are giving to this one are just the defaults, we could at this point
also configure which thread pool will be used to run it or its mailbox capacity
for incoming messages.
the defined behavior.
As Carl Hewitt said, one Actor is no Actor—it would be quite lonely with
nobody to talk to. In this sense the example is a little cruel because we only
@ -73,7 +70,7 @@ properly typed already, no type checks or casts needed. This is possible due to
the type information that is part of the message protocol: the ``?`` operator
takes as argument a function that accepts an :class:`ActorRef[U]` (which
explains the ``_`` hole in the expression on line 7 above) and the ``replyTo``
parameter which we fill in like that is of type ``ActorRef[Greeted]``, which
parameter which we fill in is of type ``ActorRef[Greeted]``, which
means that the value that fulfills the :class:`Promise` can only be of type
:class:`Greeted`.
@ -195,7 +192,7 @@ The behavior that we declare here can handle both subtypes of :class:`Command`.
trigger the dissemination of the contained chat room message to all connected
clients. But we do not want to give the ability to send
:class:`PostSessionMessage` commands to arbitrary clients, we reserve that
right to the wrappers we create—otherwise clients could pose as complete
right to the wrappers we create—otherwise clients could pose as completely
different screen names (imagine the :class:`GetSession` protocol to include
authentication information to further secure this). Therefore we narrow the
behavior down to only accepting :class:`GetSession` commands before exposing it
@ -261,7 +258,7 @@ choice:
In good tradition we call the ``main`` Actor what it is, it directly
corresponds to the ``main`` method in a traditional Java application. This
Actor will perform its job on its own accord, we do not need to send messages
from the outside, so we declare it to be of type ``Unit``. Actors receive not
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
only external messages, they also are notified of certain system events,
so-called Signals. In order to get access to those we choose to implement this
particular one using the :class:`Full` behavior decorator. The name stems from

View file

@ -38,9 +38,9 @@ trait ActorContext[T] {
def self: ActorRef[T]
/**
* The [[Props]] from which this Actor was created.
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def props: Props[T]
def mailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
@ -62,12 +62,12 @@ trait ActorContext[T] {
* Create a child Actor from the given [[Props]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](props: Props[U]): ActorRef[U]
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[Props]] and with the given name.
*/
def spawn[U](props: Props[U], name: String): ActorRef[U]
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
@ -136,7 +136,7 @@ trait ActorContext[T] {
*/
class StubbedActorContext[T](
val name: String,
override val props: Props[T],
override val mailboxCapacity: Int,
override val system: ActorSystem[Nothing]) extends ActorContext[T] {
val inbox = Inbox[T](name)
@ -147,12 +147,12 @@ class StubbedActorContext[T](
override def children: Iterable[ActorRef[Nothing]] = _children.values map (_.ref)
override def child(name: String): Option[ActorRef[Nothing]] = _children get name map (_.ref)
override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = {
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = {
val i = Inbox[U](childName.next())
_children += i.ref.path.name i
i.ref
}
override def spawn[U](props: Props[U], name: String): ActorRef[U] =
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] =
_children get name match {
case Some(_) throw new untyped.InvalidActorNameException(s"actor name $name is already taken")
case None
@ -186,7 +186,7 @@ class StubbedActorContext[T](
def executionContext: ExecutionContextExecutor = system.executionContext
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAnonymous(Props.empty)
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAnonymous[Any](Behavior.emptyBehavior)
/**
* Retrieve the named inbox. The passed ActorRef must be one that was returned

View file

@ -120,13 +120,15 @@ object ActorSystem {
* Akka Typed [[Behavior]] hierarchiesthis system cannot run untyped
* [[akka.actor.Actor]] instances.
*/
def apply[T](name: String, guardianProps: Props[T],
def apply[T](name: String, guardianBehavior: Behavior[T],
guardianDeployment: DeploymentConfig = EmptyDeploymentConfig,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
executionContext: Option[ExecutionContext] = None): ActorSystem[T] = {
Behavior.validateAsInitial(guardianBehavior)
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
new ActorSystemImpl(name, appConfig, cl, executionContext, guardianProps)
new ActorSystemImpl(name, appConfig, cl, executionContext, guardianBehavior, guardianDeployment)
}
/**
@ -134,13 +136,15 @@ object ActorSystem {
* which runs Akka Typed [[Behavior]] on an emulation layer. In this
* system typed and untyped actors can coexist.
*/
def adapter[T](name: String, guardianProps: Props[T],
def adapter[T](name: String, guardianBehavior: Behavior[T],
guardianDeployment: DeploymentConfig = EmptyDeploymentConfig,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
executionContext: Option[ExecutionContext] = None): ActorSystem[T] = {
Behavior.validateAsInitial(guardianBehavior)
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianProps)))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(guardianBehavior, guardianDeployment)))
untyped.start()
new ActorSystemAdapter(untyped)
}

View file

@ -131,6 +131,18 @@ object Behavior {
case other other
}
def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] =
behavior match {
case `sameBehavior` | `unhandledBehavior`
throw new IllegalArgumentException(s"cannot use $behavior as initial behavior")
case x x
}
def preStart[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
val b = validateAsInitial(behavior)
if (isAlive(b)) canonicalize(b.management(ctx, PreStart), b) else b
}
def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne stoppedBehavior
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq unhandledBehavior

View file

@ -0,0 +1,98 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor }
import java.util.concurrent.{ Executor, Executors }
import scala.reflect.ClassTag
import scala.annotation.tailrec
/**
* Data structure for describing an actors deployment details like which
* executor to run it on.
*
* Deliberately not sealed in order to emphasize future extensibility by the
* frameworkthis is not intended to be extended by user code.
*/
abstract class DeploymentConfig extends Product with Serializable {
def next: DeploymentConfig
def ++(next: DeploymentConfig): DeploymentConfig
def withDispatcherDefault: DeploymentConfig = DispatcherDefault(this)
def withDispatcherFromConfig(path: String): DeploymentConfig = DispatcherFromConfig(path, this)
def withDispatcherFromExecutor(executor: Executor): DeploymentConfig = DispatcherFromExecutor(executor, this)
def withDispatcherFromExecutionContext(ec: ExecutionContext): DeploymentConfig = DispatcherFromExecutionContext(ec, this)
def withMailboxCapacity(capacity: Int): DeploymentConfig = MailboxCapacity(capacity, this)
def firstOrElse[T <: DeploymentConfig: ClassTag](default: T): T = {
@tailrec def rec(d: DeploymentConfig): T = {
d match {
case EmptyDeploymentConfig default
case t: T t
case _ rec(d.next)
}
}
rec(this)
}
def allOf[T <: DeploymentConfig: ClassTag]: List[DeploymentConfig] = {
@tailrec def select(d: DeploymentConfig, acc: List[DeploymentConfig]): List[DeploymentConfig] =
d match {
case EmptyDeploymentConfig acc.reverse
case t: T select(d.next, (d ++ EmptyDeploymentConfig) :: acc)
case _ select(d.next, acc)
}
select(this, Nil)
}
def filterNot[T <: DeploymentConfig: ClassTag]: DeploymentConfig = {
@tailrec def select(d: DeploymentConfig, acc: List[DeploymentConfig]): List[DeploymentConfig] =
d match {
case EmptyDeploymentConfig acc
case t: T select(d.next, acc)
case _ select(d.next, d :: acc)
}
@tailrec def link(l: List[DeploymentConfig], acc: DeploymentConfig): DeploymentConfig =
l match {
case d :: ds link(ds, d ++ acc)
case Nil acc
}
link(select(this, Nil), EmptyDeploymentConfig)
}
}
final case class MailboxCapacity(capacity: Int, next: DeploymentConfig = EmptyDeploymentConfig) extends DeploymentConfig {
override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next)
}
case object EmptyDeploymentConfig extends DeploymentConfig {
override def next = throw new NoSuchElementException("EmptyDeploymentConfig has no next")
override def ++(next: DeploymentConfig): DeploymentConfig = next
}
sealed trait DispatcherSelector extends DeploymentConfig
sealed case class DispatcherDefault(next: DeploymentConfig) extends DispatcherSelector {
override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next)
}
object DispatcherDefault {
// this is hidden in order to avoid having people match on this object
private val empty = DispatcherDefault(EmptyDeploymentConfig)
def apply(): DispatcherDefault = empty
}
final case class DispatcherFromConfig(path: String, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector {
override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next)
}
final case class DispatcherFromExecutor(executor: Executor, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector {
override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next)
}
final case class DispatcherFromExecutionContext(ec: ExecutionContext, next: DeploymentConfig = EmptyDeploymentConfig) extends DispatcherSelector {
override def ++(next: DeploymentConfig): DeploymentConfig = copy(next = next)
}
trait Dispatchers {
def lookup(selector: DispatcherSelector): ExecutionContextExecutor
def shutdown(): Unit
}

View file

@ -1,13 +0,0 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
import scala.concurrent.ExecutionContextExecutor
import java.util.concurrent.Executors
import akka.event.LoggingAdapter
trait Dispatchers {
def lookup(selector: DispatcherSelector): ExecutionContextExecutor
def shutdown(): Unit
}

View file

@ -30,8 +30,8 @@ object Effect {
* An [[ActorContext]] for testing purposes that records the effects performed
* on it and otherwise stubs them out like a [[StubbedActorContext]].
*/
class EffectfulActorContext[T](_name: String, _props: Props[T], _system: ActorSystem[Nothing])
extends StubbedActorContext[T](_name, _props, _system) {
class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _mailboxCapacity: Int, _system: ActorSystem[Nothing])
extends StubbedActorContext[T](_name, _mailboxCapacity, _system) {
import akka.{ actor a }
import Effect._
@ -49,22 +49,23 @@ class EffectfulActorContext[T](_name: String, _props: Props[T], _system: ActorSy
}
def hasEffects: Boolean = effectQueue.peek() != null
private var current = props.creator()
signal(PreStart)
private var current = _initialBehavior
if (Behavior.isAlive(current)) signal(PreStart)
def currentBehavior: Behavior[T] = current
def run(msg: T): Unit = current = Behavior.canonicalize(current.message(this, msg), current)
def signal(signal: Signal): Unit = current = Behavior.canonicalize(current.management(this, signal), current)
override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = {
val ref = super.spawnAnonymous(props)
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = {
val ref = super.spawnAnonymous(behavior)
effectQueue.offer(Spawned(ref.path.name))
ref
}
override def spawn[U](props: Props[U], name: String): ActorRef[U] = {
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] = {
effectQueue.offer(Spawned(name))
super.spawn(props, name)
super.spawn(behavior, name)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
effectQueue.offer(Stopped(child.path.name))

View file

@ -42,14 +42,6 @@ final case object PreStart extends Signal
@SerialVersionUID(1L)
final case object PreRestart extends Signal
/**
* Lifecycle signal that is fired upon restart of the Actor after replacing
* the behavior with the fresh one (i.e. this signal is received within the
* fresh replacement behavior).
*/
@SerialVersionUID(1L)
final case object PostRestart extends Signal
/**
* Lifecycle signal that is fired after this actor and all its child actors
* (transitively) have terminated. The [[Terminated]] signal is only sent to

View file

@ -1,43 +0,0 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
import java.util.concurrent.Executor
import scala.concurrent.ExecutionContext
sealed trait DispatcherSelector
case object DispatcherDefault extends DispatcherSelector
final case class DispatcherFromConfig(path: String) extends DispatcherSelector
final case class DispatcherFromExecutor(executor: Executor) extends DispatcherSelector
final case class DispatcherFromExecutionContext(ec: ExecutionContext) extends DispatcherSelector
/**
* Props describe how to dress up a [[Behavior]] so that it can become an Actor.
*/
final case class Props[T](creator: () Behavior[T], dispatcher: DispatcherSelector, mailboxCapacity: Int) {
def withDispatcher(configPath: String) = copy(dispatcher = DispatcherFromConfig(configPath))
def withDispatcher(executor: Executor) = copy(dispatcher = DispatcherFromExecutor(executor))
def withDispatcher(ec: ExecutionContext) = copy(dispatcher = DispatcherFromExecutionContext(ec))
def withQueueSize(size: Int) = copy(mailboxCapacity = size)
}
/**
* Props describe how to dress up a [[Behavior]] so that it can become an Actor.
*/
object Props {
/**
* Create a Props instance from a block of code that creates a [[Behavior]].
*
* FIXME: investigate the pros and cons of making this take an explicit
* function instead of a by-name argument
*/
def apply[T](block: Behavior[T]): Props[T] = Props(() block, DispatcherDefault, Int.MaxValue)
/**
* Props for a Behavior that just ignores all messages.
*/
def empty[T]: Props[T] = _empty.asInstanceOf[Props[T]]
private val _empty: Props[Any] = Props(ScalaDSL.Static[Any] { case _ ScalaDSL.Unhandled })
}

View file

@ -59,6 +59,22 @@ object ScalaDSL {
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
final case class Deferred[T](factory: () Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
Behavior.preStart(factory(), ctx)
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
@ -140,7 +156,6 @@ object ScalaDSL {
context.stop(child)
}
behavior.applyOrElse(Sig(context, PostStop), fallback)
case Sig(context, PostRestart) behavior.applyOrElse(Sig(context, PreStart), fallback)
case _ Unhandled
}
behavior.applyOrElse(Sig(ctx, msg), fallback)
@ -253,8 +268,10 @@ object ScalaDSL {
* sides of [[And]] and [[Or]] combinators.
*/
final case class SynchronousSelf[T](f: ActorRef[T] Behavior[T]) extends Behavior[T] {
private class B extends Behavior[T] {
private val inbox = Inbox[T]("synchronousSelf")
private var _behavior = f(inbox.ref)
private var _behavior = Behavior.validateAsInitial(f(inbox.ref))
private def behavior = _behavior
private def setBehavior(ctx: ActorContext[T], b: Behavior[T]): Unit =
_behavior = canonicalize(b, _behavior)
@ -276,6 +293,17 @@ object ScalaDSL {
override def toString: String = s"SynchronousSelf($behavior)"
}
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"SynchronousSelf must receive PreStart as first message (got $msg)")
Behavior.preStart(new B(), ctx)
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"SynchronousSelf must receive PreStart as first message (got $msg)")
override def toString: String = s"SynchronousSelf(${LineNumbers(f)})"
}
/**
* A behavior combinator that feeds incoming messages and signals both into
* the left and right sub-behavior and allows them to evolve independently of
@ -392,12 +420,8 @@ object ScalaDSL {
*/
def SelfAware[T](behavior: ActorRef[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, signal)
val behv = behavior(ctx.self)
canonicalize(behv.management(ctx, signal), behv)
case Msg(ctx, msg)
val behv = behavior(ctx.self)
canonicalize(behv.message(ctx, msg), behv)
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx.self), ctx)
case msg throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)")
}
/**
@ -416,12 +440,8 @@ object ScalaDSL {
*/
def ContextAware[T](behavior: ActorContext[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, signal)
val behv = behavior(ctx)
canonicalize(behv.management(ctx, signal), behv)
case Msg(ctx, msg)
val behv = behavior(ctx)
canonicalize(behv.message(ctx, msg), behv)
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx), ctx)
case msg throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $msg)")
}
/**

View file

@ -6,16 +6,12 @@ package adapter
import akka.{ actor a }
private[typed] class ActorAdapter[T](_initialBehavior: () Behavior[T]) extends a.Actor {
private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor {
import Behavior._
var behavior: Behavior[T] = _
var behavior: Behavior[T] = _initialBehavior
{
behavior = canonicalize(_initialBehavior(), behavior)
if (behavior == null) throw new IllegalStateException("initial behavior cannot be `same` or `unhandled`")
if (!isAlive(behavior)) context.stop(self)
}
val ctx = new ActorContextAdapter[T](context)
@ -59,7 +55,7 @@ private[typed] class ActorAdapter[T](_initialBehavior: () ⇒ Behavior[T]) exten
override def preRestart(reason: Throwable, message: Option[Any]): Unit =
next(behavior.management(ctx, PreRestart), PreRestart)
override def postRestart(reason: Throwable): Unit =
next(behavior.management(ctx, PostRestart), PostRestart)
next(behavior.management(ctx, PreStart), PreStart)
override def postStop(): Unit =
next(behavior.management(ctx, PostStop), PostStop)
}

View file

@ -14,12 +14,14 @@ import scala.concurrent.ExecutionContextExecutor
private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorContext[T] {
override def self = ActorRefAdapter(ctx.self)
override def props = PropsAdapter(ctx.props)
override val system = ActorSystemAdapter(ctx.system)
override def mailboxCapacity = 1 << 29 // FIXME
override def children = ctx.children.map(ActorRefAdapter(_))
override def child(name: String) = ctx.child(name).map(ActorRefAdapter(_))
override def spawnAnonymous[U](props: Props[U]) = ctx.spawnAnonymous(props)
override def spawn[U](props: Props[U], name: String) = ctx.spawn(props, name)
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawnAnonymous(behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawn(behavior, name, deployment)
override def stop(child: ActorRef[Nothing]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef

View file

@ -29,10 +29,10 @@ private[typed] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
override def dispatchers: Dispatchers = new Dispatchers {
override def lookup(selector: DispatcherSelector): ExecutionContextExecutor =
selector match {
case DispatcherDefault untyped.dispatcher
case DispatcherFromConfig(str) untyped.dispatchers.lookup(str)
case DispatcherFromExecutionContext(_) throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter")
case DispatcherFromExecutor(_) throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter")
case DispatcherDefault(_) untyped.dispatcher
case DispatcherFromConfig(str, _) untyped.dispatchers.lookup(str)
case DispatcherFromExecutionContext(_, _) throw new UnsupportedOperationException("cannot use DispatcherFromExecutionContext with ActorSystemAdapter")
case DispatcherFromExecutor(_, _) throw new UnsupportedOperationException("cannot use DispatcherFromExecutor with ActorSystemAdapter")
}
override def shutdown(): Unit = () // there was no shutdown in untyped Akka
}

View file

@ -9,14 +9,14 @@ import akka.{ actor ⇒ a }
private[typed] object PropsAdapter {
// FIXME dispatcher and queue size
def apply(p: Props[_]): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (p.creator: AnyRef) :: Nil)
def apply(b: Behavior[_], deploy: DeploymentConfig): a.Props = new a.Props(a.Deploy(), classOf[ActorAdapter[_]], (b: AnyRef) :: Nil)
def apply[T](p: a.Props): Props[T] = {
def apply[T](p: a.Props): Behavior[T] = {
assert(p.clazz == classOf[ActorAdapter[_]], "typed.Actor must have typed.Props")
p.args match {
case (creator: Function0[_]) :: Nil
case (initial: Behavior[_]) :: Nil
// FIXME queue size
Props(creator.asInstanceOf[() Behavior[T]], DispatcherFromConfig(p.deploy.dispatcher), Int.MaxValue)
initial.asInstanceOf[Behavior[T]]
case _ throw new AssertionError("typed.Actor args must be right")
}
}

View file

@ -9,17 +9,17 @@ package object adapter {
import akka.dispatch.sysmsg
implicit class ActorSystemOps(val sys: akka.actor.ActorSystem) extends AnyVal {
def spawnAnonymous[T](props: Props[T]): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(props)))
def spawn[T](props: Props[T], name: String): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(props), name))
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment)))
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(sys.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name))
}
implicit class ActorContextOps(val ctx: akka.actor.ActorContext) extends AnyVal {
def spawnAnonymous[T](props: Props[T]): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(props)))
def spawn[T](props: Props[T], name: String): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(props), name))
def spawnAnonymous[T](behavior: Behavior[T], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment)))
def spawn[T](behavior: Behavior[T], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[T] =
ActorRefAdapter(ctx.actorOf(PropsAdapter(Behavior.validateAsInitial(behavior), deployment), name))
}
implicit def actorRefAdapter(ref: akka.actor.ActorRef): ActorRef[Any] = ActorRefAdapter(ref)

View file

@ -66,7 +66,9 @@ object ActorCell {
*/
private[typed] class ActorCell[T](
override val system: ActorSystem[Nothing],
override val props: Props[T],
protected val initialBehavior: Behavior[T],
override val executionContext: ExecutionContextExecutor,
override val mailboxCapacity: Int,
val parent: ActorRefImpl[Nothing])
extends ActorContext[T] with Runnable with SupervisionMechanics[T] with DeathWatch[T] {
import ActorCell._
@ -98,10 +100,12 @@ private[typed] class ActorCell[T](
protected def ctx: ActorContext[T] = this
override def spawn[U](props: Props[U], name: String): ActorRef[U] = {
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRef[U] = {
if (childrenMap contains name) throw new InvalidActorNameException(s"actor name [$name] is not unique")
if (terminatingMap contains name) throw new InvalidActorNameException(s"actor name [$name] is not yet free")
val cell = new ActorCell[U](system, props, self)
val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
val capacity = deployment.firstOrElse(MailboxCapacity(1000)) // FIXME where should this number come from?
val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self)
val ref = new LocalActorRef[U](self.path / name, cell)
cell.setSelf(ref)
childrenMap = childrenMap.updated(name, ref)
@ -110,10 +114,10 @@ private[typed] class ActorCell[T](
}
private var nextName = 0L
override def spawnAnonymous[U](props: Props[U]): ActorRef[U] = {
override def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig): ActorRef[U] = {
val name = Helpers.base64(nextName)
nextName += 1
spawn(props, name)
spawn(behavior, name, deployment)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
@ -140,8 +144,6 @@ private[typed] class ActorCell[T](
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable =
system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext)
override val executionContext: ExecutionContextExecutor = system.dispatchers.lookup(props.dispatcher)
override def spawnAdapter[U](f: U T): ActorRef[U] = {
val name = Helpers.base64(nextName, new java.lang.StringBuilder("$!"))
nextName += 1
@ -172,7 +174,7 @@ private[typed] class ActorCell[T](
protected[typed] def getStatus: Int = _status
private[this] val queue: Queue[T] = new ConcurrentLinkedQueue[T]
private[typed] def peekMessage: T = queue.peek()
private[this] val maxQueue: Int = Math.min(props.mailboxCapacity, maxActivations)
private[this] val maxQueue: Int = Math.min(mailboxCapacity, maxActivations)
@volatile private[this] var _systemQueue: LatestFirstSystemMessageList = SystemMessageList.LNil
protected def maySend: Boolean = !isTerminating

View file

@ -25,13 +25,13 @@ object ActorSystemImpl {
import ScalaDSL._
sealed trait SystemCommand
case class CreateSystemActor[T](props: Props[T])(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand
case class CreateSystemActor[T](behavior: Behavior[T])(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand
val systemGuardianBehavior: Behavior[SystemCommand] =
ContextAware { ctx
Static {
case create: CreateSystemActor[t]
create.replyTo ! ctx.spawnAnonymous(create.props)
create.replyTo ! ctx.spawnAnonymous(create.behavior)
}
}
}
@ -67,7 +67,8 @@ private[typed] class ActorSystemImpl[-T](
_config: Config,
_cl: ClassLoader,
_ec: Option[ExecutionContext],
_userGuardianProps: Props[T])
_userGuardianBehavior: Behavior[T],
_userGuardianDeployment: DeploymentConfig)
extends ActorRef[T](a.RootActorPath(a.Address("akka", name)) / "user") with ActorSystem[T] with ActorRefImpl[T] {
import ActorSystemImpl._
@ -149,7 +150,7 @@ private[typed] class ActorSystemImpl[-T](
}
override val dispatchers: Dispatchers = new DispatchersImpl(settings, log)
override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault)
override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault())
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = (System.currentTimeMillis() - startTime) / 1000
@ -182,8 +183,10 @@ private[typed] class ActorSystemImpl[-T](
override def isLocal: Boolean = true
}
private def createTopLevel[U](props: Props[U], name: String): ActorRefImpl[U] = {
val cell = new ActorCell(this, props, theOneWhoWalksTheBubblesOfSpaceTime)
private def createTopLevel[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRefImpl[U] = {
val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
val capacity = deployment.firstOrElse(MailboxCapacity(1000)) // FIXME where should this number come from?
val cell = new ActorCell(this, behavior, dispatchers.lookup(dispatcher), capacity.capacity, theOneWhoWalksTheBubblesOfSpaceTime)
val ref = new LocalActorRef(rootPath / name, cell)
cell.setSelf(ref)
topLevelActors.add(ref)
@ -191,8 +194,8 @@ private[typed] class ActorSystemImpl[-T](
ref
}
private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(Props(systemGuardianBehavior), "system")
private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianProps, "user")
private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyDeploymentConfig)
private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianDeployment)
override def terminate(): Future[Terminated] = {
theOneWhoWalksTheBubblesOfSpaceTime.sendSystem(Terminate())
@ -217,10 +220,10 @@ private[typed] class ActorSystemImpl[-T](
override def sendSystem(msg: SystemMessage): Unit = userGuardian.sendSystem(msg)
override def isLocal: Boolean = true
def systemActorOf[U](props: Props[U], name: String)(implicit timeout: Timeout): Future[ActorRef[U]] = {
def systemActorOf[U](behavior: Behavior[U], name: String)(implicit timeout: Timeout): Future[ActorRef[U]] = {
import AskPattern._
implicit val sched = scheduler
systemGuardian ? CreateSystemActor(props)
systemGuardian ? CreateSystemActor(behavior)
}
def printTree: String = {

View file

@ -19,7 +19,7 @@ private[typed] trait SupervisionMechanics[T] {
* INTERFACE WITH ACTOR CELL
*/
protected def system: ActorSystem[Nothing]
protected def props: Props[T]
protected def initialBehavior: Behavior[T]
protected def self: ActorRefImpl[T]
protected def parent: ActorRefImpl[Nothing]
protected def behavior: Behavior[T]
@ -66,15 +66,11 @@ private[typed] trait SupervisionMechanics[T] {
}
private def create(): Boolean = {
behavior = Behavior.canonicalize(props.creator(), behavior)
if (behavior == null) {
fail(new IllegalStateException("cannot start actor with “same” or “unhandled” behavior, terminating"))
} else {
behavior = initialBehavior
if (system.settings.DebugLifecycle)
publish(Logging.Debug(self.path.toString, clazz(behavior), "started"))
if (Behavior.isAlive(behavior)) next(behavior.management(ctx, PreStart), PreStart)
else self.sendSystem(Terminate())
}
true
}

View file

@ -15,14 +15,19 @@ import akka.event.Logging
* FIXME add limited restarts and back-off (with limited buffering or vacation responder)
* FIXME write tests that ensure that all Behaviors are okay with getting PostRestart as first signal
*/
final case class Restarter[T, Thr <: Throwable: ClassTag](behavior: () Behavior[T], resume: Boolean) extends Behavior[T] {
final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] {
private[this] var current = behavior()
private[this] var current = initialBehavior
// FIXME remove allocation overhead once finalized
private def canonicalize(ctx: ActorContext[T], block: Behavior[T]): Behavior[T] = {
private def restart(ctx: ActorContext[T]): Behavior[T] = {
try current.management(ctx, PreRestart) catch { case NonFatal(_) }
current = initialBehavior
current.management(ctx, PreStart)
}
override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
val b =
try block
try current.management(ctx, signal)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, current.getClass, ex.getMessage))
@ -32,22 +37,22 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](behavior: () ⇒ Behav
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
}
private def restart(ctx: ActorContext[T]): Behavior[T] = {
try current.management(ctx, PreRestart) catch { case NonFatal(_) }
current = behavior()
current.management(ctx, PostRestart)
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
val b =
try current.message(ctx, msg)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, current.getClass, ex.getMessage))
if (resume) current else restart(ctx)
}
current = Behavior.canonicalize(b, current)
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
}
override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] =
canonicalize(ctx, current.management(ctx, signal))
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
canonicalize(ctx, current.message(ctx, msg))
}
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) {
def wrap[T](p: Props[T]) = Props(() Restarter(p.creator, resume)(c), p.dispatcher, p.mailboxCapacity)
def wrap[T](b: Behavior[T]) = Restarter(Behavior.validateAsInitial(b), resume)(c)
}
def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume)

View file

@ -54,7 +54,7 @@ object ActorContextSpec {
case object Unwatched extends Event
final case class GetInfo(replyTo: ActorRef[Info]) extends Command
final case class Info(self: ActorRef[Command], props: Props[Command], system: ActorSystem[Nothing]) extends Event
final case class Info(self: ActorRef[Command], system: ActorSystem[Nothing]) extends Event
final case class GetChild(name: String, replyTo: ActorRef[Child]) extends Command
final case class Child(c: Option[ActorRef[Nothing]]) extends Event
@ -95,8 +95,8 @@ object ActorContextSpec {
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Restarter[Throwable]().wrap(Props(subject(mon))))
case Some(n) ctx.spawn(Restarter[Throwable]().wrap(Props(subject(mon))), n)
case None ctx.spawnAnonymous(Restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Restarter[Throwable]().wrap(subject(mon)), n)
}
replyTo ! Created(child)
Same
@ -125,7 +125,7 @@ object ActorContextSpec {
replyTo ! Unwatched
Same
case GetInfo(replyTo)
replyTo ! Info(ctx.self, ctx.props, ctx.system)
replyTo ! Info(ctx.self, ctx.system)
Same
case GetChild(name, replyTo)
replyTo ! Child(ctx.child(name))
@ -190,7 +190,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
def setup(name: String, wrapper: Option[Restarter.Apply[_]] = None)(
proc: (ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val props = wrapper.map(_.wrap(Props(behavior(ctx)))).getOrElse(Props(behavior(ctx)))
val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx))
val steps =
startWith.withKeepTraces(true)(ctx.spawn(props, "subject"))
.expectMessage(expectTimeout) { (msg, ref)
@ -273,7 +273,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
log.assertDone(expectTimeout)
subj
}.expectMessage(expectTimeout) { (msg, subj)
msg should ===(GotSignal(PostRestart))
msg should ===(GotSignal(PreStart))
ctx.stop(subj)
}.expectMessage(expectTimeout) { (msg, _)
msg should ===(GotSignal(PostStop))
@ -301,7 +301,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
case (msgs, (subj, child, log))
msgs should ===(
ChildEvent(GotSignal(PreRestart)) ::
ChildEvent(GotSignal(PostRestart)) :: Nil)
ChildEvent(GotSignal(PreStart)) :: Nil)
log.assertDone(expectTimeout)
child ! BecomeInert(self) // necessary to avoid PostStop/Terminated interference
(subj, child)
@ -346,7 +346,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
(subj, log)
}.expectMessage(expectTimeout) {
case (msg, (subj, log))
msg should ===(GotSignal(PostRestart))
msg should ===(GotSignal(PreStart))
log.assertDone(expectTimeout)
subj
}
@ -380,7 +380,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
def `08 must not stop non-child actor`(): Unit = sync(setup("ctx08") { (ctx, startWith)
val self = ctx.self
startWith.mkChild(Some("A"), ctx.spawnAdapter(ChildEvent), self) { pair
(pair._1, pair._2, ctx.spawn(Props(behavior(ctx)), "A"))
(pair._1, pair._2, ctx.spawn(behavior(ctx), "A"))
}.expectMessage(expectTimeout) {
case (msg, (subj, child, other))
msg should ===(GotSignal(PreStart))

View file

@ -54,7 +54,7 @@ class BehaviorSpec extends TypedSpec {
protected def mkCtx(requirePreStart: Boolean = false, factory: (ActorRef[Event]) Behavior[Command] = behavior) = {
val inbox = Inbox[Event]("evt")
val ctx = new EffectfulActorContext("ctx", Props(factory(inbox.ref)), system)
val ctx = new EffectfulActorContext("ctx", factory(inbox.ref), 1000, system)
val msgs = inbox.receiveAll()
if (requirePreStart)
msgs should ===(GotSignal(PreStart) :: Nil)
@ -117,14 +117,6 @@ class BehaviorSpec extends TypedSpec {
mkCtx().check(GetSelf).check(PreRestart)
}
def `must react to PostRestart`(): Unit = {
mkCtx().check(PostRestart)
}
def `must react to a message after PostRestart`(): Unit = {
mkCtx().check(PostRestart).check(GetSelf)
}
def `must react to Terminated`(): Unit = {
mkCtx().check(Terminated(Inbox("x").ref)(null))
}
@ -201,10 +193,6 @@ class BehaviorSpec extends TypedSpec {
mkCtx().check(Swap).check(GetSelf).check(PreRestart)
}
def `must react to a message after PostRestart after swap`(): Unit = {
mkCtx().check(PostRestart).check(Swap).check(GetSelf)
}
def `must react to Terminated after swap`(): Unit = {
mkCtx().check(Swap).check(Terminated(Inbox("x").ref)(null))
}

View file

@ -0,0 +1,39 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
class DeploymentConfigSpec extends TypedSpecSetup {
val dispatcherFirst = DispatcherDefault(MailboxCapacity(666, DispatcherFromConfig("pool")))
val mailboxFirst = MailboxCapacity(999) ++ dispatcherFirst
object `A DeploymentConfig` {
def `must get first dispatcher`(): Unit = {
dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst)
mailboxFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst)
}
def `must get first mailbox`(): Unit = {
dispatcherFirst.firstOrElse[MailboxCapacity](null).capacity should ===(666)
mailboxFirst.firstOrElse[MailboxCapacity](null).capacity should ===(999)
}
def `must get default value`(): Unit = {
mailboxFirst.firstOrElse[DispatcherFromExecutor](null) should ===(null)
}
def `must filter out the right things`(): Unit = {
val filtered = mailboxFirst.filterNot[DispatcherSelector]
filtered.firstOrElse[MailboxCapacity](null).capacity should ===(999)
filtered.firstOrElse[DispatcherSelector](null) should ===(null)
}
def `must yield all configs of some type`(): Unit = {
dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherDefault() :: DispatcherFromConfig("pool") :: Nil)
mailboxFirst.allOf[MailboxCapacity] should ===(List(999, 666).map(MailboxCapacity(_)))
}
}
}

View file

@ -28,15 +28,15 @@ class PerformanceSpec extends TypedSpec(
StepWise[Pong] { (ctx, startWith)
startWith {
val pinger = Props(SelfAware[Ping](self Static { msg
val pinger = SelfAware[Ping](self Static { msg
if (msg.x == 0) {
msg.report ! Pong(0, self, msg.report)
} else msg.pong ! Pong(msg.x - 1, self, msg.report)
})).withDispatcher(executor)
}) // FIXME .withDispatcher(executor)
val ponger = Props(SelfAware[Pong](self Static { msg
val ponger = SelfAware[Pong](self Static { msg
msg.ping ! Ping(msg.x, self, msg.report)
})).withDispatcher(executor)
}) // FIXME .withDispatcher(executor)
val actors =
for (i 1 to pairs)

View file

@ -43,8 +43,8 @@ class TypedSpec(val config: Config) extends TypedSpecSetup {
// extension point
def setTimeout: Timeout = Timeout(1.minute)
val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf))
val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), Props(guardian()), Some(config withFallback AkkaSpec.testConf))
val nativeSystem = ActorSystem(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
val adaptedSystem = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
trait NativeSystem {
def system = nativeSystem
@ -66,14 +66,14 @@ class TypedSpec(val config: Config) extends TypedSpecSetup {
import akka.testkit._
def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1)
val blackhole = await(nativeSystem ? Create(Props(ScalaDSL.Full[Any] { case _ ScalaDSL.Same }), "blackhole"))
val blackhole = await(nativeSystem ? Create(ScalaDSL.Full[Any] { case _ ScalaDSL.Same }, "blackhole"))
/**
* Run an Actor-based test. The test procedure is most conveniently
* formulated using the [[StepWise$]] behavior type.
*/
def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] =
system ? (RunTest(name, Props(behavior), _, timeout.duration))
system ? (RunTest(name, behavior, _, timeout.duration))
// TODO remove after basing on ScalaTest 3 with async support
def sync(f: Future[Status])(implicit system: ActorSystem[Command]): Unit = {
@ -135,9 +135,9 @@ object TypedSpec {
case object Start extends Start
sealed trait Command
case class RunTest[T](name: String, props: Props[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command
case class RunTest[T](name: String, behavior: Behavior[T], replyTo: ActorRef[Status], timeout: FiniteDuration) extends Command
case class Terminate(reply: ActorRef[Status]) extends Command
case class Create[T](props: Props[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command
case class Create[T](behavior: Behavior[T], name: String)(val replyTo: ActorRef[ActorRef[T]]) extends Command
sealed trait Status
case object Success extends Status
@ -156,7 +156,7 @@ object TypedSpec {
}
case _: Sig[_] Same
case Msg(ctx, r: RunTest[t])
val test = ctx.spawn(r.props, r.name)
val test = ctx.spawn(r.behavior, r.name)
ctx.schedule(r.timeout, r.replyTo, Timedout)
ctx.watch(test)
guardian(outstanding + ((test, r.replyTo)))
@ -164,7 +164,7 @@ object TypedSpec {
reply ! Success
Stopped
case Msg(ctx, c: Create[t])
c.replyTo ! ctx.spawn(c.props, c.name)
c.replyTo ! ctx.spawn(c.behavior, c.name)
Same
}

View file

@ -22,7 +22,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must be creatable`(): Unit = {
val parent = new DebugRef[String](sys.path / "creatable", true)
val cell = new ActorCell(sys, Props({ parent ! "created"; Static[String] { s parent ! s } }), parent)
val cell = new ActorCell(sys, Deferred[String](() { parent ! "created"; Static { s parent ! s } }), ec, 1000, parent)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
@ -40,7 +40,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
val parent = new DebugRef[String](sys.path / "creatable???", true)
val self = new DebugRef[String](sys.path / "creatableSelf", true)
val ??? = new NotImplementedError
val cell = new ActorCell(sys, Props[String]({ parent ! "created"; throw ??? }), parent)
val cell = new ActorCell(sys, Deferred[String](() { parent ! "created"; throw ??? }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -60,7 +60,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must be able to terminate after construction`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val cell = new ActorCell(sys, Props({ parent ! "created"; Stopped[String] }), parent)
val cell = new ActorCell(sys, Deferred[String](() { parent ! "created"; Stopped }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -80,7 +80,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must be able to terminate after PreStart`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val cell = new ActorCell(sys, Props({ parent ! "created"; Full[String] { case Sig(ctx, PreStart) Stopped } }), parent)
val cell = new ActorCell(sys, Deferred(() { parent ! "created"; Full[String] { case Sig(ctx, PreStart) Stopped } }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -101,7 +101,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val ex = new AssertionError
val cell = new ActorCell(sys, Props({ parent ! "created"; Static[String](s throw ex) }), parent)
val cell = new ActorCell(sys, Deferred(() { parent ! "created"; Static[String](s throw ex) }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -124,7 +124,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must signal failure when starting behavior is "same"`(): Unit = {
val parent = new DebugRef[String](sys.path / "startSame", true)
val self = new DebugRef[String](sys.path / "startSameSelf", true)
val cell = new ActorCell(sys, Props({ parent ! "created"; Same[String] }), parent)
val cell = new ActorCell(sys, Deferred(() { parent ! "created"; Same[String] }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -140,8 +140,8 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
parent.receiveAll() match {
case Left(DeathWatchNotification(`self`, exc)) :: Nil
exc should not be null
exc shouldBe an[IllegalStateException]
exc.getMessage should include("same")
exc shouldBe an[IllegalArgumentException]
exc.getMessage should include("Same")
case other fail(s"$other was not a DeathWatchNotification")
}
}
@ -150,7 +150,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must signal failure when starting behavior is "unhandled"`(): Unit = {
val parent = new DebugRef[String](sys.path / "startSame", true)
val self = new DebugRef[String](sys.path / "startSameSelf", true)
val cell = new ActorCell(sys, Props({ parent ! "created"; Unhandled[String] }), parent)
val cell = new ActorCell(sys, Deferred(() { parent ! "created"; Unhandled[String] }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
@ -166,8 +166,8 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
parent.receiveAll() match {
case Left(DeathWatchNotification(`self`, exc)) :: Nil
exc should not be null
exc shouldBe an[IllegalStateException]
exc.getMessage should include("same")
exc shouldBe an[IllegalArgumentException]
exc.getMessage should include("Unhandled")
case other fail(s"$other was not a DeathWatchNotification")
}
}
@ -181,7 +181,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
*/
def `must not execute more messages than were batched naturally`(): Unit = {
val parent = new DebugRef[String](sys.path / "batching", true)
val cell = new ActorCell(sys, Props(SelfAware[String] { self Static { s self ! s; parent ! s } }), parent)
val cell = new ActorCell(sys, SelfAware[String] { self Static { s self ! s; parent ! s } }, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -214,7 +214,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must signal DeathWatch when terminating normally`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchNormal", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Props(Empty[String]), parent)
val cell = new ActorCell(sys, Empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -238,7 +238,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
val parent = new DebugRef[String](sys.path / "watchAbnormal", true)
val client = new DebugRef[String](parent.path / "client", true)
val other = new DebugRef[String](parent.path / "other", true)
val cell = new ActorCell(sys, Props(ContextAware[String] { ctx ctx.watch(parent); Empty }), parent)
val cell = new ActorCell(sys, ContextAware[String] { ctx ctx.watch(parent); Empty }, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -270,7 +270,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must signal DeathWatch when watching after termination`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchLate", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Props(Stopped[String]), parent)
val cell = new ActorCell(sys, Stopped[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -289,7 +289,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must signal DeathWatch when watching after termination but before deactivation`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchSomewhatLate", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Props(Empty[String]), parent)
val cell = new ActorCell(sys, Empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -309,7 +309,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must not signal DeathWatch after Unwatch has been processed`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchUnwatch", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Props(Empty[String]), parent)
val cell = new ActorCell(sys, Empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -326,7 +326,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must send messages to deadLetters after being terminated`(): Unit = {
val parent = new DebugRef[String](sys.path / "sendDeadLetters", true)
val cell = new ActorCell(sys, Props(Stopped[String]), parent)
val cell = new ActorCell(sys, Stopped[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -347,10 +347,10 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
*/
def `must not terminate before children have terminated`(): Unit = {
val parent = new DebugRef[ActorRef[Nothing]](sys.path / "waitForChild", true)
val cell = new ActorCell(sys, Props(ContextAware[String] { ctx
ctx.spawn(Props(SelfAware[String] { self parent ! self; Empty }), "child")
val cell = new ActorCell(sys, ContextAware[String] { ctx
ctx.spawn(SelfAware[String] { self parent ! self; Empty }, "child")
Empty
}), parent)
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -380,10 +380,10 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must properly terminate if failing while handling Terminated for child actor`(): Unit = {
val parent = new DebugRef[ActorRef[Nothing]](sys.path / "terminateWhenDeathPact", true)
val cell = new ActorCell(sys, Props(ContextAware[String] { ctx
ctx.watch(ctx.spawn(Props(SelfAware[String] { self parent ! self; Empty }), "child"))
val cell = new ActorCell(sys, ContextAware[String] { ctx
ctx.watch(ctx.spawn(SelfAware[String] { self parent ! self; Empty }, "child"))
Empty
}), parent)
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
@ -416,7 +416,7 @@ class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with Scala
def `must not terminate twice if failing in PostStop`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminateProperlyPostStop", true)
val cell = new ActorCell(sys, Props(Full[String] { case Sig(_, PostStop) ??? }), parent)
val cell = new ActorCell(sys, Full[String] { case Sig(_, PostStop) ??? }, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {

View file

@ -23,11 +23,11 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
case class Probe(msg: String, replyTo: ActorRef[String])
trait CommonTests {
def system[T](name: String, props: Props[T]): ActorSystem[T]
def system[T](name: String, behavior: Behavior[T]): ActorSystem[T]
def suite: String
def withSystem[T](name: String, props: Props[T], doTerminate: Boolean = true)(block: ActorSystem[T] Unit): Terminated = {
val sys = system(s"$suite-$name", props)
def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)(block: ActorSystem[T] Unit): Terminated = {
val sys = system(s"$suite-$name", behavior)
try {
block(sys)
if (doTerminate) sys.terminate().futureValue else sys.whenTerminated.futureValue
@ -39,7 +39,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}
def `must start the guardian actor and terminate when it terminates`(): Unit = {
val t = withSystem("a", Props(Total[Probe] { p p.replyTo ! p.msg; Stopped }), doTerminate = false) { sys
val t = withSystem("a", Total[Probe] { p p.replyTo ! p.msg; Stopped }, doTerminate = false) { sys
val inbox = Inbox[String]("a")
sys ! Probe("hello", inbox.ref)
eventually { inbox.hasMessages should ===(true) }
@ -52,11 +52,11 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
def `must terminate the guardian actor`(): Unit = {
val inbox = Inbox[String]("terminate")
val sys = system("terminate", Props(Full[Probe] {
val sys = system("terminate", Full[Probe] {
case Sig(ctx, PostStop)
inbox.ref ! "done"
Same
}))
})
sys.terminate().futureValue
inbox.receiveAll() should ===("done" :: Nil)
}
@ -64,19 +64,19 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
def `must log to the event stream`(): Unit = pending
def `must have a name`(): Unit =
withSystem("name", Props(Empty[String])) { sys
withSystem("name", Empty[String]) { sys
sys.name should ===(suite + "-name")
}
def `must report its uptime`(): Unit =
withSystem("uptime", Props(Empty[String])) { sys
withSystem("uptime", Empty[String]) { sys
sys.uptime should be < 1L
Thread.sleep(1000)
sys.uptime should be >= 1L
}
def `must have a working thread factory`(): Unit =
withSystem("thread", Props(Empty[String])) { sys
withSystem("thread", Empty[String]) { sys
val p = Promise[Int]
sys.threadFactory.newThread(new Runnable {
def run(): Unit = p.success(42)
@ -85,7 +85,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}
def `must be able to run Futures`(): Unit =
withSystem("futures", Props(Empty[String])) { sys
withSystem("futures", Empty[String]) { sys
val f = Future(42)(sys.executionContext)
f.futureValue should ===(42)
}
@ -93,12 +93,12 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}
object `An ActorSystemImpl` extends CommonTests {
def system[T](name: String, props: Props[T]): ActorSystem[T] = ActorSystem(name, props)
def system[T](name: String, behavior: Behavior[T]): ActorSystem[T] = ActorSystem(name, behavior)
def suite = "native"
// this is essential to complete ActorCellSpec, see there
def `must correctly treat Watch dead letters`(): Unit =
withSystem("deadletters", Props(Empty[String])) { sys
withSystem("deadletters", Empty[String]) { sys
val client = new DebugRef[Int](sys.path / "debug", true)
sys.deadLetters.sorry.sendSystem(Watch(sys, client))
client.receiveAll() should ===(Left(DeathWatchNotification(sys, null)) :: Nil)
@ -106,7 +106,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}
object `An ActorSystemAdapter` extends CommonTests {
def system[T](name: String, props: Props[T]): ActorSystem[T] = ActorSystem.adapter(name, props)
def system[T](name: String, behavior: Behavior[T]): ActorSystem[T] = ActorSystem.adapter(name, behavior)
def suite = "adapter"
}
}

View file

@ -25,34 +25,39 @@ class ReceiverSpec extends TypedSpec {
private def afterGetOneFirst(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetOne(Duration.Zero)(dummyInbox.ref))
private def afterGetOneLater(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.second)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetOneTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[InternalCommand[Msg]]].message(ctx.asInstanceOf[ActorContext[InternalCommand[Msg]]], ReceiveTimeout()).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetAll(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def afterGetAllTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def setup(name: String, behv: Behavior[Command[Msg]] = behavior[Msg])(
proc: (EffectfulActorContext[Command[Msg]], EffectfulActorContext[Msg], Inbox[Replies[Msg]]) Unit): Unit =
for (Setup(description, behv, messages, effects) startingPoints) {
val ctx = new EffectfulActorContext("ctx", Props(ScalaDSL.ContextAware(behv)), nativeSystem)
val ctx = new EffectfulActorContext("ctx", ScalaDSL.ContextAware(behv), 1000, nativeSystem)
withClue(s"[running for starting point '$description' (${ctx.currentBehavior})]: ") {
dummyInbox.receiveAll() should have size messages
ctx.getAllEffects() should have size effects

View file

@ -13,17 +13,17 @@ class ReceptionistSpec extends TypedSpec {
trait ServiceA
case object ServiceKeyA extends ServiceKey[ServiceA]
val propsA = Props(Static[ServiceA](msg ()))
val behaviorA = Static[ServiceA](msg ())
trait ServiceB
case object ServiceKeyB extends ServiceKey[ServiceB]
val propsB = Props(Static[ServiceB](msg ()))
val behaviorB = Static[ServiceB](msg ())
trait CommonTests {
implicit def system: ActorSystem[TypedSpec.Command]
def `must register a service`(): Unit = {
val ctx = new EffectfulActorContext("register", Props(behavior), system)
val ctx = new EffectfulActorContext("register", behavior, 1000, system)
val a = Inbox[ServiceA]("a")
val r = Inbox[Registered[_]]("r")
ctx.run(Register(ServiceKeyA, a.ref)(r.ref))
@ -37,7 +37,7 @@ class ReceptionistSpec extends TypedSpec {
}
def `must register two services`(): Unit = {
val ctx = new EffectfulActorContext("registertwo", Props(behavior), system)
val ctx = new EffectfulActorContext("registertwo", behavior, 1000, system)
val a = Inbox[ServiceA]("a")
val r = Inbox[Registered[_]]("r")
ctx.run(Register(ServiceKeyA, a.ref)(r.ref))
@ -54,7 +54,7 @@ class ReceptionistSpec extends TypedSpec {
}
def `must register two services with the same key`(): Unit = {
val ctx = new EffectfulActorContext("registertwosame", Props(behavior), system)
val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system)
val a1 = Inbox[ServiceA]("a1")
val r = Inbox[Registered[_]]("r")
ctx.run(Register(ServiceKeyA, a1.ref)(r.ref))
@ -71,7 +71,7 @@ class ReceptionistSpec extends TypedSpec {
}
def `must unregister services when they terminate`(): Unit = {
val ctx = new EffectfulActorContext("registertwosame", Props(behavior), system)
val ctx = new EffectfulActorContext("registertwosame", behavior, 1000, system)
val r = Inbox[Registered[_]]("r")
val a = Inbox[ServiceA]("a")
ctx.run(Register(ServiceKeyA, a.ref)(r.ref))
@ -108,8 +108,8 @@ class ReceptionistSpec extends TypedSpec {
StepWise[Registered[ServiceA]] { (ctx, startWith)
val self = ctx.self
startWith.withKeepTraces(true) {
val r = ctx.spawnAnonymous(Props(behavior))
val s = ctx.spawnAnonymous(propsA)
val r = ctx.spawnAnonymous(behavior)
val s = ctx.spawnAnonymous(behaviorA)
val f = r ? Register(ServiceKeyA, s)
r ! Register(ServiceKeyA, s)(self)
(f, s)