Merge pull request #22577 from akka/wip-typed-IntroSpec-patriknw

add Akka Typed Java API #22293
This commit is contained in:
Patrik Nordwall 2017-03-16 22:28:22 +01:00 committed by GitHub
commit cdc1eddc3b
30 changed files with 1871 additions and 828 deletions

View file

@ -5,8 +5,8 @@ package docs.akka.typed
//#imports
import akka.typed._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.Actor._
import akka.typed.scaladsl.AskPattern._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await
@ -21,7 +21,7 @@ object IntroSpec {
final case class Greet(whom: String, replyTo: ActorRef[Greeted])
final case class Greeted(whom: String)
val greeter = Static[Greet] { msg =>
val greeter = Stateless[Greet] { (_, msg)
println(s"Hello ${msg.whom}!")
msg.replyTo ! Greeted(msg.whom)
}
@ -50,22 +50,21 @@ object IntroSpec {
//#chatroom-protocol
//#chatroom-behavior
val behavior: Behavior[GetSession] =
ContextAware[Command] { ctx =>
var sessions = List.empty[ActorRef[SessionEvent]]
Static {
case GetSession(screenName, client) =>
sessions ::= client
def chatRoom(sessions: List[ActorRef[SessionEvent]] = List.empty): Behavior[Command] =
Stateful[Command] { (ctx, msg)
msg match {
case GetSession(screenName, client)
val wrapper = ctx.spawnAdapter {
p: PostMessage => PostSessionMessage(screenName, p.message)
p: PostMessage PostSessionMessage(screenName, p.message)
}
client ! SessionGranted(wrapper)
case PostSessionMessage(screenName, message) =>
chatRoom(client :: sessions)
case PostSessionMessage(screenName, message)
val mp = MessagePosted(screenName, message)
sessions foreach (_ ! mp)
Same
}
}.narrow // only expose GetSession to the outside
}
//#chatroom-behavior
}
//#chatroom-actor
@ -76,6 +75,7 @@ class IntroSpec extends TypedSpec {
import IntroSpec._
def `must say hello`(): Unit = {
// TODO Implicits.global is not something we would like to encourage in docs
//#hello-world
import HelloWorld._
// using global pool since we want to run tasks after system.terminate
@ -86,8 +86,8 @@ class IntroSpec extends TypedSpec {
val future: Future[Greeted] = system ? (Greet("world", _))
for {
greeting <- future.recover { case ex => ex.getMessage }
done <- { println(s"result: $greeting"); system.terminate() }
greeting future.recover { case ex ex.getMessage }
done { println(s"result: $greeting"); system.terminate() }
} println("system terminated")
//#hello-world
}
@ -96,32 +96,40 @@ class IntroSpec extends TypedSpec {
//#chatroom-gabbler
import ChatRoom._
val gabbler: Behavior[SessionEvent] =
Total {
case SessionDenied(reason) =>
println(s"cannot start chat room session: $reason")
Stopped
case SessionGranted(handle) =>
handle ! PostMessage("Hello World!")
Same
case MessagePosted(screenName, message) =>
println(s"message has been posted by '$screenName': $message")
Stopped
val gabbler =
Stateful[SessionEvent] { (_, msg)
msg match {
case SessionDenied(reason)
println(s"cannot start chat room session: $reason")
Stopped
case SessionGranted(handle)
handle ! PostMessage("Hello World!")
Same
case MessagePosted(screenName, message)
println(s"message has been posted by '$screenName': $message")
Stopped
}
}
//#chatroom-gabbler
//#chatroom-main
val main: Behavior[akka.NotUsed] =
Full {
case Sig(ctx, PreStart) =>
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Same
case Sig(_, Terminated(ref)) =>
Stopped
}
Stateful(
behavior = (_, _) Unhandled,
signal = { (ctx, sig)
sig match {
case PreStart
val chatRoom = ctx.spawn(ChatRoom.chatRoom(), "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Same
case Terminated(ref)
Stopped
case _
Unhandled
}
})
val system = ActorSystem("ChatRoomDemo", main)
Await.result(system.whenTerminated, 1.second)

View file

@ -30,8 +30,8 @@ supplies so that the :class:`HelloWorld` Actor can send back the confirmation
message.
The behavior of the Actor is defined as the :meth:`greeter` value with the help
of the :class:`Static` behavior constructor—there are many different ways of
formulating behaviors as we shall see in the following. The “static” behavior
of the :class:`Stateless` behavior constructor—there are many different ways of
formulating behaviors as we shall see in the following. The “stateless” behavior
is not capable of changing in response to a message, it will stay the same
until the Actor is stopped by its parent.
@ -175,10 +175,11 @@ as the following:
.. includecode:: code/docs/akka/typed/IntroSpec.scala#chatroom-behavior
The core of this behavior is again static, the chat room itself does not change
The core of this behavior is stateful, the chat room itself does not change
into something else when sessions are established, but we introduce a variable
that tracks the opened sessions. When a new :class:`GetSession` command comes
in we add that client to the list and then we need to create the sessions
that tracks the opened sessions. Note that by using a method parameter a ``var``
is not needed. When a new :class:`GetSession` command comes in we add that client to the
list that is in the returned behavior. Then we also need to create the sessions
:class:`ActorRef` that will be used to post messages. In this case we want to
create a very simple Actor that just repackages the :class:`PostMessage`
command into a :class:`PostSessionMessage` command which also includes the
@ -194,15 +195,8 @@ clients. But we do not want to give the ability to send
:class:`PostSessionMessage` commands to arbitrary clients, we reserve that
right to the wrappers we create—otherwise clients could pose as completely
different screen names (imagine the :class:`GetSession` protocol to include
authentication information to further secure this). Therefore we narrow the
behavior down to only accepting :class:`GetSession` commands before exposing it
to the world, hence the type of the ``behavior`` value is
:class:`Behavior[GetSession]` instead of :class:`Behavior[Command]`.
Narrowing the type of a behavior is always a safe operation since it only
restricts what clients can do. If we were to widen the type then clients could
send other messages that were not foreseen while writing the source code for
the behavior.
authentication information to further secure this). Therefore :class:`PostSessionMessage`
has ``private`` visibility and can't be created outside the actor.
If we did not care about securing the correspondence between a session and a
screen name then we could change the protocol such that :class:`PostMessage` is
@ -216,13 +210,6 @@ former simply speaks more languages than the latter. The opposite would be
problematic, so passing an :class:`ActorRef[PostSessionMessage]` where
:class:`ActorRef[Command]` is required will lead to a type error.
The final piece of this behavior definition is the :class:`ContextAware`
decorator that we use in order to obtain access to the :class:`ActorContext`
within the :class:`Static` behavior definition. This decorator invokes the
provided function when the first message is received and thereby creates the
real behavior that will be used going forward—the decorator is discarded after
it has done its job.
Trying it out
-------------
@ -261,11 +248,9 @@ Actor will perform its job on its own accord, we do not need to send messages
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
only external messages, they also are notified of certain system events,
so-called Signals. In order to get access to those we choose to implement this
particular one using the :class:`Full` behavior decorator. The name stems from
the fact that within this we have full access to all aspects of the Actor. The
provided function will be invoked for signals (wrapped in :class:`Sig`) or user
messages (wrapped in :class:`Msg`) and the wrapper also contains a reference to
the :class:`ActorContext`.
particular one using the :class:`Stateful` behavior decorator. The
provided ``signal`` function will be invoked for signals (subclasses of :class:`Signal`)
or the ``mesg`` function for user messages.
This particular main Actor reacts to two signals: when it is started it will
first receive the :class:`PreStart` signal, upon which the chat room and the

View file

@ -0,0 +1,385 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import static akka.typed.scaladsl.Actor.Empty;
import static akka.typed.scaladsl.Actor.Ignore;
import static akka.typed.scaladsl.Actor.Same;
import static akka.typed.scaladsl.Actor.Stopped;
import static akka.typed.scaladsl.Actor.Unhandled;
import java.util.function.Function;
import akka.japi.function.Function2;
import akka.japi.function.Procedure2;
import akka.japi.pf.PFBuilder;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.PreStart;
import akka.typed.Signal;
import akka.typed.patterns.Restarter;
import akka.typed.scaladsl.Actor.Widened;
import scala.reflect.ClassTag;
public abstract class Actor {
/*
* This DSL is implemented in Java in order to ensure consistent usability from Java,
* taking possible Scala oddities out of the equation. There is some duplication in
* the behavior implementations, but that is unavoidable if both DSLs shall offer the
* same runtime performance (especially concerning allocations for function converters).
*/
private static class Deferred<T> extends Behavior<T> {
final akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer;
public Deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
this.producer = producer;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
if (msg instanceof PreStart) {
return Behavior.preStart(producer.apply(ctx), ctx);
} else
throw new IllegalStateException("Deferred behavior must receive PreStart as first signal");
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
throw new IllegalStateException("Deferred behavior must receive PreStart as first signal");
}
}
private static class Stateful<T> extends Behavior<T> {
final Function2<ActorContext<T>, T, Behavior<T>> message;
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
public Stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
this.signal = signal;
this.message = message;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return signal.apply(ctx, msg);
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
return message.apply(ctx, msg);
}
}
private static class Stateless<T> extends Behavior<T> {
final Procedure2<ActorContext<T>, T> message;
public Stateless(Procedure2<ActorContext<T>, T> message) {
this.message = message;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return Same();
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
message.apply(ctx, msg);
return Same();
}
}
private static class Tap<T> extends Behavior<T> {
final Procedure2<ActorContext<T>, Signal> signal;
final Procedure2<ActorContext<T>, T> message;
final Behavior<T> behavior;
public Tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
this.signal = signal;
this.message = message;
this.behavior = behavior;
}
private Behavior<T> canonicalize(Behavior<T> behv) {
if (Behavior.isUnhandled(behv))
return Unhandled();
else if (behv == Same())
return Same();
else if (Behavior.isAlive(behv))
return new Tap<T>(signal, message, behv);
else
return Stopped();
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
signal.apply(ctx, msg);
return canonicalize(behavior.management(ctx, msg));
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
message.apply(ctx, msg);
return canonicalize(behavior.message(ctx, msg));
}
}
/**
* Mode selector for the {@link #restarter} wrapper that decides whether an actor
* upon a failure shall be restarted (to clean initial state) or resumed (keep
* on running, with potentially compromised state).
*
* Resuming is less safe. If you use <code>OnFailure.RESUME</code> you should at least
* not hold mutable data fields or collections within the actor as those might
* be in an inconsistent state (the exception might have interrupted normal
* processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*/
public enum OnFailure {
RESUME, RESTART;
}
private static Function2<Object, Object, Object> _unhandledFun = (ctx, msg) -> Unhandled();
@SuppressWarnings("unchecked")
private static <T> Function2<ActorContext<T>, Signal, Behavior<T>> unhandledFun() {
return (Function2<ActorContext<T>, Signal, Behavior<T>>) (Object) _unhandledFun;
}
private static Procedure2<Object, Object> _doNothing = (ctx, msg) -> {
};
@SuppressWarnings("unchecked")
private static <T> Procedure2<ActorContext<T>, Signal> doNothing() {
return (Procedure2<ActorContext<T>, Signal>) (Object) _doNothing;
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
* If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(message, unhandledFun());
}
/**
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
* If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
return new Stateful<T>(message, signal);
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateless because it cannot be replaced by
* another one after it has been installed. It is most useful for leaf actors
* that do not create child actors themselves.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> stateless(Procedure2<ActorContext<T>, T> message) {
return new Stateless<T>(message);
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior. This is provided in order to avoid the
* allocation overhead of recreating the current behavior where that is not
* necessary.
*
* @return pseudo-behavior marking no change
*/
static public <T> Behavior<T> same() {
return Same();
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior, including the hint that the message has not
* been handled. This hint may be used by composite behaviors that delegate
* (partial) handling to other behaviors.
*
* @return pseudo-behavior marking unhandled
*/
static public <T> Behavior<T> unhandled() {
return Unhandled();
}
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*
* @return the inert behavior
*/
static public <T> Behavior<T> stopped() {
return Stopped();
}
/**
* A behavior that treats every incoming message as unhandled.
*
* @return the empty behavior
*/
static public <T> Behavior<T> empty() {
return Empty();
}
/**
* A behavior that ignores every incoming message and returns same.
*
* @return the inert behavior
*/
static public <T> Behavior<T> ignore() {
return Ignore();
}
/**
* Behavior decorator that allows you to perform any side-effect before a
* signal or message is delivered to the wrapped behavior. The wrapped
* behavior can evolve (i.e. be stateful) without needing to be wrapped in a
* <code>tap(...)</code> call again.
*
* @param signal
* procedure to invoke with the {@link ActorContext} and the
* {@link akka.typed.Signal} as arguments before delivering the signal to
* the wrapped behavior
* @param message
* procedure to invoke with the {@link ActorContext} and the received
* message as arguments before delivering the signal to the wrapped
* behavior
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
return new Tap<T>(signal, message, behavior);
}
/**
* Behavior decorator that copies all received message to the designated
* monitor {@link akka.typed.ActorRef} before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. be stateful) without needing to be
* wrapped in a <code>monitor(...)</code> call again.
*
* @param monitor
* ActorRef to which to copy all received messages
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> monitor(ActorRef<T> monitor, Behavior<T> behavior) {
return new Tap<T>(doNothing(), (ctx, msg) -> monitor.tell(msg), behavior);
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of <code>Thr</code> will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify that the actor shall not be restarted but
* resumed. This entails keeping the same state as before the exception was
* thrown and is thus less safe. If you use <code>OnFailure.RESUME</code> you should at
* least not hold mutable data fields or collections within the actor as those
* might be in an inconsistent state (the exception might have interrupted
* normal processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*
* @param clazz
* the type of exceptions that shall be caught
* @param mode
* whether to restart or resume the actor upon a caught failure
* @param initialBehavior
* the initial behavior, that is also restored during a restart
* @return the wrapped behavior
*/
static public <T, Thr extends Throwable> Behavior<T> restarter(Class<Thr> clazz, OnFailure mode,
Behavior<T> initialBehavior) {
final ClassTag<Thr> catcher = akka.japi.Util.classTag(clazz);
return new Restarter<T, Thr>(initialBehavior, mode == OnFailure.RESUME, initialBehavior, catcher);
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* <code><pre>
* Behavior&lt;String> s = stateless((ctx, msg) -> System.out.println(msg))
* Behavior&lt;Number> n = widened(s, pf -> pf.
* match(BigInteger.class, i -> "BigInteger(" + i + ")").
* match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
* // drop all other kinds of Number
* );
* </pre></code>
*
* @param behavior
* the behavior that will receive the selected messages
* @param selector
* a partial function builder for describing the selection and
* transformation
* @return a behavior of the widened type
*/
static public <T, U> Behavior<U> widened(Behavior<T> behavior, Function<PFBuilder<U, T>, PFBuilder<U, T>> selector) {
return new Widened<T, U>(behavior, selector.apply(new PFBuilder<>()).build());
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior
* creation is deferred to the child actor instead of running within the
* parent.
*
* @param producer
* behavior factory that takes the child actors context as argument
* @return the deferred behavior
*/
static public <T> Behavior<T> deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
return new Deferred<T>(producer);
}
}

View file

@ -0,0 +1,159 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import akka.actor.Cancellable;
import akka.typed.ActorRef;
import akka.typed.ActorSystem;
import akka.typed.Behavior;
import akka.typed.DeploymentConfig;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;
/**
* An Actor is given by the combination of a {@link akka.typed.Behavior} and a context in which
* this behavior is executed. As per the Actor Model an Actor can perform the
* following actions when processing a message:
*
* <ul>
* <li>send a finite number of messages to other Actors it knows</li>
* <li>create a finite number of Actors</li>
* <li>designate the behavior for the next message</li>
* </ul>
*
* In Akka the first capability is accessed by using the {@link akka.typed.ActorRef#tell}
* method, the second is provided by {@link akka.typed.ActorContext#spawn} and the
* third is implicit in the signature of {@link akka.typed.Behavior} in that the next behavior
* is always returned from the message processing logic.
*
* An <code>ActorContext</code> in addition provides access to the Actors own identity
* ({@link #getSelf self}), the {@link akka.typed.ActorSystem} it is part of, methods for querying the list
* of child Actors it created, access to {@link akka.typed.Terminated DeathWatch} and timed
* message scheduling.
*/
public interface ActorContext<T> {
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different {@link akka.typed.ActorRef}.
*/
public ActorRef<T> getSelf();
/**
* Return the mailbox capacity that was configured by the parent for this
* actor.
*/
public int getMailboxCapacity();
/**
* The {@link akka.typed.ActorSystem} to which this Actor belongs.
*/
public ActorSystem<Void> getSystem();
/**
* The list of child Actors created by this Actor during its lifetime that are
* still alive, in no particular order.
*/
public List<ActorRef<Void>> getChildren();
/**
* The named child Actor if it is alive.
*/
public Optional<ActorRef<Void>> getChild(String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior, DeploymentConfig deployment);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name, DeploymentConfig deployment);
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in {@link akka.typed.ActorRef} points to a current child Actor
*/
public boolean stop(ActorRef<?> child);
/**
* Register for {@link akka.typed.Terminated} notification once the Actor identified by the
* given {@link akka.typed.ActorRef} terminates. This notification is also generated when the
* {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed
* (e.g. in reaction to being unreachable).
*/
public <U> ActorRef<U> watch(ActorRef<U> other);
/**
* Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated}
* notification will not subsequently be received for the referenced Actor.
*/
public <U> ActorRef<U> unwatch(ActorRef<U> other);
/**
* Schedule the sending of a notification in case no other message is received
* during the given period of time. The timeout starts anew with each received
* message. Provide <code>scala.concurrent.duration.Duration.Undefined</code> to switch off this mechanism.
*/
public void setReceiveTimeout(FiniteDuration d, T msg);
/**
* Cancel the sending of receive timeout notifications.
*/
public void cancelReceiveTimeout();
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled by
* invoking {@link akka.actor.Cancellable#cancel} on the returned handle.
*/
public <U> Cancellable schedule(FiniteDuration delay, ActorRef<U> target, U msg);
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like <code>scala.concurrent.Future</code> combinators.
*/
public ExecutionContextExecutor getExecutionContext();
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given <code>name</code> argument is appended,
* with an inserted hyphen between these two parts. Therefore the given <code>name</code>
* argument does not need to be unique within the scope of the parent actor.
*/
public <U> ActorRef<U> spawnAdapter(Function<U, T> f, String name);
/**
* Create an anonymous child actor that will wrap messages such that other
* Actors protocols can be ingested by this Actor. You are strongly advised
* to cache these ActorRefs or to stop them when no longer needed.
*/
public <U> ActorRef<U> spawnAdapter(Function<U, T> f);
}

View file

@ -9,136 +9,55 @@ import akka.util.Helpers
import akka.{ actor untyped }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import java.util.Optional
import java.util.ArrayList
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
* This trait is not meant to be extended by user code. If you do so, you may
* lose binary compatibility.
*/
trait ActorContext[T] {
@DoNotInherit
@ApiMayChange
trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
def getChild(name: String): Optional[ActorRef[Void]] =
child(name) match {
case Some(c) Optional.of(c.upcast[Void])
case None Optional.empty()
}
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
def getChildren(): java.util.List[akka.typed.ActorRef[Void]] = {
val c = children
val a = new ArrayList[ActorRef[Void]](c.size)
val i = c.iterator
while (i.hasNext) a.add(i.next().upcast[Void])
a
}
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
def getExecutionContext(): scala.concurrent.ExecutionContextExecutor =
executionContext
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
def getMailboxCapacity(): Int =
mailboxCapacity
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
def getSelf(): akka.typed.ActorRef[T] =
self
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
def getSystem(): akka.typed.ActorSystem[Void] =
system.asInstanceOf[ActorSystem[Void]]
/**
* Create a child Actor from the given [[Props]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
def spawn[U](behavior: akka.typed.Behavior[U], name: String): akka.typed.ActorRef[U] =
spawn(behavior, name, EmptyDeploymentConfig)
/**
* Create a child Actor from the given [[Props]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
def spawnAnonymous[U](behavior: akka.typed.Behavior[U]): akka.typed.ActorRef[U] =
spawnAnonymous(behavior, EmptyDeploymentConfig)
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop(child: ActorRef[Nothing]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): ActorRef[U]
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): ActorRef[U]
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable]] `cancel` on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.typed.ActorRef[U] =
spawnAdapter(f.apply _)
def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.typed.ActorRef[U] =
spawnAdapter(f.apply _, name)
}
/**
@ -180,7 +99,7 @@ class StubbedActorContext[T](
* Do not actually stop the child inbox, only simulate the liveness check.
* Removal is asynchronous, explicit removeInbox is needed from outside afterwards.
*/
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
_children.get(child.path.name) match {
case None false
case Some(inbox) inbox.ref == child

View file

@ -13,11 +13,11 @@ import scala.concurrent.Future
* only during the Actors lifetime and allows messages to be sent to that
* Actor instance. Sending a message to an Actor that has terminated before
* receiving the message will lead to that message being discarded; such
* messages are delivered to the [[akka.actor.DeadLetter]] channel of the
* [[akka.event.EventStream]] on a best effort basis
* messages are delivered to the [[DeadLetter]] channel of the
* [[EventStream]] on a best effort basis
* (i.e. this delivery is not reliable).
*/
abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[Nothing]] { this: internal.ActorRefImpl[T]
abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[_]] { this: internal.ActorRefImpl[T]
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
@ -49,7 +49,7 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act
/**
* Comparison takes path and the unique id of the actor cell into account.
*/
final override def compareTo(other: ActorRef[Nothing]) = {
final override def compareTo(other: ActorRef[_]) = {
val x = this.path compareTo other.path
if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1
else x

View file

@ -13,14 +13,18 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.typed.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.util.Timeout
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
/**
* An ActorSystem is home to a hierarchy of Actors. It is created using
* [[ActorSystem$]] `apply` from a [[Props]] object that describes the root
* [[ActorSystem#apply]] from a [[Behavior]] object that describes the root
* Actor of this hierarchy and which will create all other Actors beneath it.
* A system also implements the [[ActorRef]] type, and sending a message to
* the system directs that message to the root Actor.
*/
@DoNotInherit
@ApiMayChange
trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T]
/**
@ -131,11 +135,11 @@ trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒
* Ask the system guardian of this system to create an actor from the given
* behavior and deployment and with the given name. The name does not need to
* be unique since the guardian will prefix it with a running number when
* creating the child actor. The timeout sets the timeout used for the [[AskPattern$]]
* creating the child actor. The timeout sets the timeout used for the [[akka.typed.scaladsl.AskPattern$]]
* invocation when asking the guardian.
*
* The returned Future of [[ActorRef]] may be converted into an [[ActorRef]]
* to which messages can immediately be sent by using the [[ActorRef$apply]]
* to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]]
* method.
*/
def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]]

View file

@ -3,6 +3,8 @@
*/
package akka.typed
import akka.annotation.InternalApi
/**
* The behavior of an actor defines how it reacts to the messages that it
* receives. The message may either be of the type that the Actor declares
@ -13,6 +15,10 @@ package akka.typed
* Behaviors can be formulated in a number of different ways, either by
* creating a derived class or by employing factory methods like the ones
* in the [[ScalaDSL$]] object.
*
* Closing over ActorContext makes a Behavior immobile: it cannot be moved to
* another context and executed there, and therefore it cannot be replicated or
* forked either.
*/
abstract class Behavior[T] {
/**
@ -30,6 +36,7 @@ abstract class Behavior[T] {
* Code calling this method should use [[Behavior$]] `canonicalize` to replace
* the special objects with real Behaviors.
*/
@throws(classOf[Exception])
def management(ctx: ActorContext[T], msg: Signal): Behavior[T]
/**
@ -45,6 +52,7 @@ abstract class Behavior[T] {
* Code calling this method should use [[Behavior$]] `canonicalize` to replace
* the special objects with real Behaviors.
*/
@throws(classOf[Exception])
def message(ctx: ActorContext[T], msg: T): Behavior[T]
/**
@ -55,14 +63,6 @@ abstract class Behavior[T] {
def narrow[U <: T]: Behavior[U] = this.asInstanceOf[Behavior[U]]
}
/*
* FIXME
*
* Closing over ActorContext makes a Behavior immobile: it cannot be moved to
* another context and executed there, and therefore it cannot be replicated or
* forked either.
*/
object Behavior {
/**
@ -95,6 +95,12 @@ object Behavior {
override def toString = "Unhandled"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] val unhandledSignal: (ActorContext[Nothing], Signal) Behavior[Nothing] =
(_, _) unhandledBehavior
/**
* INTERNAL API.
*/
@ -123,8 +129,8 @@ object Behavior {
/**
* Given a possibly special behavior (same or unhandled) and a
* current behavior (which defines the meaning of encountering a `Same`
* behavior) this method unwraps the behavior such that the innermost behavior
* is returned, i.e. it removes the decorations.
* behavior) this method computes the next behavior, suitable for passing a
* message or signal.
*/
def canonicalize[T](behavior: Behavior[T], current: Behavior[T]): Behavior[T] =
behavior match {
@ -133,6 +139,11 @@ object Behavior {
case other other
}
/**
* Validate the given behavior as a suitable initial actor behavior; most
* notably the behavior can neither be `Same` nor `Unhandled`. Starting
* out with a `Stopped` behavior is allowed, though.
*/
def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] =
behavior match {
case `sameBehavior` | `unhandledBehavior`
@ -140,12 +151,22 @@ object Behavior {
case x x
}
/**
* Validate the given behavior as initial, pass it a [[PreStart]] message
* and canonicalize the result.
*/
def preStart[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
val b = validateAsInitial(behavior)
if (isAlive(b)) canonicalize(b.management(ctx, PreStart), b) else b
}
/**
* Returns true if the given behavior is not stopped.
*/
def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne stoppedBehavior
/**
* Returns true if the given behavior is the special `Unhandled` marker.
*/
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq unhandledBehavior
}

View file

@ -73,7 +73,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
effectQueue.offer(Spawned(name))
super.spawn(behavior, name)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
effectQueue.offer(Stopped(child.path.name))
super.stop(child)
}

View file

@ -12,6 +12,12 @@ import scala.annotation.tailrec
import akka.actor.ActorRefProvider
import java.util.concurrent.ThreadLocalRandom
/**
* Utility for receiving messages outside of an actor. No methods are provided
* for synchronously awaiting a message, this is primarily useful for synchronous
* tests of behaviors that send messages to other actors, where an Inboxs ActorRef
* can conveniently be used as a stub.
*/
class Inbox[T](name: String) {
private val q = new ConcurrentLinkedQueue[T]

View file

@ -31,16 +31,21 @@ sealed trait Signal
* Lifecycle signal that is fired upon creation of the Actor. This will be the
* first message that the actor processes.
*/
@SerialVersionUID(1L)
final case object PreStart extends Signal
sealed abstract class PreStart extends Signal
final case object PreStart extends PreStart {
def instance: PreStart = this
}
/**
* Lifecycle signal that is fired upon restart of the Actor before replacing
* the behavior with the fresh one (i.e. this signal is received within the
* behavior that failed).
* behavior that failed). The replacement behavior will receive PreStart as its
* first signal.
*/
@SerialVersionUID(1L)
final case object PreRestart extends Signal
sealed abstract class PreRestart extends Signal
final case object PreRestart extends PreRestart {
def instance: PreRestart = this
}
/**
* Lifecycle signal that is fired after this actor and all its child actors
@ -51,8 +56,10 @@ final case object PreRestart extends Signal
* `Stopped` behavior then this signal will be ignored (i.e. the
* Stopped behavior will do nothing in reaction to it).
*/
@SerialVersionUID(1L)
final case object PostStop extends Signal
sealed abstract class PostStop extends Signal
final case object PostStop extends PostStop {
def instance: PostStop = this
}
/**
* Lifecycle signal that is fired when an Actor that was watched has terminated.
@ -64,70 +71,8 @@ final case object PostStop extends Signal
* have occurred. Termination of a remote Actor can also be effected by declaring
* the Actors home system as failed (e.g. as a result of being unreachable).
*/
@SerialVersionUID(1L)
final case class Terminated(ref: ActorRef[Nothing])(failed: Throwable) extends Signal {
def wasFailed: Boolean = failed ne null
def failure: Throwable = failed
def failureOption: Option[Throwable] = Option(failed)
}
/**
* FIXME correct this documentation when the Restarter behavior has been implemented
*
* The parent of an actor decides upon the fate of a failed child actor by
* encapsulating its next behavior in one of the four wrappers defined within
* this class.
*
* Failure responses have an associated precedence that ranks them, which is in
* descending importance:
*
* - Escalate
* - Stop
* - Restart
* - Resume
*/
object Failed {
sealed trait Decision
@SerialVersionUID(1L)
case object NoFailureResponse extends Decision
/**
* Resuming the child actor means that the result of processing the message
* on which it failed is just ignored, the previous state will be used to
* process the next message. The message that triggered the failure will not
* be processed again.
*/
@SerialVersionUID(1L)
case object Resume extends Decision
/**
* Restarting the child actor means resetting its behavior to the initial
* one that was provided during its creation (i.e. the one which was passed
* into the [[Props]] constructor). The previously failed behavior will
* receive a [[PreRestart]] signal before this happens and the replacement
* behavior will receive a [[PostRestart]] signal afterwards.
*/
@SerialVersionUID(1L)
case object Restart extends Decision
/**
* Stopping the child actor will free its resources and eventually
* (asynchronously) unregister its name from the parent. Completion of this
* process can be observed by watching the child actor and reacting to its
* [[Terminated]] signal.
*/
@SerialVersionUID(1L)
case object Stop extends Decision
/**
* The default response to a failure in a child actor is to escalate the
* failure, entailing that the parent actor fails as well. This is equivalent
* to an exception unwinding the call stack, but it applies to the supervision
* hierarchy instead.
*/
@SerialVersionUID(1L)
case object Escalate extends Decision
}

View file

@ -11,6 +11,7 @@ import akka.util.LineNumbers
* This object holds several behavior factories and combinators that can be
* used to construct Behavior instances.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
object ScalaDSL {
// FIXME check that all behaviors can cope with not getting PreStart as first message
@ -20,17 +21,20 @@ object ScalaDSL {
* Widen the type of this Behavior by providing a filter function that permits
* only a subtype of the widened set of messages.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def widen[U >: T](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher)
/**
* Combine the two behaviors such that incoming messages are distributed
* to both of them, each one evolving its state independently.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def &&(other: Behavior[T]): Behavior[T] = And(behavior, other)
/**
* Combine the two behaviors such that incoming messages are given first to
* the left behavior and are then only passed on to the right behavior if
* the left one returned Unhandled.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def ||(other: Behavior[T]): Behavior[T] = Or(behavior, other)
}
@ -40,6 +44,7 @@ object ScalaDSL {
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Widened[T, U >: T](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] {
private def postProcess(ctx: ActorContext[U], behv: Behavior[T]): Behavior[U] =
if (isUnhandled(behv)) Unhandled
@ -63,6 +68,7 @@ object ScalaDSL {
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Deferred[T](factory: () Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
@ -81,6 +87,7 @@ object ScalaDSL {
* avoid the allocation overhead of recreating the current behavior where
* that is not necessary.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]]
/**
@ -89,6 +96,7 @@ object ScalaDSL {
* message has not been handled. This hint may be used by composite
* behaviors that delegate (partial) handling to other behaviors.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]]
/*
@ -104,16 +112,19 @@ object ScalaDSL {
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]]
/**
* This behavior does not handle any inputs, it is completely inert.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]]
/**
* This behavior does not handle any inputs, it is completely inert.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]
/**
@ -122,17 +133,20 @@ object ScalaDSL {
* used by several of the behaviors defined in this DSL, see for example
* [[Full]].
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
sealed trait MessageOrSignal[T]
/**
* A message bundled together with the current [[ActorContext]].
*/
@SerialVersionUID(1L)
final case class Msg[T](ctx: ActorContext[T], msg: T) extends MessageOrSignal[T]
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Msg[T](ctx: scaladsl.ActorContext[T], msg: T) extends MessageOrSignal[T]
/**
* A signal bundled together with the current [[ActorContext]].
*/
@SerialVersionUID(1L)
final case class Sig[T](ctx: ActorContext[T], signal: Signal) extends MessageOrSignal[T]
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Sig[T](ctx: scaladsl.ActorContext[T], signal: Signal) extends MessageOrSignal[T]
/**
* This type of behavior allows to handle all incoming messages within
@ -144,9 +158,9 @@ object ScalaDSL {
* For the lifecycle notifications pertaining to the actor itself this
* behavior includes a fallback mechanism: an unhandled [[PreRestart]] signal
* will terminate all child actors (transitively) and then emit a [[PostStop]]
* signal in addition, whereas an unhandled [[PostRestart]] signal will emit
* an additional [[PreStart]] signal.
* signal in addition.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Full[T](behavior: PartialFunction[MessageOrSignal[T], Behavior[T]]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
lazy val fallback: (MessageOrSignal[T]) Behavior[T] = {
@ -173,6 +187,7 @@ object ScalaDSL {
* to create the supplied function then any message not matching the list of
* cases will fail this actor with a [[scala.MatchError]].
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class FullTotal[T](behavior: MessageOrSignal[T] Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal) = behavior(Sig(ctx, msg))
override def message(ctx: ActorContext[T], msg: T) = behavior(Msg(ctx, msg))
@ -189,6 +204,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Total[T](behavior: T Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match {
case _ Unhandled
@ -207,6 +223,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Partial[T](behavior: PartialFunction[T, Behavior[T]]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match {
case _ Unhandled
@ -220,6 +237,7 @@ object ScalaDSL {
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Tap[T](f: PartialFunction[MessageOrSignal[T], Unit], behavior: Behavior[T]) extends Behavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) Unhandled
@ -237,6 +255,7 @@ object ScalaDSL {
override def toString = s"Tap(${LineNumbers(f)},$behavior)"
}
object Tap {
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap({ case Msg(_, msg) monitor ! msg }, behavior)
}
@ -249,6 +268,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Static[T](behavior: T Unit) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
@ -267,6 +287,7 @@ object ScalaDSL {
* This decorator is useful for passing messages between the left and right
* sides of [[And]] and [[Or]] combinators.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class SynchronousSelf[T](f: ActorRef[T] Behavior[T]) extends Behavior[T] {
private class B extends Behavior[T] {
@ -308,9 +329,9 @@ object ScalaDSL {
* A behavior combinator that feeds incoming messages and signals both into
* the left and right sub-behavior and allows them to evolve independently of
* each other. When one of the sub-behaviors terminates the other takes over
* exclusively. When both sub-behaviors respond to a [[Failed]] signal, the
* response with the higher precedence is chosen (see [[Failed$]]).
* exclusively.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class And[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
@ -354,9 +375,9 @@ object ScalaDSL {
* each other. The message or signal is passed first into the left sub-behavior
* and only if that results in [[#Unhandled]] is it passed to the right
* sub-behavior. When one of the sub-behaviors terminates the other takes over
* exclusively. When both sub-behaviors respond to a [[Failed]] signal, the
* response with the higher precedence is chosen (see [[Failed$]]).
* exclusively.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Or[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] =
@ -418,10 +439,14 @@ object ScalaDSL {
* }
* }}}
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def SelfAware[T](behavior: ActorRef[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx.self), ctx)
case msg throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)")
new Behavior[T] {
override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] =
if (sig == PreStart) Behavior.preStart(behavior(ctx.self), ctx)
else throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $sig)")
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)")
}
/**
@ -438,10 +463,14 @@ object ScalaDSL {
* }
* }}}
*/
def ContextAware[T](behavior: ActorContext[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx), ctx)
case msg throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $msg)")
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def ContextAware[T](behavior: scaladsl.ActorContext[T] Behavior[T]): Behavior[T] =
new Behavior[T] {
override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] =
if (sig == PreStart) Behavior.preStart(behavior(ctx), ctx)
else throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $sig)")
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got ${msg.getClass})")
}
/**

View file

@ -22,7 +22,7 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo
ctx.spawnAnonymous(behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawn(behavior, name, deployment)
override def stop(child: ActorRef[Nothing]) =
override def stop(child: ActorRef[_]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef
val cell = ctx.asInstanceOf[akka.actor.ActorCell]

View file

@ -106,6 +106,7 @@ private[typed] class ActorCell[T](
val dispatcher = deployment.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
val capacity = deployment.firstOrElse(MailboxCapacity(system.settings.DefaultMailboxCapacity))
val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self)
// TODO uid is still needed
val ref = new LocalActorRef[U](self.path / name, cell)
cell.setSelf(ref)
childrenMap = childrenMap.updated(name, ref)
@ -120,7 +121,7 @@ private[typed] class ActorCell[T](
spawn(behavior, name, deployment)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
val name = child.path.name
childrenMap.get(name) match {
case None false

View file

@ -21,6 +21,7 @@ import scala.util.Success
import akka.util.Timeout
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger
import akka.typed.scaladsl.AskPattern
object ActorSystemImpl {
import ScalaDSL._

View file

@ -12,6 +12,7 @@ import akka.util.{ ReentrantGuard, Subclassification, SubclassifiedIndex }
import scala.collection.immutable
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.typed.scaladsl.AskPattern
/**
* INTERNAL API

View file

@ -1,122 +0,0 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.patterns
import scala.concurrent.duration._
import akka.typed.ActorRef
import scala.collection.immutable
import akka.typed.Behavior
import scala.concurrent.duration.Deadline
import akka.typed.ActorContext
import java.util.LinkedList
import scala.collection.JavaConverters._
import scala.collection.immutable.Queue
// FIXME make this nice again once the Actor Algebra is implemented
object Receiver {
import akka.typed.ScalaDSL._
sealed trait InternalCommand[T]
case class ReceiveTimeout[T]() extends InternalCommand[T]
sealed trait Command[T] extends InternalCommand[T]
/**
* Retrieve one message from the Receiver, waiting at most for the given duration.
*/
final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]
/**
* Retrieve all messages from the Receiver that it has queued after the given
* duration has elapsed.
*/
final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]
/**
* Retrieve the external address of this Receiver (i.e. the side at which it
* takes in the messages of type T.
*/
final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]
sealed trait Replies[T]
final case class GetOneResult[T](receiver: ActorRef[Command[T]], msg: Option[T]) extends Replies[T]
final case class GetAllResult[T](receiver: ActorRef[Command[T]], msgs: immutable.Seq[T]) extends Replies[T]
private final case class Enqueue[T](msg: T) extends Command[T]
def behavior[T]: Behavior[Command[T]] =
ContextAware[Any] { ctx
SynchronousSelf { syncself
Or(
empty(ctx).widen { case c: InternalCommand[t] c.asInstanceOf[InternalCommand[T]] },
Static[Any] {
case msg syncself ! Enqueue(msg)
})
}
}.narrow
private def empty[T](ctx: ActorContext[Any]): Behavior[InternalCommand[T]] =
Total {
case ExternalAddress(replyTo) { replyTo ! ctx.self; Same }
case g @ GetOne(d) if d <= Duration.Zero { g.replyTo ! GetOneResult(ctx.self, None); Same }
case g @ GetOne(d) asked(ctx, Queue(Asked(g.replyTo, Deadline.now + d)))
case g @ GetAll(d) if d <= Duration.Zero { g.replyTo ! GetAllResult(ctx.self, Nil); Same }
case g @ GetAll(d) { ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo)); Same }
case Enqueue(msg) queued(ctx, msg)
}
private def queued[T](ctx: ActorContext[Any], t: T): Behavior[InternalCommand[T]] = {
val queue = new LinkedList[T]
queue.add(t)
Total {
case ExternalAddress(replyTo)
replyTo ! ctx.self
Same
case g: GetOne[t]
g.replyTo ! GetOneResult(ctx.self, Some(queue.remove()))
if (queue.isEmpty) empty(ctx) else Same
case g @ GetAll(d) if d <= Duration.Zero
g.replyTo ! GetAllResult(ctx.self, queue.iterator.asScala.toVector)
empty(ctx)
case g @ GetAll(d)
ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo))
Same
case Enqueue(msg)
queue.add(msg)
Same
}
}
private case class Asked[T](replyTo: ActorRef[GetOneResult[T]], deadline: Deadline)
private def asked[T](ctx: ActorContext[Any], queue: Queue[Asked[T]]): Behavior[InternalCommand[T]] = {
ctx.setReceiveTimeout(queue.map(_.deadline).min.timeLeft, ReceiveTimeout())
Total {
case ReceiveTimeout()
val (overdue, remaining) = queue partition (_.deadline.isOverdue)
overdue foreach (a a.replyTo ! GetOneResult(ctx.self, None))
if (remaining.isEmpty) {
ctx.cancelReceiveTimeout()
empty(ctx)
} else asked(ctx, remaining)
case ExternalAddress(replyTo) { replyTo ! ctx.self; Same }
case g @ GetOne(d) if d <= Duration.Zero
g.replyTo ! GetOneResult(ctx.self, None)
asked(ctx, queue)
case g @ GetOne(d)
asked(ctx, queue enqueue Asked(g.replyTo, Deadline.now + d))
case g @ GetAll(d) if d <= Duration.Zero
g.replyTo ! GetAllResult(ctx.self, Nil)
asked(ctx, queue)
case g @ GetAll(d)
ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo))
asked(ctx, queue)
case Enqueue(msg)
val (ask, q) = queue.dequeue
ask.replyTo ! GetOneResult(ctx.self, Some(msg))
if (q.isEmpty) {
ctx.cancelReceiveTimeout()
empty(ctx)
} else asked(ctx, q)
}
}
}

