From 4368bed37a62f8fb93d6a8be05fed7813335a23f Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 26 Feb 2017 16:13:11 +0100 Subject: [PATCH] add Akka Typed Java API #22293 ... and matching Scala DSL, to replace the old ScalaDSl._ object. --- .../code/docs/akka/typed/IntroSpec.scala | 2 +- .../main/java/akka/typed/javadsl/Actor.java | 384 +++++++++++++ .../java/akka/typed/javadsl/ActorContext.java | 159 +++++ .../main/scala/akka/typed/ActorContext.scala | 157 ++--- .../src/main/scala/akka/typed/ActorRef.scala | 8 +- .../main/scala/akka/typed/ActorSystem.scala | 10 +- .../src/main/scala/akka/typed/Behavior.scala | 33 +- .../src/main/scala/akka/typed/Effects.scala | 2 +- .../src/main/scala/akka/typed/Inbox.scala | 6 + .../scala/akka/typed/MessageAndSignals.scala | 83 +-- .../src/main/scala/akka/typed/ScalaDSL.scala | 59 +- .../typed/adapter/ActorContextAdapter.scala | 2 +- .../scala/akka/typed/internal/ActorCell.scala | 2 +- .../akka/typed/internal/ActorSystemImpl.scala | 1 + .../akka/typed/internal/EventStreamImpl.scala | 1 + .../scala/akka/typed/patterns/Receiver.scala | 122 ---- .../scala/akka/typed/patterns/Restarter.scala | 60 +- .../scala/akka/typed/scaladsl/Actor.scala | 404 +++++++++++++ .../scala/akka/typed/{ => scaladsl}/Ask.scala | 8 +- .../java/akka/typed/javadsl/ActorCompile.java | 53 ++ .../scala/akka/typed/ActorContextSpec.scala | 224 ++++++-- .../src/test/scala/akka/typed/AskSpec.scala | 2 +- .../test/scala/akka/typed/BehaviorSpec.scala | 542 +++++++++++++++--- .../src/test/scala/akka/typed/StepWise.scala | 4 +- .../src/test/scala/akka/typed/TypedSpec.scala | 1 + .../akka/typed/internal/ActorSystemSpec.scala | 2 +- .../akka/typed/internal/EventStreamSpec.scala | 2 +- .../akka/typed/patterns/ReceiverSpec.scala | 254 -------- .../typed/patterns/ReceptionistSpec.scala | 2 +- 29 files changed, 1822 insertions(+), 767 deletions(-) create mode 100644 akka-typed/src/main/java/akka/typed/javadsl/Actor.java create mode 100644 akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java delete mode 100644 akka-typed/src/main/scala/akka/typed/patterns/Receiver.scala create mode 100644 akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala rename akka-typed/src/main/scala/akka/typed/{ => scaladsl}/Ask.scala (96%) create mode 100644 akka-typed/src/test/java/akka/typed/javadsl/ActorCompile.java delete mode 100644 akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala diff --git a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala index d70b20d4bb..ace52e1b18 100644 --- a/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala +++ b/akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala @@ -6,7 +6,7 @@ package docs.akka.typed //#imports import akka.typed._ import akka.typed.ScalaDSL._ -import akka.typed.AskPattern._ +import akka.typed.scaladsl.AskPattern._ import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.Await diff --git a/akka-typed/src/main/java/akka/typed/javadsl/Actor.java b/akka-typed/src/main/java/akka/typed/javadsl/Actor.java new file mode 100644 index 0000000000..7cb4be0ce8 --- /dev/null +++ b/akka-typed/src/main/java/akka/typed/javadsl/Actor.java @@ -0,0 +1,384 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl; + +import static akka.typed.scaladsl.Actor.Empty; +import static akka.typed.scaladsl.Actor.Ignore; +import static akka.typed.scaladsl.Actor.Same; +import static akka.typed.scaladsl.Actor.Stopped; +import static akka.typed.scaladsl.Actor.Unhandled; + +import java.util.function.Function; + +import akka.japi.function.Function2; +import akka.japi.function.Procedure2; +import akka.japi.pf.PFBuilder; +import akka.typed.ActorRef; +import akka.typed.Behavior; +import akka.typed.PreStart; +import akka.typed.Signal; +import akka.typed.patterns.Restarter; +import akka.typed.scaladsl.Actor.Widened; +import scala.reflect.ClassTag; + +public abstract class Actor { + /* + * This DSL is implemented in Java in order to ensure consistent usability from Java, + * taking possible Scala oddities out of the equation. There is some duplication in + * the behavior implementations, but that is unavoidable if both DSLs shall offer the + * same runtime performance (especially concerning allocations for function converters). + */ + + private static class Deferred extends Behavior { + final akka.japi.function.Function, Behavior> producer; + + public Deferred(akka.japi.function.Function, Behavior> producer) { + this.producer = producer; + } + + @Override + public Behavior management(akka.typed.ActorContext ctx, Signal msg) throws Exception { + if (msg instanceof PreStart) { + return Behavior.preStart(producer.apply(ctx), ctx); + } else + throw new IllegalStateException("Deferred behavior must receive PreStart as first signal"); + } + + @Override + public Behavior message(akka.typed.ActorContext ctx, T msg) throws Exception { + throw new IllegalStateException("Deferred behavior must receive PreStart as first signal"); + } + } + + private static class Stateful extends Behavior { + final Function2, Signal, Behavior> signal; + final Function2, T, Behavior> message; + + public Stateful(Function2, Signal, Behavior> signal, + Function2, T, Behavior> message) { + this.signal = signal; + this.message = message; + } + + @Override + public Behavior management(akka.typed.ActorContext ctx, Signal msg) throws Exception { + return signal.apply(ctx, msg); + } + + @Override + public Behavior message(akka.typed.ActorContext ctx, T msg) throws Exception { + return message.apply(ctx, msg); + } + } + + private static class Stateless extends Behavior { + final Procedure2, T> message; + + public Stateless(Procedure2, T> message) { + this.message = message; + } + + @Override + public Behavior management(akka.typed.ActorContext ctx, Signal msg) throws Exception { + return Same(); + } + + @Override + public Behavior message(akka.typed.ActorContext ctx, T msg) throws Exception { + message.apply(ctx, msg); + return Same(); + } + } + + private static class Tap extends Behavior { + final Procedure2, Signal> signal; + final Procedure2, T> message; + final Behavior behavior; + + public Tap(Procedure2, Signal> signal, Procedure2, T> message, + Behavior behavior) { + this.signal = signal; + this.message = message; + this.behavior = behavior; + } + + private Behavior canonicalize(Behavior behv) { + if (Behavior.isUnhandled(behv)) + return Unhandled(); + else if (behv == Same()) + return Same(); + else if (Behavior.isAlive(behv)) + return new Tap(signal, message, behv); + else + return Stopped(); + } + + @Override + public Behavior management(akka.typed.ActorContext ctx, Signal msg) throws Exception { + signal.apply(ctx, msg); + return canonicalize(behavior.management(ctx, msg)); + } + + @Override + public Behavior message(akka.typed.ActorContext ctx, T msg) throws Exception { + message.apply(ctx, msg); + return canonicalize(behavior.message(ctx, msg)); + } + } + + /** + * Mode selector for the {@link #restarter} wrapper that decides whether an actor + * upon a failure shall be restarted (to clean initial state) or resumed (keep + * on running, with potentially compromised state). + * + * Resuming is less safe. If you use OnFailure.RESUME you should at least + * not hold mutable data fields or collections within the actor as those might + * be in an inconsistent state (the exception might have interrupted normal + * processing); avoiding mutable state is possible by returning a fresh + * behavior with the new state after every message. + */ + public enum OnFailure { + RESUME, RESTART; + } + + private static Function2 _unhandledFun = (ctx, msg) -> Unhandled(); + + @SuppressWarnings("unchecked") + private static Function2, Signal, Behavior> unhandledFun() { + return (Function2, Signal, Behavior>) (Object) _unhandledFun; + } + + private static Procedure2 _doNothing = (ctx, msg) -> { + }; + + @SuppressWarnings("unchecked") + private static Procedure2, Signal> doNothing() { + return (Procedure2, Signal>) (Object) _doNothing; + } + + /** + * Construct an actor behavior that can react both to lifecycle signals and + * incoming messages. After spawning this actor from another actor (or as the + * guardian of an {@link akka.typed.ActorSystem}) it will be executed within an + * {@link ActorContext} that allows access to the system, spawning and watching + * other actors, etc. + * + * In either case—signal or message—the next behavior must be returned. If no + * change is desired, use {@link #same}. + * + * @param signal + * the function that describes how this actor reacts to the given + * signal + * @param message + * the function that describes how this actor reacts to the next + * message + * @return the behavior + */ + static public Behavior signalOrMessage(Function2, Signal, Behavior> signal, + Function2, T, Behavior> message) { + return new Stateful(signal, message); + } + + /** + * Construct an actor behavior that can react to incoming messages but not to + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an {@link akka.typed.ActorSystem}) it will be executed within an + * {@link ActorContext} that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called stateful because processing the next message + * results in a new behavior that can potentially be different from this one. + * If no change is desired, use {@link #same}. + * + * @param message + * the function that describes how this actor reacts to the next + * message + * @return the behavior + */ + static public Behavior stateful(Function2, T, Behavior> message) { + return new Stateful(unhandledFun(), message); + } + + /** + * Construct an actor behavior that can react to incoming messages but not to + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an {@link akka.typed.ActorSystem}) it will be executed within an + * {@link ActorContext} that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called stateless because it cannot be replaced by + * another one after it has been installed. It is most useful for leaf actors + * that do not create child actors themselves. + * + * @param message + * the function that describes how this actor reacts to the next + * message + * @return the behavior + */ + static public Behavior stateless(Procedure2, T> message) { + return new Stateless(message); + } + + /** + * Return this behavior from message processing in order to advise the system + * to reuse the previous behavior. This is provided in order to avoid the + * allocation overhead of recreating the current behavior where that is not + * necessary. + * + * @return pseudo-behavior marking “no change” + */ + static public Behavior same() { + return Same(); + } + + /** + * Return this behavior from message processing in order to advise the system + * to reuse the previous behavior, including the hint that the message has not + * been handled. This hint may be used by composite behaviors that delegate + * (partial) handling to other behaviors. + * + * @return pseudo-behavior marking “unhandled” + */ + static public Behavior unhandled() { + return Unhandled(); + } + + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. The PostStop + * signal that results from stopping this actor will NOT be passed to the + * current behavior, it will be effectively ignored. + * + * @return the inert behavior + */ + static public Behavior stopped() { + return Stopped(); + } + + /** + * A behavior that treats every incoming message as unhandled. + * + * @return the empty behavior + */ + static public Behavior empty() { + return Empty(); + } + + /** + * A behavior that ignores every incoming message and returns “same”. + * + * @return the inert behavior + */ + static public Behavior ignore() { + return Ignore(); + } + + /** + * Behavior decorator that allows you to perform any side-effect before a + * signal or message is delivered to the wrapped behavior. The wrapped + * behavior can evolve (i.e. be stateful) without needing to be wrapped in a + * tap(...) call again. + * + * @param signal + * procedure to invoke with the {@link ActorContext} and the + * {@link akka.typed.Signal} as arguments before delivering the signal to + * the wrapped behavior + * @param message + * procedure to invoke with the {@link ActorContext} and the received + * message as arguments before delivering the signal to the wrapped + * behavior + * @param behavior + * initial behavior to be wrapped + * @return the decorated behavior + */ + static public Behavior tap(Procedure2, Signal> signal, Procedure2, T> message, + Behavior behavior) { + return new Tap(signal, message, behavior); + } + + /** + * Behavior decorator that copies all received message to the designated + * monitor {@link akka.typed.ActorRef} before invoking the wrapped behavior. The + * wrapped behavior can evolve (i.e. be stateful) without needing to be + * wrapped in a monitor(...) call again. + * + * @param monitor + * ActorRef to which to copy all received messages + * @param behavior + * initial behavior to be wrapped + * @return the decorated behavior + */ + static public Behavior monitor(ActorRef monitor, Behavior behavior) { + return new Tap(doNothing(), (ctx, msg) -> monitor.tell(msg), behavior); + } + + /** + * Wrap the given behavior such that it is restarted (i.e. reset to its + * initial state) whenever it throws an exception of the given class or a + * subclass thereof. Exceptions that are not subtypes of Thr will not be + * caught and thus lead to the termination of the actor. + * + * It is possible to specify that the actor shall not be restarted but + * resumed. This entails keeping the same state as before the exception was + * thrown and is thus less safe. If you use OnFailure.RESUME you should at + * least not hold mutable data fields or collections within the actor as those + * might be in an inconsistent state (the exception might have interrupted + * normal processing); avoiding mutable state is possible by returning a fresh + * behavior with the new state after every message. + * + * @param clazz + * the type of exceptions that shall be caught + * @param mode + * whether to restart or resume the actor upon a caught failure + * @param initialBehavior + * the initial behavior, that is also restored during a restart + * @return the wrapped behavior + */ + static public Behavior restarter(Class clazz, OnFailure mode, + Behavior initialBehavior) { + final ClassTag catcher = akka.japi.Util.classTag(clazz); + return new Restarter(initialBehavior, mode == OnFailure.RESUME, initialBehavior, catcher); + } + + /** + * Widen the wrapped Behavior by placing a funnel in front of it: the supplied + * PartialFunction decides which message to pull in (those that it is defined + * at) and may transform the incoming message to place them into the wrapped + * Behavior’s type hierarchy. Signals are not transformed. + * + *
+   * Behavior<String> s = stateless((ctx, msg) -> System.out.println(msg))
+   * Behavior<Number> n = widened(s, pf -> pf.
+   *         match(BigInteger.class, i -> "BigInteger(" + i + ")").
+   *         match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
+   *         // drop all other kinds of Number
+   *     );
+   * 
+ * + * @param behavior + * the behavior that will receive the selected messages + * @param selector + * a partial function builder for describing the selection and + * transformation + * @return a behavior of the widened type + */ + static public Behavior widened(Behavior behavior, Function, PFBuilder> selector) { + return new Widened(behavior, selector.apply(new PFBuilder<>()).build()); + } + + /** + * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior + * creation is deferred to the child actor instead of running within the + * parent. + * + * @param producer + * behavior factory that takes the child actor’s context as argument + * @return the deferred behavior + */ + static public Behavior deferred(akka.japi.function.Function, Behavior> producer) { + return new Deferred(producer); + } + +} diff --git a/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java b/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java new file mode 100644 index 0000000000..fa2050efd9 --- /dev/null +++ b/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl; + +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import akka.actor.Cancellable; +import akka.typed.ActorRef; +import akka.typed.ActorSystem; +import akka.typed.Behavior; +import akka.typed.DeploymentConfig; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.duration.FiniteDuration; + +/** + * An Actor is given by the combination of a {@link akka.typed.Behavior} and a context in which + * this behavior is executed. As per the Actor Model an Actor can perform the + * following actions when processing a message: + * + *
    + *
  • send a finite number of messages to other Actors it knows
  • + *
  • create a finite number of Actors
  • + *
  • designate the behavior for the next message
  • + *
+ * + * In Akka the first capability is accessed by using the {@link akka.typed.ActorRef#tell} + * method, the second is provided by {@link akka.typed.ActorContext#spawn} and the + * third is implicit in the signature of {@link akka.typed.Behavior} in that the next behavior + * is always returned from the message processing logic. + * + * An ActorContext in addition provides access to the Actor’s own identity + * ({@link #getSelf “self”}), the {@link akka.typed.ActorSystem} it is part of, methods for querying the list + * of child Actors it created, access to {@link akka.typed.Terminated DeathWatch} and timed + * message scheduling. + */ +public interface ActorContext { + + /** + * The identity of this Actor, bound to the lifecycle of this Actor instance. + * An Actor with the same name that lives before or after this instance will + * have a different {@link akka.typed.ActorRef}. + */ + public ActorRef getSelf(); + + /** + * Return the mailbox capacity that was configured by the parent for this + * actor. + */ + public int getMailboxCapacity(); + + /** + * The {@link akka.typed.ActorSystem} to which this Actor belongs. + */ + public ActorSystem getSystem(); + + /** + * The list of child Actors created by this Actor during its lifetime that are + * still alive, in no particular order. + */ + public List> getChildren(); + + /** + * The named child Actor if it is alive. + */ + public Optional> getChild(String name); + + /** + * Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + public ActorRef spawnAnonymous(Behavior behavior); + + /** + * Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + public ActorRef spawnAnonymous(Behavior behavior, DeploymentConfig deployment); + + /** + * Create a child Actor from the given {@link akka.typed.Behavior} and with the given name. + */ + public ActorRef spawn(Behavior behavior, String name); + + /** + * Create a child Actor from the given {@link akka.typed.Behavior} and with the given name. + */ + public ActorRef spawn(Behavior behavior, String name, DeploymentConfig deployment); + + /** + * Force the child Actor under the given name to terminate after it finishes + * processing its current message. Nothing happens if the ActorRef does not + * refer to a current child actor. + * + * @return whether the passed-in {@link akka.typed.ActorRef} points to a current child Actor + */ + public boolean stop(ActorRef child); + + /** + * Register for {@link akka.typed.Terminated} notification once the Actor identified by the + * given {@link akka.typed.ActorRef} terminates. This notification is also generated when the + * {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed + * (e.g. in reaction to being unreachable). + */ + public ActorRef watch(ActorRef other); + + /** + * Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated} + * notification will not subsequently be received for the referenced Actor. + */ + public ActorRef unwatch(ActorRef other); + + /** + * Schedule the sending of a notification in case no other message is received + * during the given period of time. The timeout starts anew with each received + * message. Provide scala.concurrent.duration.Duration.Undefined to switch off this mechanism. + */ + public void setReceiveTimeout(FiniteDuration d, T msg); + + /** + * Cancel the sending of receive timeout notifications. + */ + public void cancelReceiveTimeout(); + + /** + * Schedule the sending of the given message to the given target Actor after + * the given time period has elapsed. The scheduled action can be cancelled by + * invoking {@link akka.actor.Cancellable#cancel} on the returned handle. + */ + public Cancellable schedule(FiniteDuration delay, ActorRef target, U msg); + + /** + * This Actor’s execution context. It can be used to run asynchronous tasks + * like scala.concurrent.Future combinators. + */ + public ExecutionContextExecutor getExecutionContext(); + + /** + * Create a child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + * + * The name of the child actor will be composed of a unique identifier + * starting with a dollar sign to which the given name argument is appended, + * with an inserted hyphen between these two parts. Therefore the given name + * argument does not need to be unique within the scope of the parent actor. + */ + public ActorRef spawnAdapter(Function f, String name); + + /** + * Create an anonymous child actor that will wrap messages such that other + * Actor’s protocols can be ingested by this Actor. You are strongly advised + * to cache these ActorRefs or to stop them when no longer needed. + */ + public ActorRef spawnAdapter(Function f); + +} diff --git a/akka-typed/src/main/scala/akka/typed/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/ActorContext.scala index 5ce4e40e3e..119b8538d4 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorContext.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorContext.scala @@ -9,136 +9,55 @@ import akka.util.Helpers import akka.{ actor ⇒ untyped } import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor +import java.util.Optional +import java.util.ArrayList +import akka.annotation.DoNotInherit +import akka.annotation.ApiMayChange /** - * An Actor is given by the combination of a [[Behavior]] and a context in - * which this behavior is executed. As per the Actor Model an Actor can perform - * the following actions when processing a message: - * - * - send a finite number of messages to other Actors it knows - * - create a finite number of Actors - * - designate the behavior for the next message - * - * In Akka the first capability is accessed by using the `!` or `tell` method - * on an [[ActorRef]], the second is provided by [[ActorContext#spawn]] - * and the third is implicit in the signature of [[Behavior]] in that the next - * behavior is always returned from the message processing logic. - * - * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), - * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it - * created, access to [[Terminated DeathWatch]] and timed message scheduling. + * This trait is not meant to be extended by user code. If you do so, you may + * lose binary compatibility. */ -trait ActorContext[T] { +@DoNotInherit +@ApiMayChange +trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext[T] { + def getChild(name: String): Optional[ActorRef[Void]] = + child(name) match { + case Some(c) ⇒ Optional.of(c.upcast[Void]) + case None ⇒ Optional.empty() + } - /** - * The identity of this Actor, bound to the lifecycle of this Actor instance. - * An Actor with the same name that lives before or after this instance will - * have a different [[ActorRef]]. - */ - def self: ActorRef[T] + def getChildren(): java.util.List[akka.typed.ActorRef[Void]] = { + val c = children + val a = new ArrayList[ActorRef[Void]](c.size) + val i = c.iterator + while (i.hasNext) a.add(i.next().upcast[Void]) + a + } - /** - * Return the mailbox capacity that was configured by the parent for this actor. - */ - def mailboxCapacity: Int + def getExecutionContext(): scala.concurrent.ExecutionContextExecutor = + executionContext - /** - * The [[ActorSystem]] to which this Actor belongs. - */ - def system: ActorSystem[Nothing] + def getMailboxCapacity(): Int = + mailboxCapacity - /** - * The list of child Actors created by this Actor during its lifetime that - * are still alive, in no particular order. - */ - def children: Iterable[ActorRef[Nothing]] + def getSelf(): akka.typed.ActorRef[T] = + self - /** - * The named child Actor if it is alive. - */ - def child(name: String): Option[ActorRef[Nothing]] + def getSystem(): akka.typed.ActorSystem[Void] = + system.asInstanceOf[ActorSystem[Void]] - /** - * Create a child Actor from the given [[Props]] under a randomly chosen name. - * It is good practice to name Actors wherever practical. - */ - def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + def spawn[U](behavior: akka.typed.Behavior[U], name: String): akka.typed.ActorRef[U] = + spawn(behavior, name, EmptyDeploymentConfig) - /** - * Create a child Actor from the given [[Props]] and with the given name. - */ - def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + def spawnAnonymous[U](behavior: akka.typed.Behavior[U]): akka.typed.ActorRef[U] = + spawnAnonymous(behavior, EmptyDeploymentConfig) - /** - * Force the child Actor under the given name to terminate after it finishes - * processing its current message. Nothing happens if the ActorRef does not - * refer to a current child actor. - * - * @return whether the passed-in [[ActorRef]] points to a current child Actor - */ - def stop(child: ActorRef[Nothing]): Boolean - - /** - * Register for [[Terminated]] notification once the Actor identified by the - * given [[ActorRef]] terminates. This notification is also generated when the - * [[ActorSystem]] to which the referenced Actor belongs is declared as - * failed (e.g. in reaction to being unreachable). - */ - def watch[U](other: ActorRef[U]): ActorRef[U] - - /** - * Revoke the registration established by `watch`. A [[Terminated]] - * notification will not subsequently be received for the referenced Actor. - */ - def unwatch[U](other: ActorRef[U]): ActorRef[U] - - /** - * Schedule the sending of a notification in case no other - * message is received during the given period of time. The timeout starts anew - * with each received message. Provide `Duration.Undefined` to switch off this - * mechanism. - */ - def setReceiveTimeout(d: FiniteDuration, msg: T): Unit - - /** - * Cancel the sending of receive timeout notifications. - */ - def cancelReceiveTimeout(): Unit - - /** - * Schedule the sending of the given message to the given target Actor after - * the given time period has elapsed. The scheduled action can be cancelled - * by invoking [[akka.actor.Cancellable]] `cancel` on the returned - * handle. - */ - def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): untyped.Cancellable - - /** - * This Actor’s execution context. It can be used to run asynchronous tasks - * like [[scala.concurrent.Future]] combinators. - */ - implicit def executionContext: ExecutionContextExecutor - - /** - * Create a child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - * - * The name of the child actor will be composed of a unique identifier - * starting with a dollar sign to which the given `name` argument is - * appended, with an inserted hyphen between these two parts. Therefore - * the given `name` argument does not need to be unique within the scope - * of the parent actor. - */ - def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] - - /** - * Create an anonymous child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - */ - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAdapter(f, "") + def spawnAdapter[U](f: java.util.function.Function[U, T]): akka.typed.ActorRef[U] = + spawnAdapter(f.apply _) + def spawnAdapter[U](f: java.util.function.Function[U, T], name: String): akka.typed.ActorRef[U] = + spawnAdapter(f.apply _, name) } /** @@ -180,7 +99,7 @@ class StubbedActorContext[T]( * Do not actually stop the child inbox, only simulate the liveness check. * Removal is asynchronous, explicit removeInbox is needed from outside afterwards. */ - override def stop(child: ActorRef[Nothing]): Boolean = { + override def stop(child: ActorRef[_]): Boolean = { _children.get(child.path.name) match { case None ⇒ false case Some(inbox) ⇒ inbox.ref == child diff --git a/akka-typed/src/main/scala/akka/typed/ActorRef.scala b/akka-typed/src/main/scala/akka/typed/ActorRef.scala index 43cdcb3ef4..0fbc0b28bf 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorRef.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorRef.scala @@ -13,11 +13,11 @@ import scala.concurrent.Future * only during the Actor’s lifetime and allows messages to be sent to that * Actor instance. Sending a message to an Actor that has terminated before * receiving the message will lead to that message being discarded; such - * messages are delivered to the [[akka.actor.DeadLetter]] channel of the - * [[akka.event.EventStream]] on a best effort basis + * messages are delivered to the [[DeadLetter]] channel of the + * [[EventStream]] on a best effort basis * (i.e. this delivery is not reliable). */ -abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[Nothing]] { this: internal.ActorRefImpl[T] ⇒ +abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[ActorRef[_]] { this: internal.ActorRefImpl[T] ⇒ /** * Send a message to the Actor referenced by this ActorRef using *at-most-once* @@ -49,7 +49,7 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act /** * Comparison takes path and the unique id of the actor cell into account. */ - final override def compareTo(other: ActorRef[Nothing]) = { + final override def compareTo(other: ActorRef[_]) = { val x = this.path compareTo other.path if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1 else x diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index 5d1e28a510..9557befad6 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -13,14 +13,18 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.{ ExecutionContextExecutor, Future } import akka.typed.adapter.{ ActorSystemAdapter, PropsAdapter } import akka.util.Timeout +import akka.annotation.DoNotInherit +import akka.annotation.ApiMayChange /** * An ActorSystem is home to a hierarchy of Actors. It is created using - * [[ActorSystem$]] `apply` from a [[Props]] object that describes the root + * [[ActorSystem#apply]] from a [[Behavior]] object that describes the root * Actor of this hierarchy and which will create all other Actors beneath it. * A system also implements the [[ActorRef]] type, and sending a message to * the system directs that message to the root Actor. */ +@DoNotInherit +@ApiMayChange trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒ /** @@ -131,11 +135,11 @@ trait ActorSystem[-T] extends ActorRef[T] { this: internal.ActorRefImpl[T] ⇒ * Ask the system guardian of this system to create an actor from the given * behavior and deployment and with the given name. The name does not need to * be unique since the guardian will prefix it with a running number when - * creating the child actor. The timeout sets the timeout used for the [[AskPattern$]] + * creating the child actor. The timeout sets the timeout used for the [[akka.typed.scaladsl.AskPattern$]] * invocation when asking the guardian. * * The returned Future of [[ActorRef]] may be converted into an [[ActorRef]] - * to which messages can immediately be sent by using the [[ActorRef$apply]] + * to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]] * method. */ def systemActorOf[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig)(implicit timeout: Timeout): Future[ActorRef[U]] diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index b3f1eb2858..5429d57dd8 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -13,6 +13,10 @@ package akka.typed * Behaviors can be formulated in a number of different ways, either by * creating a derived class or by employing factory methods like the ones * in the [[ScalaDSL$]] object. + * + * Closing over ActorContext makes a Behavior immobile: it cannot be moved to + * another context and executed there, and therefore it cannot be replicated or + * forked either. */ abstract class Behavior[T] { /** @@ -30,6 +34,7 @@ abstract class Behavior[T] { * Code calling this method should use [[Behavior$]] `canonicalize` to replace * the special objects with real Behaviors. */ + @throws(classOf[Exception]) def management(ctx: ActorContext[T], msg: Signal): Behavior[T] /** @@ -45,6 +50,7 @@ abstract class Behavior[T] { * Code calling this method should use [[Behavior$]] `canonicalize` to replace * the special objects with real Behaviors. */ + @throws(classOf[Exception]) def message(ctx: ActorContext[T], msg: T): Behavior[T] /** @@ -55,14 +61,6 @@ abstract class Behavior[T] { def narrow[U <: T]: Behavior[U] = this.asInstanceOf[Behavior[U]] } -/* - * FIXME - * - * Closing over ActorContext makes a Behavior immobile: it cannot be moved to - * another context and executed there, and therefore it cannot be replicated or - * forked either. - */ - object Behavior { /** @@ -123,8 +121,8 @@ object Behavior { /** * Given a possibly special behavior (same or unhandled) and a * “current” behavior (which defines the meaning of encountering a `Same` - * behavior) this method unwraps the behavior such that the innermost behavior - * is returned, i.e. it removes the decorations. + * behavior) this method computes the next behavior, suitable for passing a + * message or signal. */ def canonicalize[T](behavior: Behavior[T], current: Behavior[T]): Behavior[T] = behavior match { @@ -133,6 +131,11 @@ object Behavior { case other ⇒ other } + /** + * Validate the given behavior as a suitable initial actor behavior; most + * notably the behavior can neither be `Same` nor `Unhandled`. Starting + * out with a `Stopped` behavior is allowed, though. + */ def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] = behavior match { case `sameBehavior` | `unhandledBehavior` ⇒ @@ -140,12 +143,22 @@ object Behavior { case x ⇒ x } + /** + * Validate the given behavior as initial, pass it a [[PreStart]] message + * and canonicalize the result. + */ def preStart[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { val b = validateAsInitial(behavior) if (isAlive(b)) canonicalize(b.management(ctx, PreStart), b) else b } + /** + * Returns true if the given behavior is not stopped. + */ def isAlive[T](behavior: Behavior[T]): Boolean = behavior ne stoppedBehavior + /** + * Returns true if the given behavior is the special `Unhandled` marker. + */ def isUnhandled[T](behavior: Behavior[T]): Boolean = behavior eq unhandledBehavior } diff --git a/akka-typed/src/main/scala/akka/typed/Effects.scala b/akka-typed/src/main/scala/akka/typed/Effects.scala index c8acad73ae..1b13fb9e61 100644 --- a/akka-typed/src/main/scala/akka/typed/Effects.scala +++ b/akka-typed/src/main/scala/akka/typed/Effects.scala @@ -73,7 +73,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma effectQueue.offer(Spawned(name)) super.spawn(behavior, name) } - override def stop(child: ActorRef[Nothing]): Boolean = { + override def stop(child: ActorRef[_]): Boolean = { effectQueue.offer(Stopped(child.path.name)) super.stop(child) } diff --git a/akka-typed/src/main/scala/akka/typed/Inbox.scala b/akka-typed/src/main/scala/akka/typed/Inbox.scala index 90a3804e07..233a12713e 100644 --- a/akka-typed/src/main/scala/akka/typed/Inbox.scala +++ b/akka-typed/src/main/scala/akka/typed/Inbox.scala @@ -12,6 +12,12 @@ import scala.annotation.tailrec import akka.actor.ActorRefProvider import java.util.concurrent.ThreadLocalRandom +/** + * Utility for receiving messages outside of an actor. No methods are provided + * for synchronously awaiting a message, this is primarily useful for synchronous + * tests of behaviors that send messages to other actors, where an Inbox’s ActorRef + * can conveniently be used as a stub. + */ class Inbox[T](name: String) { private val q = new ConcurrentLinkedQueue[T] diff --git a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala index 9da2377c41..e5b06d7b93 100644 --- a/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala +++ b/akka-typed/src/main/scala/akka/typed/MessageAndSignals.scala @@ -31,16 +31,21 @@ sealed trait Signal * Lifecycle signal that is fired upon creation of the Actor. This will be the * first message that the actor processes. */ -@SerialVersionUID(1L) -final case object PreStart extends Signal +sealed abstract class PreStart extends Signal +final case object PreStart extends PreStart { + def instance: PreStart = this +} /** * Lifecycle signal that is fired upon restart of the Actor before replacing * the behavior with the fresh one (i.e. this signal is received within the - * behavior that failed). + * behavior that failed). The replacement behavior will receive PreStart as its + * first signal. */ -@SerialVersionUID(1L) -final case object PreRestart extends Signal +sealed abstract class PreRestart extends Signal +final case object PreRestart extends PreRestart { + def instance: PreRestart = this +} /** * Lifecycle signal that is fired after this actor and all its child actors @@ -51,8 +56,10 @@ final case object PreRestart extends Signal * `Stopped` behavior then this signal will be ignored (i.e. the * Stopped behavior will do nothing in reaction to it). */ -@SerialVersionUID(1L) -final case object PostStop extends Signal +sealed abstract class PostStop extends Signal +final case object PostStop extends PostStop { + def instance: PostStop = this +} /** * Lifecycle signal that is fired when an Actor that was watched has terminated. @@ -64,70 +71,8 @@ final case object PostStop extends Signal * have occurred. Termination of a remote Actor can also be effected by declaring * the Actor’s home system as failed (e.g. as a result of being unreachable). */ -@SerialVersionUID(1L) final case class Terminated(ref: ActorRef[Nothing])(failed: Throwable) extends Signal { def wasFailed: Boolean = failed ne null def failure: Throwable = failed def failureOption: Option[Throwable] = Option(failed) } - -/** - * FIXME correct this documentation when the Restarter behavior has been implemented - * - * The parent of an actor decides upon the fate of a failed child actor by - * encapsulating its next behavior in one of the four wrappers defined within - * this class. - * - * Failure responses have an associated precedence that ranks them, which is in - * descending importance: - * - * - Escalate - * - Stop - * - Restart - * - Resume - */ -object Failed { - - sealed trait Decision - - @SerialVersionUID(1L) - case object NoFailureResponse extends Decision - - /** - * Resuming the child actor means that the result of processing the message - * on which it failed is just ignored, the previous state will be used to - * process the next message. The message that triggered the failure will not - * be processed again. - */ - @SerialVersionUID(1L) - case object Resume extends Decision - - /** - * Restarting the child actor means resetting its behavior to the initial - * one that was provided during its creation (i.e. the one which was passed - * into the [[Props]] constructor). The previously failed behavior will - * receive a [[PreRestart]] signal before this happens and the replacement - * behavior will receive a [[PostRestart]] signal afterwards. - */ - @SerialVersionUID(1L) - case object Restart extends Decision - - /** - * Stopping the child actor will free its resources and eventually - * (asynchronously) unregister its name from the parent. Completion of this - * process can be observed by watching the child actor and reacting to its - * [[Terminated]] signal. - */ - @SerialVersionUID(1L) - case object Stop extends Decision - - /** - * The default response to a failure in a child actor is to escalate the - * failure, entailing that the parent actor fails as well. This is equivalent - * to an exception unwinding the call stack, but it applies to the supervision - * hierarchy instead. - */ - @SerialVersionUID(1L) - case object Escalate extends Decision - -} diff --git a/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala b/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala index 0f4be63377..bb3b46bca2 100644 --- a/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala +++ b/akka-typed/src/main/scala/akka/typed/ScalaDSL.scala @@ -11,6 +11,7 @@ import akka.util.LineNumbers * This object holds several behavior factories and combinators that can be * used to construct Behavior instances. */ +@deprecated("use akka.typed.scaladsl.Actor", "2.5.0") object ScalaDSL { // FIXME check that all behaviors can cope with not getting PreStart as first message @@ -20,17 +21,20 @@ object ScalaDSL { * Widen the type of this Behavior by providing a filter function that permits * only a subtype of the widened set of messages. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def widen[U >: T](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher) /** * Combine the two behaviors such that incoming messages are distributed * to both of them, each one evolving its state independently. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def &&(other: Behavior[T]): Behavior[T] = And(behavior, other) /** * Combine the two behaviors such that incoming messages are given first to * the left behavior and are then only passed on to the right behavior if * the left one returned Unhandled. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def ||(other: Behavior[T]): Behavior[T] = Or(behavior, other) } @@ -40,6 +44,7 @@ object ScalaDSL { * at) and may transform the incoming message to place them into the wrapped * Behavior’s type hierarchy. Signals are not transformed. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Widened[T, U >: T](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] { private def postProcess(ctx: ActorContext[U], behv: Behavior[T]): Behavior[U] = if (isUnhandled(behv)) Unhandled @@ -63,6 +68,7 @@ object ScalaDSL { * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation * is deferred to the child actor instead of running within the parent. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Deferred[T](factory: () ⇒ Behavior[T]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = { if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)") @@ -81,6 +87,7 @@ object ScalaDSL { * avoid the allocation overhead of recreating the current behavior where * that is not necessary. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]] /** @@ -89,6 +96,7 @@ object ScalaDSL { * message has not been handled. This hint may be used by composite * behaviors that delegate (partial) handling to other behaviors. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]] /* @@ -104,16 +112,19 @@ object ScalaDSL { * signal that results from stopping this actor will NOT be passed to the * current behavior, it will be effectively ignored. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]] /** * This behavior does not handle any inputs, it is completely inert. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]] /** * This behavior does not handle any inputs, it is completely inert. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]] /** @@ -122,17 +133,20 @@ object ScalaDSL { * used by several of the behaviors defined in this DSL, see for example * [[Full]]. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") sealed trait MessageOrSignal[T] /** * A message bundled together with the current [[ActorContext]]. */ @SerialVersionUID(1L) - final case class Msg[T](ctx: ActorContext[T], msg: T) extends MessageOrSignal[T] + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") + final case class Msg[T](ctx: scaladsl.ActorContext[T], msg: T) extends MessageOrSignal[T] /** * A signal bundled together with the current [[ActorContext]]. */ @SerialVersionUID(1L) - final case class Sig[T](ctx: ActorContext[T], signal: Signal) extends MessageOrSignal[T] + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") + final case class Sig[T](ctx: scaladsl.ActorContext[T], signal: Signal) extends MessageOrSignal[T] /** * This type of behavior allows to handle all incoming messages within @@ -144,9 +158,9 @@ object ScalaDSL { * For the lifecycle notifications pertaining to the actor itself this * behavior includes a fallback mechanism: an unhandled [[PreRestart]] signal * will terminate all child actors (transitively) and then emit a [[PostStop]] - * signal in addition, whereas an unhandled [[PostRestart]] signal will emit - * an additional [[PreStart]] signal. + * signal in addition. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Full[T](behavior: PartialFunction[MessageOrSignal[T], Behavior[T]]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = { lazy val fallback: (MessageOrSignal[T]) ⇒ Behavior[T] = { @@ -173,6 +187,7 @@ object ScalaDSL { * to create the supplied function then any message not matching the list of * cases will fail this actor with a [[scala.MatchError]]. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class FullTotal[T](behavior: MessageOrSignal[T] ⇒ Behavior[T]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal) = behavior(Sig(ctx, msg)) override def message(ctx: ActorContext[T], msg: T) = behavior(Msg(ctx, msg)) @@ -189,6 +204,7 @@ object ScalaDSL { * This behavior type is most useful for leaf actors that do not create child * actors themselves. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Total[T](behavior: T ⇒ Behavior[T]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match { case _ ⇒ Unhandled @@ -207,6 +223,7 @@ object ScalaDSL { * This behavior type is most useful for leaf actors that do not create child * actors themselves. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Partial[T](behavior: PartialFunction[T, Behavior[T]]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = msg match { case _ ⇒ Unhandled @@ -220,6 +237,7 @@ object ScalaDSL { * some action upon each received message or signal. It is most commonly used * for logging or tracing what a certain Actor does. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Tap[T](f: PartialFunction[MessageOrSignal[T], Unit], behavior: Behavior[T]) extends Behavior[T] { private def canonical(behv: Behavior[T]): Behavior[T] = if (isUnhandled(behv)) Unhandled @@ -237,6 +255,7 @@ object ScalaDSL { override def toString = s"Tap(${LineNumbers(f)},$behavior)" } object Tap { + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap({ case Msg(_, msg) ⇒ monitor ! msg }, behavior) } @@ -249,6 +268,7 @@ object ScalaDSL { * This behavior type is most useful for leaf actors that do not create child * actors themselves. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Static[T](behavior: T ⇒ Unit) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = Unhandled override def message(ctx: ActorContext[T], msg: T): Behavior[T] = { @@ -267,6 +287,7 @@ object ScalaDSL { * This decorator is useful for passing messages between the left and right * sides of [[And]] and [[Or]] combinators. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class SynchronousSelf[T](f: ActorRef[T] ⇒ Behavior[T]) extends Behavior[T] { private class B extends Behavior[T] { @@ -308,9 +329,9 @@ object ScalaDSL { * A behavior combinator that feeds incoming messages and signals both into * the left and right sub-behavior and allows them to evolve independently of * each other. When one of the sub-behaviors terminates the other takes over - * exclusively. When both sub-behaviors respond to a [[Failed]] signal, the - * response with the higher precedence is chosen (see [[Failed$]]). + * exclusively. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class And[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = { @@ -354,9 +375,9 @@ object ScalaDSL { * each other. The message or signal is passed first into the left sub-behavior * and only if that results in [[#Unhandled]] is it passed to the right * sub-behavior. When one of the sub-behaviors terminates the other takes over - * exclusively. When both sub-behaviors respond to a [[Failed]] signal, the - * response with the higher precedence is chosen (see [[Failed$]]). + * exclusively. */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") final case class Or[T](left: Behavior[T], right: Behavior[T]) extends Behavior[T] { override def management(ctx: ActorContext[T], msg: Signal): Behavior[T] = @@ -418,10 +439,14 @@ object ScalaDSL { * } * }}} */ + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") def SelfAware[T](behavior: ActorRef[T] ⇒ Behavior[T]): Behavior[T] = - FullTotal { - case Sig(ctx, PreStart) ⇒ Behavior.preStart(behavior(ctx.self), ctx) - case msg ⇒ throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)") + new Behavior[T] { + override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] = + if (sig == PreStart) Behavior.preStart(behavior(ctx.self), ctx) + else throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $sig)") + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = + throw new IllegalStateException(s"SelfAware must receive PreStart as first message (got $msg)") } /** @@ -438,10 +463,14 @@ object ScalaDSL { * } * }}} */ - def ContextAware[T](behavior: ActorContext[T] ⇒ Behavior[T]): Behavior[T] = - FullTotal { - case Sig(ctx, PreStart) ⇒ Behavior.preStart(behavior(ctx), ctx) - case msg ⇒ throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $msg)") + @deprecated("use akka.typed.scaladsl.Actor", "2.5.0") + def ContextAware[T](behavior: scaladsl.ActorContext[T] ⇒ Behavior[T]): Behavior[T] = + new Behavior[T] { + override def management(ctx: ActorContext[T], sig: Signal): Behavior[T] = + if (sig == PreStart) Behavior.preStart(behavior(ctx), ctx) + else throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got $sig)") + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = + throw new IllegalStateException(s"ContextAware must receive PreStart as first message (got ${msg.getClass})") } /** diff --git a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala index dcd5733b68..129b756a72 100644 --- a/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/adapter/ActorContextAdapter.scala @@ -22,7 +22,7 @@ private[typed] class ActorContextAdapter[T](ctx: a.ActorContext) extends ActorCo ctx.spawnAnonymous(behavior, deployment) override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) = ctx.spawn(behavior, name, deployment) - override def stop(child: ActorRef[Nothing]) = + override def stop(child: ActorRef[_]) = toUntyped(child) match { case f: akka.actor.FunctionRef ⇒ val cell = ctx.asInstanceOf[akka.actor.ActorCell] diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index d3e13e9c1f..0bed92475e 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -120,7 +120,7 @@ private[typed] class ActorCell[T]( spawn(behavior, name, deployment) } - override def stop(child: ActorRef[Nothing]): Boolean = { + override def stop(child: ActorRef[_]): Boolean = { val name = child.path.name childrenMap.get(name) match { case None ⇒ false diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala index 824c1d3c66..d65d0255c6 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorSystemImpl.scala @@ -21,6 +21,7 @@ import scala.util.Success import akka.util.Timeout import java.io.Closeable import java.util.concurrent.atomic.AtomicInteger +import akka.typed.scaladsl.AskPattern object ActorSystemImpl { import ScalaDSL._ diff --git a/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala index 8f700ca126..ccee43cd3d 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala @@ -12,6 +12,7 @@ import akka.util.{ ReentrantGuard, Subclassification, SubclassifiedIndex } import scala.collection.immutable import java.util.concurrent.TimeoutException import akka.util.Timeout +import akka.typed.scaladsl.AskPattern /** * INTERNAL API diff --git a/akka-typed/src/main/scala/akka/typed/patterns/Receiver.scala b/akka-typed/src/main/scala/akka/typed/patterns/Receiver.scala deleted file mode 100644 index 6028fd9a86..0000000000 --- a/akka-typed/src/main/scala/akka/typed/patterns/Receiver.scala +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright (C) 2014-2017 Lightbend Inc. - */ -package akka.typed.patterns - -import scala.concurrent.duration._ -import akka.typed.ActorRef -import scala.collection.immutable -import akka.typed.Behavior -import scala.concurrent.duration.Deadline -import akka.typed.ActorContext -import java.util.LinkedList -import scala.collection.JavaConverters._ -import scala.collection.immutable.Queue - -// FIXME make this nice again once the Actor Algebra is implemented -object Receiver { - import akka.typed.ScalaDSL._ - - sealed trait InternalCommand[T] - case class ReceiveTimeout[T]() extends InternalCommand[T] - - sealed trait Command[T] extends InternalCommand[T] - - /** - * Retrieve one message from the Receiver, waiting at most for the given duration. - */ - final case class GetOne[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetOneResult[T]]) extends Command[T] - /** - * Retrieve all messages from the Receiver that it has queued after the given - * duration has elapsed. - */ - final case class GetAll[T](timeout: FiniteDuration)(val replyTo: ActorRef[GetAllResult[T]]) extends Command[T] - /** - * Retrieve the external address of this Receiver (i.e. the side at which it - * takes in the messages of type T. - */ - final case class ExternalAddress[T](replyTo: ActorRef[ActorRef[T]]) extends Command[T] - - sealed trait Replies[T] - final case class GetOneResult[T](receiver: ActorRef[Command[T]], msg: Option[T]) extends Replies[T] - final case class GetAllResult[T](receiver: ActorRef[Command[T]], msgs: immutable.Seq[T]) extends Replies[T] - - private final case class Enqueue[T](msg: T) extends Command[T] - - def behavior[T]: Behavior[Command[T]] = - ContextAware[Any] { ctx ⇒ - SynchronousSelf { syncself ⇒ - Or( - empty(ctx).widen { case c: InternalCommand[t] ⇒ c.asInstanceOf[InternalCommand[T]] }, - Static[Any] { - case msg ⇒ syncself ! Enqueue(msg) - }) - } - }.narrow - - private def empty[T](ctx: ActorContext[Any]): Behavior[InternalCommand[T]] = - Total { - case ExternalAddress(replyTo) ⇒ { replyTo ! ctx.self; Same } - case g @ GetOne(d) if d <= Duration.Zero ⇒ { g.replyTo ! GetOneResult(ctx.self, None); Same } - case g @ GetOne(d) ⇒ asked(ctx, Queue(Asked(g.replyTo, Deadline.now + d))) - case g @ GetAll(d) if d <= Duration.Zero ⇒ { g.replyTo ! GetAllResult(ctx.self, Nil); Same } - case g @ GetAll(d) ⇒ { ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo)); Same } - case Enqueue(msg) ⇒ queued(ctx, msg) - } - - private def queued[T](ctx: ActorContext[Any], t: T): Behavior[InternalCommand[T]] = { - val queue = new LinkedList[T] - queue.add(t) - Total { - case ExternalAddress(replyTo) ⇒ - replyTo ! ctx.self - Same - case g: GetOne[t] ⇒ - g.replyTo ! GetOneResult(ctx.self, Some(queue.remove())) - if (queue.isEmpty) empty(ctx) else Same - case g @ GetAll(d) if d <= Duration.Zero ⇒ - g.replyTo ! GetAllResult(ctx.self, queue.iterator.asScala.toVector) - empty(ctx) - case g @ GetAll(d) ⇒ - ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo)) - Same - case Enqueue(msg) ⇒ - queue.add(msg) - Same - } - } - - private case class Asked[T](replyTo: ActorRef[GetOneResult[T]], deadline: Deadline) - private def asked[T](ctx: ActorContext[Any], queue: Queue[Asked[T]]): Behavior[InternalCommand[T]] = { - ctx.setReceiveTimeout(queue.map(_.deadline).min.timeLeft, ReceiveTimeout()) - - Total { - case ReceiveTimeout() ⇒ - val (overdue, remaining) = queue partition (_.deadline.isOverdue) - overdue foreach (a ⇒ a.replyTo ! GetOneResult(ctx.self, None)) - if (remaining.isEmpty) { - ctx.cancelReceiveTimeout() - empty(ctx) - } else asked(ctx, remaining) - case ExternalAddress(replyTo) ⇒ { replyTo ! ctx.self; Same } - case g @ GetOne(d) if d <= Duration.Zero ⇒ - g.replyTo ! GetOneResult(ctx.self, None) - asked(ctx, queue) - case g @ GetOne(d) ⇒ - asked(ctx, queue enqueue Asked(g.replyTo, Deadline.now + d)) - case g @ GetAll(d) if d <= Duration.Zero ⇒ - g.replyTo ! GetAllResult(ctx.self, Nil) - asked(ctx, queue) - case g @ GetAll(d) ⇒ - ctx.schedule(d, ctx.self, GetAll(Duration.Zero)(g.replyTo)) - asked(ctx, queue) - case Enqueue(msg) ⇒ - val (ask, q) = queue.dequeue - ask.replyTo ! GetOneResult(ctx.self, Some(msg)) - if (q.isEmpty) { - ctx.cancelReceiveTimeout() - empty(ctx) - } else asked(ctx, q) - } - } -} diff --git a/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala b/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala index a2b10f09a6..171b5b4875 100644 --- a/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala +++ b/akka-typed/src/main/scala/akka/typed/patterns/Restarter.scala @@ -7,6 +7,52 @@ package patterns import scala.reflect.ClassTag import scala.util.control.NonFatal import akka.event.Logging +import akka.typed.scaladsl.Actor._ + +/** + * Simple supervision strategy that restarts the underlying behavior for all + * failures of type Thr. + * + * FIXME add limited restarts and back-off (with limited buffering or vacation responder) + * FIXME write tests that ensure that restart is canonicalizing PreStart result correctly + */ +final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean)( + behavior: Behavior[T] = initialBehavior) extends Behavior[T] { + + private def restart(ctx: ActorContext[T]): Behavior[T] = { + try behavior.management(ctx, PreRestart) catch { case NonFatal(_) ⇒ } + Behavior.canonicalize(initialBehavior.management(ctx, PreStart), initialBehavior) + } + + private def canonical(b: Behavior[T]): Behavior[T] = + if (Behavior.isUnhandled(b)) Unhandled + else if (b eq Behavior.sameBehavior) Same + else if (!Behavior.isAlive(b)) Stopped + else if (b eq behavior) Same + else Restarter[T, Thr](initialBehavior, resume)(b) + + override def management(ctx: ActorContext[T], signal: Signal): Behavior[T] = { + val b = + try behavior.management(ctx, signal) + catch { + case ex: Thr ⇒ + ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage)) + if (resume) behavior else restart(ctx) + } + canonical(b) + } + + override def message(ctx: ActorContext[T], msg: T): Behavior[T] = { + val b = + try behavior.message(ctx, msg) + catch { + case ex: Thr ⇒ + ctx.system.eventStream.publish(Logging.Error(ex, ctx.self.toString, behavior.getClass, ex.getMessage)) + if (resume) behavior else restart(ctx) + } + canonical(b) + } +} /** * Simple supervision strategy that restarts the underlying behavior for all @@ -15,7 +61,7 @@ import akka.event.Logging * FIXME add limited restarts and back-off (with limited buffering or vacation responder) * FIXME write tests that ensure that all Behaviors are okay with getting PostRestart as first signal */ -final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] { +final case class MutableRestarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], resume: Boolean) extends Behavior[T] { private[this] var current = initialBehavior @@ -34,7 +80,7 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav if (resume) current else restart(ctx) } current = Behavior.canonicalize(b, current) - if (Behavior.isAlive(current)) this else ScalaDSL.Stopped + if (Behavior.isAlive(current)) this else Stopped } override def message(ctx: ActorContext[T], msg: T): Behavior[T] = { @@ -46,14 +92,6 @@ final case class Restarter[T, Thr <: Throwable: ClassTag](initialBehavior: Behav if (resume) current else restart(ctx) } current = Behavior.canonicalize(b, current) - if (Behavior.isAlive(current)) this else ScalaDSL.Stopped + if (Behavior.isAlive(current)) this else Stopped } } - -object Restarter { - class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) { - def wrap[T](b: Behavior[T]) = Restarter(Behavior.validateAsInitial(b), resume)(c) - } - - def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume) -} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala new file mode 100644 index 0000000000..f7eabf4fc9 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala @@ -0,0 +1,404 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed +package scaladsl + +import akka.util.LineNumbers +import scala.reflect.ClassTag +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContextExecutor +import scala.deprecatedInheritance +import akka.typed.{ ActorContext ⇒ AC } +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit + +/** + * An Actor is given by the combination of a [[Behavior]] and a context in + * which this behavior is executed. As per the Actor Model an Actor can perform + * the following actions when processing a message: + * + * - send a finite number of messages to other Actors it knows + * - create a finite number of Actors + * - designate the behavior for the next message + * + * In Akka the first capability is accessed by using the `!` or `tell` method + * on an [[ActorRef]], the second is provided by [[ActorContext#spawn]] + * and the third is implicit in the signature of [[Behavior]] in that the next + * behavior is always returned from the message processing logic. + * + * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), + * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it + * created, access to [[Terminated DeathWatch]] and timed message scheduling. + */ +@DoNotInherit +@ApiMayChange +trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T] ⇒ + + /** + * The identity of this Actor, bound to the lifecycle of this Actor instance. + * An Actor with the same name that lives before or after this instance will + * have a different [[ActorRef]]. + */ + def self: ActorRef[T] + + /** + * Return the mailbox capacity that was configured by the parent for this actor. + */ + def mailboxCapacity: Int + + /** + * The [[ActorSystem]] to which this Actor belongs. + */ + def system: ActorSystem[Nothing] + + /** + * The list of child Actors created by this Actor during its lifetime that + * are still alive, in no particular order. + */ + def children: Iterable[ActorRef[Nothing]] + + /** + * The named child Actor if it is alive. + */ + def child(name: String): Option[ActorRef[Nothing]] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] and with the given name. + */ + def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + + /** + * Force the child Actor under the given name to terminate after it finishes + * processing its current message. Nothing happens if the ActorRef does not + * refer to a current child actor. + * + * @return whether the passed-in [[ActorRef]] points to a current child Actor + */ + def stop(child: ActorRef[_]): Boolean + + /** + * Register for [[Terminated]] notification once the Actor identified by the + * given [[ActorRef]] terminates. This notification is also generated when the + * [[ActorSystem]] to which the referenced Actor belongs is declared as + * failed (e.g. in reaction to being unreachable). + */ + def watch[U](other: ActorRef[U]): ActorRef[U] + + /** + * Revoke the registration established by `watch`. A [[Terminated]] + * notification will not subsequently be received for the referenced Actor. + */ + def unwatch[U](other: ActorRef[U]): ActorRef[U] + + /** + * Schedule the sending of a notification in case no other + * message is received during the given period of time. The timeout starts anew + * with each received message. Provide `Duration.Undefined` to switch off this + * mechanism. + */ + def setReceiveTimeout(d: FiniteDuration, msg: T): Unit + + /** + * Cancel the sending of receive timeout notifications. + */ + def cancelReceiveTimeout(): Unit + + /** + * Schedule the sending of the given message to the given target Actor after + * the given time period has elapsed. The scheduled action can be cancelled + * by invoking [[akka.actor.Cancellable#cancel]] on the returned + * handle. + */ + def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable + + /** + * This Actor’s execution context. It can be used to run asynchronous tasks + * like [[scala.concurrent.Future]] combinators. + */ + implicit def executionContext: ExecutionContextExecutor + + /** + * Create a child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + * + * The name of the child actor will be composed of a unique identifier + * starting with a dollar sign to which the given `name` argument is + * appended, with an inserted hyphen between these two parts. Therefore + * the given `name` argument does not need to be unique within the scope + * of the parent actor. + */ + def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] + + /** + * Create an anonymous child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + */ + def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAdapter(f, "") + +} + +@ApiMayChange +object Actor { + import Behavior._ + + // FIXME check that all behaviors can cope with not getting PreStart as first message + + final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { + /** + * Widen the wrapped Behavior by placing a funnel in front of it: the supplied + * PartialFunction decides which message to pull in (those that it is defined + * at) and may transform the incoming message to place them into the wrapped + * Behavior’s type hierarchy. Signals are not transformed. + * + * see also [[Actor.Widened]] + */ + def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher) + } + + private val _nullFun = (_: Any) ⇒ null + private def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] + private implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal { + def as[U] = ctx.asInstanceOf[AC[U]] + } + + /** + * Widen the wrapped Behavior by placing a funnel in front of it: the supplied + * PartialFunction decides which message to pull in (those that it is defined + * at) and may transform the incoming message to place them into the wrapped + * Behavior’s type hierarchy. Signals are not transformed. + * + * Example: + * {{{ + * Stateless[String]((ctx, msg) => println(msg)).widen[Number] { + * case b: BigDecimal => s"BigDecimal($b)" + * case i: BigInteger => s"BigInteger($i)" + * // drop all other kinds of Number + * } + * }}} + */ + final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends Behavior[U] { + private def postProcess(behv: Behavior[T]): Behavior[U] = + if (isUnhandled(behv)) Unhandled + else if (isAlive(behv)) { + val next = canonicalize(behv, behavior) + if (next eq behavior) Same else Widened(next, matcher) + } else Stopped + + override def management(ctx: AC[U], msg: Signal): Behavior[U] = + postProcess(behavior.management(ctx.as[T], msg)) + + override def message(ctx: AC[U], msg: U): Behavior[U] = + matcher.applyOrElse(msg, nullFun) match { + case null ⇒ Unhandled + case transformed ⇒ postProcess(behavior.message(ctx.as[T], transformed)) + } + + override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" + } + + /** + * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation + * is deferred to the child actor instead of running within the parent. + */ + final case class Deferred[T](factory: ActorContext[T] ⇒ Behavior[T]) extends Behavior[T] { + override def management(ctx: AC[T], msg: Signal): Behavior[T] = { + if (msg != PreStart) throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)") + Behavior.preStart(factory(ctx), ctx) + } + + override def message(ctx: AC[T], msg: T): Behavior[T] = + throw new IllegalStateException(s"Deferred must receive PreStart as first message (got $msg)") + + override def toString: String = s"Deferred(${LineNumbers(factory)})" + } + + /** + * Return this behavior from message processing in order to advise the + * system to reuse the previous behavior. This is provided in order to + * avoid the allocation overhead of recreating the current behavior where + * that is not necessary. + */ + def Same[T]: Behavior[T] = sameBehavior.asInstanceOf[Behavior[T]] + + /** + * Return this behavior from message processing in order to advise the + * system to reuse the previous behavior, including the hint that the + * message has not been handled. This hint may be used by composite + * behaviors that delegate (partial) handling to other behaviors. + */ + def Unhandled[T]: Behavior[T] = unhandledBehavior.asInstanceOf[Behavior[T]] + + /* + * TODO write a Behavior that waits for all child actors to stop and then + * runs some cleanup before stopping. The factory for this behavior should + * stop and watch all children to get the process started. + */ + + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. The PostStop + * signal that results from stopping this actor will NOT be passed to the + * current behavior, it will be effectively ignored. + */ + def Stopped[T]: Behavior[T] = stoppedBehavior.asInstanceOf[Behavior[T]] + + /** + * A behavior that treats every incoming message as unhandled. + */ + def Empty[T]: Behavior[T] = emptyBehavior.asInstanceOf[Behavior[T]] + + /** + * A behavior that ignores every incoming message and returns “same”. + */ + def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]] + + /** + * Construct an actor behavior that can react both to lifecycle signals and + * incoming messages. After spawning this actor from another actor (or as the + * guardian of an [[akka.typed.ActorSystem]]) it will be executed within an + * [[ActorContext]] that allows access to the system, spawning and watching + * other actors, etc. + * + * In either case—signal or message—the next behavior must be returned. If no + * change is desired, use `Actor.same()`. + */ + final case class SignalOrMessage[T]( + signal: (ActorContext[T], Signal) ⇒ Behavior[T], + mesg: (ActorContext[T], T) ⇒ Behavior[T]) extends Behavior[T] { + override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg) + override def message(ctx: AC[T], msg: T): Behavior[T] = mesg(ctx, msg) + override def toString = s"SignalOrMessage(${LineNumbers(signal)},${LineNumbers(mesg)})" + } + + /** + * Construct an actor behavior that can react to incoming messages but not to + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an [[akka.typed.ActorSystem]]) it will be executed within an + * [[ActorContext]] that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called stateful because processing the next message + * results in a new behavior that can potentially be different from this one. + */ + final case class Stateful[T](behavior: (ActorContext[T], T) ⇒ Behavior[T]) extends Behavior[T] { + override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled + override def message(ctx: AC[T], msg: T) = behavior(ctx, msg) + override def toString = s"Stateful(${LineNumbers(behavior)})" + } + + /** + * Construct an actor behavior that can react to incoming messages but not to + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an [[akka.typed.ActorSystem]]) it will be executed within an + * [[ActorContext]] that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called stateless because it cannot be replaced by + * another one after it has been installed. It is most useful for leaf actors + * that do not create child actors themselves. + */ + final case class Stateless[T](behavior: (ActorContext[T], T) ⇒ Any) extends Behavior[T] { + override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled + override def message(ctx: AC[T], msg: T): Behavior[T] = { + behavior(ctx, msg) + this + } + override def toString = s"Static(${LineNumbers(behavior)})" + } + + /** + * This type of Behavior wraps another Behavior while allowing you to perform + * some action upon each received message or signal. It is most commonly used + * for logging or tracing what a certain Actor does. + */ + final case class Tap[T]( + signal: (ActorContext[T], Signal) ⇒ _, + mesg: (ActorContext[T], T) ⇒ _, + behavior: Behavior[T]) extends Behavior[T] { + private def canonical(behv: Behavior[T]): Behavior[T] = + if (isUnhandled(behv)) Unhandled + else if (behv eq sameBehavior) Same + else if (isAlive(behv)) Tap(signal, mesg, behv) + else Stopped + override def management(ctx: AC[T], msg: Signal): Behavior[T] = { + signal(ctx, msg) + canonical(behavior.management(ctx, msg)) + } + override def message(ctx: AC[T], msg: T): Behavior[T] = { + mesg(ctx, msg) + canonical(behavior.message(ctx, msg)) + } + override def toString = s"Tap(${LineNumbers(signal)},${LineNumbers(mesg)},$behavior)" + } + + /** + * Behavior decorator that copies all received message to the designated + * monitor [[akka.typed.ActorRef]] before invoking the wrapped behavior. The + * wrapped behavior can evolve (i.e. be stateful) without needing to be + * wrapped in a `monitor` call again. + */ + object Monitor { + def apply[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap(unitFunction, (_, msg) ⇒ monitor ! msg, behavior) + } + + /** + * Wrap the given behavior such that it is restarted (i.e. reset to its + * initial state) whenever it throws an exception of the given class or a + * subclass thereof. Exceptions that are not subtypes of `Thr` will not be + * caught and thus lead to the termination of the actor. + * + * It is possible to specify that the actor shall not be restarted but + * resumed. This entails keeping the same state as before the exception was + * thrown and is thus less safe. If you use `OnFailure.RESUME` you should at + * least not hold mutable data fields or collections within the actor as those + * might be in an inconsistent state (the exception might have interrupted + * normal processing); avoiding mutable state is possible by returning a fresh + * behavior with the new state after every message. + * + * Example: + * {{{ + * val dbConnector: Behavior[DbCommand] = ... + * val dbRestarts = Restarter[DbException].wrap(dbConnector) + * }}} + */ + object Restarter { + class Apply[Thr <: Throwable](c: ClassTag[Thr], resume: Boolean) { + def wrap[T](b: Behavior[T]): Behavior[T] = patterns.Restarter(Behavior.validateAsInitial(b), resume)()(c) + def mutableWrap[T](b: Behavior[T]): Behavior[T] = patterns.MutableRestarter(Behavior.validateAsInitial(b), resume)(c) + } + + def apply[Thr <: Throwable: ClassTag](resume: Boolean = false): Apply[Thr] = new Apply(implicitly, resume) + } + + // TODO + // final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () ⇒ Behavior[T]) + + /** + * INTERNAL API. + */ + private[akka] val _unhandledFunction = (_: Any) ⇒ Unhandled[Nothing] + /** + * INTERNAL API. + */ + private[akka] def unhandledFunction[T, U] = _unhandledFunction.asInstanceOf[(T ⇒ Behavior[U])] + + /** + * INTERNAL API. + */ + private[akka] val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () + /** + * INTERNAL API. + */ + private[akka] def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] + +} diff --git a/akka-typed/src/main/scala/akka/typed/Ask.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala similarity index 96% rename from akka-typed/src/main/scala/akka/typed/Ask.scala rename to akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala index e27e7c74ae..e45b31ce8d 100644 --- a/akka-typed/src/main/scala/akka/typed/Ask.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Ask.scala @@ -1,19 +1,19 @@ /** * Copyright (C) 2014-2017 Lightbend Inc. */ -package akka.typed +package akka.typed.scaladsl import scala.concurrent.{ Future, Promise } import akka.util.Timeout import akka.actor.InternalActorRef import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef -import java.lang.IllegalArgumentException import akka.actor.Scheduler import akka.typed.internal.FunctionRef import akka.actor.RootActorPath import akka.actor.Address -import akka.util.LineNumbers +import akka.typed.ActorRef +import akka.typed.adapter /** * The ask-pattern implements the initiator side of a request–reply protocol. @@ -82,7 +82,7 @@ object AskPattern { AskPath, (msg, self) ⇒ { p.trySuccess(msg) - self.sendSystem(internal.Terminate()) + self.sendSystem(akka.typed.internal.Terminate()) }, (self) ⇒ if (!p.isCompleted) p.tryFailure(new NoSuchElementException("ask pattern terminated before value was received"))) actorRef ! f(ref) diff --git a/akka-typed/src/test/java/akka/typed/javadsl/ActorCompile.java b/akka-typed/src/test/java/akka/typed/javadsl/ActorCompile.java new file mode 100644 index 0000000000..ca7ad3e9d3 --- /dev/null +++ b/akka-typed/src/test/java/akka/typed/javadsl/ActorCompile.java @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl; + +import akka.typed.*; +import static akka.typed.javadsl.Actor.*; + +public class ActorCompile { + + interface MyMsg {} + + class MyMsgA implements MyMsg { + final ActorRef replyTo; + + public MyMsgA(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + class MyMsgB implements MyMsg { + final String greeting; + + public MyMsgB(String greeting) { + this.greeting = greeting; + } + } + + Behavior actor1 = signalOrMessage((ctx, signal) -> same(), (ctx, msg) -> stopped()); + Behavior actor2 = stateful((ctx, msg) -> unhandled()); + Behavior actor3 = stateless((ctx, msg) -> {}); + Behavior actor4 = empty(); + Behavior actor5 = ignore(); + Behavior actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5); + Behavior actor7 = actor6.narrow(); + Behavior actor8 = deferred(ctx -> { + final ActorRef self = ctx.getSelf(); + return monitor(self, ignore()); + }); + Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); + + { + Actor.stateful((ctx, msg) -> { + if (msg instanceof MyMsgA) { + return stateless((ctx2, msg2) -> { + if (msg2 instanceof MyMsgB) { + ((MyMsgA) msg).replyTo.tell(((MyMsgB) msg2).greeting); + } + }); + } else return unhandled(); + }); + } +} diff --git a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala index 6c34d76f1f..13dad7e2d2 100644 --- a/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala @@ -4,8 +4,8 @@ import scala.concurrent.duration._ import scala.concurrent.Future import com.typesafe.config.ConfigFactory import akka.actor.DeadLetterSuppression -import akka.typed.ScalaDSL._ -import akka.typed.patterns._ +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.Actor.SignalOrMessage object ActorContextSpec { @@ -74,6 +74,91 @@ object ActorContextSpec { final case class Adapter(a: ActorRef[Command]) extends Event def subject(monitor: ActorRef[Monitor]): Behavior[Command] = + Actor.SignalOrMessage( + (ctx, signal) ⇒ { monitor ! GotSignal(signal); Actor.Same }, + (ctx, message) ⇒ message match { + case ReceiveTimeout ⇒ + monitor ! GotReceiveTimeout + Actor.Same + case Ping(replyTo) ⇒ + replyTo ! Pong1 + Actor.Same + case Miss(replyTo) ⇒ + replyTo ! Missed + Actor.Unhandled + case Renew(replyTo) ⇒ + replyTo ! Renewed + subject(monitor) + case Throw(ex) ⇒ + throw ex + case MkChild(name, mon, replyTo) ⇒ + val child = name match { + case None ⇒ ctx.spawnAnonymous(Actor.Restarter[Throwable]().wrap(subject(mon))) + case Some(n) ⇒ ctx.spawn(Actor.Restarter[Throwable]().wrap(subject(mon)), n) + } + replyTo ! Created(child) + Actor.Same + case SetTimeout(d, replyTo) ⇒ + d match { + case f: FiniteDuration ⇒ ctx.setReceiveTimeout(f, ReceiveTimeout) + case _ ⇒ ctx.cancelReceiveTimeout() + } + replyTo ! TimeoutSet + Actor.Same + case Schedule(delay, target, msg, replyTo) ⇒ + replyTo ! Scheduled + ctx.schedule(delay, target, msg) + Actor.Same + case Stop ⇒ Actor.Stopped + case Kill(ref, replyTo) ⇒ + if (ctx.stop(ref)) replyTo ! Killed + else replyTo ! NotKilled + Actor.Same + case Watch(ref, replyTo) ⇒ + ctx.watch[Nothing](ref) + replyTo ! Watched + Actor.Same + case Unwatch(ref, replyTo) ⇒ + ctx.unwatch[Nothing](ref) + replyTo ! Unwatched + Actor.Same + case GetInfo(replyTo) ⇒ + replyTo ! Info(ctx.self, ctx.system) + Actor.Same + case GetChild(name, replyTo) ⇒ + replyTo ! Child(ctx.child(name)) + Actor.Same + case GetChildren(replyTo) ⇒ + replyTo ! Children(ctx.children.toSet) + Actor.Same + case BecomeInert(replyTo) ⇒ + replyTo ! BecameInert + Actor.Stateless { + case (_, Ping(replyTo)) ⇒ + replyTo ! Pong2 + case (_, Throw(ex)) ⇒ + throw ex + case _ ⇒ () + } + case BecomeCareless(replyTo) ⇒ + replyTo ! BecameCareless + Actor.SignalOrMessage( + (ctx, signal) ⇒ signal match { + case Terminated(_) ⇒ Actor.Unhandled + case sig ⇒ + monitor ! GotSignal(sig) + Actor.Same + }, + (ctx, message) ⇒ Actor.Unhandled + ) + case GetAdapter(replyTo, name) ⇒ + replyTo ! Adapter(ctx.spawnAdapter(identity, name)) + Actor.Same + } + ) + + def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = { + import ScalaDSL._ FullTotal { case Sig(ctx, signal) ⇒ monitor ! GotSignal(signal) @@ -95,8 +180,8 @@ object ActorContextSpec { throw ex case MkChild(name, mon, replyTo) ⇒ val child = name match { - case None ⇒ ctx.spawnAnonymous(Restarter[Throwable]().wrap(subject(mon))) - case Some(n) ⇒ ctx.spawn(Restarter[Throwable]().wrap(subject(mon)), n) + case None ⇒ ctx.spawnAnonymous(patterns.Restarter[Command, Throwable](subject(mon), false)()) + case Some(n) ⇒ ctx.spawn(patterns.Restarter[Command, Throwable](subject(mon), false)(), n) } replyTo ! Created(child) Same @@ -156,6 +241,8 @@ object ActorContextSpec { Same } } + } + } class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( @@ -179,7 +266,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( /** * The behavior against which to run all the tests. */ - def behavior(ctx: ActorContext[Event]): Behavior[Command] + def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] implicit def system: ActorSystem[TypedSpec.Command] @@ -187,8 +274,8 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( if (system eq nativeSystem) suite + "Native" else suite + "Adapted" - def setup(name: String, wrapper: Option[Restarter.Apply[_]] = None)( - proc: (ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = + def setup(name: String, wrapper: Option[Actor.Restarter.Apply[_]] = None)( + proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx)) val steps = @@ -260,7 +347,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) - def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Restarter[Throwable]())) { (ctx, startWith) ⇒ + def `01 must correctly wire the lifecycle hooks`(): Unit = sync(setup("ctx01", Some(Actor.Restarter[Throwable]())) { (ctx, startWith) ⇒ val self = ctx.self val ex = new Exception("KABOOM1") startWith { subj ⇒ @@ -335,7 +422,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } }) - def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Restarter[Exception]())) { (ctx, startWith) ⇒ + def `05 must reset behavior upon Restart`(): Unit = sync(setup("ctx05", Some(Actor.Restarter[Exception]())) { (ctx, startWith) ⇒ val self = ctx.self val ex = new Exception("KABOOM05") startWith @@ -353,7 +440,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( .stimulate(_ ! Ping(self), _ ⇒ Pong1) }) - def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Restarter[Exception](resume = true))) { (ctx, startWith) ⇒ + def `06 must not reset behavior upon Resume`(): Unit = sync(setup("ctx06", Some(Actor.Restarter[Exception](resume = true))) { (ctx, startWith) ⇒ val self = ctx.self val ex = new Exception("KABOOM06") startWith @@ -534,80 +621,115 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( } trait Normal extends Tests { - override def suite = "basic" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self) + override def suite = "normal" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + subject(ctx.self) } object `An ActorContext (native)` extends Normal with NativeSystem object `An ActorContext (adapted)` extends Normal with AdaptedSystem trait Widened extends Tests { + import Actor._ override def suite = "widened" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = subject(ctx.self).widen { case x ⇒ x } + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + subject(ctx.self).widen { case x ⇒ x } } object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem - trait SynchronousSelf extends Tests { + trait Deferred extends Tests { + override def suite = "deferred" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + Actor.Deferred(_ ⇒ subject(ctx.self)) + } + object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem + object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem + + trait Tap extends Tests { + override def suite = "tap" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + Actor.Tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self)) + } + object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem + object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem + + trait NormalOld extends Tests { + override def suite = "basic" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + oldSubject(ctx.self) + } + object `An ActorContext (old-native)` extends NormalOld with NativeSystem + object `An ActorContext (old-adapted)` extends NormalOld with AdaptedSystem + + trait WidenedOld extends Tests { + import ScalaDSL._ + override def suite = "widened" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + oldSubject(ctx.self).widen { case x ⇒ x } + } + object `An ActorContext with widened Behavior (old-native)` extends WidenedOld with NativeSystem + object `An ActorContext with widened Behavior (old-adapted)` extends WidenedOld with AdaptedSystem + + trait SynchronousSelfOld extends Tests { override def suite = "synchronous" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = SynchronousSelf(self ⇒ subject(ctx.self)) + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.SynchronousSelf(self ⇒ oldSubject(ctx.self)) } - object `An ActorContext with SynchronousSelf (native)` extends SynchronousSelf with NativeSystem - object `An ActorContext with SynchronousSelf (adapted)` extends SynchronousSelf with AdaptedSystem + object `An ActorContext with SynchronousSelf (old-native)` extends SynchronousSelfOld with NativeSystem + object `An ActorContext with SynchronousSelf (old-adapted)` extends SynchronousSelfOld with AdaptedSystem - trait NonMatchingTap extends Tests { + trait NonMatchingTapOld extends Tests { override def suite = "TapNonMatch" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case null ⇒ }, subject(ctx.self)) + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case null ⇒ }, oldSubject(ctx.self)) } - object `An ActorContext with non-matching Tap (native)` extends NonMatchingTap with NativeSystem - object `An ActorContext with non-matching Tap (adapted)` extends NonMatchingTap with AdaptedSystem + object `An ActorContext with non-matching Tap (old-native)` extends NonMatchingTapOld with NativeSystem + object `An ActorContext with non-matching Tap (old-adapted)` extends NonMatchingTapOld with AdaptedSystem - trait MatchingTap extends Tests { + trait MatchingTapOld extends Tests { override def suite = "TapMatch" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = Tap({ case _ ⇒ }, subject(ctx.self)) + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = ScalaDSL.Tap({ case _ ⇒ }, oldSubject(ctx.self)) } - object `An ActorContext with matching Tap (native)` extends MatchingTap with NativeSystem - object `An ActorContext with matching Tap (adapted)` extends MatchingTap with AdaptedSystem + object `An ActorContext with matching Tap (old-native)` extends MatchingTapOld with NativeSystem + object `An ActorContext with matching Tap (old-adapted)` extends MatchingTapOld with AdaptedSystem - private val stoppingBehavior = Full[Command] { case Msg(_, Stop) ⇒ Stopped } + private val stoppingBehavior = ScalaDSL.Full[Command] { case ScalaDSL.Msg(_, Stop) ⇒ ScalaDSL.Stopped } - trait AndLeft extends Tests { - override def suite = "and" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = - And(subject(ctx.self), stoppingBehavior) + trait AndLeftOld extends Tests { + override def suite = "andLeft" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + ScalaDSL.And(oldSubject(ctx.self), stoppingBehavior) } - object `An ActorContext with And (left, native)` extends AndLeft with NativeSystem - object `An ActorContext with And (left, adapted)` extends AndLeft with AdaptedSystem + object `An ActorContext with And (left, native)` extends AndLeftOld with NativeSystem + object `An ActorContext with And (left, adapted)` extends AndLeftOld with AdaptedSystem - trait AndRight extends Tests { - override def suite = "and" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = - And(stoppingBehavior, subject(ctx.self)) + trait AndRightOld extends Tests { + override def suite = "andRight" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + ScalaDSL.And(stoppingBehavior, oldSubject(ctx.self)) } - object `An ActorContext with And (right, native)` extends AndRight with NativeSystem - object `An ActorContext with And (right, adapted)` extends AndRight with AdaptedSystem + object `An ActorContext with And (right, native)` extends AndRightOld with NativeSystem + object `An ActorContext with And (right, adapted)` extends AndRightOld with AdaptedSystem - trait OrLeft extends Tests { - override def suite = "basic" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = - Or(subject(ctx.self), stoppingBehavior) + trait OrLeftOld extends Tests { + override def suite = "orLeft" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + ScalaDSL.Or(oldSubject(ctx.self), stoppingBehavior) override def stop(ref: ActorRef[Command]) = { ref ! Stop ref ! Stop } } - object `An ActorContext with Or (left, native)` extends OrLeft with NativeSystem - object `An ActorContext with Or (left, adapted)` extends OrLeft with AdaptedSystem + object `An ActorContext with Or (left, native)` extends OrLeftOld with NativeSystem + object `An ActorContext with Or (left, adapted)` extends OrLeftOld with AdaptedSystem - trait OrRight extends Tests { - override def suite = "basic" - override def behavior(ctx: ActorContext[Event]): Behavior[Command] = - Or(stoppingBehavior, subject(ctx.self)) + trait OrRightOld extends Tests { + override def suite = "orRight" + override def behavior(ctx: scaladsl.ActorContext[Event]): Behavior[Command] = + ScalaDSL.Or(stoppingBehavior, oldSubject(ctx.self)) override def stop(ref: ActorRef[Command]) = { ref ! Stop ref ! Stop } } - object `An ActorContext with Or (right, native)` extends OrRight with NativeSystem - object `An ActorContext with Or (right, adapted)` extends OrRight with AdaptedSystem - + object `An ActorContext with Or (right, native)` extends OrRightOld with NativeSystem + object `An ActorContext with Or (right, adapted)` extends OrRightOld with AdaptedSystem } diff --git a/akka-typed/src/test/scala/akka/typed/AskSpec.scala b/akka-typed/src/test/scala/akka/typed/AskSpec.scala index ada32b825c..39fb3b3fd8 100644 --- a/akka-typed/src/test/scala/akka/typed/AskSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/AskSpec.scala @@ -12,7 +12,7 @@ import akka.util.Timeout import akka.pattern.AskTimeoutException import ScalaDSL._ -import AskPattern._ +import akka.typed.scaladsl.AskPattern._ object AskSpec { diff --git a/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala b/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala index 80e6b419c7..dcc88f6656 100644 --- a/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala @@ -3,6 +3,12 @@ */ package akka.typed +import akka.typed.scaladsl.{ Actor ⇒ SActor } +import akka.typed.javadsl.{ Actor ⇒ JActor, ActorContext ⇒ JActorContext } +import akka.japi.function.{ Function ⇒ F1e, Function2 ⇒ F2, Procedure2 ⇒ P2 } +import akka.japi.pf.{ FI, PFBuilder } +import java.util.function.{ Function ⇒ F1 } + class BehaviorSpec extends TypedSpec { sealed trait Command { @@ -25,9 +31,8 @@ class BehaviorSpec extends TypedSpec { case object Swap extends Command { override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Swapped :: Nil } - case class GetState(replyTo: ActorRef[State]) extends Command - object GetState { - def apply()(implicit inbox: Inbox[State]): GetState = GetState(inbox.ref) + case class GetState()(s: State) extends Command { + override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = s :: Nil } case class AuxPing(id: Int) extends Command { override def expectedResponse(ctx: ActorContext[Command]): Seq[Event] = Pong :: Nil @@ -42,58 +47,86 @@ class BehaviorSpec extends TypedSpec { case object Pong extends Event case object Swapped extends Event - trait State { def next: State } + sealed trait State extends Event { def next: State } val StateA: State = new State { override def toString = "StateA"; override def next = StateB } val StateB: State = new State { override def toString = "StateB"; override def next = StateA } trait Common { + type Aux >: Null <: AnyRef def system: ActorSystem[TypedSpec.Command] - def behavior(monitor: ActorRef[Event]): Behavior[Command] + def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) + def checkAux(signal: Signal, aux: Aux): Unit = () + def checkAux(command: Command, aux: Aux): Unit = () - case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event]) + case class Init(behv: Behavior[Command], inbox: Inbox[Event], aux: Aux) { + def mkCtx(requirePreStart: Boolean = false): Setup = { + val ctx = new EffectfulActorContext("ctx", behv, 1000, system) + val msgs = inbox.receiveAll() + if (requirePreStart) msgs should ===(GotSignal(PreStart) :: Nil) + checkAux(PreStart, aux) + Setup(ctx, inbox, aux) + } + } + case class Setup(ctx: EffectfulActorContext[Command], inbox: Inbox[Event], aux: Aux) - protected def mkCtx(requirePreStart: Boolean = false, factory: (ActorRef[Event]) ⇒ Behavior[Command] = behavior) = { + def init(): Init = { val inbox = Inbox[Event]("evt") - val ctx = new EffectfulActorContext("ctx", factory(inbox.ref), 1000, system) - val msgs = inbox.receiveAll() - if (requirePreStart) - msgs should ===(GotSignal(PreStart) :: Nil) - Setup(ctx, inbox) + val (behv, aux) = behavior(inbox.ref) + Init(behv, inbox, aux) } - protected implicit class Check(val setup: Setup) { + def init(factory: ActorRef[Event] ⇒ (Behavior[Command], Aux)): Init = { + val inbox = Inbox[Event]("evt") + val (behv, aux) = factory(inbox.ref) + Init(behv, inbox, aux) + } + + def mkCtx(requirePreStart: Boolean = false): Setup = + init().mkCtx(requirePreStart) + + implicit class Check(val setup: Setup) { def check(signal: Signal): Setup = { setup.ctx.signal(signal) setup.inbox.receiveAll() should ===(GotSignal(signal) :: Nil) + checkAux(signal, setup.aux) setup } def check(command: Command): Setup = { setup.ctx.run(command) setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx)) - setup - } - def check[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = { - setup.ctx.run(command) - setup.inbox.receiveAll() should ===(command.expectedResponse(setup.ctx)) - inbox.receiveAll() should ===(aux) + checkAux(command, setup.aux) setup } def check2(command: Command): Setup = { setup.ctx.run(command) val expected = command.expectedResponse(setup.ctx) setup.inbox.receiveAll() should ===(expected ++ expected) - setup - } - def check2[T](command: Command, aux: T*)(implicit inbox: Inbox[T]): Setup = { - setup.ctx.run(command) - val expected = command.expectedResponse(setup.ctx) - setup.inbox.receiveAll() should ===(expected ++ expected) - inbox.receiveAll() should ===(aux ++ aux) + checkAux(command, setup.aux) setup } } - protected val ex = new Exception("mine!") + val ex = new Exception("mine!") + } + + trait Siphon extends Common { + override type Aux = Inbox[Command] + + override def checkAux(command: Command, aux: Aux): Unit = { + aux.receiveAll() should ===(command :: Nil) + } + } + + trait SignalSiphon extends Common { + override type Aux = Inbox[Either[Signal, Command]] + + override def checkAux(command: Command, aux: Aux): Unit = { + aux.receiveAll() should ===(Right(command) :: Nil) + } + + override def checkAux(signal: Signal, aux: Aux): Unit = { + aux.receiveAll() should ===(Left(signal) :: Nil) + } } trait Lifecycle extends Common { @@ -146,17 +179,19 @@ class BehaviorSpec extends TypedSpec { trait Unhandled extends Common { def `must return Unhandled`(): Unit = { - val Setup(ctx, inbox) = mkCtx() - ctx.currentBehavior.message(ctx, Miss) should ===(ScalaDSL.Unhandled[Command]) + val Setup(ctx, inbox, aux) = mkCtx() + ctx.currentBehavior.message(ctx, Miss) should be(Behavior.unhandledBehavior) inbox.receiveAll() should ===(Missed :: Nil) + checkAux(Miss, aux) } } trait Stoppable extends Common { def `must stop`(): Unit = { - val Setup(ctx, inbox) = mkCtx() + val Setup(ctx, inbox, aux) = mkCtx() ctx.run(Stop) - ctx.currentBehavior should ===(ScalaDSL.Stopped[Command]) + ctx.currentBehavior should be(Behavior.stoppedBehavior) + checkAux(Stop, aux) } } @@ -164,15 +199,15 @@ class BehaviorSpec extends TypedSpec { private implicit val inbox = Inbox[State]("state") def `must be in state A`(): Unit = { - mkCtx().check(GetState(), StateA) + mkCtx().check(GetState()(StateA)) } def `must switch to state B`(): Unit = { - mkCtx().check(Swap).check(GetState(), StateB) + mkCtx().check(Swap).check(GetState()(StateB)) } def `must switch back to state A`(): Unit = { - mkCtx().check(Swap).check(Swap).check(GetState(), StateA) + mkCtx().check(Swap).check(Swap).check(GetState()(StateA)) } } @@ -206,6 +241,18 @@ class BehaviorSpec extends TypedSpec { } } + /** + * This targets behavior wrappers to ensure that the wrapper does not + * hold on to the changed behavior. Wrappers must be immutable. + */ + trait Reuse extends Common { + def `must be reusable`(): Unit = { + val i = init() + i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB)) + i.mkCtx().check(GetState()(StateA)).check(Swap).check(GetState()(StateB)) + } + } + private def mkFull(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = { import ScalaDSL.{ Full, Msg, Sig, Same, Unhandled, Stopped } Full { @@ -227,21 +274,21 @@ class BehaviorSpec extends TypedSpec { case Msg(ctx, Swap) ⇒ monitor ! Swapped mkFull(monitor, state.next) - case Msg(ctx, GetState(replyTo)) ⇒ - replyTo ! state + case Msg(ctx, GetState()) ⇒ + monitor ! state Same case Msg(ctx, Stop) ⇒ Stopped } } trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = mkFull(monitor) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) → null } object `A Full Behavior (native)` extends FullBehavior with NativeSystem object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem trait FullTotalBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null private def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = { import ScalaDSL.{ FullTotal, Msg, Sig, Same, Unhandled, Stopped } FullTotal { @@ -263,8 +310,8 @@ class BehaviorSpec extends TypedSpec { case Msg(_, Swap) ⇒ monitor ! Swapped behv(monitor, state.next) - case Msg(_, GetState(replyTo)) ⇒ - replyTo ! state + case Msg(_, GetState()) ⇒ + monitor ! state Same case Msg(_, Stop) ⇒ Stopped case Msg(_, _: AuxPing) ⇒ Unhandled @@ -275,36 +322,36 @@ class BehaviorSpec extends TypedSpec { object `A FullTotal Behavior (adapted)` extends FullTotalBehavior with AdaptedSystem trait WidenedBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Widened(mkFull(monitor), { case x ⇒ x }) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.Widened(mkFull(monitor), { case x ⇒ x }), null) } object `A Widened Behavior (native)` extends WidenedBehavior with NativeSystem object `A Widened Behavior (adapted)` extends WidenedBehavior with AdaptedSystem trait ContextAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.ContextAware(ctx ⇒ mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.ContextAware(ctx ⇒ mkFull(monitor)), null) } object `A ContextAware Behavior (native)` extends ContextAwareBehavior with NativeSystem object `A ContextAware Behavior (adapted)` extends ContextAwareBehavior with AdaptedSystem trait SelfAwareBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.SelfAware(self ⇒ mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.SelfAware(self ⇒ mkFull(monitor)), null) } object `A SelfAware Behavior (native)` extends SelfAwareBehavior with NativeSystem object `A SelfAware Behavior (adapted)` extends SelfAwareBehavior with AdaptedSystem trait NonMatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Tap({ case null ⇒ }, mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.Tap({ case null ⇒ }, mkFull(monitor)), null) } object `A non-matching Tap Behavior (native)` extends NonMatchingTapBehavior with NativeSystem object `A non-matching Tap Behavior (adapted)` extends NonMatchingTapBehavior with AdaptedSystem trait MatchingTapBehavior extends Messages with BecomeWithLifecycle with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Tap({ case _ ⇒ }, mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.Tap({ case _ ⇒ }, mkFull(monitor)), null) } object `A matching Tap Behavior (native)` extends MatchingTapBehavior with NativeSystem object `A matching Tap Behavior (adapted)` extends MatchingTapBehavior with AdaptedSystem @@ -312,12 +359,13 @@ class BehaviorSpec extends TypedSpec { trait SynchronousSelfBehavior extends Messages with BecomeWithLifecycle with Stoppable { import ScalaDSL._ - implicit private val inbox = Inbox[Command]("syncself") + type Aux = Inbox[Command] - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - SynchronousSelf(self ⇒ mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (SynchronousSelf(self ⇒ mkFull(monitor)), null) - private def behavior2(monitor: ActorRef[Event]): Behavior[Command] = { + private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[Command]("syncself") def first(self: ActorRef[Command]) = Tap.monitor(inbox.ref, Partial[Command] { case AuxPing(id) ⇒ { self ! AuxPing(0); second(self) } }) @@ -330,46 +378,51 @@ class BehaviorSpec extends TypedSpec { case AuxPing(3) ⇒ { self ! Ping; Same } case AuxPing(4) ⇒ { self ! Stop; Stopped } } - SynchronousSelf(self ⇒ Or(mkFull(monitor), first(self))) + (SynchronousSelf(self ⇒ Or(mkFull(monitor), first(self))), inbox) } + override def checkAux(cmd: Command, aux: Aux) = + (cmd, aux) match { + case (AuxPing(42), i: Inbox[_]) ⇒ i.receiveAll() should ===(Seq(42, 0, 1, 2, 3) map AuxPing: Seq[Command]) + case (AuxPing(4), i: Inbox[_]) ⇒ i.receiveAll() should ===(AuxPing(4) :: Nil) + case _ ⇒ // ignore + } + def `must send messages to itself and stop correctly`(): Unit = { - val Setup(ctx, _) = mkCtx(factory = behavior2).check[Command](AuxPing(42), Seq(42, 0, 1, 2, 3) map AuxPing: _*) + val Setup(ctx, _, _) = init(behavior2).mkCtx().check(AuxPing(42)) ctx.run(AuxPing(4)) - inbox.receiveAll() should ===(AuxPing(4) :: Nil) ctx.currentBehavior should ===(Stopped[Command]) } } - object `A SynchronourSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem + object `A SynchronousSelf Behavior (native)` extends SynchronousSelfBehavior with NativeSystem object `A SynchronousSelf Behavior (adapted)` extends SynchronousSelfBehavior with AdaptedSystem trait And extends Common { - private implicit val inbox = Inbox[State]("and") - private def behavior2(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.And(mkFull(monitor), mkFull(monitor)) + private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.And(mkFull(monitor), mkFull(monitor)) → null def `must pass message to both parts`(): Unit = { - mkCtx(factory = behavior2).check2(Swap).check2[State](GetState(), StateB) + init(behavior2).mkCtx().check2(Swap).check2(GetState()(StateB)) } def `must half-terminate`(): Unit = { - val Setup(ctx, inbox) = mkCtx() + val Setup(ctx, inbox, _) = mkCtx() ctx.run(Stop) ctx.currentBehavior should ===(ScalaDSL.Empty[Command]) } } trait BehaviorAndLeft extends Messages with BecomeWithLifecycle with And { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.And(mkFull(monitor), ScalaDSL.Empty) → null } object `A Behavior combined with And (left, native)` extends BehaviorAndLeft with NativeSystem object `A Behavior combined with And (left, adapted)` extends BehaviorAndLeft with NativeSystem trait BehaviorAndRight extends Messages with BecomeWithLifecycle with And { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.And(ScalaDSL.Empty, mkFull(monitor)) → null } object `A Behavior combined with And (right, native)` extends BehaviorAndRight with NativeSystem object `A Behavior combined with And (right, adapted)` extends BehaviorAndRight with NativeSystem @@ -382,43 +435,43 @@ class BehaviorSpec extends TypedSpec { ScalaDSL.Unhandled } - private def behavior2(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Or(mkFull(monitor), strange(monitor)) + private def behavior2(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.Or(mkFull(monitor), strange(monitor)) → null - private def behavior3(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Or(strange(monitor), mkFull(monitor)) + private def behavior3(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.Or(strange(monitor), mkFull(monitor)) → null def `must pass message only to first interested party`(): Unit = { - mkCtx(factory = behavior2).check(Ping).check(AuxPing(0)) + init(behavior2).mkCtx().check(Ping).check(AuxPing(0)) } def `must pass message through both if first is uninterested`(): Unit = { - mkCtx(factory = behavior3).check2(Ping).check(AuxPing(0)) + init(behavior3).mkCtx().check2(Ping).check(AuxPing(0)) } def `must half-terminate`(): Unit = { - val Setup(ctx, inbox) = mkCtx() + val Setup(ctx, inbox, _) = mkCtx() ctx.run(Stop) ctx.currentBehavior should ===(ScalaDSL.Empty[Command]) } } trait BehaviorOrLeft extends Messages with BecomeWithLifecycle with Or { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.Or(mkFull(monitor), ScalaDSL.Empty) → null } object `A Behavior combined with Or (left, native)` extends BehaviorOrLeft with NativeSystem object `A Behavior combined with Or (left, adapted)` extends BehaviorOrLeft with NativeSystem trait BehaviorOrRight extends Messages with BecomeWithLifecycle with Or { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor)) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + ScalaDSL.Or(ScalaDSL.Empty, mkFull(monitor)) → null } object `A Behavior combined with Or (right, native)` extends BehaviorOrRight with NativeSystem object `A Behavior combined with Or (right, adapted)` extends BehaviorOrRight with NativeSystem trait PartialBehavior extends Messages with Become with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = ScalaDSL.Partial { case Ping ⇒ @@ -433,8 +486,8 @@ class BehaviorSpec extends TypedSpec { case Swap ⇒ monitor ! Swapped behv(monitor, state.next) - case GetState(replyTo) ⇒ - replyTo ! state + case GetState() ⇒ + monitor ! state ScalaDSL.Same case Stop ⇒ ScalaDSL.Stopped } @@ -443,7 +496,7 @@ class BehaviorSpec extends TypedSpec { object `A Partial Behavior (adapted)` extends PartialBehavior with AdaptedSystem trait TotalBehavior extends Messages with Become with Stoppable { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = behv(monitor, StateA) + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = ScalaDSL.Total { case Ping ⇒ @@ -459,8 +512,8 @@ class BehaviorSpec extends TypedSpec { case Swap ⇒ monitor ! Swapped behv(monitor, state.next) - case GetState(replyTo) ⇒ - replyTo ! state + case GetState() ⇒ + monitor ! state ScalaDSL.Same case Stop ⇒ ScalaDSL.Stopped case _: AuxPing ⇒ ScalaDSL.Unhandled @@ -470,18 +523,317 @@ class BehaviorSpec extends TypedSpec { object `A Total Behavior (adapted)` extends TotalBehavior with AdaptedSystem trait StaticBehavior extends Messages { - override def behavior(monitor: ActorRef[Event]): Behavior[Command] = - ScalaDSL.Static { - case Ping ⇒ monitor ! Pong - case Miss ⇒ monitor ! Missed - case Ignore ⇒ monitor ! Ignored - case GetSelf ⇒ - case Swap ⇒ - case GetState(_) ⇒ - case Stop ⇒ - case _: AuxPing ⇒ - } + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (ScalaDSL.Static { + case Ping ⇒ monitor ! Pong + case Miss ⇒ monitor ! Missed + case Ignore ⇒ monitor ! Ignored + case GetSelf ⇒ + case Swap ⇒ + case GetState() ⇒ + case Stop ⇒ + case _: AuxPing ⇒ + }, null) } object `A Static Behavior (native)` extends StaticBehavior with NativeSystem object `A Static Behavior (adapted)` extends StaticBehavior with AdaptedSystem + + trait SignalOrMessageScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null + def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = + SActor.SignalOrMessage((ctx, sig) ⇒ { + monitor ! GotSignal(sig) + SActor.Same + }, (ctx, msg) ⇒ msg match { + case GetSelf ⇒ + monitor ! Self(ctx.self) + SActor.Same + case Miss ⇒ + monitor ! Missed + SActor.Unhandled + case Ignore ⇒ + monitor ! Ignored + SActor.Same + case Ping ⇒ + monitor ! Pong + behv(monitor, state) + case Swap ⇒ + monitor ! Swapped + behv(monitor, state.next) + case GetState() ⇒ + monitor ! state + SActor.Same + case Stop ⇒ SActor.Stopped + case _: AuxPing ⇒ SActor.Unhandled + }) + } + object `A SignalOrMessage Behavior (scala,native)` extends SignalOrMessageScalaBehavior with NativeSystem + object `A SignalOrMessage Behavior (scala,adapted)` extends SignalOrMessageScalaBehavior with AdaptedSystem + + trait StatefulScalaBehavior extends Messages with Become with Stoppable { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null + def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = + SActor.Stateful { (ctx, msg) ⇒ + msg match { + case GetSelf ⇒ + monitor ! Self(ctx.self) + SActor.Same + case Miss ⇒ + monitor ! Missed + SActor.Unhandled + case Ignore ⇒ + monitor ! Ignored + SActor.Same + case Ping ⇒ + monitor ! Pong + behv(monitor, state) + case Swap ⇒ + monitor ! Swapped + behv(monitor, state.next) + case GetState() ⇒ + monitor ! state + SActor.Same + case Stop ⇒ SActor.Stopped + case _: AuxPing ⇒ SActor.Unhandled + } + } + } + object `A Stateful Behavior (scala,native)` extends StatefulScalaBehavior with NativeSystem + object `A Stateful Behavior (scala,adapted)` extends StatefulScalaBehavior with AdaptedSystem + + trait StatelessScalaBehavior extends Messages { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (SActor.Stateless { (ctx, msg) ⇒ + msg match { + case GetSelf ⇒ monitor ! Self(ctx.self) + case Miss ⇒ monitor ! Missed + case Ignore ⇒ monitor ! Ignored + case Ping ⇒ monitor ! Pong + case Swap ⇒ monitor ! Swapped + case GetState() ⇒ monitor ! StateA + case Stop ⇒ + case _: AuxPing ⇒ + } + }, null) + } + object `A Stateless Behavior (scala,native)` extends StatelessScalaBehavior with NativeSystem + object `A Stateless Behavior (scala,adapted)` extends StatelessScalaBehavior with AdaptedSystem + + trait WidenedScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with Siphon { + import SActor.BehaviorDecorators + + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[Command]("widenedListener") + super.behavior(monitor)._1.widen[Command] { case c ⇒ inbox.ref ! c; c } → inbox + } + } + object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem + object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem + + trait DeferredScalaBehavior extends SignalOrMessageScalaBehavior { + override type Aux = Inbox[PreStart] + + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[PreStart]("deferredListener") + (SActor.Deferred(ctx ⇒ { + inbox.ref ! PreStart + super.behavior(monitor)._1 + }), inbox) + } + + override def checkAux(signal: Signal, aux: Aux): Unit = + signal match { + case PreStart ⇒ aux.receiveAll() should ===(PreStart :: Nil) + case _ ⇒ aux.receiveAll() should ===(Nil) + } + } + object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem + object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem + + trait TapScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with SignalSiphon { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[Either[Signal, Command]]("tapListener") + (SActor.Tap( + (_, sig) ⇒ inbox.ref ! Left(sig), + (_, msg) ⇒ inbox.ref ! Right(msg), + super.behavior(monitor)._1 + ), inbox) + } + } + object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem + object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem + + trait RestarterScalaBehavior extends SignalOrMessageScalaBehavior with Reuse { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + SActor.Restarter[Exception]().wrap(super.behavior(monitor)._1) → null + } + } + object `A restarter Behavior (scala,native)` extends RestarterScalaBehavior with NativeSystem + object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem + + /* + * function converters for Java, to ease the pain on Scala 2.11 + */ + def fs(f: (JActorContext[Command], Signal) ⇒ Behavior[Command]) = + new F2[JActorContext[Command], Signal, Behavior[Command]] { + override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig) + } + def fc(f: (JActorContext[Command], Command) ⇒ Behavior[Command]) = + new F2[JActorContext[Command], Command, Behavior[Command]] { + override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command) + } + def ps(f: (JActorContext[Command], Signal) ⇒ Unit) = + new P2[JActorContext[Command], Signal] { + override def apply(ctx: JActorContext[Command], sig: Signal) = f(ctx, sig) + } + def pc(f: (JActorContext[Command], Command) ⇒ Unit) = + new P2[JActorContext[Command], Command] { + override def apply(ctx: JActorContext[Command], command: Command) = f(ctx, command) + } + def pf(f: PFBuilder[Command, Command] ⇒ PFBuilder[Command, Command]) = + new F1[PFBuilder[Command, Command], PFBuilder[Command, Command]] { + override def apply(in: PFBuilder[Command, Command]) = f(in) + } + def fi(f: Command ⇒ Command) = + new FI.Apply[Command, Command] { + override def apply(in: Command) = f(in) + } + def df(f: JActorContext[Command] ⇒ Behavior[Command]) = + new F1e[JActorContext[Command], Behavior[Command]] { + override def apply(in: JActorContext[Command]) = f(in) + } + + trait SignalOrMessageJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) → null + def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] = + JActor.signalOrMessage(fs((ctx, sig) ⇒ { + monitor ! GotSignal(sig) + SActor.Same + }), fc((ctx, msg) ⇒ msg match { + case GetSelf ⇒ + monitor ! Self(ctx.getSelf) + SActor.Same + case Miss ⇒ + monitor ! Missed + SActor.Unhandled + case Ignore ⇒ + monitor ! Ignored + SActor.Same + case Ping ⇒ + monitor ! Pong + behv(monitor, state) + case Swap ⇒ + monitor ! Swapped + behv(monitor, state.next) + case GetState() ⇒ + monitor ! state + SActor.Same + case Stop ⇒ SActor.Stopped + case _: AuxPing ⇒ SActor.Unhandled + })) + } + object `A SignalOrMessage Behavior (java,native)` extends SignalOrMessageJavaBehavior with NativeSystem + object `A SignalOrMessage Behavior (java,adapted)` extends SignalOrMessageJavaBehavior with AdaptedSystem + + trait StatefulJavaBehavior extends Messages with Become with Stoppable { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) → null + def behv(monitor: ActorRef[Event], state: State): Behavior[Command] = + JActor.stateful { + fc((ctx, msg) ⇒ + msg match { + case GetSelf ⇒ + monitor ! Self(ctx.getSelf) + SActor.Same + case Miss ⇒ + monitor ! Missed + SActor.Unhandled + case Ignore ⇒ + monitor ! Ignored + SActor.Same + case Ping ⇒ + monitor ! Pong + behv(monitor, state) + case Swap ⇒ + monitor ! Swapped + behv(monitor, state.next) + case GetState() ⇒ + monitor ! state + SActor.Same + case Stop ⇒ SActor.Stopped + case _: AuxPing ⇒ SActor.Unhandled + }) + } + } + object `A Stateful Behavior (java,native)` extends StatefulJavaBehavior with NativeSystem + object `A Stateful Behavior (java,adapted)` extends StatefulJavaBehavior with AdaptedSystem + + trait StatelessJavaBehavior extends Messages { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = + (JActor.stateless { + pc((ctx, msg) ⇒ + msg match { + case GetSelf ⇒ monitor ! Self(ctx.getSelf) + case Miss ⇒ monitor ! Missed + case Ignore ⇒ monitor ! Ignored + case Ping ⇒ monitor ! Pong + case Swap ⇒ monitor ! Swapped + case GetState() ⇒ monitor ! StateA + case Stop ⇒ + case _: AuxPing ⇒ + }) + }, null) + } + object `A Stateless Behavior (java,native)` extends StatelessJavaBehavior with NativeSystem + object `A Stateless Behavior (java,adapted)` extends StatelessJavaBehavior with AdaptedSystem + + trait WidenedJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with Siphon { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[Command]("widenedListener") + JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { inbox.ref ! x; x })))) → inbox + } + } + object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem + object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem + + trait DeferredJavaBehavior extends SignalOrMessageJavaBehavior { + override type Aux = Inbox[PreStart] + + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[PreStart]("deferredListener") + (JActor.deferred(df(ctx ⇒ { + inbox.ref ! PreStart + super.behavior(monitor)._1 + })), inbox) + } + + override def checkAux(signal: Signal, aux: Aux): Unit = + signal match { + case PreStart ⇒ aux.receiveAll() should ===(PreStart :: Nil) + case _ ⇒ aux.receiveAll() should ===(Nil) + } + } + object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem + object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem + + trait TapJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with SignalSiphon { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + val inbox = Inbox[Either[Signal, Command]]("tapListener") + (JActor.tap( + ps((_, sig) ⇒ inbox.ref ! Left(sig)), + pc((_, msg) ⇒ inbox.ref ! Right(msg)), + super.behavior(monitor)._1 + ), inbox) + } + } + object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem + object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem + + trait RestarterJavaBehavior extends SignalOrMessageJavaBehavior with Reuse { + override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { + JActor.restarter(classOf[Exception], JActor.OnFailure.RESTART, super.behavior(monitor)._1) → null + } + } + object `A restarter Behavior (java,native)` extends RestarterJavaBehavior with NativeSystem + object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem + } diff --git a/akka-typed/src/test/scala/akka/typed/StepWise.scala b/akka-typed/src/test/scala/akka/typed/StepWise.scala index a85769d0b1..dbdaba83d8 100644 --- a/akka-typed/src/test/scala/akka/typed/StepWise.scala +++ b/akka-typed/src/test/scala/akka/typed/StepWise.scala @@ -106,7 +106,7 @@ object StepWise { def withKeepTraces(b: Boolean): StartWith[T] = new StartWith(b) } - def apply[T](f: (ActorContext[T], StartWith[T]) ⇒ Steps[T, _]): Behavior[T] = + def apply[T](f: (scaladsl.ActorContext[T], StartWith[T]) ⇒ Steps[T, _]): Behavior[T] = Full[Any] { case Sig(ctx, PreStart) ⇒ run(ctx, f(ctx.asInstanceOf[ActorContext[T]], new StartWith(keepTraces = false)).ops.reverse, ()) }.narrow @@ -127,7 +127,7 @@ object StepWise { } } - private def run[T](ctx: ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] = + private def run[T](ctx: scaladsl.ActorContext[Any], ops: List[AST], value: Any): Behavior[Any] = ops match { case Thunk(f) :: tail ⇒ run(ctx, tail, f()) case ThunkV(f) :: tail ⇒ run(ctx, tail, f(value)) diff --git a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala index d451cf45f8..039e09e45d 100644 --- a/akka-typed/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/TypedSpec.scala @@ -25,6 +25,7 @@ import org.junit.runner.RunWith import scala.util.control.NonFatal import org.scalatest.exceptions.TestFailedException import akka.util.TypedMultiMap +import akka.typed.scaladsl.AskPattern /** * Helper class for writing tests for typed Actors with ScalaTest. diff --git a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala index e872e57edc..5466b778de 100644 --- a/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/internal/ActorSystemSpec.scala @@ -108,7 +108,7 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca def `must start system actors and mangle their names`(): Unit = { withSystem("systemActorOf", Empty[String]) { sys ⇒ - import akka.typed.AskPattern._ + import akka.typed.scaladsl.AskPattern._ implicit val timeout = Timeout(1.second) implicit val sched = sys.scheduler diff --git a/akka-typed/src/test/scala/akka/typed/internal/EventStreamSpec.scala b/akka-typed/src/test/scala/akka/typed/internal/EventStreamSpec.scala index ab8fdd93c2..4aedbea9b1 100644 --- a/akka-typed/src/test/scala/akka/typed/internal/EventStreamSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/internal/EventStreamSpec.scala @@ -8,7 +8,7 @@ import scala.concurrent.duration._ import akka.Done import akka.event.Logging._ import akka.typed.ScalaDSL._ -import akka.typed.AskPattern._ +import akka.typed.scaladsl.AskPattern._ import com.typesafe.config.ConfigFactory import java.util.concurrent.Executor import org.scalatest.concurrent.Eventually diff --git a/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala b/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala deleted file mode 100644 index fd4f1a11e7..0000000000 --- a/akka-typed/src/test/scala/akka/typed/patterns/ReceiverSpec.scala +++ /dev/null @@ -1,254 +0,0 @@ -package akka.typed.patterns - -import akka.typed._ -import scala.concurrent.duration._ -import akka.typed.Effect.{ ReceiveTimeoutSet, Scheduled } -import Receiver._ - -object ReceiverSpec { - case class Msg(x: Int) - case class Setup(name: String, creator: ActorContext[Command[Msg]] ⇒ Behavior[Command[Msg]], messages: Int, effects: Int) -} - -class ReceiverSpec extends TypedSpec { - import ReceiverSpec._ - - private val dummyInbox = Inbox[Replies[Msg]]("dummy") - - private val startingPoints: Seq[Setup] = Seq( - Setup("initial", ctx ⇒ behavior[Msg], 0, 0), - Setup("afterGetOneFirst", afterGetOneFirst, 1, 0), - Setup("afterGetOneLater", afterGetOneLater, 1, 2), - Setup("afterGetOneTimeout", afterGetOneTimeout, 1, 2), - Setup("afterGetAll", afterGetAll, 1, 1), - Setup("afterGetAllTimeout", afterGetAllTimeout, 1, 1)) - - private def afterGetOneFirst(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = - behavior[Msg] - .management(ctx, PreStart) - .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] - .message(ctx, GetOne(Duration.Zero)(dummyInbox.ref)) - - private def afterGetOneLater(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = - behavior[Msg] - .management(ctx, PreStart) - .message(ctx, GetOne(1.second)(dummyInbox.ref)) - .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] - - private def afterGetOneTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = - behavior[Msg] - .management(ctx, PreStart) - .message(ctx, GetOne(1.nano)(dummyInbox.ref)) - .asInstanceOf[Behavior[InternalCommand[Msg]]].message(ctx.asInstanceOf[ActorContext[InternalCommand[Msg]]], ReceiveTimeout()).asInstanceOf[Behavior[Command[Msg]]] - - private def afterGetAll(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = - behavior[Msg] - .management(ctx, PreStart) - .message(ctx, GetAll(1.nano)(dummyInbox.ref)) - .asInstanceOf[Behavior[Msg]].message(ctx.asInstanceOf[ActorContext[Msg]], Msg(1)).asInstanceOf[Behavior[Command[Msg]]] - .message(ctx, GetAll(Duration.Zero)(dummyInbox.ref)) - - private def afterGetAllTimeout(ctx: ActorContext[Command[Msg]]): Behavior[Command[Msg]] = - behavior[Msg] - .management(ctx, PreStart) - .message(ctx, GetAll(1.nano)(dummyInbox.ref)) - .message(ctx, GetAll(Duration.Zero)(dummyInbox.ref)) - - private def setup(name: String, behv: Behavior[Command[Msg]] = behavior[Msg])( - proc: (EffectfulActorContext[Command[Msg]], EffectfulActorContext[Msg], Inbox[Replies[Msg]]) ⇒ Unit): Unit = - for (Setup(description, behv, messages, effects) ← startingPoints) { - val ctx = new EffectfulActorContext("ctx", ScalaDSL.ContextAware(behv), 1000, nativeSystem) - withClue(s"[running for starting point '$description' (${ctx.currentBehavior})]: ") { - dummyInbox.receiveAll() should have size messages - ctx.getAllEffects() should have size effects - proc(ctx, ctx.asInstanceOf[EffectfulActorContext[Msg]], Inbox[Replies[Msg]](name)) - } - } - - object `A Receiver` { - - /* - * This test suite assumes that the Receiver is only one actor with two - * sides that share the same ActorRef. - */ - def `must return "self" as external address`(): Unit = - setup("") { (int, ext, _) ⇒ - val inbox = Inbox[ActorRef[Msg]]("extAddr") - int.run(ExternalAddress(inbox.ref)) - int.hasEffects should be(false) - inbox.receiveAll() should be(List(int.self)) - } - - def `must receive one message which arrived first`(): Unit = - setup("getOne") { (int, ext, inbox) ⇒ - // first with zero timeout - ext.run(Msg(1)) - int.run(GetOne(Duration.Zero)(inbox.ref)) - int.getAllEffects() should be(Nil) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil) - // then with positive timeout - ext.run(Msg(2)) - int.run(GetOne(1.second)(inbox.ref)) - int.getAllEffects() should be(Nil) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil) - // then with negative timeout - ext.run(Msg(3)) - int.run(GetOne(-1.second)(inbox.ref)) - int.getAllEffects() should be(Nil) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: Nil) - } - - def `must receive one message which arrives later`(): Unit = - setup("getOneLater") { (int, ext, inbox) ⇒ - int.run(GetOne(1.second)(inbox.ref)) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ d > Duration.Zero should be(true) - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.hasMessages should be(false) - ext.run(Msg(1)) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ d should be theSameInstanceAs (Duration.Undefined) - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil) - } - - def `must reply with no message when asked for immediate value`(): Unit = - setup("getNone") { (int, ext, inbox) ⇒ - int.run(GetOne(Duration.Zero)(inbox.ref)) - int.getAllEffects() should be(Nil) - inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil) - int.run(GetOne(-1.second)(inbox.ref)) - int.getAllEffects() should be(Nil) - inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil) - } - - def `must reply with no message after a timeout`(): Unit = - setup("getNoneTimeout") { (int, ext, inbox) ⇒ - int.run(GetOne(1.nano)(inbox.ref)) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ // okay - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.hasMessages should be(false) - // currently this all takes >1ns, but who knows what the future brings - Thread.sleep(1) - int.asInstanceOf[EffectfulActorContext[InternalCommand[Msg]]].run(ReceiveTimeout()) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ d should be theSameInstanceAs (Duration.Undefined) - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.receiveAll() should be(GetOneResult(int.self, None) :: Nil) - } - - def `must reply with messages which arrived first in the same order (GetOne)`(): Unit = - setup("getMoreOrderOne") { (int, ext, inbox) ⇒ - ext.run(Msg(1)) - ext.run(Msg(2)) - ext.run(Msg(3)) - ext.run(Msg(4)) - int.run(GetOne(Duration.Zero)(inbox.ref)) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(1))) :: Nil) - int.run(GetOne(Duration.Zero)(inbox.ref)) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(2))) :: Nil) - int.run(GetOne(Duration.Zero)(inbox.ref)) - int.run(GetOne(Duration.Zero)(inbox.ref)) - inbox.receiveAll() should be(GetOneResult(int.self, Some(Msg(3))) :: GetOneResult(int.self, Some(Msg(4))) :: Nil) - int.hasEffects should be(false) - } - - def `must reply with messages which arrived first in the same order (GetAll)`(): Unit = - setup("getMoreOrderAll") { (int, ext, inbox) ⇒ - ext.run(Msg(1)) - ext.run(Msg(2)) - int.run(GetAll(Duration.Zero)(inbox.ref)) - inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil) - // now with negative timeout - ext.run(Msg(3)) - ext.run(Msg(4)) - int.run(GetAll(-1.second)(inbox.ref)) - inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(3), Msg(4))) :: Nil) - int.hasEffects should be(false) - } - - private def assertScheduled[T, U](s: Scheduled[T], target: ActorRef[U]): U = { - s.target should be(target) - // unfortunately Scala cannot automatically transfer the hereby established type knowledge - s.msg.asInstanceOf[U] - } - - def `must reply to GetAll with messages which arrived first`(): Unit = - setup("getAllFirst") { (int, ext, inbox) ⇒ - ext.run(Msg(1)) - ext.run(Msg(2)) - int.run(GetAll(1.second)(inbox.ref)) - val msg = int.getAllEffects() match { - case (s: Scheduled[_]) :: Nil ⇒ assertScheduled(s, int.self) - } - int.run(msg) - inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil) - int.hasEffects should be(false) - } - - def `must reply to GetAll with messages which arrived first and later`(): Unit = - setup("getAllFirstAndLater") { (int, ext, inbox) ⇒ - ext.run(Msg(1)) - ext.run(Msg(2)) - int.run(GetAll(1.second)(inbox.ref)) - val msg = int.getAllEffects() match { - case (s: Scheduled[_]) :: Nil ⇒ assertScheduled(s, int.self) - } - inbox.hasMessages should be(false) - ext.run(Msg(3)) - int.run(msg) - inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2), Msg(3))) :: Nil) - int.hasEffects should be(false) - } - - def `must reply to GetAll with messages which arrived later`(): Unit = - setup("getAllLater") { (int, ext, inbox) ⇒ - int.run(GetAll(1.second)(inbox.ref)) - val msg = int.getAllEffects() match { - case (s: Scheduled[_]) :: Nil ⇒ assertScheduled(s, int.self) - } - ext.run(Msg(1)) - ext.run(Msg(2)) - inbox.hasMessages should be(false) - int.run(msg) - inbox.receiveAll() should be(GetAllResult(int.self, List(Msg(1), Msg(2))) :: Nil) - int.hasEffects should be(false) - } - - def `must reply to GetAll immediately while GetOne is pending`(): Unit = - setup("getAllWhileGetOne") { (int, ext, inbox) ⇒ - int.run(GetOne(1.second)(inbox.ref)) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ // okay - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.hasMessages should be(false) - int.run(GetAll(Duration.Zero)(inbox.ref)) - int.getAllEffects() should have size 1 - inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil) - } - - def `must reply to GetAll later while GetOne is pending`(): Unit = - setup("getAllWhileGetOne") { (int, ext, inbox) ⇒ - int.run(GetOne(1.second)(inbox.ref)) - int.getAllEffects() match { - case ReceiveTimeoutSet(d, _) :: Nil ⇒ // okay - case other ⇒ fail(s"$other was not List(ReceiveTimeoutSet(_))") - } - inbox.hasMessages should be(false) - int.run(GetAll(1.nano)(inbox.ref)) - val msg = int.getAllEffects() match { - case (s: Scheduled[_]) :: ReceiveTimeoutSet(_, _) :: Nil ⇒ assertScheduled(s, int.self) - } - inbox.receiveAll() should be(Nil) - int.run(msg) - inbox.receiveAll() should be(GetAllResult(int.self, Nil) :: Nil) - } - } - -} diff --git a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala index a4c24c3c43..93ac31ea51 100644 --- a/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala +++ b/akka-typed/src/test/scala/akka/typed/patterns/ReceptionistSpec.scala @@ -5,7 +5,7 @@ package akka.typed.patterns import Receptionist._ import akka.typed.ScalaDSL._ -import akka.typed.AskPattern._ +import akka.typed.scaladsl.AskPattern._ import scala.concurrent.duration._ import akka.typed._