add Akka Typed Java API #22293

... and matching Scala DSL, to replace the old ScalaDSl._ object.
This commit is contained in:
Roland Kuhn 2017-02-26 16:13:11 +01:00 committed by Patrik Nordwall
parent 6414aaccbc
commit 4368bed37a
29 changed files with 1822 additions and 767 deletions

View file

@ -6,7 +6,7 @@ package docs.akka.typed
//#imports
import akka.typed._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await

View file

@ -0,0 +1,384 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import static akka.typed.scaladsl.Actor.Empty;
import static akka.typed.scaladsl.Actor.Ignore;
import static akka.typed.scaladsl.Actor.Same;
import static akka.typed.scaladsl.Actor.Stopped;
import static akka.typed.scaladsl.Actor.Unhandled;
import java.util.function.Function;
import akka.japi.function.Function2;
import akka.japi.function.Procedure2;
import akka.japi.pf.PFBuilder;
import akka.typed.ActorRef;
import akka.typed.Behavior;
import akka.typed.PreStart;
import akka.typed.Signal;
import akka.typed.patterns.Restarter;
import akka.typed.scaladsl.Actor.Widened;
import scala.reflect.ClassTag;
public abstract class Actor {
/*
* This DSL is implemented in Java in order to ensure consistent usability from Java,
* taking possible Scala oddities out of the equation. There is some duplication in
* the behavior implementations, but that is unavoidable if both DSLs shall offer the
* same runtime performance (especially concerning allocations for function converters).
*/
private static class Deferred<T> extends Behavior<T> {
final akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer;
public Deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
this.producer = producer;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
if (msg instanceof PreStart) {
return Behavior.preStart(producer.apply(ctx), ctx);
} else
throw new IllegalStateException("Deferred behavior must receive PreStart as first signal");
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
throw new IllegalStateException("Deferred behavior must receive PreStart as first signal");
}
}
private static class Stateful<T> extends Behavior<T> {
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
final Function2<ActorContext<T>, T, Behavior<T>> message;
public Stateful(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
this.signal = signal;
this.message = message;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return signal.apply(ctx, msg);
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
return message.apply(ctx, msg);
}
}
private static class Stateless<T> extends Behavior<T> {
final Procedure2<ActorContext<T>, T> message;
public Stateless(Procedure2<ActorContext<T>, T> message) {
this.message = message;
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return Same();
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
message.apply(ctx, msg);
return Same();
}
}
private static class Tap<T> extends Behavior<T> {
final Procedure2<ActorContext<T>, Signal> signal;
final Procedure2<ActorContext<T>, T> message;
final Behavior<T> behavior;
public Tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
this.signal = signal;
this.message = message;
this.behavior = behavior;
}
private Behavior<T> canonicalize(Behavior<T> behv) {
if (Behavior.isUnhandled(behv))
return Unhandled();
else if (behv == Same())
return Same();
else if (Behavior.isAlive(behv))
return new Tap<T>(signal, message, behv);
else
return Stopped();
}
@Override
public Behavior<T> management(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
signal.apply(ctx, msg);
return canonicalize(behavior.management(ctx, msg));
}
@Override
public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
message.apply(ctx, msg);
return canonicalize(behavior.message(ctx, msg));
}
}
/**
* Mode selector for the {@link #restarter} wrapper that decides whether an actor
* upon a failure shall be restarted (to clean initial state) or resumed (keep
* on running, with potentially compromised state).
*
* Resuming is less safe. If you use <code>OnFailure.RESUME</code> you should at least
* not hold mutable data fields or collections within the actor as those might
* be in an inconsistent state (the exception might have interrupted normal
* processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*/
public enum OnFailure {
RESUME, RESTART;
}
private static Function2<Object, Object, Object> _unhandledFun = (ctx, msg) -> Unhandled();
@SuppressWarnings("unchecked")
private static <T> Function2<ActorContext<T>, Signal, Behavior<T>> unhandledFun() {
return (Function2<ActorContext<T>, Signal, Behavior<T>>) (Object) _unhandledFun;
}
private static Procedure2<Object, Object> _doNothing = (ctx, msg) -> {
};
@SuppressWarnings("unchecked")
private static <T> Procedure2<ActorContext<T>, Signal> doNothing() {
return (Procedure2<ActorContext<T>, Signal>) (Object) _doNothing;
}
/**
* Construct an actor behavior that can react both to lifecycle signals and
* incoming messages. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* In either casesignal or messagethe next behavior must be returned. If no
* change is desired, use {@link #same}.
*
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> signalOrMessage(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(signal, message);
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
* If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(unhandledFun(), message);
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateless because it cannot be replaced by
* another one after it has been installed. It is most useful for leaf actors
* that do not create child actors themselves.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> stateless(Procedure2<ActorContext<T>, T> message) {
return new Stateless<T>(message);
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior. This is provided in order to avoid the
* allocation overhead of recreating the current behavior where that is not
* necessary.
*
* @return pseudo-behavior marking no change
*/
static public <T> Behavior<T> same() {
return Same();
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior, including the hint that the message has not
* been handled. This hint may be used by composite behaviors that delegate
* (partial) handling to other behaviors.
*
* @return pseudo-behavior marking unhandled
*/
static public <T> Behavior<T> unhandled() {
return Unhandled();
}
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*
* @return the inert behavior
*/
static public <T> Behavior<T> stopped() {
return Stopped();
}
/**
* A behavior that treats every incoming message as unhandled.
*
* @return the empty behavior
*/
static public <T> Behavior<T> empty() {
return Empty();
}
/**
* A behavior that ignores every incoming message and returns same.
*
* @return the inert behavior
*/
static public <T> Behavior<T> ignore() {
return Ignore();
}
/**
* Behavior decorator that allows you to perform any side-effect before a
* signal or message is delivered to the wrapped behavior. The wrapped
* behavior can evolve (i.e. be stateful) without needing to be wrapped in a
* <code>tap(...)</code> call again.
*
* @param signal
* procedure to invoke with the {@link ActorContext} and the
* {@link akka.typed.Signal} as arguments before delivering the signal to
* the wrapped behavior
* @param message
* procedure to invoke with the {@link ActorContext} and the received
* message as arguments before delivering the signal to the wrapped
* behavior
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
return new Tap<T>(signal, message, behavior);
}
/**
* Behavior decorator that copies all received message to the designated
* monitor {@link akka.typed.ActorRef} before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. be stateful) without needing to be
* wrapped in a <code>monitor(...)</code> call again.
*
* @param monitor
* ActorRef to which to copy all received messages
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> monitor(ActorRef<T> monitor, Behavior<T> behavior) {
return new Tap<T>(doNothing(), (ctx, msg) -> monitor.tell(msg), behavior);
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of <code>Thr</code> will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify that the actor shall not be restarted but
* resumed. This entails keeping the same state as before the exception was
* thrown and is thus less safe. If you use <code>OnFailure.RESUME</code> you should at
* least not hold mutable data fields or collections within the actor as those
* might be in an inconsistent state (the exception might have interrupted
* normal processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*
* @param clazz
* the type of exceptions that shall be caught
* @param mode
* whether to restart or resume the actor upon a caught failure
* @param initialBehavior
* the initial behavior, that is also restored during a restart
* @return the wrapped behavior
*/
static public <T, Thr extends Throwable> Behavior<T> restarter(Class<Thr> clazz, OnFailure mode,
Behavior<T> initialBehavior) {
final ClassTag<Thr> catcher = akka.japi.Util.classTag(clazz);
return new Restarter<T, Thr>(initialBehavior, mode == OnFailure.RESUME, initialBehavior, catcher);
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* <code><pre>
* Behavior&lt;String> s = stateless((ctx, msg) -> System.out.println(msg))
* Behavior&lt;Number> n = widened(s, pf -> pf.
* match(BigInteger.class, i -> "BigInteger(" + i + ")").
* match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
* // drop all other kinds of Number
* );
* </pre></code>
*
* @param behavior
* the behavior that will receive the selected messages
* @param selector
* a partial function builder for describing the selection and
* transformation
* @return a behavior of the widened type
*/
static public <T, U> Behavior<U> widened(Behavior<T> behavior, Function<PFBuilder<U, T>, PFBuilder<U, T>> selector) {
return new Widened<T, U>(behavior, selector.apply(new PFBuilder<>()).build());
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior
* creation is deferred to the child actor instead of running within the
* parent.
*
* @param producer
* behavior factory that takes the child actors context as argument
* @return the deferred behavior
*/
static public <T> Behavior<T> deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
return new Deferred<T>(producer);
}
}

View file

@ -0,0 +1,159 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import akka.actor.Cancellable;
import akka.typed.ActorRef;
import akka.typed.ActorSystem;
import akka.typed.Behavior;
import akka.typed.DeploymentConfig;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;
/**
* An Actor is given by the combination of a {@link akka.typed.Behavior} and a context in which
* this behavior is executed. As per the Actor Model an Actor can perform the
* following actions when processing a message:
*
* <ul>
* <li>send a finite number of messages to other Actors it knows</li>
* <li>create a finite number of Actors</li>
* <li>designate the behavior for the next message</li>
* </ul>
*
* In Akka the first capability is accessed by using the {@link akka.typed.ActorRef#tell}
* method, the second is provided by {@link akka.typed.ActorContext#spawn} and the
* third is implicit in the signature of {@link akka.typed.Behavior} in that the next behavior
* is always returned from the message processing logic.
*
* An <code>ActorContext</code> in addition provides access to the Actors own identity
* ({@link #getSelf self}), the {@link akka.typed.ActorSystem} it is part of, methods for querying the list
* of child Actors it created, access to {@link akka.typed.Terminated DeathWatch} and timed
* message scheduling.
*/
public interface ActorContext<T> {
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different {@link akka.typed.ActorRef}.
*/
public ActorRef<T> getSelf();
/**
* Return the mailbox capacity that was configured by the parent for this
* actor.
*/
public int getMailboxCapacity();
/**
* The {@link akka.typed.ActorSystem} to which this Actor belongs.
*/
public ActorSystem<Void> getSystem();
/**
* The list of child Actors created by this Actor during its lifetime that are
* still alive, in no particular order.
*/
public List<ActorRef<Void>> getChildren();
/**
* The named child Actor if it is alive.
*/
public Optional<ActorRef<Void>> getChild(String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior, DeploymentConfig deployment);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name, DeploymentConfig deployment);
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in {@link akka.typed.ActorRef} points to a current child Actor
*/
public boolean stop(ActorRef<?> child);
/**
* Register for {@link akka.typed.Terminated} notification once the Actor identified by the
* given {@link akka.typed.ActorRef} terminates. This notification is also generated when the
* {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed
* (e.g. in reaction to being unreachable).
*/
public <U> ActorRef<U> watch(ActorRef<U> other);
/**
* Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated}
* notification will not subsequently be received for the referenced Actor.
*/
public <U> ActorRef<U> unwatch(ActorRef<U> other);
/**
* Schedule the sending of a notification in case no other message is received
* during the given period of time. The timeout starts anew with each received
* message. Provide <code>scala.concurrent.duration.Duration.Undefined</code> to switch off this mechanism.
*/
public void setReceiveTimeout(FiniteDuration d, T msg);
/**
* Cancel the sending of receive timeout notifications.
*/
public void cancelReceiveTimeout();
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled by
* invoking {@link akka.actor.Cancellable#cancel} on the returned handle.
*/
public <U> Cancellable schedule(FiniteDuration delay, ActorRef<U> target, U msg);
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like <code>scala.concurrent.Future</code> combinators.
*/
public ExecutionContextExecutor getExecutionContext();
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given <code>name</code> argument is appended,
* with an inserted hyphen between these two parts. Therefore the given <code>name</code>
* argument does not need to be unique within the scope of the parent actor.
*/
public <U> ActorRef<U> spawnAdapter(Function<U, T> f, String name);
/**
* Create an anonymous child actor that will wrap messages such that other
* Actors protocols can be ingested by this Actor. You are strongly advised
* to cache these ActorRefs or to stop them when no longer needed.
*/
public <U> ActorRef<U> spawnAdapter(Function<U, T> f);
}

View file

@ -9,136 +9,55 @@ import akka.util.Helpers
import akka.{ actor untyped }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import java.util.Optional
import java.util.ArrayList
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
* This trait is not meant to be extended by user code. If you do so, you may
* lose binary compatibility.
*/
trait ActorContext[T] {
@DoNotInherit
@ApiMayChange
trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
def getChild(name: String): Optional[ActorRef[Void]] =
child(name) match {
case Some(c) Optional.of(c.upcast[Void])
case None Optional.empty()
}
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
def getChildren(): java.util.List[akka.typed.ActorRef[Void]] = {
val c = children
val a = new ArrayList[ActorRef[Void]](c.size)
val i = c.iterator
while (i.hasNext) a.add(i.next().upcast[Void])
a
}
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
def getExecutionContext(): scala.concurrent.ExecutionContextExecutor =
executionContext
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
def getMailboxCapacity(): Int =
mailboxCapacity
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
def getSelf(): akka.typed.ActorRef[T] =
self
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
def getSystem(): akka.typed.ActorSystem[Void] =
system.asInstanceOf[ActorSystem[Void]]
/**
* Create a child Actor from the given [[Props]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
def spawn[U](behavior: akka.typed.Behavior[U], name: String): akka.typed.ActorRef[U] =
spawn(behavior, name, EmptyDeploymentConfig)
/**
* Create a child Actor from the given [[Props]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
def spawnAnonymous[U](behavior: akka.typed.Behavior[U]): akka.typed.ActorRef[U] =
spawnAnonymous(behavior, EmptyDeploymentConfig)
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop(child: ActorRef[Nothing]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): ActorRef[U]
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): ActorRef[U]
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable]] `cancel` on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.typed.ActorRef[U] =
spawnAdapter(f.apply _)
def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.typed.ActorRef[U] =
spawnAdapter(f.apply _, name)
}
/**
@ -180,7 +99,7 @@ class StubbedActorContext[T](
* Do not actually stop the child inbox, only simulate the liveness check.
* Removal is asynchronous, explicit removeInbox is needed from outside afterwards.
*/
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
_children.get(child.path.name) match {
case None false
case Some(inbox) inbox.ref == child

View file

@ -13,11 +13,11 @@ import scala.concurrent.Future
* only during the Actors lifetime and allows messages to be sent to that
* Actor instance. Sending a message to an Actor that has terminated before
* receiving the message will lead to that message being discarded; such
* messages are delivered to the [[akka.actor.DeadLetter]] channel of the
* [[akka.event.EventStream]] on a best effort basis
* messages are delivered to the [[DeadLetter]] channel of the
* [[EventStream]] on a best effort basis
* (i.e. this delivery is not reliable).
*/
abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[Nothing]] { this: internal.ActorRefImpl[T]
abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[_]] { this: internal.ActorRefImpl[T]
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
@ -49,7 +49,7 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act
/**
* Comparison takes path and the unique id of the actor cell into account.
*/
final override def compareTo(other: ActorRef[Nothing]) = {
final override def compareTo(other: ActorRef[_]) = {
val x = this.path compareTo other.path
if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1
else x

View file

@ -13,14 +13,18 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.typed.adapter.{ ActorSystemAdapter, PropsAdapter }
import akka.util.Timeout
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
/**
* An ActorSystem is home to a hierarchy of Actors. It is created using
* [[ActorSystem$]] `apply` from a [[Props]] object that describes the root
* [[ActorSystem#apply]] from a [[Behavior]] object that describes the root
* Actor of this hierarchy and which will create all other Actors beneath it.
* A system also implements the [[ActorRef]] type, and sending a message to
* the system directs that message to the root Actor.
*/
@DoNotInherit
@ApiMayChange
trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T]
/**
@ -131,11 +135,11 @@ trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒
* Ask the system guardian of this system to create an actor from the given
* behavior and deployment and with the given name. The name does not need to
* be unique since the guardian will prefix it with a running number when
* creating the child actor. The timeout sets the timeout used for the [[AskPattern$]]
* creating the child actor. The timeout sets the timeout used for the [[akka.typed.scaladsl.AskPattern$]]
* invocation when asking the guardian.
*
* The returned Future of [[ActorRef]] may be converted into an [[ActorRef]]
* to which messages can immediately be sent by using the [[ActorRef$apply]]
* to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]]
* method.
*/
def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]]

View file

@ -13,6 +13,10 @@ package akka.typed
* Behaviors can be formulated in a number of different ways, either by
* creating a derived class or by employing factory methods like the ones
* in the [[ScalaDSL$]] object.
*
* Closing over ActorContext makes a Behavior immobile: it cannot be moved to
* another context and executed there, and therefore it cannot be replicated or
* forked either.
*/
abstract class Behavior[T] {
/**
@ -30,6 +34,7 @@ abstract class Behavior[T] {
* Code calling this method should use [[Behavior$]] `canonicalize` to replace
* the special objects with real Behaviors.
*/
@throws(classOf[Exception])
def management(ctx: ActorContext[T], msg: Signal): Behavior[T]
/**
@ -45,6 +50,7 @@ abstract class Behavior[T] {
* Code calling this method should use [[Behavior$]] `canonicalize` to replace
* the special objects with real Behaviors.
*/
@throws(classOf[Exception])
def message(ctx: ActorContext[T], msg: T): Behavior[T]
/**
@ -55,14 +61,6 @@ abstract class Behavior[T] {
def narrow[U <: T]: Behavior[U] = this.asInstanceOf[Behavior[U]]
}
/*
* FIXME
*
* Closing over ActorContext makes a Behavior immobile: it cannot be moved to
* another context and executed there, and therefore it cannot be replicated or
* forked either.
*/
object Behavior {
/**
@ -123,8 +121,8 @@ object Behavior {
/**
* Given a possibly special behavior (same or unhandled) and a
* current behavior (which defines the meaning of encountering a `Same`
* behavior) this method unwraps the behavior such that the innermost behavior
* is returned, i.e. it removes the decorations.
* behavior) this method computes the next behavior, suitable for passing a
* message or signal.
*/
def canonicalize[T](behavior: Behavior[T], current: Behavior[T]): Behavior[T] =
behavior match {
@ -133,6 +131,11 @@ object Behavior {
case other other
}
/**
* Validate the given behavior as a suitable initial actor behavior; most
* notably the behavior can neither be `Same` nor `Unhandled`. Starting
* out with a `Stopped` behavior is allowed, though.
*/
def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] =
behavior match {
case `sameBehavior` | `unhandledBehavior`
@ -140,12 +143,22 @@ object Behavior {
case x x
}
/**
* Validate the given behavior as initial, pass it a [[PreStart]] message
* and canonicalize the result.
*/
def preStart[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
val b = validateAsInitial(behavior)
if (isAlive(b)) canonicalize(b.management(ctx, PreStart), b) else b
}
/**
* Returns true if the given behavior is not stopped.
*/
def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne stoppedBehavior
/**
* Returns true if the given behavior is the special `Unhandled` marker.
*/
def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq unhandledBehavior
}

View file

@ -73,7 +73,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
effectQueue.offer(Spawned(name))
super.spawn(behavior, name)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
effectQueue.offer(Stopped(child.path.name))
super.stop(child)
}

View file

@ -12,6 +12,12 @@ import scala.annotation.tailrec
import akka.actor.ActorRefProvider
import java.util.concurrent.ThreadLocalRandom
/**
* Utility for receiving messages outside of an actor. No methods are provided
* for synchronously awaiting a message, this is primarily useful for synchronous
* tests of behaviors that send messages to other actors, where an Inboxs ActorRef
* can conveniently be used as a stub.
*/
class Inbox[T](name: String) {
private val q = new ConcurrentLinkedQueue[T]

View file

@ -31,16 +31,21 @@ sealed trait Signal
* Lifecycle signal that is fired upon creation of the Actor. This will be the
* first message that the actor processes.
*/
@SerialVersionUID(1L)
final case object PreStart extends Signal
sealed abstract class PreStart extends Signal
final case object PreStart extends PreStart {
def instance: PreStart = this
}
/**
* Lifecycle signal that is fired upon restart of the Actor before replacing
* the behavior with the fresh one (i.e. this signal is received within the
* behavior that failed).
* behavior that failed). The replacement behavior will receive PreStart as its
* first signal.
*/
@SerialVersionUID(1L)
final case object PreRestart extends Signal
sealed abstract class PreRestart extends Signal
final case object PreRestart extends PreRestart {
def instance: PreRestart = this
}
/**
* Lifecycle signal that is fired after this actor and all its child actors
@ -51,8 +56,10 @@ final case object PreRestart extends Signal
* `Stopped` behavior then this signal will be ignored (i.e. the
* Stopped behavior will do nothing in reaction to it).
*/
@SerialVersionUID(1L)
final case object PostStop extends Signal
sealed abstract class PostStop extends Signal
final case object PostStop extends PostStop {
def instance: PostStop = this
}
/**
* Lifecycle signal that is fired when an Actor that was watched has terminated.
@ -64,70 +71,8 @@ final case object PostStop extends Signal
* have occurred. Termination of a remote Actor can also be effected by declaring
* the Actors home system as failed (e.g. as a result of being unreachable).
*/
@SerialVersionUID(1L)
final case class Terminated(ref: ActorRef[Nothing])(failed: Throwable) extends Signal {
def wasFailed: Boolean = failed ne null
def failure: Throwable = failed
def failureOption: Option[Throwable] = Option(failed)
}
/**
* FIXME correct this documentation when the Restarter behavior has been implemented
*
* The parent of an actor decides upon the fate of a failed child actor by
* encapsulating its next behavior in one of the four wrappers defined within
* this class.
*
* Failure responses have an associated precedence that ranks them, which is in
* descending importance:
*
* - Escalate
* - Stop
* - Restart
* - Resume
*/
object Failed {
sealed trait Decision
@SerialVersionUID(1L)
case object NoFailureResponse extends Decision
/**
* Resuming the child actor means that the result of processing the message
* on which it failed is just ignored, the previous state will be used to
* process the next message. The message that triggered the failure will not
* be processed again.
*/
@SerialVersionUID(1L)
case object Resume extends Decision
/**
* Restarting the child actor means resetting its behavior to the initial
* one that was provided during its creation (i.e. the one which was passed
* into the [[Props]] constructor). The previously failed behavior will
* receive a [[PreRestart]] signal before this happens and the replacement
* behavior will receive a [[PostRestart]] signal afterwards.
*/
@SerialVersionUID(1L)
case object Restart extends Decision
/**
* Stopping the child actor will free its resources and eventually
* (asynchronously) unregister its name from the parent. Completion of this
* process can be observed by watching the child actor and reacting to its
* [[Terminated]] signal.
*/
@SerialVersionUID(1L)
case object Stop extends Decision
/**
* The default response to a failure in a child actor is to escalate the
* failure, entailing that the parent actor fails as well. This is equivalent
* to an exception unwinding the call stack, but it applies to the supervision
* hierarchy instead.
*/
@SerialVersionUID(1L)
case object Escalate extends Decision
}

View file

@ -11,6 +11,7 @@ import akka.util.LineNumbers
* This object holds several behavior factories and combinators that can be
* used to construct Behavior instances.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
object ScalaDSL {
// FIXME check that all behaviors can cope with not getting PreStart as first message
@ -20,17 +21,20 @@ object ScalaDSL {
* Widen the type of this Behavior by providing a filter function that permits
* only a subtype of the widened set of messages.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def widen[U >: T](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher)
/**
* Combine the two behaviors such that incoming messages are distributed
* to both of them, each one evolving its state independently.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def &&(other: Behavior[T]): Behavior[T] = And(behavior, other)
/**
* Combine the two behaviors such that incoming messages are given first to
* the left behavior and are then only passed on to the right behavior if
* the left one returned Unhandled.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def ||(other: Behavior[T]): Behavior[T] = Or(behavior, other)
}
@ -40,6 +44,7 @@ object ScalaDSL {
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Widened[T, U >: T](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] {
private def postProcess(ctx: ActorContext[U], behv: Behavior[T]): Behavior[U] =
if (isUnhandled(behv)) Unhandled
@ -63,6 +68,7 @@ object ScalaDSL {
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Deferred[T](factory: () Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
@ -81,6 +87,7 @@ object ScalaDSL {
* avoid the allocation overhead of recreating the current behavior where
* that is not necessary.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]]
/**
@ -89,6 +96,7 @@ object ScalaDSL {
* message has not been handled. This hint may be used by composite
* behaviors that delegate (partial) handling to other behaviors.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]]
/*
@ -104,16 +112,19 @@ object ScalaDSL {
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]]
/**
* This behavior does not handle any inputs, it is completely inert.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]]
/**
* This behavior does not handle any inputs, it is completely inert.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]
/**
@ -122,17 +133,20 @@ object ScalaDSL {
* used by several of the behaviors defined in this DSL, see for example
* [[Full]].
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
sealed trait MessageOrSignal[T]
/**
* A message bundled together with the current [[ActorContext]].
*/
@SerialVersionUID(1L)
final case class Msg[T](ctx: ActorContext[T], msg: T) extends MessageOrSignal[T]
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Msg[T](ctx: scaladsl.ActorContext[T], msg: T) extends MessageOrSignal[T]
/**
* A signal bundled together with the current [[ActorContext]].
*/
@SerialVersionUID(1L)
final case class Sig[T](ctx: ActorContext[T], signal: Signal) extends MessageOrSignal[T]
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Sig[T](ctx: scaladsl.ActorContext[T], signal: Signal) extends MessageOrSignal[T]
/**
* This type of behavior allows to handle all incoming messages within
@ -144,9 +158,9 @@ object ScalaDSL {
* For the lifecycle notifications pertaining to the actor itself this
* behavior includes a fallback mechanism: an unhandled [[PreRestart]] signal
* will terminate all child actors (transitively) and then emit a [[PostStop]]
* signal in addition, whereas an unhandled [[PostRestart]] signal will emit
* an additional [[PreStart]] signal.
* signal in addition.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Full[T](behavior: PartialFunction[MessageOrSignal[T], Behavior[T]]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
lazy val fallback: (MessageOrSignal[T]) Behavior[T] = {
@ -173,6 +187,7 @@ object ScalaDSL {
* to create the supplied function then any message not matching the list of
* cases will fail this actor with a [[scala.MatchError]].
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class FullTotal[T](behavior: MessageOrSignal[T] Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal) = behavior(Sig(ctx, msg))
override def message(ctx: ActorContext[T], msg: T) = behavior(Msg(ctx, msg))
@ -189,6 +204,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Total[T](behavior: T Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match {
case _ Unhandled
@ -207,6 +223,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Partial[T](behavior: PartialFunction[T, Behavior[T]]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match {
case _ Unhandled
@ -220,6 +237,7 @@ object ScalaDSL {
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Tap[T](f: PartialFunction[MessageOrSignal[T], Unit], behavior: Behavior[T]) extends Behavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) Unhandled
@ -237,6 +255,7 @@ object ScalaDSL {
override def toString = s"Tap(${LineNumbers(f)},$behavior)"
}
object Tap {
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap({ case Msg(_, msg) monitor ! msg }, behavior)
}
@ -249,6 +268,7 @@ object ScalaDSL {
* This behavior type is most useful for leaf actors that do not create child
* actors themselves.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Static[T](behavior: T Unit) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
@ -267,6 +287,7 @@ object ScalaDSL {
* This decorator is useful for passing messages between the left and right
* sides of [[And]] and [[Or]] combinators.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class SynchronousSelf[T](f: ActorRef[T] Behavior[T]) extends Behavior[T] {
private class B extends Behavior[T] {
@ -308,9 +329,9 @@ object ScalaDSL {
* A behavior combinator that feeds incoming messages and signals both into
* the left and right sub-behavior and allows them to evolve independently of
* each other. When one of the sub-behaviors terminates the other takes over
* exclusively. When both sub-behaviors respond to a [[Failed]] signal, the
* response with the higher precedence is chosen (see [[Failed$]]).
* exclusively.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class And[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = {
@ -354,9 +375,9 @@ object ScalaDSL {
* each other. The message or signal is passed first into the left sub-behavior
* and only if that results in [[#Unhandled]] is it passed to the right
* sub-behavior. When one of the sub-behaviors terminates the other takes over
* exclusively. When both sub-behaviors respond to a [[Failed]] signal, the
* response with the higher precedence is chosen (see [[Failed$]]).
* exclusively.
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
final case class Or[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] {
override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] =
@ -418,10 +439,14 @@ object ScalaDSL {
* }
* }}}
*/
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def SelfAware[T](behavior: ActorRef[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx.self), ctx)
case msg throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)")
new Behavior[T] {
override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] =
if (sig == PreStart) Behavior.preStart(behavior(ctx.self), ctx)
else throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $sig)")
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)")
}
/**
@ -438,10 +463,14 @@ object ScalaDSL {
* }
* }}}
*/
def ContextAware[T](behavior: ActorContext[T] Behavior[T]): Behavior[T] =
FullTotal {
case Sig(ctx, PreStart) Behavior.preStart(behavior(ctx), ctx)
case msg throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $msg)")
@deprecated("use akka.typed.scaladsl.Actor", "2.5.0")
def ContextAware[T](behavior: scaladsl.ActorContext[T] Behavior[T]): Behavior[T] =
new Behavior[T] {
override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] =
if (sig == PreStart) Behavior.preStart(behavior(ctx), ctx)
else throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $sig)")
override def message(ctx: ActorContext[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got ${msg.getClass})")
}
/**

View file

@ -22,7 +22,7 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo
ctx.spawnAnonymous(behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ctx.spawn(behavior, name, deployment)
override def stop(child: ActorRef[Nothing]) =
override def stop(child: ActorRef[_]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef
val cell = ctx.asInstanceOf[akka.actor.ActorCell]

View file

@ -120,7 +120,7 @@ private[typed] class ActorCell[T](
spawn(behavior, name, deployment)
}
override def stop(child: ActorRef[Nothing]): Boolean = {
override def stop(child: ActorRef[_]): Boolean = {
val name = child.path.name
childrenMap.get(name) match {
case None false

View file

@ -21,6 +21,7 @@ import scala.util.Success
import akka.util.Timeout
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger
import akka.typed.scaladsl.AskPattern
object ActorSystemImpl {
import ScalaDSL._

View file

@ -12,6 +12,7 @@ import akka.util.{ ReentrantGuard, Subclassification, SubclassifiedIndex }
import scala.collection.immutable
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.typed.scaladsl.AskPattern
/**
* INTERNAL API

View file

@ -1,122 +0,0 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.patterns
import scala.concurrent.duration._
import akka.typed.ActorRef
import scala.collection.immutable
import akka.typed.Behavior
import scala.concurrent.duration.Deadline
import akka.typed.ActorContext
import java.util.LinkedList
import scala.collection.JavaConverters._
import scala.collection.immutable.Queue
// FIXME make this nice again once the Actor Algebra is implemented
object Receiver {
import akka.typed.ScalaDSL._
sealed trait InternalCommand[T]
case class ReceiveTimeout[T]() extends InternalCommand[T]
sealed trait Command[T] extends InternalCommand[T]
/**
* Retrieve one message from the Receiver, waiting at most for the given duration.
*/
final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T]
/**
* Retrieve all messages from the Receiver that it has queued after the given
* duration has elapsed.
*/
final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T]
/**
* Retrieve the external address of this Receiver (i.e. the side at which it
* takes in the messages of type T.
*/
final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T]
sealed trait Replies[T]
final case class GetOneResult[T](receiver: ActorRef[Command[T]], msg: Option[T]) extends Replies[T]
final case class GetAllResult[T](receiver: ActorRef[Command[T]], msgs: immutable.Seq[T]) extends Replies[T]
private final case class Enqueue[T](msg: T) extends Command[T]
def behavior[T]: Behavior[Command[T]] =
ContextAware[Any] { ctx
SynchronousSelf { syncself
Or(
empty(ctx).widen { case c: InternalCommand[t] c.asInstanceOf[InternalCommand[T]] },
Static[Any] {
case msg syncself ! Enqueue(msg)
})
}
}.narrow
private def empty[T](ctx: ActorContext[Any]): Behavior[InternalCommand[T]] =
Total {
case ExternalAddress(replyTo) { replyTo ! ctx.self; Same }
case g @ GetOne(d) if d <= Duration.Zero { g.replyTo ! GetOneResult(ctx.self, None); Same }
case g @ GetOne(d) asked(ctx, Queue(Asked(g.replyTo, Deadline.now + d)))
case g @ GetAll(d) if d <= Duration.Zero { g.replyTo ! GetAllResult(ctx.self, Nil); Same }
case g @ GetAll(d) { ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo)); Same }
case Enqueue(msg) queued(ctx, msg)
}
private def queued[T](ctx: ActorContext[Any], t: T): Behavior[InternalCommand[T]] = {
val queue = new LinkedList[T]
queue.add(t)
Total {
case ExternalAddress(replyTo)
replyTo ! ctx.self
Same
case g: GetOne[t]
g.replyTo ! GetOneResult(ctx.self, Some(queue.remove()))
if (queue.isEmpty) empty(ctx) else Same
case g @ GetAll(d) if d <= Duration.Zero
g.replyTo ! GetAllResult(ctx.self, queue.iterator.asScala.toVector)
empty(ctx)
case g @ GetAll(d)
ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo))
Same
case Enqueue(msg)
queue.add(msg)
Same
}
}
private case class Asked[T](replyTo: ActorRef[GetOneResult[T]], deadline: Deadline)
private def asked[T](ctx: ActorContext[Any], queue: Queue[Asked[T]]): Behavior[InternalCommand[T]] = {
ctx.setReceiveTimeout(queue.map(_.deadline).min.timeLeft, ReceiveTimeout())
Total {
case ReceiveTimeout()
val (overdue, remaining) = queue partition (_.deadline.isOverdue)
overdue foreach (a a.replyTo ! GetOneResult(ctx.self, None))
if (remaining.isEmpty) {
ctx.cancelReceiveTimeout()
empty(ctx)
} else asked(ctx, remaining)
case ExternalAddress(replyTo) { replyTo ! ctx.self; Same }
case g @ GetOne(d) if d <= Duration.Zero
g.replyTo ! GetOneResult(ctx.self, None)
asked(ctx, queue)
case g @ GetOne(d)
asked(ctx, queue enqueue Asked(g.replyTo, Deadline.now + d))
case g @ GetAll(d) if d <= Duration.Zero
g.replyTo ! GetAllResult(ctx.self, Nil)
asked(ctx, queue)
case g @ GetAll(d)
ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo))
asked(ctx, queue)
case Enqueue(msg)
val (ask, q) = queue.dequeue
ask.replyTo ! GetOneResult(ctx.self, Some(msg))
if (q.isEmpty) {
ctx.cancelReceiveTimeout()
empty(ctx)
} else asked(ctx, q)
}
}
}