View file

@ -7,6 +7,52 @@ package patterns
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.event.Logging
import akka.typed.scaladsl.Actor._
/**
* Simple supervision strategy that restarts the underlying behavior for all
* failures of type Thr.
*
* FIXME add limited restarts and back-off (with limited buffering or vacation responder)
* FIXME write tests that ensure that restart is canonicalizing PreStart result correctly
*/
final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean)(
behavior: Behavior[T] = initialBehavior) extends Behavior[T] {
private def restart(ctx: ActorContext[T]): Behavior[T] = {
try behavior.management(ctx, PreRestart) catch { case NonFatal(_) }
Behavior.canonicalize(initialBehavior.management(ctx, PreStart), initialBehavior)
}
private def canonical(b: Behavior[T]): Behavior[T] =
if (Behavior.isUnhandled(b)) Unhandled
else if (b eq Behavior.sameBehavior) Same
else if (!Behavior.isAlive(b)) Stopped
else if (b eq behavior) Same
else Restarter[T, Thr](initialBehavior, resume)(b)
override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
val b =
try behavior.management(ctx, signal)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage))
if (resume) behavior else restart(ctx)
}
canonical(b)
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
val b =
try behavior.message(ctx, msg)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage))
if (resume) behavior else restart(ctx)
}
canonical(b)
}
}
/**
* Simple supervision strategy that restarts the underlying behavior for all
@ -15,7 +61,7 @@ import akka.event.Logging
* FIXME add limited restarts and back-off (with limited buffering or vacation responder)
* FIXME write tests that ensure that all Behaviors are okay with getting PostRestart as first signal
*/
final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] {
final case class MutableRestarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] {
private[this] var current = initialBehavior
@ -34,7 +80,7 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav
if (resume) current else restart(ctx)
}
current = Behavior.canonicalize(b, current)
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
if (Behavior.isAlive(current)) this else Stopped
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
@ -46,14 +92,6 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav
if (resume) current else restart(ctx)
}
current = Behavior.canonicalize(b, current)
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
if (Behavior.isAlive(current)) this else Stopped
}
}
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) {
def wrap[T](b: Behavior[T]) = Restarter(Behavior.validateAsInitial(b), resume)(c)
}
def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume)
}