View file

@ -7,6 +7,52 @@ package patterns
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.event.Logging
import akka.typed.scaladsl.Actor._
/**
* Simple supervision strategy that restarts the underlying behavior for all
* failures of type Thr.
*
* FIXME add limited restarts and back-off (with limited buffering or vacation responder)
* FIXME write tests that ensure that restart is canonicalizing PreStart result correctly
*/
final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean)(
behavior: Behavior[T] = initialBehavior) extends Behavior[T] {
private def restart(ctx: ActorContext[T]): Behavior[T] = {
try behavior.management(ctx, PreRestart) catch { case NonFatal(_) }
Behavior.canonicalize(initialBehavior.management(ctx, PreStart), initialBehavior)
}
private def canonical(b: Behavior[T]): Behavior[T] =
if (Behavior.isUnhandled(b)) Unhandled
else if (b eq Behavior.sameBehavior) Same
else if (!Behavior.isAlive(b)) Stopped
else if (b eq behavior) Same
else Restarter[T, Thr](initialBehavior, resume)(b)
override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
val b =
try behavior.management(ctx, signal)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage))
if (resume) behavior else restart(ctx)
}
canonical(b)
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
val b =
try behavior.message(ctx, msg)
catch {
case ex: Thr
ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage))
if (resume) behavior else restart(ctx)
}
canonical(b)
}
}
/**
* Simple supervision strategy that restarts the underlying behavior for all
@ -15,7 +61,7 @@ import akka.event.Logging
* FIXME add limited restarts and back-off (with limited buffering or vacation responder)
* FIXME write tests that ensure that all Behaviors are okay with getting PostRestart as first signal
*/
final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] {
final case class MutableRestarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] {
private[this] var current = initialBehavior
@ -34,7 +80,7 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav
if (resume) current else restart(ctx)
}
current = Behavior.canonicalize(b, current)
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
if (Behavior.isAlive(current)) this else Stopped
}
override def message(ctx: ActorContext[T], msg: T): Behavior[T] = {
@ -46,14 +92,6 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav
if (resume) current else restart(ctx)
}
current = Behavior.canonicalize(b, current)
if (Behavior.isAlive(current)) this else ScalaDSL.Stopped
if (Behavior.isAlive(current)) this else Stopped
}
}
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) {
def wrap[T](b: Behavior[T]) = Restarter(Behavior.validateAsInitial(b), resume)(c)
}
def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume)
}

View file

@ -0,0 +1,404 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed
package scaladsl
import akka.util.LineNumbers
import scala.reflect.ClassTag
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import scala.deprecatedInheritance
import akka.typed.{ ActorContext AC }
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T]
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop(child: ActorRef[_]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): ActorRef[U]
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): ActorRef[U]
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable#cancel]] on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
}
@ApiMayChange
object Actor {
import Behavior._
// FIXME check that all behaviors can cope with not getting PreStart as first message
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* see also [[Actor.Widened]]
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher)
}
private val _nullFun = (_: Any) null
private def nullFun[T] = _nullFun.asInstanceOf[Any T]
private implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
def as[U] = ctx.asInstanceOf[AC[U]]
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* Stateless[String]((ctx, msg) => println(msg)).widen[Number] {
* case b: BigDecimal => s"BigDecimal(&dollar;b)"
* case i: BigInteger => s"BigInteger(&dollar;i)"
* // drop all other kinds of Number
* }
* }}}
*/
final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] {
private def postProcess(behv: Behavior[T]): Behavior[U] =
if (isUnhandled(behv)) Unhandled
else if (isAlive(behv)) {
val next = canonicalize(behv, behavior)
if (next eq behavior) Same else Widened(next, matcher)
} else Stopped
override def management(ctx: AC[U], msg: Signal): Behavior[U] =
postProcess(behavior.management(ctx.as[T], msg))
override def message(ctx: AC[U], msg: U): Behavior[U] =
matcher.applyOrElse(msg, nullFun) match {
case null Unhandled
case transformed postProcess(behavior.message(ctx.as[T], transformed))
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
final case class Deferred[T](factory: ActorContext[T] Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = {
if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
Behavior.preStart(factory(ctx), ctx)
}
override def message(ctx: AC[T], msg: T): Behavior[T] =
throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)")
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
* avoid the allocation overhead of recreating the current behavior where
* that is not necessary.
*/
def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]]
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior, including the hint that the
* message has not been handled. This hint may be used by composite
* behaviors that delegate (partial) handling to other behaviors.
*/
def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]]
/*
* TODO write a Behavior that waits for all child actors to stop and then
* runs some cleanup before stopping. The factory for this behavior should
* stop and watch all children to get the process started.
*/
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*/
def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]]
/**
* A behavior that treats every incoming message as unhandled.
*/
def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]]
/**
* A behavior that ignores every incoming message and returns same.
*/
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]
/**
* Construct an actor behavior that can react both to lifecycle signals and
* incoming messages. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* In either casesignal or messagethe next behavior must be returned. If no
* change is desired, use `Actor.same()`.
*/
final case class SignalOrMessage[T](
signal: (ActorContext[T], Signal) Behavior[T],
mesg: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T): Behavior[T] = mesg(ctx, msg)
override def toString = s"SignalOrMessage(${LineNumbers(signal)},${LineNumbers(mesg)})"
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
*/
final case class Stateful[T](behavior: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: AC[T], msg: T) = behavior(ctx, msg)
override def toString = s"Stateful(${LineNumbers(behavior)})"
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateless because it cannot be replaced by
* another one after it has been installed. It is most useful for leaf actors
* that do not create child actors themselves.
*/
final case class Stateless[T](behavior: (ActorContext[T], T) Any) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
override def message(ctx: AC[T], msg: T): Behavior[T] = {
behavior(ctx, msg)
this
}
override def toString = s"Static(${LineNumbers(behavior)})"
}
/**
* This type of Behavior wraps another Behavior while allowing you to perform
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
final case class Tap[T](
signal: (ActorContext[T], Signal) _,
mesg: (ActorContext[T], T) _,
behavior: Behavior[T]) extends Behavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) Unhandled
else if (behv eq sameBehavior) Same
else if (isAlive(behv)) Tap(signal, mesg, behv)
else Stopped
override def management(ctx: AC[T], msg: Signal): Behavior[T] = {
signal(ctx, msg)
canonical(behavior.management(ctx, msg))
}
override def message(ctx: AC[T], msg: T): Behavior[T] = {
mesg(ctx, msg)
canonical(behavior.message(ctx, msg))
}
override def toString = s"Tap(${LineNumbers(signal)},${LineNumbers(mesg)},$behavior)"
}
/**
* Behavior decorator that copies all received message to the designated
* monitor [[akka.typed.ActorRef]] before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. be stateful) without needing to be
* wrapped in a `monitor` call again.
*/
object Monitor {
def apply[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap(unitFunction, (_, msg) monitor ! msg, behavior)
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of `Thr` will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify that the actor shall not be restarted but
* resumed. This entails keeping the same state as before the exception was
* thrown and is thus less safe. If you use `OnFailure.RESUME` you should at
* least not hold mutable data fields or collections within the actor as those
* might be in an inconsistent state (the exception might have interrupted
* normal processing); avoiding mutable state is possible by returning a fresh
* behavior with the new state after every message.
*
* Example:
* {{{
* val dbConnector: Behavior[DbCommand] = ...
* val dbRestarts = Restarter[DbException].wrap(dbConnector)
* }}}
*/
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) {
def wrap[T](b: Behavior[T]): Behavior[T] = patterns.Restarter(Behavior.validateAsInitial(b), resume)()(c)
def mutableWrap[T](b: Behavior[T]): Behavior[T] = patterns.MutableRestarter(Behavior.validateAsInitial(b), resume)(c)
}
def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume)
}
// TODO
// final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () Behavior[T])
/**
* INTERNAL API.
*/
private[akka] val _unhandledFunction = (_: Any) Unhandled[Nothing]
/**
* INTERNAL API.
*/
private[akka] def unhandledFunction[T, U] = _unhandledFunction.asInstanceOf[(T Behavior[U])]
/**
* INTERNAL API.
*/
private[akka] val _unitFunction = (_: ActorContext[Any], _: Any) ()
/**
* INTERNAL API.
*/
private[akka] def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
}