View file

@ -0,0 +1,389 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package scaladsl
import akka.util.LineNumbers
import scala.reflect.ClassTag
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import scala.deprecatedInheritance
import akka.typed.{ ActorContext AC }
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T]
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop(child: ActorRef[_]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): ActorRef[U]
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): ActorRef[U]
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable#cancel]] on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
}
@ApiMayChange
object Actor {
import Behavior._
// FIXME check that all behaviors can cope with not getting PreStart as first message
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* see also [[Actor.Widened]]
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher)
}
private val _nullFun = (_: Any) null
private def nullFun[T] = _nullFun.asInstanceOf[Any T]
private implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
def as[U] = ctx.asInstanceOf[AC[U]]
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* Stateless[String]((ctx, msg) => println(msg)).widen[Number] {
* case b: BigDecimal => s"BigDecimal(&dollar;b)"
* case i: BigInteger => s"BigInteger(&dollar;i)"
* // drop all other kinds of Number
* }
* }}}
*/
final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] {
private def postProcess(behv: Behavior[T]): Behavior[U] =
if (isUnhandled(behv)) Unhandled
else if (isAlive(behv)) {
val next = canonicalize(behv, behavior)
if (next eq behavior) Same else Widened(next, matcher)
} else Stopped
override def management(ctx: AC[U], msg: Signal): Behavior[U] =
postProcess(behavior.management(ctx.as[T], msg))
override def message(ctx: AC[U], msg: U): Behavior[U] =
matcher.applyOrElse(msg, nullFun) match {
case null Unhandled
case transformed postProcess(behavior.message(ctx.as[T], transformed))
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
final case class Deferred[T](factory: ActorContext[T] Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
Behavior.preStart(factory(ctx), ctx)
}
override def message(ctx: AC[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
* avoid the allocation overhead of recreating the current behavior where
* that is not necessary.
*/
def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior, including the hint that the
* message has not been handled. This hint may be used by composite
* behaviors that delegate (partial) handling to other behaviors.
*/
def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]]
/*
* TODO write a Behavior that waits for all child actors to stop and then
* runs some cleanup before stopping. The factory for this behavior should
* stop and watch all children to get the process started.
*/
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*/
def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]]
/**
* A behavior that treats every incoming message as unhandled.
*/
def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]]
/**
* A behavior that ignores every incoming message and returns same.
*/
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]
/**
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
*/
final case class Stateful[T](
behavior: (ActorContext[T], T) Behavior[T],
signal: (ActorContext[T], Signal) Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) Behavior[T]])
extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T) = behavior(ctx, msg)
override def toString = s"Stateful(${LineNumbers(behavior)})"
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateless because it cannot be replaced by
* another one after it has been installed. It is most useful for leaf actors
* that do not create child actors themselves.
*/
final case class Stateless[T](behavior: (ActorContext[T], T) Any) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: AC[T], msg: T): Behavior[T] = {
behavior(ctx, msg)
this
}
override def toString = s"Static(${LineNumbers(behavior)})"
}
/**
* This type of Behavior wraps another Behavior while allowing you to perform
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
final case class Tap[T](
signal: (ActorContext[T], Signal) _,
mesg: (ActorContext[T], T) _,
behavior: Behavior[T]) extends Behavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) Unhandled
else if (behv eq sameBehavior) Same
else if (isAlive(behv)) Tap(signal, mesg, behv)
else Stopped
override def management(ctx: AC[T], msg: Signal): Behavior[T] = {
signal(ctx, msg)
canonical(behavior.management(ctx, msg))
}
override def message(ctx: AC[T], msg: T): Behavior[T] = {
mesg(ctx, msg)
canonical(behavior.message(ctx, msg))
}
override def toString = s"Tap(${LineNumbers(signal)},${LineNumbers(mesg)},$behavior)"
}
/**
* Behavior decorator that copies all received message to the designated
* monitor [[akka.typed.ActorRef]] before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. be stateful) without needing to be
* wrapped in a `monitor` call again.
*/
object Monitor {
def apply[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap(unitFunction, (_, msg) monitor ! msg, behavior)
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of `Thr` will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify that the actor shall not be restarted but
* resumed. This entails keeping the same state as before the exception was
* thrown and is thus less safe. If you use `OnFailure.RESUME` you should at
* least not hold mutable data fields or collections within the actor as those
* might be in an inconsistent state (the exception might have interrupted
* normal processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*
* Example:
* {{{
* val dbConnector: Behavior[DbCommand] = ...
* val dbRestarts = Restarter[DbException].wrap(dbConnector)
* }}}
*/
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) {
def wrap[T](b: Behavior[T]): Behavior[T] = patterns.Restarter(Behavior.validateAsInitial(b), resume)()(c)
def mutableWrap[T](b: Behavior[T]): Behavior[T] = patterns.MutableRestarter(Behavior.validateAsInitial(b), resume)(c)
}
def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume)
}
// TODO
// final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () Behavior[T])
/**
* INTERNAL API.
*/
private[akka] val _unhandledFunction = (_: Any) Unhandled[Nothing]
/**
* INTERNAL API.
*/
private[akka] def unhandledFunction[T, U] = _unhandledFunction.asInstanceOf[(T Behavior[U])]
/**
* INTERNAL API.
*/
private[akka] val _unitFunction = (_: ActorContext[Any], _: Any) ()
/**
* INTERNAL API.
*/
private[akka] def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
}