View file

@ -1,19 +1,19 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
package akka.typed.scaladsl
import scala.concurrent.{ Future, Promise }
import akka.util.Timeout
import akka.actor.InternalActorRef
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import java.lang.IllegalArgumentException
import akka.actor.Scheduler
import akka.typed.internal.FunctionRef
import akka.actor.RootActorPath
import akka.actor.Address
import akka.util.LineNumbers
import akka.typed.ActorRef
import akka.typed.adapter
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
@ -82,7 +82,7 @@ object AskPattern {
AskPath,
(msg, self) {
p.trySuccess(msg)
self.sendSystem(internal.Terminate())
self.sendSystem(akka.typed.internal.Terminate())
},
(self) if (!p.isCompleted) p.tryFailure(new NoSuchElementException("ask pattern terminated before value was received")))
actorRef ! f(ref)

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import akka.typed.*;
import static akka.typed.javadsl.Actor.*;
public class ActorCompile {
interface MyMsg {}
class MyMsgA implements MyMsg {
final ActorRef<String> replyTo;
public MyMsgA(ActorRef<String> replyTo) {
this.replyTo = replyTo;
}
}
class MyMsgB implements MyMsg {
final String greeting;
public MyMsgB(String greeting) {
this.greeting = greeting;
}
}
Behavior<MyMsg> actor1 = signalOrMessage((ctx, signal) -> same(), (ctx, msg) -> stopped());
Behavior<MyMsg> actor2 = stateful((ctx, msg) -> unhandled());
Behavior<MyMsg> actor3 = stateless((ctx, msg) -> {});
Behavior<MyMsg> actor4 = empty();
Behavior<MyMsg> actor5 = ignore();
Behavior<MyMsg> actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5);
Behavior<MyMsgA> actor7 = actor6.narrow();
Behavior<MyMsg> actor8 = deferred(ctx -> {
final ActorRef<MyMsg> self = ctx.getSelf();
return monitor(self, ignore());
});
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
{
Actor.<MyMsg>stateful((ctx, msg) -> {
if (msg instanceof MyMsgA) {
return stateless((ctx2, msg2) -> {
if (msg2 instanceof MyMsgB) {
((MyMsgA) msg).replyTo.tell(((MyMsgB) msg2).greeting);
}
});
} else return unhandled();
});
}
}