View file

@ -1,19 +1,19 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
package akka.typed.scaladsl
import scala.concurrent.{ Future, Promise }
import akka.util.Timeout
import akka.actor.InternalActorRef
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import java.lang.IllegalArgumentException
import akka.actor.Scheduler
import akka.typed.internal.FunctionRef
import akka.actor.RootActorPath
import akka.actor.Address
import akka.util.LineNumbers
import akka.typed.ActorRef
import akka.typed.adapter
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
@ -82,7 +82,7 @@ object AskPattern {
AskPath,
(msg, self) {
p.trySuccess(msg)
self.sendSystem(internal.Terminate())
self.sendSystem(akka.typed.internal.Terminate())
},
(self) if (!p.isCompleted) p.tryFailure(new NoSuchElementException("ask pattern terminated before value was received")))
actorRef ! f(ref)

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import akka.typed.*;
import static akka.typed.javadsl.Actor.*;
public class ActorCompile {
interface MyMsg {}
class MyMsgA implements MyMsg {
final ActorRef<String> replyTo;
public MyMsgA(ActorRef<String> replyTo) {
this.replyTo = replyTo;
}
}
class MyMsgB implements MyMsg {
final String greeting;
public MyMsgB(String greeting) {
this.greeting = greeting;
}
}
Behavior<MyMsg> actor1 = stateful((ctx, msg) -> stopped(), (ctx, signal) -> same());
Behavior<MyMsg> actor2 = stateful((ctx, msg) -> unhandled());
Behavior<MyMsg> actor3 = stateless((ctx, msg) -> {});
Behavior<MyMsg> actor4 = empty();
Behavior<MyMsg> actor5 = ignore();
Behavior<MyMsg> actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5);
Behavior<MyMsgA> actor7 = actor6.narrow();
Behavior<MyMsg> actor8 = deferred(ctx -> {
final ActorRef<MyMsg> self = ctx.getSelf();
return monitor(self, ignore());
});
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
{
Actor.<MyMsg>stateful((ctx, msg) -> {
if (msg instanceof MyMsgA) {
return stateless((ctx2, msg2) -> {
if (msg2 instanceof MyMsgB) {
((MyMsgA) msg).replyTo.tell(((MyMsgB) msg2).greeting);
}
});
} else return unhandled();
});
}
}