View file

@ -4,8 +4,8 @@ import scala.concurrent.duration._
import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.DeadLetterSuppression
import akka.typed.ScalaDSL._
import akka.typed.patterns._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.SignalOrMessage
object ActorContextSpec {
@ -74,6 +74,91 @@ object ActorContextSpec {
final case class Adapter(a: ActorRef[Command]) extends Event
def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
Actor.SignalOrMessage(
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same },
(ctx, message) message match {
case ReceiveTimeout
monitor ! GotReceiveTimeout
Actor.Same
case Ping(replyTo)
replyTo ! Pong1
Actor.Same
case Miss(replyTo)
replyTo ! Missed
Actor.Unhandled
case Renew(replyTo)
replyTo ! Renewed
subject(monitor)
case Throw(ex)
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Actor.Restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Actor.Restarter[Throwable]().wrap(subject(mon)), n)
}
replyTo ! Created(child)
Actor.Same
case SetTimeout(d, replyTo)
d match {
case f: FiniteDuration ctx.setReceiveTimeout(f, ReceiveTimeout)
case _ ctx.cancelReceiveTimeout()
}
replyTo ! TimeoutSet
Actor.Same
case Schedule(delay, target, msg, replyTo)
replyTo ! Scheduled
ctx.schedule(delay, target, msg)
Actor.Same
case Stop Actor.Stopped
case Kill(ref, replyTo)
if (ctx.stop(ref)) replyTo ! Killed
else replyTo ! NotKilled
Actor.Same
case Watch(ref, replyTo)
ctx.watch[Nothing](ref)
replyTo ! Watched
Actor.Same
case Unwatch(ref, replyTo)
ctx.unwatch[Nothing](ref)
replyTo ! Unwatched
Actor.Same
case GetInfo(replyTo)
replyTo ! Info(ctx.self, ctx.system)
Actor.Same
case GetChild(name, replyTo)
replyTo ! Child(ctx.child(name))
Actor.Same
case GetChildren(replyTo)
replyTo ! Children(ctx.children.toSet)
Actor.Same
case BecomeInert(replyTo)
replyTo ! BecameInert
Actor.Stateless {
case (_, Ping(replyTo))
replyTo ! Pong2
case (_, Throw(ex))
throw ex
case _ ()
}
case BecomeCareless(replyTo)
replyTo ! BecameCareless
Actor.SignalOrMessage(
(ctx, signal) signal match {
case Terminated(_) Actor.Unhandled
case sig
monitor ! GotSignal(sig)
Actor.Same
},
(ctx, message) Actor.Unhandled
)
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
Actor.Same
}
)
def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = {
import ScalaDSL._
FullTotal {
case Sig(ctx, signal)
monitor ! GotSignal(signal)
@ -95,8 +180,8 @@ object ActorContextSpec {
throw ex
case MkChild(name, mon, replyTo)
val child = name match {
case None ctx.spawnAnonymous(Restarter[Throwable]().wrap(subject(mon)))
case Some(n) ctx.spawn(Restarter[Throwable]().wrap(subject(mon)), n)
case None ctx.spawnAnonymous(patterns.Restarter[Command, Throwable](subject(mon), false)())
case Some(n) ctx.spawn(patterns.Restarter[Command, Throwable](subject(mon), false)(), n)
}
replyTo ! Created(child)
Same
@ -156,6 +241,8 @@ object ActorContextSpec {
Same
}
}
}
}
class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
@ -179,7 +266,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
/**
* The behavior against which to run all the tests.
*/
def behavior(ctx: ActorContext[Event]): Behavior[Command]
def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command]
implicit def system: ActorSystem[TypedSpec.Command]
@ -187,8 +274,8 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
if (system eq nativeSystem) suite + "Native"
else suite + "Adapted"
def setup(name: String, wrapper: Option[Restarter.Apply[_]] = None)(
proc: (ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
def setup(name: String, wrapper: Option[Actor.Restarter.Apply[_]] = None)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx))
val steps =
@ -260,7 +347,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Restarter[Throwable]())) { (ctx, startWith)
def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Actor.Restarter[Throwable]())) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM1")
startWith { subj
@ -335,7 +422,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
})
def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Restarter[Exception]())) { (ctx, startWith)
def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Actor.Restarter[Exception]())) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM05")
startWith
@ -353,7 +440,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
.stimulate(_ ! Ping(self), _ Pong1)
})
def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Restarter[Exception](resume = true))) { (ctx, startWith)
def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Actor.Restarter[Exception](resume = true))) { (ctx, startWith)
val self = ctx.self
val ex = new Exception("KABOOM06")
startWith
@ -534,80 +621,115 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
}
trait Normal extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self)
override def suite = "normal"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self)
}
object `An ActorContext (native)` extends Normal with NativeSystem
object `An ActorContext (adapted)` extends Normal with AdaptedSystem
trait Widened extends Tests {
import Actor._
override def suite = "widened"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self).widen { case x x }
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
subject(ctx.self).widen { case x x }
}
object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem
object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem
trait SynchronousSelf extends Tests {
trait Deferred extends Tests {
override def suite = "deferred"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.Deferred(_ subject(ctx.self))
}
object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem
object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem
trait Tap extends Tests {
override def suite = "tap"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
Actor.Tap((_, _) (), (_, _) (), subject(ctx.self))
}
object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem
object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem
trait NormalOld extends Tests {
override def suite = "basic"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
oldSubject(ctx.self)
}
object `An ActorContext (old-native)` extends NormalOld with NativeSystem
object `An ActorContext (old-adapted)` extends NormalOld with AdaptedSystem
trait WidenedOld extends Tests {
import ScalaDSL._
override def suite = "widened"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
oldSubject(ctx.self).widen { case x x }
}
object `An ActorContext with widened Behavior (old-native)` extends WidenedOld with NativeSystem
object `An ActorContext with widened Behavior (old-adapted)` extends WidenedOld with AdaptedSystem
trait SynchronousSelfOld extends Tests {
override def suite = "synchronous"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = SynchronousSelf(self subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.SynchronousSelf(self oldSubject(ctx.self))
}
object `An ActorContext with SynchronousSelf (native)` extends SynchronousSelf with NativeSystem
object `An ActorContext with SynchronousSelf (adapted)` extends SynchronousSelf with AdaptedSystem
object `An ActorContext with SynchronousSelf (old-native)` extends SynchronousSelfOld with NativeSystem
object `An ActorContext with SynchronousSelf (old-adapted)` extends SynchronousSelfOld with AdaptedSystem
trait NonMatchingTap extends Tests {
trait NonMatchingTapOld extends Tests {
override def suite = "TapNonMatch"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case null }, subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case null }, oldSubject(ctx.self))
}
object `An ActorContext with non-matching Tap (native)` extends NonMatchingTap with NativeSystem
object `An ActorContext with non-matching Tap (adapted)` extends NonMatchingTap with AdaptedSystem
object `An ActorContext with non-matching Tap (old-native)` extends NonMatchingTapOld with NativeSystem
object `An ActorContext with non-matching Tap (old-adapted)` extends NonMatchingTapOld with AdaptedSystem
trait MatchingTap extends Tests {
trait MatchingTapOld extends Tests {
override def suite = "TapMatch"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case _ }, subject(ctx.self))
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case _ }, oldSubject(ctx.self))
}
object `An ActorContext with matching Tap (native)` extends MatchingTap with NativeSystem
object `An ActorContext with matching Tap (adapted)` extends MatchingTap with AdaptedSystem
object `An ActorContext with matching Tap (old-native)` extends MatchingTapOld with NativeSystem
object `An ActorContext with matching Tap (old-adapted)` extends MatchingTapOld with AdaptedSystem
private val stoppingBehavior = Full[Command] { case Msg(_, Stop) Stopped }
private val stoppingBehavior = ScalaDSL.Full[Command] { case ScalaDSL.Msg(_, Stop) ScalaDSL.Stopped }
trait AndLeft extends Tests {
override def suite = "and"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
And(subject(ctx.self), stoppingBehavior)
trait AndLeftOld extends Tests {
override def suite = "andLeft"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.And(oldSubject(ctx.self), stoppingBehavior)
}
object `An ActorContext with And (left, native)` extends AndLeft with NativeSystem
object `An ActorContext with And (left, adapted)` extends AndLeft with AdaptedSystem
object `An ActorContext with And (left, native)` extends AndLeftOld with NativeSystem
object `An ActorContext with And (left, adapted)` extends AndLeftOld with AdaptedSystem
trait AndRight extends Tests {
override def suite = "and"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
And(stoppingBehavior, subject(ctx.self))
trait AndRightOld extends Tests {
override def suite = "andRight"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.And(stoppingBehavior, oldSubject(ctx.self))
}
object `An ActorContext with And (right, native)` extends AndRight with NativeSystem
object `An ActorContext with And (right, adapted)` extends AndRight with AdaptedSystem
object `An ActorContext with And (right, native)` extends AndRightOld with NativeSystem
object `An ActorContext with And (right, adapted)` extends AndRightOld with AdaptedSystem
trait OrLeft extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
Or(subject(ctx.self), stoppingBehavior)
trait OrLeftOld extends Tests {
override def suite = "orLeft"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.Or(oldSubject(ctx.self), stoppingBehavior)
override def stop(ref: ActorRef[Command]) = {
ref ! Stop
ref ! Stop
}
}
object `An ActorContext with Or (left, native)` extends OrLeft with NativeSystem
object `An ActorContext with Or (left, adapted)` extends OrLeft with AdaptedSystem
object `An ActorContext with Or (left, native)` extends OrLeftOld with NativeSystem
object `An ActorContext with Or (left, adapted)` extends OrLeftOld with AdaptedSystem
trait OrRight extends Tests {
override def suite = "basic"
override def behavior(ctx: ActorContext[Event]): Behavior[Command] =
Or(stoppingBehavior, subject(ctx.self))
trait OrRightOld extends Tests {
override def suite = "orRight"
override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] =
ScalaDSL.Or(stoppingBehavior, oldSubject(ctx.self))
override def stop(ref: ActorRef[Command]) = {
ref ! Stop
ref ! Stop
}
}
object `An ActorContext with Or (right, native)` extends OrRight with NativeSystem
object `An ActorContext with Or (right, adapted)` extends OrRight with AdaptedSystem
object `An ActorContext with Or (right, native)` extends OrRightOld with NativeSystem
object `An ActorContext with Or (right, adapted)` extends OrRightOld with AdaptedSystem
}

View file

@ -12,7 +12,7 @@ import akka.util.Timeout
import akka.pattern.AskTimeoutException
import ScalaDSL._
import AskPattern._
import akka.typed.scaladsl.AskPattern._
object AskSpec {

View file

@ -3,6 +3,12 @@
*/
package akka.typed
import akka.typed.scaladsl.{ Actor SActor }
import akka.typed.javadsl.{ Actor JActor, ActorContext JActorContext }
import akka.japi.function.{ Function F1e, Function2 F2, Procedure2 P2 }
import akka.japi.pf.{ FI, PFBuilder }
import java.util.function.{ Function F1 }
class BehaviorSpec extends TypedSpec {
sealed trait Command {
@ -25,9 +31,8 @@ class BehaviorSpec extends TypedSpec {
case object Swap extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Swapped :: Nil
}
case class GetState(replyTo: ActorRef[State]) extends Command
object GetState {
def apply()(implicit inbox: Inbox[State]): GetState = GetState(inbox.ref)
case class GetState()(s: State) extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = s :: Nil
}
case class AuxPing(id: Int) extends Command {
override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Pong :: Nil
@ -42,58 +47,86 @@ class BehaviorSpec extends TypedSpec {
case object Pong extends Event
case object Swapped extends Event
trait State { def next: State }
sealed trait State extends Event { def next: State }
val StateA: State = new State { override def toString = "StateA"; override def next = StateB }
val StateB: State = new State { override def toString = "StateB"; override def next = StateA }
trait Common {
type Aux >: Null <: AnyRef
def system: ActorSystem[TypedSpec.Command]
def behavior(monitor: ActorRef[Event]): Behavior[Command]
def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux)
def checkAux(signal: Signal, aux: Aux): Unit = ()
def checkAux(command: Command, aux: Aux): Unit = ()
case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event])
case class Init(behv: Behavior[Command], inbox: Inbox[Event], aux: Aux) {
def mkCtx(requirePreStart: Boolean = false): Setup = {
val ctx = new EffectfulActorContext("ctx", behv, 1000, system)
val msgs = inbox.receiveAll()
if (requirePreStart) msgs should ===(GotSignal(PreStart) :: Nil)
checkAux(PreStart, aux)
Setup(ctx, inbox, aux)
}
}
case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event], aux: Aux)
protected def mkCtx(requirePreStart: Boolean = false, factory: (ActorRef[Event]) Behavior[Command] = behavior) = {
def init(): Init = {
val inbox = Inbox[Event]("evt")
val ctx = new EffectfulActorContext("ctx", factory(inbox.ref), 1000, system)
val msgs = inbox.receiveAll()
if (requirePreStart)
msgs should ===(GotSignal(PreStart) :: Nil)
Setup(ctx, inbox)
val (behv, aux) = behavior(inbox.ref)
Init(behv, inbox, aux)
}
protected implicit class Check(val setup: Setup) {
def init(factory: ActorRef[Event] (Behavior[Command], Aux)): Init = {
val inbox = Inbox[Event]("evt")
val (behv, aux) = factory(inbox.ref)
Init(behv, inbox, aux)
}
def mkCtx(requirePreStart: Boolean = false): Setup =
init().mkCtx(requirePreStart)
implicit class Check(val setup: Setup) {
def check(signal: Signal): Setup = {
setup.ctx.signal(signal)
setup.inbox.receiveAll() should ===(GotSignal(signal) :: Nil)
checkAux(signal, setup.aux)
setup
}
def check(command: Command): Setup = {
setup.ctx.run(command)
setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx))
setup
}
def check[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = {
setup.ctx.run(command)
setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx))
inbox.receiveAll() should ===(aux)
checkAux(command, setup.aux)
setup
}
def check2(command: Command): Setup = {
setup.ctx.run(command)
val expected = command.expectedResponse(setup.ctx)
setup.inbox.receiveAll() should ===(expected ++ expected)
setup
}
def check2[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = {
setup.ctx.run(command)
val expected = command.expectedResponse(setup.ctx)
setup.inbox.receiveAll() should ===(expected ++ expected)
inbox.receiveAll() should ===(aux ++ aux)
checkAux(command, setup.aux)
setup
}
}
protected val ex = new Exception("mine!")
val ex = new Exception("mine!")
}
trait Siphon extends Common {
override type Aux = Inbox[Command]
override def checkAux(command: Command, aux: Aux): Unit = {
aux.receiveAll() should ===(command :: Nil)
}
}
trait SignalSiphon extends Common {
override type Aux = Inbox[Either[Signal, Command]]
override def checkAux(command: Command, aux: Aux): Unit = {
aux.receiveAll() should ===(Right(command) :: Nil)
}
override def checkAux(signal: Signal, aux: Aux): Unit = {
aux.receiveAll() should ===(Left(signal) :: Nil)
}
}
trait Lifecycle extends Common {
@ -146,17 +179,19 @@ class BehaviorSpec extends TypedSpec {
trait Unhandled extends Common {
def `must return Unhandled`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
ctx.currentBehavior.message(ctx, Miss) should ===(ScalaDSL.Unhandled[Command])
val Setup(ctx, inbox, aux) = mkCtx()
ctx.currentBehavior.message(ctx, Miss) should be(Behavior.unhandledBehavior)
inbox.receiveAll() should ===(Missed :: Nil)
checkAux(Miss, aux)
}
}
trait Stoppable extends Common {
def `must stop`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, aux) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Stopped[Command])
ctx.currentBehavior should be(Behavior.stoppedBehavior)
checkAux(Stop, aux)
}
}
@ -164,15 +199,15 @@ class BehaviorSpec extends TypedSpec {
private implicit val inbox = Inbox[State]("state")
def `must be in state A`(): Unit = {
mkCtx().check(GetState(), StateA)
mkCtx().check(GetState()(StateA))
}
def `must switch to state B`(): Unit = {
mkCtx().check(Swap).check(GetState(), StateB)
mkCtx().check(Swap).check(GetState()(StateB))
}
def `must switch back to state A`(): Unit = {
mkCtx().check(Swap).check(Swap).check(GetState(), StateA)
mkCtx().check(Swap).check(Swap).check(GetState()(StateA))
}
}
@ -206,6 +241,18 @@ class BehaviorSpec extends TypedSpec {
}
}
/**
* This targets behavior wrappers to ensure that the wrapper does not
* hold on to the changed behavior. Wrappers must be immutable.
*/
trait Reuse extends Common {
def `must be reusable`(): Unit = {
val i = init()
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB))
}
}
private def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = {
import ScalaDSL.{ Full, Msg, Sig, Same, Unhandled, Stopped }
Full {
@ -227,21 +274,21 @@ class BehaviorSpec extends TypedSpec {
case Msg(ctx, Swap)
monitor ! Swapped
mkFull(monitor, state.next)
case Msg(ctx, GetState(replyTo))
replyTo ! state
case Msg(ctx, GetState())
monitor ! state
Same
case Msg(ctx, Stop) Stopped
}
}
trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = mkFull(monitor)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) null
}
object `A Full Behavior (native)` extends FullBehavior with NativeSystem
object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem
trait FullTotalBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = {
import ScalaDSL.{ FullTotal, Msg, Sig, Same, Unhandled, Stopped }
FullTotal {
@ -263,8 +310,8 @@ class BehaviorSpec extends TypedSpec {
case Msg(_, Swap)
monitor ! Swapped
behv(monitor, state.next)
case Msg(_, GetState(replyTo))
replyTo ! state
case Msg(_, GetState())
monitor ! state
Same
case Msg(_, Stop) Stopped
case Msg(_, _: AuxPing) Unhandled
@ -275,36 +322,36 @@ class BehaviorSpec extends TypedSpec {
object `A FullTotal Behavior (adapted)` extends FullTotalBehavior with AdaptedSystem
trait WidenedBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Widened(mkFull(monitor), { case x x })
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Widened(mkFull(monitor), { case x x }), null)
}
object `A Widened Behavior (native)` extends WidenedBehavior with NativeSystem
object `A Widened Behavior (adapted)` extends WidenedBehavior with AdaptedSystem
trait ContextAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.ContextAware(ctx mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.ContextAware(ctx mkFull(monitor)), null)
}
object `A ContextAware Behavior (native)` extends ContextAwareBehavior with NativeSystem
object `A ContextAware Behavior (adapted)` extends ContextAwareBehavior with AdaptedSystem
trait SelfAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.SelfAware(self mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.SelfAware(self mkFull(monitor)), null)
}
object `A SelfAware Behavior (native)` extends SelfAwareBehavior with NativeSystem
object `A SelfAware Behavior (adapted)` extends SelfAwareBehavior with AdaptedSystem
trait NonMatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Tap({ case null }, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Tap({ case null }, mkFull(monitor)), null)
}
object `A non-matching Tap Behavior (native)` extends NonMatchingTapBehavior with NativeSystem
object `A non-matching Tap Behavior (adapted)` extends NonMatchingTapBehavior with AdaptedSystem
trait MatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Tap({ case _ }, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Tap({ case _ }, mkFull(monitor)), null)
}
object `A matching Tap Behavior (native)` extends MatchingTapBehavior with NativeSystem
object `A matching Tap Behavior (adapted)` extends MatchingTapBehavior with AdaptedSystem
@ -312,12 +359,13 @@ class BehaviorSpec extends TypedSpec {
trait SynchronousSelfBehavior extends Messages with BecomeWithLifecycle with Stoppable {
import ScalaDSL._
implicit private val inbox = Inbox[Command]("syncself")
type Aux = Inbox[Command]
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
SynchronousSelf(self mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(SynchronousSelf(self mkFull(monitor)), null)
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] = {
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("syncself")
def first(self: ActorRef[Command]) = Tap.monitor(inbox.ref, Partial[Command] {
case AuxPing(id) { self ! AuxPing(0); second(self) }
})
@ -330,46 +378,51 @@ class BehaviorSpec extends TypedSpec {
case AuxPing(3) { self ! Ping; Same }
case AuxPing(4) { self ! Stop; Stopped }
}
SynchronousSelf(self Or(mkFull(monitor), first(self)))
(SynchronousSelf(self Or(mkFull(monitor), first(self))), inbox)
}
override def checkAux(cmd: Command, aux: Aux) =
(cmd, aux) match {
case (AuxPing(42), i: Inbox[_]) i.receiveAll() should ===(Seq(42, 0, 1, 2, 3) map AuxPing: Seq[Command])
case (AuxPing(4), i: Inbox[_]) i.receiveAll() should ===(AuxPing(4) :: Nil)
case _ // ignore
}
def `must send messages to itself and stop correctly`(): Unit = {
val Setup(ctx, _) = mkCtx(factory = behavior2).check[Command](AuxPing(42), Seq(42, 0, 1, 2, 3) map AuxPing: _*)
val Setup(ctx, _, _) = init(behavior2).mkCtx().check(AuxPing(42))
ctx.run(AuxPing(4))
inbox.receiveAll() should ===(AuxPing(4) :: Nil)
ctx.currentBehavior should ===(Stopped[Command])
}
}
object `A SynchronourSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem
object `A SynchronousSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem
object `A SynchronousSelf Behavior (adapted)` extends SynchronousSelfBehavior with AdaptedSystem
trait And extends Common {
private implicit val inbox = Inbox[State]("and")
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(mkFull(monitor), mkFull(monitor))
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(mkFull(monitor), mkFull(monitor)) null
def `must pass message to both parts`(): Unit = {
mkCtx(factory = behavior2).check2(Swap).check2[State](GetState(), StateB)
init(behavior2).mkCtx().check2(Swap).check2(GetState()(StateB))
}
def `must half-terminate`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, _) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Empty[Command])
}
}
trait BehaviorAndLeft extends Messages with BecomeWithLifecycle with And {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty) null
}
object `A Behavior combined with And (left, native)` extends BehaviorAndLeft with NativeSystem
object `A Behavior combined with And (left, adapted)` extends BehaviorAndLeft with NativeSystem
trait BehaviorAndRight extends Messages with BecomeWithLifecycle with And {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor)) null
}
object `A Behavior combined with And (right, native)` extends BehaviorAndRight with NativeSystem
object `A Behavior combined with And (right, adapted)` extends BehaviorAndRight with NativeSystem
@ -382,43 +435,43 @@ class BehaviorSpec extends TypedSpec {
ScalaDSL.Unhandled
}
private def behavior2(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(mkFull(monitor), strange(monitor))
private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(mkFull(monitor), strange(monitor)) null
private def behavior3(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(strange(monitor), mkFull(monitor))
private def behavior3(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(strange(monitor), mkFull(monitor)) null
def `must pass message only to first interested party`(): Unit = {
mkCtx(factory = behavior2).check(Ping).check(AuxPing(0))
init(behavior2).mkCtx().check(Ping).check(AuxPing(0))
}
def `must pass message through both if first is uninterested`(): Unit = {
mkCtx(factory = behavior3).check2(Ping).check(AuxPing(0))
init(behavior3).mkCtx().check2(Ping).check(AuxPing(0))
}
def `must half-terminate`(): Unit = {
val Setup(ctx, inbox) = mkCtx()
val Setup(ctx, inbox, _) = mkCtx()
ctx.run(Stop)
ctx.currentBehavior should ===(ScalaDSL.Empty[Command])
}
}
trait BehaviorOrLeft extends Messages with BecomeWithLifecycle with Or {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty) null
}
object `A Behavior combined with Or (left, native)` extends BehaviorOrLeft with NativeSystem
object `A Behavior combined with Or (left, adapted)` extends BehaviorOrLeft with NativeSystem
trait BehaviorOrRight extends Messages with BecomeWithLifecycle with Or {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor))
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor)) null
}
object `A Behavior combined with Or (right, native)` extends BehaviorOrRight with NativeSystem
object `A Behavior combined with Or (right, adapted)` extends BehaviorOrRight with NativeSystem
trait PartialBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
ScalaDSL.Partial {
case Ping
@ -433,8 +486,8 @@ class BehaviorSpec extends TypedSpec {
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState(replyTo)
replyTo ! state
case GetState()
monitor ! state
ScalaDSL.Same
case Stop ScalaDSL.Stopped
}
@ -443,7 +496,7 @@ class BehaviorSpec extends TypedSpec {
object `A Partial Behavior (adapted)` extends PartialBehavior with AdaptedSystem
trait TotalBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA)
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
ScalaDSL.Total {
case Ping
@ -459,8 +512,8 @@ class BehaviorSpec extends TypedSpec {
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState(replyTo)
replyTo ! state
case GetState()
monitor ! state
ScalaDSL.Same
case Stop ScalaDSL.Stopped
case _: AuxPing ScalaDSL.Unhandled
@ -470,18 +523,317 @@ class BehaviorSpec extends TypedSpec {
object `A Total Behavior (adapted)` extends TotalBehavior with AdaptedSystem
trait StaticBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): Behavior[Command] =
ScalaDSL.Static {
case Ping monitor ! Pong
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case GetSelf
case Swap
case GetState(_)
case Stop
case _: AuxPing
}
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(ScalaDSL.Static {
case Ping monitor ! Pong
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case GetSelf
case Swap
case GetState()
case Stop
case _: AuxPing
}, null)
}
object `A Static Behavior (native)` extends StaticBehavior with NativeSystem
object `A Static Behavior (adapted)` extends StaticBehavior with AdaptedSystem
trait SignalOrMessageScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.SignalOrMessage((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}, (ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
})
}
object `A SignalOrMessage Behavior (scala,native)` extends SignalOrMessageScalaBehavior with NativeSystem
object `A SignalOrMessage Behavior (scala,adapted)` extends SignalOrMessageScalaBehavior with AdaptedSystem
trait StatefulScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
SActor.Stateful { (ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}
}
}
object `A Stateful Behavior (scala,native)` extends StatefulScalaBehavior with NativeSystem
object `A Stateful Behavior (scala,adapted)` extends StatefulScalaBehavior with AdaptedSystem
trait StatelessScalaBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(SActor.Stateless { (ctx, msg)
msg match {
case GetSelf monitor ! Self(ctx.self)
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case Ping monitor ! Pong
case Swap monitor ! Swapped
case GetState() monitor ! StateA
case Stop
case _: AuxPing
}
}, null)
}
object `A Stateless Behavior (scala,native)` extends StatelessScalaBehavior with NativeSystem
object `A Stateless Behavior (scala,adapted)` extends StatelessScalaBehavior with AdaptedSystem
trait WidenedScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with Siphon {
import SActor.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox
}
}
object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem
trait DeferredScalaBehavior extends SignalOrMessageScalaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[PreStart]("deferredListener")
(SActor.Deferred(ctx {
inbox.ref ! PreStart
super.behavior(monitor)._1
}), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
signal match {
case PreStart aux.receiveAll() should ===(PreStart :: Nil)
case _ aux.receiveAll() should ===(Nil)
}
}
object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem
trait TapScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.Tap(
(_, sig) inbox.ref ! Left(sig),
(_, msg) inbox.ref ! Right(msg),
super.behavior(monitor)._1
), inbox)
}
}
object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem
trait RestarterScalaBehavior extends SignalOrMessageScalaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.Restarter[Exception]().wrap(super.behavior(monitor)._1) null
}
}
object `A restarter Behavior (scala,native)` extends RestarterScalaBehavior with NativeSystem
object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem
/*
* function converters for Java, to ease the pain on Scala 2.11
*/
def fs(f: (JActorContext[Command], Signal) Behavior[Command]) =
new F2[JActorContext[Command], Signal, Behavior[Command]] {
override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig)
}
def fc(f: (JActorContext[Command], Command) Behavior[Command]) =
new F2[JActorContext[Command], Command, Behavior[Command]] {
override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command)
}
def ps(f: (JActorContext[Command], Signal) Unit) =
new P2[JActorContext[Command], Signal] {
override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig)
}
def pc(f: (JActorContext[Command], Command) Unit) =
new P2[JActorContext[Command], Command] {
override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command)
}
def pf(f: PFBuilder[Command, Command] PFBuilder[Command, Command]) =
new F1[PFBuilder[Command, Command], PFBuilder[Command, Command]] {
override def apply(in: PFBuilder[Command, Command]) = f(in)
}
def fi(f: Command Command) =
new FI.Apply[Command, Command] {
override def apply(in: Command) = f(in)
}
def df(f: JActorContext[Command] Behavior[Command]) =
new F1e[JActorContext[Command], Behavior[Command]] {
override def apply(in: JActorContext[Command]) = f(in)
}
trait SignalOrMessageJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.signalOrMessage(fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}), fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}))
}
object `A SignalOrMessage Behavior (java,native)` extends SignalOrMessageJavaBehavior with NativeSystem
object `A SignalOrMessage Behavior (java,adapted)` extends SignalOrMessageJavaBehavior with AdaptedSystem
trait StatefulJavaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
def behv(monitor: ActorRef[Event], state: State): Behavior[Command] =
JActor.stateful {
fc((ctx, msg)
msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
})
}
}
object `A Stateful Behavior (java,native)` extends StatefulJavaBehavior with NativeSystem
object `A Stateful Behavior (java,adapted)` extends StatefulJavaBehavior with AdaptedSystem
trait StatelessJavaBehavior extends Messages {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) =
(JActor.stateless {
pc((ctx, msg)
msg match {
case GetSelf monitor ! Self(ctx.getSelf)
case Miss monitor ! Missed
case Ignore monitor ! Ignored
case Ping monitor ! Pong
case Swap monitor ! Swapped
case GetState() monitor ! StateA
case Stop
case _: AuxPing
})
}, null)
}
object `A Stateless Behavior (java,native)` extends StatelessJavaBehavior with NativeSystem
object `A Stateless Behavior (java,adapted)` extends StatelessJavaBehavior with AdaptedSystem
trait WidenedJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
}
}
object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem
trait DeferredJavaBehavior extends SignalOrMessageJavaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[PreStart]("deferredListener")
(JActor.deferred(df(ctx {
inbox.ref ! PreStart
super.behavior(monitor)._1
})), inbox)
}
override def checkAux(signal: Signal, aux: Aux): Unit =
signal match {
case PreStart aux.receiveAll() should ===(PreStart :: Nil)
case _ aux.receiveAll() should ===(Nil)
}
}
object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem
trait TapJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
ps((_, sig) inbox.ref ! Left(sig)),
pc((_, msg) inbox.ref ! Right(msg)),
super.behavior(monitor)._1
), inbox)
}
}
object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem
trait RestarterJavaBehavior extends SignalOrMessageJavaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.restarter(classOf[Exception], JActor.OnFailure.RESTART, super.behavior(monitor)._1) null
}
}
object `A restarter Behavior (java,native)` extends RestarterJavaBehavior with NativeSystem
object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem
}