View file

@ -4,8 +4,8 @@ import scala.concurrent.duration._
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.DeadLetterSuppression
import akka.typed.ScalaDSL._
import akka.typed.patterns._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.Stateful
object ActorContextSpec {
@ -74,6 +74,89 @@ object ActorContextSpec {
final case class Adapter(a: ActorRef[Command]) extends Event
def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
Actor.Stateful(
(ctx, message) message match {
case ReceiveTimeout
monitor ! GotReceiveTimeout
Actor.Same
case Ping(replyTo)
replyTo ! Pong1
Actor.Same
case Miss(replyTo)
replyTo ! Missed
Actor.Unhandled
case Renew(replyTo)
replyTo ! Renewed
subject(monitor)
case Throw(ex)
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Actor.Restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Actor.Restarter[Throwable]().wrap(subject(mon)), n)
}
replyTo ! Created(child)
Actor.Same
case SetTimeout(d, replyTo)
d match {
case f: FiniteDuration ctx.setReceiveTimeout(f, ReceiveTimeout)
case _ ctx.cancelReceiveTimeout()
}
replyTo ! TimeoutSet
Actor.Same
case Schedule(delay, target, msg, replyTo)
replyTo ! Scheduled
ctx.schedule(delay, target, msg)
Actor.Same
case Stop Actor.Stopped
case Kill(ref, replyTo)
if (ctx.stop(ref)) replyTo ! Killed
else replyTo ! NotKilled
Actor.Same
case Watch(ref, replyTo)
ctx.watch[Nothing](ref)
replyTo ! Watched
Actor.Same
case Unwatch(ref, replyTo)
ctx.unwatch[Nothing](ref)
replyTo ! Unwatched
Actor.Same
case GetInfo(replyTo)
replyTo ! Info(ctx.self, ctx.system)
Actor.Same
case GetChild(name, replyTo)
replyTo ! Child(ctx.child(name))
Actor.Same
case GetChildren(replyTo)
replyTo ! Children(ctx.children.toSet)
Actor.Same
case BecomeInert(replyTo)
replyTo ! BecameInert
Actor.Stateless {
case (_, Ping(replyTo))
replyTo ! Pong2
case (_, Throw(ex))
throw ex
case _ ()
}
case BecomeCareless(replyTo)
replyTo ! BecameCareless
Actor.Stateful(
(ctx, message) Actor.Unhandled,
(ctx, signal) signal match {
case Terminated(_) Actor.Unhandled
case sig
monitor ! GotSignal(sig)
Actor.Same
})
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
Actor.Same
},
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same })
def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = {
import ScalaDSL._
FullTotal {
case Sig(ctx, signal)
monitor ! GotSignal(signal)
@ -95,8 +178,8 @@ object ActorContextSpec {
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Restarter[Throwable]().wrap(subject(mon)), n)
case None ctx.spawnAnonymous(patterns.Restarter[Command, Throwable](subject(mon), false)())
case Some(n) ctx.spawn(patterns.Restarter[Command, Throwable](subject(mon), false)(), n)
}
replyTo ! Created(child)
Same
@ -156,6 +239,8 @@ object ActorContextSpec {
Same
}
}
}
}
class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
@ -179,7 +264,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
/**
* The behavior against which to run all the tests.
*/
def behavior(ctx: ActorContext[Event]): Behavior[Command]
def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command]
implicit def system: ActorSystem[TypedSpec.Command]
@ -187,8 +272,8 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
if (system eq nativeSystem) suite + "Native"
else suite + "Adapted"
def setup(name: String, wrapper: Option[Restarter.Apply[_]] = None)(
proc: (ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
def setup(name: String, wrapper: Option[Actor.Restarter.Apply[_]] = None)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx))
val steps =
@ -260,7 +345,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Restarter[Throwable]())) { (ctx, startWith)
def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Actor.Restarter[Throwable]())) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM1")
startWith { subj
@ -335,7 +420,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Restarter[Exception]())) { (ctx, startWith)
def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Actor.Restarter[Exception]())) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM05")
startWith
@ -353,7 +438,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
.stimulate(_ ! Ping(self), _ Pong1)
})
def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Restarter[Exception](resume = true))) { (ctx, startWith)
def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Actor.Restarter[Exception](resume = true))) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM06")
startWith
@ -534,80 +619,115 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
trait Normal extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self)
override def suite = "normal"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self)
}
object `An ActorContext (native)` extends Normal with NativeSystem
object `An ActorContext (adapted)` extends Normal with AdaptedSystem
trait Widened extends Tests {
import Actor._
override def suite = "widened"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self).widen { case x x }
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self).widen { case x x }
}
object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem
object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem
trait SynchronousSelf extends Tests {
trait Deferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.Deferred(_ subject(ctx.self))
}
object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem
object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem
trait Tap extends Tests {
override def suite = "tap"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.Tap((_, _) (), (_, _) (), subject(ctx.self))
}
object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem
object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem
trait NormalOld extends Tests {
override def suite = "basic"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
oldSubject(ctx.self)
}
object `An ActorContext (old-native)` extends NormalOld with NativeSystem
object `An ActorContext (old-adapted)` extends NormalOld with AdaptedSystem
trait WidenedOld extends Tests {
import ScalaDSL._
override def suite = "widened"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
oldSubject(ctx.self).widen { case x x }
}
object `An ActorContext with widened Behavior (old-native)` extends WidenedOld with NativeSystem
object `An ActorContext with widened Behavior (old-adapted)` extends WidenedOld with AdaptedSystem
trait SynchronousSelfOld extends Tests {
override def suite = "synchronous"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = SynchronousSelf(self subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.SynchronousSelf(self oldSubject(ctx.self))
}
object `An ActorContext with SynchronousSelf (native)` extends SynchronousSelf with NativeSystem
object `An ActorContext with SynchronousSelf (adapted)` extends SynchronousSelf with AdaptedSystem
object `An ActorContext with SynchronousSelf (old-native)` extends SynchronousSelfOld with NativeSystem
object `An ActorContext with SynchronousSelf (old-adapted)` extends SynchronousSelfOld with AdaptedSystem
trait NonMatchingTap extends Tests {
trait NonMatchingTapOld extends Tests {
override def suite = "TapNonMatch"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case null }, subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case null }, oldSubject(ctx.self))
}
object `An ActorContext with non-matching Tap (native)` extends NonMatchingTap with NativeSystem
object `An ActorContext with non-matching Tap (adapted)` extends NonMatchingTap with AdaptedSystem
object `An ActorContext with non-matching Tap (old-native)` extends NonMatchingTapOld with NativeSystem
object `An ActorContext with non-matching Tap (old-adapted)` extends NonMatchingTapOld with AdaptedSystem
trait MatchingTap extends Tests {
trait MatchingTapOld extends Tests {
override def suite = "TapMatch"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case _ }, subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case _ }, oldSubject(ctx.self))
}
object `An ActorContext with matching Tap (native)` extends MatchingTap with NativeSystem
object `An ActorContext with matching Tap (adapted)` extends MatchingTap with AdaptedSystem
object `An ActorContext with matching Tap (old-native)` extends MatchingTapOld with NativeSystem
object `An ActorContext with matching Tap (old-adapted)` extends MatchingTapOld with AdaptedSystem
private val stoppingBehavior = Full[Command] { case Msg(_, Stop) Stopped }
private val stoppingBehavior = ScalaDSL.Full[Command] { case ScalaDSL.Msg(_, Stop) ScalaDSL.Stopped }
trait AndLeft extends Tests {
override def suite = "and"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
And(subject(ctx.self), stoppingBehavior)
trait AndLeftOld extends Tests {
override def suite = "andLeft"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.And(oldSubject(ctx.self), stoppingBehavior)
}
object `An ActorContext with And (left, native)` extends AndLeft with NativeSystem
object `An ActorContext with And (left, adapted)` extends AndLeft with AdaptedSystem
object `An ActorContext with And (left, native)` extends AndLeftOld with NativeSystem
object `An ActorContext with And (left, adapted)` extends AndLeftOld with AdaptedSystem
trait AndRight extends Tests {
override def suite = "and"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
And(stoppingBehavior, subject(ctx.self))
trait AndRightOld extends Tests {
override def suite = "andRight"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.And(stoppingBehavior, oldSubject(ctx.self))
}
object `An ActorContext with And (right, native)` extends AndRight with NativeSystem
object `An ActorContext with And (right, adapted)` extends AndRight with AdaptedSystem
object `An ActorContext with And (right, native)` extends AndRightOld with NativeSystem
object `An ActorContext with And (right, adapted)` extends AndRightOld with AdaptedSystem
trait OrLeft extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
Or(subject(ctx.self), stoppingBehavior)
trait OrLeftOld extends Tests {
override def suite = "orLeft"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.Or(oldSubject(ctx.self), stoppingBehavior)
override def stop(ref: ActorRef[Command]) = {
ref ! Stop
ref ! Stop
}
}
object `An ActorContext with Or (left, native)` extends OrLeft with NativeSystem
object `An ActorContext with Or (left, adapted)` extends OrLeft with AdaptedSystem
object `An ActorContext with Or (left, native)` extends OrLeftOld with NativeSystem
object `An ActorContext with Or (left, adapted)` extends OrLeftOld with AdaptedSystem
trait OrRight extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
Or(stoppingBehavior, subject(ctx.self))
trait OrRightOld extends Tests {
override def suite = "orRight"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.Or(stoppingBehavior, oldSubject(ctx.self))
override def stop(ref: ActorRef[Command]) = {
ref ! Stop
ref ! Stop
}
}
object `An ActorContext with Or (right, native)` extends OrRight with NativeSystem
object `An ActorContext with Or (right, adapted)` extends OrRight with AdaptedSystem
object `An ActorContext with Or (right, native)` extends OrRightOld with NativeSystem
object `An ActorContext with Or (right, adapted)` extends OrRightOld with AdaptedSystem
}

View file

@ -12,7 +12,7 @@ import akka.util.Timeout
import akka.pattern.AskTimeoutException
import ScalaDSL._
import AskPattern._
import akka.typed.scaladsl.AskPattern._
object AskSpec {

View file

@ -3,6 +3,12 @@
*/
package akka.typed
import akka.typed.scaladsl.{ Actor SActor }
import akka.typed.javadsl.{ Actor JActor, ActorContext JActorContext }
import akka.japi.function.{ Function F1e, Function2 F2, Procedure2 P2 }
import akka.japi.pf.{ FI, PFBuilder }
import java.util.function.{ Function F1 }
class BehaviorSpec extends TypedSpec {
sealed trait Command {
@ -25,9 +31,8 @@ class BehaviorSpec extends TypedSpec {
case object Swap extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Swapped :: Nil
}
case class GetState(replyTo: ActorRef[State]) extends Command
object GetState {
def apply()(implicit inbox: Inbox[State]): GetState = GetState(inbox.ref)
case class GetState()(s: State) extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = s :: Nil
}
case class AuxPing(id: Int) extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Pong :: Nil
@ -42,58 +47,86 @@ class BehaviorSpec extends TypedSpec {
case object Pong extends Event
case object Swapped extends Event
trait State { def next: State }
sealed trait State extends Event { def next: State }
val StateA: State = new State { override def toString = "StateA"; override def next = StateB }
val StateB: State = new State { override def toString = "StateB"; override def next = StateA }
trait Common {
type Aux >: Null <: AnyRef
def system: ActorSystem[TypedSpec.Command]
def behavior(monitor: ActorRef[Event]): Behavior[Command]
def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux)
def checkAux(signal: Signal, aux: Aux): Unit = ()
def checkAux(command: Command, aux: Aux): Unit = ()
case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event])
case class Init(behv: Behavior[Command], inbox: Inbox[Event], aux: Aux) {
def mkCtx(requirePreStart: Boolean = false): Setup = {
val ctx = new EffectfulActorContext("ctx", behv, 1000, system)
val msgs = inbox.receiveAll()
if (requirePreStart) msgs should ===(GotSignal(PreStart) :: Nil)
checkAux(PreStart, aux)
Setup(ctx, inbox, aux)
}
}
case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event], aux: Aux)
protected def mkCtx(requirePreStart: Boolean = false, factory: (ActorRef[Event]) Behavior[Command] = behavior) = {
def init(): Init = {
val inbox = Inbox[Event]("evt")
val ctx = new EffectfulActorContext("ctx", factory(inbox.ref), 1000, system)
val msgs = inbox.receiveAll()
if (requirePreStart)
msgs should ===(GotSignal(PreStart) :: Nil)
Setup(ctx, inbox)
val (behv, aux) = behavior(inbox.ref)
Init(behv, inbox, aux)
}
protected implicit class Check(val setup: Setup) {
def init(factory: ActorRef[Event] (Behavior[Command], Aux)): Init = {
val inbox = Inbox[Event]("evt")
val (behv, aux) = factory(inbox.ref)
Init(behv, inbox, aux)
}
def mkCtx(requirePreStart: Boolean = false): Setup =
init().mkCtx(requirePreStart)
implicit class Check(val setup: Setup) {
def check(signal: Signal): Setup = {
setup.ctx.signal(signal)
setup.inbox.receiveAll() should ===(GotSignal(signal) :: Nil)
checkAux(signal, setup.aux)
setup
}
def check(command: Command): Setup = {
setup.ctx.run(command)
setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx))
setup
}
def check[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = {
setup.ctx.run(command)
setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx))
inbox.receiveAll() should ===(aux)
checkAux(command, setup.aux)
setup
}
def check2(command: Command): Setup = {
setup.ctx.run(command)
val expected = command.expectedResponse(setup.ctx)
setup.inbox.receiveAll() should ===(expected ++ expected)
setup
}
def check2[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = {
setup.ctx.run(command)
val expected = command.expectedResponse(setup.ctx)
setup.inbox.receiveAll() should ===(expected ++ expected)
inbox.receiveAll() should ===(aux ++ aux)
checkAux(command, setup.aux)
setup
}
}
protected val ex = new Exception("mine!")
val ex = new Exception("mine!")
}
trait Siphon extends Common {
override type Aux = Inbox[Command]
override def checkAux(command: Command, aux: Aux): Unit = {
aux.receiveAll() should ===(command :: Nil)
}
}
trait SignalSiphon extends Common {
override type Aux = Inbox[Either[Signal, Command]]
override def checkAux(command: Command, aux: Aux): Unit = {
aux.receiveAll() should ===(Right(command) :: Nil)
}
override def checkAux(signal: Signal, aux: Aux): Unit = {
aux.receiveAll() should ===(Left(signal) :: Nil)
}
}
trait Lifecycle extends Common {
@ -146,17 +179,19 @@ class BehaviorSpec extends TypedSpec {
trait Unhandled extends Common {
def `must return Unhandled`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
ctx.currentBehavior.message(ctx, Miss) should ===(ScalaDSL.Unhandled[Command])
val Setup(ctx, inbox, aux) = mkCtx()
ctx.currentBehavior.message(ctx, Miss) should be(Behavior.unhandledBehavior)
inbox.receiveAll() should ===(Missed :: Nil)
checkAux(Miss, aux)
}
}
trait Stoppable extends Common {
def `must stop`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, aux) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Stopped[Command])
ctx.currentBehavior should be(Behavior.stoppedBehavior)
checkAux(Stop, aux)
}
}
@ -164,15 +199,15 @@ class BehaviorSpec extends TypedSpec {
private implicit val inbox = Inbox[State]("state")
def `must be in state A`(): Unit = {
mkCtx().check(GetState(), StateA)
mkCtx().check(GetState()(StateA))
}
def `must switch to state B`(): Unit = {
mkCtx().check(Swap).check(GetState(), StateB)
mkCtx().check(Swap).check(GetState()(StateB))
}
def `must switch back to state A`(): Unit = {
mkCtx().check(Swap).check(Swap).check(GetState(), StateA)
mkCtx().check(Swap).check(Swap).check(GetState()(StateA))
}
}
@ -206,6 +241,18 @@ class BehaviorSpec extends TypedSpec {
}
}
/**
* This targets behavior wrappers to ensure that the wrapper does not
* hold on to the changed behavior. Wrappers must be immutable.
*/
trait Reuse extends Common {
def `must be reusable`(): Unit = {
val i = init()
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
}
}
private def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = {
import ScalaDSL.{ Full, Msg, Sig, Same, Unhandled, Stopped }
Full {
@ -227,21 +274,21 @@ class BehaviorSpec extends TypedSpec {
case Msg(ctx, Swap)
monitor ! Swapped
mkFull(monitor, state.next)
case Msg(ctx, GetState(replyTo))
replyTo ! state
case Msg(ctx, GetState())
monitor ! state
Same
case Msg(ctx, Stop) Stopped
}
}
trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = mkFull(monitor)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) null
}
object `A Full Behavior (native)` extends FullBehavior with NativeSystem
object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem
trait FullTotalBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = {
import ScalaDSL.{ FullTotal, Msg, Sig, Same, Unhandled, Stopped }
FullTotal {
@ -263,8 +310,8 @@ class BehaviorSpec extends TypedSpec {
case Msg(_, Swap)
monitor ! Swapped
behv(monitor, state.next)
case Msg(_, GetState(replyTo))
replyTo ! state
case Msg(_, GetState())
monitor ! state
Same
case Msg(_, Stop) Stopped
case Msg(_, _: AuxPing) Unhandled
@ -275,36 +322,36 @@ class BehaviorSpec extends TypedSpec {
object `A FullTotal Behavior (adapted)` extends FullTotalBehavior with AdaptedSystem
trait WidenedBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Widened(mkFull(monitor), { case x x })
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Widened(mkFull(monitor), { case x x }), null)
}
object `A Widened Behavior (native)` extends WidenedBehavior with NativeSystem
object `A Widened Behavior (adapted)` extends WidenedBehavior with AdaptedSystem
trait ContextAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.ContextAware(ctx mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.ContextAware(ctx mkFull(monitor)), null)
}
object `A ContextAware Behavior (native)` extends ContextAwareBehavior with NativeSystem
object `A ContextAware Behavior (adapted)` extends ContextAwareBehavior with AdaptedSystem
trait SelfAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.SelfAware(self mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.SelfAware(self mkFull(monitor)), null)
}
object `A SelfAware Behavior (native)` extends SelfAwareBehavior with NativeSystem
object `A SelfAware Behavior (adapted)` extends SelfAwareBehavior with AdaptedSystem
trait NonMatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Tap({ case null }, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Tap({ case null }, mkFull(monitor)), null)
}
object `A non-matching Tap Behavior (native)` extends NonMatchingTapBehavior with NativeSystem
object `A non-matching Tap Behavior (adapted)` extends NonMatchingTapBehavior with AdaptedSystem
trait MatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Tap({ case _ }, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Tap({ case _ }, mkFull(monitor)), null)
}
object `A matching Tap Behavior (native)` extends MatchingTapBehavior with NativeSystem
object `A matching Tap Behavior (adapted)` extends MatchingTapBehavior with AdaptedSystem
@ -312,12 +359,13 @@ class BehaviorSpec extends TypedSpec {
trait SynchronousSelfBehavior extends Messages with BecomeWithLifecycle with Stoppable {
import ScalaDSL._
implicit private val inbox = Inbox[Command]("syncself")
type Aux = Inbox[Command]
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
SynchronousSelf(self mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(SynchronousSelf(self mkFull(monitor)), null)
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] = {
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("syncself")
def first(self: ActorRef[Command]) = Tap.monitor(inbox.ref, Partial[Command] {
case AuxPing(id) { self ! AuxPing(0); second(self) }
})
@ -330,46 +378,51 @@ class BehaviorSpec extends TypedSpec {
case AuxPing(3) { self ! Ping; Same }
case AuxPing(4) { self ! Stop; Stopped }
}
SynchronousSelf(self Or(mkFull(monitor), first(self)))
(SynchronousSelf(self Or(mkFull(monitor), first(self))), inbox)
}
override def checkAux(cmd: Command, aux: Aux) =
(cmd, aux) match {
case (AuxPing(42), i: Inbox[_]) i.receiveAll() should ===(Seq(42, 0, 1, 2, 3) map AuxPing: Seq[Command])
case (AuxPing(4), i: Inbox[_]) i.receiveAll() should ===(AuxPing(4) :: Nil)
case _ // ignore
}
def `must send messages to itself and stop correctly`(): Unit = {
val Setup(ctx, _) = mkCtx(factory = behavior2).check[Command](AuxPing(42), Seq(42, 0, 1, 2, 3) map AuxPing: _*)
val Setup(ctx, _, _) = init(behavior2).mkCtx().check(AuxPing(42))
ctx.run(AuxPing(4))
inbox.receiveAll() should ===(AuxPing(4) :: Nil)
ctx.currentBehavior should ===(Stopped[Command])
}
}
object `A SynchronourSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem
object `A SynchronousSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem
object `A SynchronousSelf Behavior (adapted)` extends SynchronousSelfBehavior with AdaptedSystem
trait And extends Common {
private implicit val inbox = Inbox[State]("and")
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(mkFull(monitor), mkFull(monitor))
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(mkFull(monitor), mkFull(monitor)) null
def `must pass message to both parts`(): Unit = {
mkCtx(factory = behavior2).check2(Swap).check2[State](GetState(), StateB)
init(behavior2).mkCtx().check2(Swap).check2(GetState()(StateB))
}
def `must half-terminate`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, _) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Empty[Command])
}
}
trait BehaviorAndLeft extends Messages with BecomeWithLifecycle with And {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty) null
}
object `A Behavior combined with And (left, native)` extends BehaviorAndLeft with NativeSystem
object `A Behavior combined with And (left, adapted)` extends BehaviorAndLeft with NativeSystem
trait BehaviorAndRight extends Messages with BecomeWithLifecycle with And {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor)) null
}
object `A Behavior combined with And (right, native)` extends BehaviorAndRight with NativeSystem
object `A Behavior combined with And (right, adapted)` extends BehaviorAndRight with NativeSystem
@ -382,43 +435,43 @@ class BehaviorSpec extends TypedSpec {
ScalaDSL.Unhandled
}
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(mkFull(monitor), strange(monitor))
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(mkFull(monitor), strange(monitor)) null
private def behavior3(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(strange(monitor), mkFull(monitor))
private def behavior3(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(strange(monitor), mkFull(monitor)) null
def `must pass message only to first interested party`(): Unit = {
mkCtx(factory = behavior2).check(Ping).check(AuxPing(0))
init(behavior2).mkCtx().check(Ping).check(AuxPing(0))
}
def `must pass message through both if first is uninterested`(): Unit = {
mkCtx(factory = behavior3).check2(Ping).check(AuxPing(0))
init(behavior3).mkCtx().check2(Ping).check(AuxPing(0))
}
def `must half-terminate`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, _) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Empty[Command])
}
}
trait BehaviorOrLeft extends Messages with BecomeWithLifecycle with Or {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty) null
}
object `A Behavior combined with Or (left, native)` extends BehaviorOrLeft with NativeSystem
object `A Behavior combined with Or (left, adapted)` extends BehaviorOrLeft with NativeSystem
trait BehaviorOrRight extends Messages with BecomeWithLifecycle with Or {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor)) null
}
object `A Behavior combined with Or (right, native)` extends BehaviorOrRight with NativeSystem
object `A Behavior combined with Or (right, adapted)` extends BehaviorOrRight with NativeSystem
trait PartialBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
ScalaDSL.Partial {
case Ping
@ -433,8 +486,8 @@ class BehaviorSpec extends TypedSpec {
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState(replyTo)
replyTo ! state
case GetState()
monitor ! state
ScalaDSL.Same
case Stop ScalaDSL.Stopped
}
@ -443,7 +496,7 @@ class BehaviorSpec extends TypedSpec {
object `A Partial Behavior (adapted)` extends PartialBehavior with AdaptedSystem
trait TotalBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
ScalaDSL.Total {
case Ping
@ -459,8 +512,8 @@ class BehaviorSpec extends TypedSpec {
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState(replyTo)
replyTo ! state
case GetState()
monitor ! state
ScalaDSL.Same
case Stop ScalaDSL.Stopped
case _: AuxPing ScalaDSL.Unhandled
@ -470,18 +523,319 @@ class BehaviorSpec extends TypedSpec {
object `A Total Behavior (adapted)` extends TotalBehavior with AdaptedSystem
trait StaticBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Static {
case Ping monitor ! Pong
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case GetSelf
case Swap
case GetState(_)
case Stop
case _: AuxPing
}
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Static {
case Ping monitor ! Pong
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case GetSelf
case Swap
case GetState()
case Stop
case _: AuxPing
}, null)
}
object `A Static Behavior (native)` extends StaticBehavior with NativeSystem
object `A Static Behavior (adapted)` extends StaticBehavior with AdaptedSystem
trait StatefulWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.Stateful(
(ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
},
(ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
})
}
object `A StatefulWithSignal Behavior (scala,native)` extends StatefulWithSignalScalaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (scala,adapted)` extends StatefulWithSignalScalaBehavior with AdaptedSystem
trait StatefulScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
SActor.Stateful { (ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}
}
}
object `A Stateful Behavior (scala,native)` extends StatefulScalaBehavior with NativeSystem
object `A Stateful Behavior (scala,adapted)` extends StatefulScalaBehavior with AdaptedSystem
trait StatelessScalaBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(SActor.Stateless { (ctx, msg)
msg match {
case GetSelf monitor ! Self(ctx.self)
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case Ping monitor ! Pong
case Swap monitor ! Swapped
case GetState() monitor ! StateA
case Stop
case _: AuxPing
}
}, null)
}
object `A Stateless Behavior (scala,native)` extends StatelessScalaBehavior with NativeSystem
object `A Stateless Behavior (scala,adapted)` extends StatelessScalaBehavior with AdaptedSystem
trait WidenedScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with Siphon {
import SActor.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox
}
}
object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem
trait DeferredScalaBehavior extends StatefulWithSignalScalaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[PreStart]("deferredListener")
(SActor.Deferred(ctx {
inbox.ref ! PreStart
super.behavior(monitor)._1
}), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
signal match {
case PreStart aux.receiveAll() should ===(PreStart :: Nil)
case _ aux.receiveAll() should ===(Nil)
}
}
object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem
trait TapScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.Tap(
(_, sig) inbox.ref ! Left(sig),
(_, msg) inbox.ref ! Right(msg),
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem
trait RestarterScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.Restarter[Exception]().wrap(super.behavior(monitor)._1) null
}
}
object `A restarter Behavior (scala,native)` extends RestarterScalaBehavior with NativeSystem
object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem
/*
* function converters for Java, to ease the pain on Scala 2.11
*/
def fs(f: (JActorContext[Command], Signal) Behavior[Command]) =
new F2[JActorContext[Command], Signal, Behavior[Command]] {
override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig)
}
def fc(f: (JActorContext[Command], Command) Behavior[Command]) =
new F2[JActorContext[Command], Command, Behavior[Command]] {
override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command)
}
def ps(f: (JActorContext[Command], Signal) Unit) =
new P2[JActorContext[Command], Signal] {
override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig)
}
def pc(f: (JActorContext[Command], Command) Unit) =
new P2[JActorContext[Command], Command] {
override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command)
}
def pf(f: PFBuilder[Command, Command] PFBuilder[Command, Command]) =
new F1[PFBuilder[Command, Command], PFBuilder[Command, Command]] {
override def apply(in: PFBuilder[Command, Command]) = f(in)
}
def fi(f: Command Command) =
new FI.Apply[Command, Command] {
override def apply(in: Command) = f(in)
}
def df(f: JActorContext[Command] Behavior[Command]) =
new F1e[JActorContext[Command], Behavior[Command]] {
override def apply(in: JActorContext[Command]) = f(in)
}
trait StatefulWithSignalJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.stateful(
fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}),
fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}))
}
object `A StatefulWithSignal Behavior (java,native)` extends StatefulWithSignalJavaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (java,adapted)` extends StatefulWithSignalJavaBehavior with AdaptedSystem
trait StatefulJavaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
JActor.stateful {
fc((ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
})
}
}
object `A Stateful Behavior (java,native)` extends StatefulJavaBehavior with NativeSystem
object `A Stateful Behavior (java,adapted)` extends StatefulJavaBehavior with AdaptedSystem
trait StatelessJavaBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(JActor.stateless {
pc((ctx, msg)
msg match {
case GetSelf monitor ! Self(ctx.getSelf)
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case Ping monitor ! Pong
case Swap monitor ! Swapped
case GetState() monitor ! StateA
case Stop
case _: AuxPing
})
}, null)
}
object `A Stateless Behavior (java,native)` extends StatelessJavaBehavior with NativeSystem
object `A Stateless Behavior (java,adapted)` extends StatelessJavaBehavior with AdaptedSystem
trait WidenedJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
}
}
object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem
trait DeferredJavaBehavior extends StatefulWithSignalJavaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[PreStart]("deferredListener")
(JActor.deferred(df(ctx {
inbox.ref ! PreStart
super.behavior(monitor)._1
})), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
signal match {
case PreStart aux.receiveAll() should ===(PreStart :: Nil)
case _ aux.receiveAll() should ===(Nil)
}
}
object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem
trait TapJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
ps((_, sig) inbox.ref ! Left(sig)),
pc((_, msg) inbox.ref ! Right(msg)),
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem
trait RestarterJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.restarter(classOf[Exception], JActor.OnFailure.RESTART, super.behavior(monitor)._1) null
}
}
object `A restarter Behavior (java,native)` extends RestarterJavaBehavior with NativeSystem
object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem
}

View file

@ -106,7 +106,7 @@ object StepWise {
def withKeepTraces(b: Boolean): StartWith[T] = new StartWith(b)
}
def apply[T](f: (ActorContext[T], StartWith[T]) Steps[T, _]): Behavior[T] =
def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) Steps[T, _]): Behavior[T] =
Full[Any] {
case Sig(ctx, PreStart) run(ctx, f(ctx.asInstanceOf[ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ())
}.narrow
@ -127,7 +127,7 @@ object StepWise {
}
}
private def run[T](ctx: ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] =
private def run[T](ctx: scaladsl.ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] =
ops match {
case Thunk(f) :: tail run(ctx, tail, f())
case ThunkV(f) :: tail run(ctx, tail, f(value))

View file

@ -25,6 +25,7 @@ import org.junit.runner.RunWith
import scala.util.control.NonFatal
import org.scalatest.exceptions.TestFailedException
import akka.util.TypedMultiMap
import akka.typed.scaladsl.AskPattern
/**
* Helper class for writing tests for typed Actors with ScalaTest.

View file

@ -108,7 +108,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
def `must start system actors and mangle their names`(): Unit = {
withSystem("systemActorOf", Empty[String]) { sys
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
implicit val timeout = Timeout(1.second)
implicit val sched = sys.scheduler

View file

@ -8,7 +8,7 @@ import scala.concurrent.duration._
import akka.Done
import akka.event.Logging._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.Executor
import org.scalatest.concurrent.Eventually

View file

@ -1,254 +0,0 @@
package akka.typed.patterns
import akka.typed._
import scala.concurrent.duration._
import akka.typed.Effect.{ ReceiveTimeoutSet, Scheduled }
import Receiver._
object ReceiverSpec {
case class Msg(x: Int)
case class Setup(name: String, creator: ActorContext[Command[Msg]] Behavior[Command[Msg]], messages: Int, effects: Int)
}
class ReceiverSpec extends TypedSpec {
import ReceiverSpec._
private val dummyInbox = Inbox[Replies[Msg]]("dummy")
private val startingPoints: Seq[Setup] = Seq(
Setup("initial", ctx behavior[Msg], 0, 0),
Setup("afterGetOneFirst", afterGetOneFirst, 1, 0),
Setup("afterGetOneLater", afterGetOneLater, 1, 2),
Setup("afterGetOneTimeout", afterGetOneTimeout, 1, 2),
Setup("afterGetAll", afterGetAll, 1, 1),
Setup("afterGetAllTimeout", afterGetAllTimeout, 1, 1))
private def afterGetOneFirst(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetOne(Duration.Zero)(dummyInbox.ref))
private def afterGetOneLater(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.second)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetOneTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[InternalCommand[Msg]]].message(ctx.asInstanceOf[ActorContext[InternalCommand[Msg]]], ReceiveTimeout()).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetAll(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def afterGetAllTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def setup(name: String, behv: Behavior[Command[Msg]] = behavior[Msg])(
proc: (EffectfulActorContext[Command[Msg]], EffectfulActorContext[Msg], Inbox[Replies[Msg]]) Unit): Unit =
for (Setup(description, behv, messages, effects) startingPoints) {
val ctx = new EffectfulActorContext("ctx", ScalaDSL.ContextAware(behv), 1000, nativeSystem)
withClue(s"[running for starting point '$description' (${ctx.currentBehavior})]: ") {
dummyInbox.receiveAll() should have size messages
ctx.getAllEffects() should have size effects
proc(ctx, ctx.asInstanceOf[EffectfulActorContext[Msg]], Inbox[Replies[Msg]](name))
}
}
object `A Receiver` {
/*
* This test suite assumes that the Receiver is only one actor with two
* sides that share the same ActorRef.
*/
def `must return "self" as external address`(): Unit =
setup("") { (int, ext, _)
val inbox = Inbox[ActorRef[Msg]]("extAddr")
int.run(ExternalAddress(inbox.ref))
int.hasEffects should be(false)
inbox.receiveAll() should be(List(int.self))
}
def `must receive one message which arrived first`(): Unit =
setup("getOne") { (int, ext, inbox)
// first with zero timeout
ext.run(Msg(1))
int.run(GetOne(Duration.Zero)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
// then with positive timeout
ext.run(Msg(2))
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil)
// then with negative timeout
ext.run(Msg(3))
int.run(GetOne(-1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: Nil)
}
def `must receive one message which arrives later`(): Unit =
setup("getOneLater") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d > Duration.Zero should be(true)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
ext.run(Msg(1))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d should be theSameInstanceAs (Duration.Undefined)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
}
def `must reply with no message when asked for immediate value`(): Unit =
setup("getNone") { (int, ext, inbox)
int.run(GetOne(Duration.Zero)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
int.run(GetOne(-1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
}
def `must reply with no message after a timeout`(): Unit =
setup("getNoneTimeout") { (int, ext, inbox)
int.run(GetOne(1.nano)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
// currently this all takes >1ns, but who knows what the future brings
Thread.sleep(1)
int.asInstanceOf[EffectfulActorContext[InternalCommand[Msg]]].run(ReceiveTimeout())
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d should be theSameInstanceAs (Duration.Undefined)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
}
def `must reply with messages which arrived first in the same order (GetOne)`(): Unit =
setup("getMoreOrderOne") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
ext.run(Msg(3))
ext.run(Msg(4))
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil)
int.run(GetOne(Duration.Zero)(inbox.ref))
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: GetOneResult(int.self, Some(Msg(4))) :: Nil)
int.hasEffects should be(false)
}
def `must reply with messages which arrived first in the same order (GetAll)`(): Unit =
setup("getMoreOrderAll") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
// now with negative timeout
ext.run(Msg(3))
ext.run(Msg(4))
int.run(GetAll(-1.second)(inbox.ref))
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(3), Msg(4))) :: Nil)
int.hasEffects should be(false)
}
private def assertScheduled[T, U](s: Scheduled[T], target: ActorRef[U]): U = {
s.target should be(target)
// unfortunately Scala cannot automatically transfer the hereby established type knowledge
s.msg.asInstanceOf[U]
}
def `must reply to GetAll with messages which arrived first`(): Unit =
setup("getAllFirst") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll with messages which arrived first and later`(): Unit =
setup("getAllFirstAndLater") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
inbox.hasMessages should be(false)
ext.run(Msg(3))
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2), Msg(3))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll with messages which arrived later`(): Unit =
setup("getAllLater") { (int, ext, inbox)
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
ext.run(Msg(1))
ext.run(Msg(2))
inbox.hasMessages should be(false)
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll immediately while GetOne is pending`(): Unit =
setup("getAllWhileGetOne") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
int.run(GetAll(Duration.Zero)(inbox.ref))
int.getAllEffects() should have size 1
inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil)
}
def `must reply to GetAll later while GetOne is pending`(): Unit =
setup("getAllWhileGetOne") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
int.run(GetAll(1.nano)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: ReceiveTimeoutSet(_, _) :: Nil assertScheduled(s, int.self)
}
inbox.receiveAll() should be(Nil)
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil)
}
}
}

View file

@ -5,7 +5,7 @@ package akka.typed.patterns
import Receptionist._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
import scala.concurrent.duration._
import akka.typed._