View file

@ -106,7 +106,7 @@ object StepWise {
def withKeepTraces(b: Boolean): StartWith[T] = new StartWith(b)
}
def apply[T](f: (ActorContext[T], StartWith[T]) Steps[T, _]): Behavior[T] =
def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) Steps[T, _]): Behavior[T] =
Full[Any] {
case Sig(ctx, PreStart) run(ctx, f(ctx.asInstanceOf[ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ())
}.narrow
@ -127,7 +127,7 @@ object StepWise {
}
}
private def run[T](ctx: ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] =
private def run[T](ctx: scaladsl.ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] =
ops match {
case Thunk(f) :: tail run(ctx, tail, f())
case ThunkV(f) :: tail run(ctx, tail, f(value))

View file

@ -25,6 +25,7 @@ import org.junit.runner.RunWith
import scala.util.control.NonFatal
import org.scalatest.exceptions.TestFailedException
import akka.util.TypedMultiMap
import akka.typed.scaladsl.AskPattern
/**
* Helper class for writing tests for typed Actors with ScalaTest.

View file

@ -108,7 +108,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
def `must start system actors and mangle their names`(): Unit = {
withSystem("systemActorOf", Empty[String]) { sys
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
implicit val timeout = Timeout(1.second)
implicit val sched = sys.scheduler

View file

@ -8,7 +8,7 @@ import scala.concurrent.duration._
import akka.Done
import akka.event.Logging._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.Executor
import org.scalatest.concurrent.Eventually

View file

@ -1,254 +0,0 @@
package akka.typed.patterns
import akka.typed._
import scala.concurrent.duration._
import akka.typed.Effect.{ ReceiveTimeoutSet, Scheduled }
import Receiver._
object ReceiverSpec {
case class Msg(x: Int)
case class Setup(name: String, creator: ActorContext[Command[Msg]] Behavior[Command[Msg]], messages: Int, effects: Int)
}
class ReceiverSpec extends TypedSpec {
import ReceiverSpec._
private val dummyInbox = Inbox[Replies[Msg]]("dummy")
private val startingPoints: Seq[Setup] = Seq(
Setup("initial", ctx behavior[Msg], 0, 0),
Setup("afterGetOneFirst", afterGetOneFirst, 1, 0),
Setup("afterGetOneLater", afterGetOneLater, 1, 2),
Setup("afterGetOneTimeout", afterGetOneTimeout, 1, 2),
Setup("afterGetAll", afterGetAll, 1, 1),
Setup("afterGetAllTimeout", afterGetAllTimeout, 1, 1))
private def afterGetOneFirst(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetOne(Duration.Zero)(dummyInbox.ref))
private def afterGetOneLater(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.second)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetOneTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetOne(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[InternalCommand[Msg]]].message(ctx.asInstanceOf[ActorContext[InternalCommand[Msg]]], ReceiveTimeout()).asInstanceOf[Behavior[Command[Msg]]]
private def afterGetAll(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]]
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def afterGetAllTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] =
behavior[Msg]
.management(ctx, PreStart)
.message(ctx, GetAll(1.nano)(dummyInbox.ref))
.message(ctx, GetAll(Duration.Zero)(dummyInbox.ref))
private def setup(name: String, behv: Behavior[Command[Msg]] = behavior[Msg])(
proc: (EffectfulActorContext[Command[Msg]], EffectfulActorContext[Msg], Inbox[Replies[Msg]]) Unit): Unit =
for (Setup(description, behv, messages, effects) startingPoints) {
val ctx = new EffectfulActorContext("ctx", ScalaDSL.ContextAware(behv), 1000, nativeSystem)
withClue(s"[running for starting point '$description' (${ctx.currentBehavior})]: ") {
dummyInbox.receiveAll() should have size messages
ctx.getAllEffects() should have size effects
proc(ctx, ctx.asInstanceOf[EffectfulActorContext[Msg]], Inbox[Replies[Msg]](name))
}
}
object `A Receiver` {
/*
* This test suite assumes that the Receiver is only one actor with two
* sides that share the same ActorRef.
*/
def `must return "self" as external address`(): Unit =
setup("") { (int, ext, _)
val inbox = Inbox[ActorRef[Msg]]("extAddr")
int.run(ExternalAddress(inbox.ref))
int.hasEffects should be(false)
inbox.receiveAll() should be(List(int.self))
}
def `must receive one message which arrived first`(): Unit =
setup("getOne") { (int, ext, inbox)
// first with zero timeout
ext.run(Msg(1))
int.run(GetOne(Duration.Zero)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
// then with positive timeout
ext.run(Msg(2))
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil)
// then with negative timeout
ext.run(Msg(3))
int.run(GetOne(-1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: Nil)
}
def `must receive one message which arrives later`(): Unit =
setup("getOneLater") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d > Duration.Zero should be(true)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
ext.run(Msg(1))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d should be theSameInstanceAs (Duration.Undefined)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
}
def `must reply with no message when asked for immediate value`(): Unit =
setup("getNone") { (int, ext, inbox)
int.run(GetOne(Duration.Zero)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
int.run(GetOne(-1.second)(inbox.ref))
int.getAllEffects() should be(Nil)
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
}
def `must reply with no message after a timeout`(): Unit =
setup("getNoneTimeout") { (int, ext, inbox)
int.run(GetOne(1.nano)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
// currently this all takes >1ns, but who knows what the future brings
Thread.sleep(1)
int.asInstanceOf[EffectfulActorContext[InternalCommand[Msg]]].run(ReceiveTimeout())
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil d should be theSameInstanceAs (Duration.Undefined)
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil)
}
def `must reply with messages which arrived first in the same order (GetOne)`(): Unit =
setup("getMoreOrderOne") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
ext.run(Msg(3))
ext.run(Msg(4))
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil)
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil)
int.run(GetOne(Duration.Zero)(inbox.ref))
int.run(GetOne(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: GetOneResult(int.self, Some(Msg(4))) :: Nil)
int.hasEffects should be(false)
}
def `must reply with messages which arrived first in the same order (GetAll)`(): Unit =
setup("getMoreOrderAll") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(Duration.Zero)(inbox.ref))
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
// now with negative timeout
ext.run(Msg(3))
ext.run(Msg(4))
int.run(GetAll(-1.second)(inbox.ref))
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(3), Msg(4))) :: Nil)
int.hasEffects should be(false)
}
private def assertScheduled[T, U](s: Scheduled[T], target: ActorRef[U]): U = {
s.target should be(target)
// unfortunately Scala cannot automatically transfer the hereby established type knowledge
s.msg.asInstanceOf[U]
}
def `must reply to GetAll with messages which arrived first`(): Unit =
setup("getAllFirst") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll with messages which arrived first and later`(): Unit =
setup("getAllFirstAndLater") { (int, ext, inbox)
ext.run(Msg(1))
ext.run(Msg(2))
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
inbox.hasMessages should be(false)
ext.run(Msg(3))
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2), Msg(3))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll with messages which arrived later`(): Unit =
setup("getAllLater") { (int, ext, inbox)
int.run(GetAll(1.second)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: Nil assertScheduled(s, int.self)
}
ext.run(Msg(1))
ext.run(Msg(2))
inbox.hasMessages should be(false)
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil)
int.hasEffects should be(false)
}
def `must reply to GetAll immediately while GetOne is pending`(): Unit =
setup("getAllWhileGetOne") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
int.run(GetAll(Duration.Zero)(inbox.ref))
int.getAllEffects() should have size 1
inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil)
}
def `must reply to GetAll later while GetOne is pending`(): Unit =
setup("getAllWhileGetOne") { (int, ext, inbox)
int.run(GetOne(1.second)(inbox.ref))
int.getAllEffects() match {
case ReceiveTimeoutSet(d, _) :: Nil // okay
case other fail(s"$other was not List(ReceiveTimeoutSet(_))")
}
inbox.hasMessages should be(false)
int.run(GetAll(1.nano)(inbox.ref))
val msg = int.getAllEffects() match {
case (s: Scheduled[_]) :: ReceiveTimeoutSet(_, _) :: Nil assertScheduled(s, int.self)
}
inbox.receiveAll() should be(Nil)
int.run(msg)
inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil)
}
}
}

View file

@ -5,7 +5,7 @@ package akka.typed.patterns
import Receptionist._
import akka.typed.ScalaDSL._
import akka.typed.AskPattern._
import akka.typed.scaladsl.AskPattern._
import scala.concurrent.duration._
import akka.typed._