diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala index 165d8f8506..03d06fd72f 100644 --- a/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/Effects.scala @@ -92,7 +92,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma effectQueue.offer(Spawned(name)) super.spawn(behavior, name) } - override def stop(child: ActorRef[_]): Boolean = { + override def stop[U](child: ActorRef[U]): Boolean = { effectQueue.offer(Stopped(child.path.name)) super.stop(child) } diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/StubbedActorContext.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/StubbedActorContext.scala index 8a9daed51f..952d0b5521 100644 --- a/akka-typed-testkit/src/main/scala/akka/typed/testkit/StubbedActorContext.scala +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/StubbedActorContext.scala @@ -47,7 +47,7 @@ class StubbedActorContext[T]( * Do not actually stop the child inbox, only simulate the liveness check. * Removal is asynchronous, explicit removeInbox is needed from outside afterwards. */ - override def stop(child: ActorRef[_]): Boolean = { + override def stop[U](child: ActorRef[U]): Boolean = { _children.get(child.path.name) match { case None ⇒ false case Some(inbox) ⇒ inbox.ref == child diff --git a/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala index 82b6b382af..3275fbcde3 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/ActorContextSpec.scala @@ -279,7 +279,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( if (system eq nativeSystem) suite + "Native" else suite + "Adapted" - def setup(name: String, wrapper: Option[Actor.Restarter.Apply[_]] = None)( + def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None)( proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith) ⇒ val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx)) diff --git a/akka-typed-tests/src/test/scala/akka/typed/BehaviorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/BehaviorSpec.scala index 5d1a345950..a144bdb630 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/BehaviorSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/BehaviorSpec.scala @@ -601,8 +601,8 @@ class BehaviorSpec extends TypedSpec { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = Inbox[Either[Signal, Command]]("tapListener") (JActor.tap( - ps((_, sig) ⇒ inbox.ref ! Left(sig)), pc((_, msg) ⇒ inbox.ref ! Right(msg)), + ps((_, sig) ⇒ inbox.ref ! Left(sig)), super.behavior(monitor)._1), inbox) } } diff --git a/akka-typed/src/main/java/akka/typed/javadsl/Actor.java b/akka-typed/src/main/java/akka/typed/javadsl/Actor.java deleted file mode 100644 index ef87d79445..0000000000 --- a/akka-typed/src/main/java/akka/typed/javadsl/Actor.java +++ /dev/null @@ -1,392 +0,0 @@ -/** - * Copyright (C) 2017 Lightbend Inc. - */ -package akka.typed.javadsl; - -import static akka.typed.scaladsl.Actor.Empty; -import static akka.typed.scaladsl.Actor.Ignore; -import static akka.typed.scaladsl.Actor.Same; -import static akka.typed.scaladsl.Actor.Stopped; -import static akka.typed.scaladsl.Actor.Unhandled; - -import java.util.function.Function; - -import akka.japi.function.Function2; -import akka.japi.function.Procedure2; -import akka.japi.pf.PFBuilder; -import akka.typed.*; -import akka.typed.internal.Restarter; -import akka.typed.scaladsl.Actor.Widened; -import scala.reflect.ClassTag; - -public abstract class Actor { - /* - * This DSL is implemented in Java in order to ensure consistent usability from Java, - * taking possible Scala oddities out of the equation. There is some duplication in - * the behavior implementations, but that is unavoidable if both DSLs shall offer the - * same runtime performance (especially concerning allocations for function converters). - */ - - private static class Deferred extends Behavior.DeferredBehavior { - final akka.japi.function.Function, Behavior> producer; - - public Deferred(akka.japi.function.Function, Behavior> producer) { - this.producer = producer; - } - - @Override - public Behavior apply(akka.typed.ActorContext ctx) throws Exception { - return producer.apply(ctx); - } - } - - private static class Immutable extends ExtensibleBehavior { - final Function2, T, Behavior> message; - final Function2, Signal, Behavior> signal; - - public Immutable(Function2, T, Behavior> message, - Function2, Signal, Behavior> signal) { - this.signal = signal; - this.message = message; - } - - @Override - public Behavior receiveSignal(akka.typed.ActorContext ctx, Signal msg) throws Exception { - return signal.apply(ctx, msg); - } - - @Override - public Behavior receiveMessage(akka.typed.ActorContext ctx, T msg) throws Exception { - return message.apply(ctx, msg); - } - } - - private static class Tap extends ExtensibleBehavior { - final Procedure2, Signal> signal; - final Procedure2, T> message; - final Behavior behavior; - - public Tap(Procedure2, Signal> signal, Procedure2, T> message, - Behavior behavior) { - this.signal = signal; - this.message = message; - this.behavior = behavior; - } - - private Behavior canonicalize(Behavior behv) { - if (Behavior.isUnhandled(behv)) - return Unhandled(); - else if (behv == Same() || behv == this) - return Same(); - else if (Behavior.isAlive(behv)) - return new Tap(signal, message, behv); - else - return Stopped(); - } - - @Override - public Behavior receiveSignal(akka.typed.ActorContext ctx, Signal signal) throws Exception { - this.signal.apply(ctx, signal); - return canonicalize(Behavior.interpretSignal(behavior, ctx, signal)); - } - - @Override - public Behavior receiveMessage(akka.typed.ActorContext ctx, T msg) throws Exception { - message.apply(ctx, msg); - return canonicalize(Behavior.interpretMessage(behavior, ctx, msg)); - } - } - - private static Function2 _unhandledFun = (ctx, msg) -> Unhandled(); - - @SuppressWarnings("unchecked") - private static Function2, Signal, Behavior> unhandledFun() { - return (Function2, Signal, Behavior>) (Object) _unhandledFun; - } - - private static Procedure2 _doNothing = (ctx, msg) -> { - }; - - @SuppressWarnings("unchecked") - private static Procedure2, Signal> doNothing() { - return (Procedure2, Signal>) (Object) _doNothing; - } - - /** - * Construct an actor behavior that can react to incoming messages but not to - * lifecycle signals. After spawning this actor from another actor (or as the - * guardian of an {@link akka.typed.ActorSystem}) it will be executed within an - * {@link ActorContext} that allows access to the system, spawning and watching - * other actors, etc. - * - * This constructor is called immutable because the behavior instance doesn't - * have or close over any mutable state. Processing the next message - * results in a new behavior that can potentially be different from this one. - * State is updated by returning a new behavior that holds the new immutable - * state. If no change is desired, use {@link #same}. - * - * @param message - * the function that describes how this actor reacts to the next - * message - * @return the behavior - */ - static public Behavior immutable(Function2, T, Behavior> message) { - return new Immutable(message, unhandledFun()); - } - - /** - * Construct an actor behavior that can react to both incoming messages and - * lifecycle signals. After spawning this actor from another actor (or as the - * guardian of an {@link akka.typed.ActorSystem}) it will be executed within an - * {@link ActorContext} that allows access to the system, spawning and watching - * other actors, etc. - * - * This constructor is called immutable because the behavior instance doesn't - * have or close over any mutable state. Processing the next message - * results in a new behavior that can potentially be different from this one. - * State is updated by returning a new behavior that holds the new immutable - * state. If no change is desired, use {@link #same}. - * - * @param message - * the function that describes how this actor reacts to the next - * message - * @param signal - * the function that describes how this actor reacts to the given - * signal - * @return the behavior - */ - static public Behavior immutable(Function2, T, Behavior> message, - Function2, Signal, Behavior> signal) { - return new Immutable(message, signal); - } - - /** - * Return this behavior from message processing in order to advise the system - * to reuse the previous behavior. This is provided in order to avoid the - * allocation overhead of recreating the current behavior where that is not - * necessary. - * - * @return pseudo-behavior marking “no change” - */ - static public Behavior same() { - return Same(); - } - - /** - * Return this behavior from message processing in order to advise the system - * to reuse the previous behavior, including the hint that the message has not - * been handled. This hint may be used by composite behaviors that delegate - * (partial) handling to other behaviors. - * - * @return pseudo-behavior marking “unhandled” - */ - static public Behavior unhandled() { - return Unhandled(); - } - - /** - * Return this behavior from message processing to signal that this actor - * shall terminate voluntarily. If this actor has created child actors then - * these will be stopped as part of the shutdown procedure. The PostStop - * signal that results from stopping this actor will NOT be passed to the - * current behavior, it will be effectively ignored. - * - * @return the inert behavior - */ - static public Behavior stopped() { - return Stopped(); - } - - /** - * A behavior that treats every incoming message as unhandled. - * - * @return the empty behavior - */ - static public Behavior empty() { - return Empty(); - } - - /** - * A behavior that ignores every incoming message and returns “same”. - * - * @return the inert behavior - */ - static public Behavior ignore() { - return Ignore(); - } - - /** - * Behavior decorator that allows you to perform any side-effect before a - * signal or message is delivered to the wrapped behavior. The wrapped - * behavior can evolve (i.e. be immutable) without needing to be wrapped in a - * tap(...) call again. - * - * @param signal - * procedure to invoke with the {@link ActorContext} and the - * {@link akka.typed.Signal} as arguments before delivering the signal to - * the wrapped behavior - * @param message - * procedure to invoke with the {@link ActorContext} and the received - * message as arguments before delivering the signal to the wrapped - * behavior - * @param behavior - * initial behavior to be wrapped - * @return the decorated behavior - */ - static public Behavior tap(Procedure2, Signal> signal, Procedure2, T> message, - Behavior behavior) { - return new Tap(signal, message, behavior); - } - - /** - * Behavior decorator that copies all received message to the designated - * monitor {@link akka.typed.ActorRef} before invoking the wrapped behavior. The - * wrapped behavior can evolve (i.e. return different behavior) without needing to be - * wrapped in a monitor(...) call again. - * - * @param monitor - * ActorRef to which to copy all received messages - * @param behavior - * initial behavior to be wrapped - * @return the decorated behavior - */ - static public Behavior monitor(ActorRef monitor, Behavior behavior) { - return new Tap(doNothing(), (ctx, msg) -> monitor.tell(msg), behavior); - } - - /** - * Wrap the given behavior such that it is restarted (i.e. reset to its - * initial state) whenever it throws an exception of the given class or a - * subclass thereof. Exceptions that are not subtypes of Thr will not be - * caught and thus lead to the termination of the actor. - * - * It is possible to specify different supervisor strategies, such as restart, - * resume, backoff. - * - * @param clazz - * the type of exceptions that shall be caught - * @param strategy - * whether to restart, resume, or backoff the actor upon a caught failure - * @param initialBehavior - * the initial behavior, that is also restored during a restart - * @return the wrapped behavior - */ - static public Behavior restarter(Class clazz, SupervisorStrategy strategy, - Behavior initialBehavior) { - final ClassTag catcher = akka.japi.Util.classTag(clazz); - return Restarter.apply(Behavior.validateAsInitial(initialBehavior), strategy, catcher); - } - - /** - * Widen the wrapped Behavior by placing a funnel in front of it: the supplied - * PartialFunction decides which message to pull in (those that it is defined - * at) and may transform the incoming message to place them into the wrapped - * Behavior’s type hierarchy. Signals are not transformed. - * - *
-   * Behavior<String> s = immutable((ctx, msg) -> {
-   *     System.out.println(msg);
-   *     return same();
-   *   });
-   * Behavior<Number> n = widened(s, pf -> pf.
-   *         match(BigInteger.class, i -> "BigInteger(" + i + ")").
-   *         match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
-   *         // drop all other kinds of Number
-   *     );
-   * 
- * - * @param behavior - * the behavior that will receive the selected messages - * @param selector - * a partial function builder for describing the selection and - * transformation - * @return a behavior of the widened type - */ - static public Behavior widened(Behavior behavior, Function, PFBuilder> selector) { - return new Widened(behavior, selector.apply(new PFBuilder<>()).build()); - } - - /** - * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior - * creation is deferred to the child actor instead of running within the - * parent. - * - * @param producer - * behavior factory that takes the child actor’s context as argument - * @return the deferred behavior - */ - static public Behavior deferred(akka.japi.function.Function, Behavior> producer) { - return new Deferred(producer); - } - - /** - * Factory for creating a MutableBehavior that typically holds mutable state as - * instance variables in the concrete MutableBehavior implementation class. - * - * Creation of the behavior instance is deferred, i.e. it is created via the producer - * function. The reason for the deferred creation is to avoid sharing the same instance in - * multiple actors, and to create a new instance when the actor is restarted. - * - * @param producer - * behavior factory that takes the child actor’s context as argument - * @return the deferred behavior - */ - static public Behavior mutable(akka.japi.function.Function, MutableBehavior> producer) { - return deferred(ctx -> producer.apply(ctx)); - } - - /** - * Mutable behavior can be implemented by extending this class and implement the - * abstract method {@link MutableBehavior#onMessage} and optionally override - * {@link MutableBehavior#onSignal}. - * - * Instances of this behavior should be created via {@link Actor#mutable} and if - * the {@link ActorContext} is needed it can be passed as a constructor parameter - * from the factory function. - * - * @see Actor#mutable - */ - static public abstract class MutableBehavior extends ExtensibleBehavior { - @Override - final public Behavior receiveMessage(akka.typed.ActorContext ctx, T msg) throws Exception { - return onMessage(msg); - } - - /** - * Implement this method to process an incoming message and return the next behavior. - * - * The returned behavior can in addition to normal behaviors be one of the canned special objects: - *
    - *
  • returning `stopped` will terminate this Behavior
  • - *
  • returning `this` or `same` designates to reuse the current Behavior
  • - *
  • returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
  • - *
- * - */ - public abstract Behavior onMessage(T msg) throws Exception; - - @Override - final public Behavior receiveSignal(akka.typed.ActorContext ctx, Signal msg) throws Exception { - return onSignal(msg); - } - - /** - * Override this method to process an incoming {@link akka.typed.Signal} and return the next behavior. - * This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages - * can initiate a behavior change. - * - * The returned behavior can in addition to normal behaviors be one of the canned special objects: - *
    - *
  • returning `stopped` will terminate this Behavior
  • - *
  • returning `this` or `same` designates to reuse the current Behavior
  • - *
  • returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
  • - *
- * - * By default, this method returns `unhandled`. - */ - public Behavior onSignal(Signal msg) throws Exception { - return unhandled(); - } - } - -} diff --git a/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java b/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java deleted file mode 100644 index dc92b15f73..0000000000 --- a/akka-typed/src/main/java/akka/typed/javadsl/ActorContext.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Copyright (C) 2017 Lightbend Inc. - */ -package akka.typed.javadsl; - -import java.util.List; -import java.util.Optional; -import java.util.function.Function; - -import akka.actor.Cancellable; -import akka.typed.ActorRef; -import akka.typed.ActorSystem; -import akka.typed.Behavior; -import akka.typed.DeploymentConfig; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.duration.FiniteDuration; - -/** - * An Actor is given by the combination of a {@link akka.typed.Behavior} and a context in which - * this behavior is executed. As per the Actor Model an Actor can perform the - * following actions when processing a message: - * - *
    - *
  • send a finite number of messages to other Actors it knows
  • - *
  • create a finite number of Actors
  • - *
  • designate the behavior for the next message
  • - *
- * - * In Akka the first capability is accessed by using the {@link akka.typed.ActorRef#tell} - * method, the second is provided by {@link akka.typed.ActorContext#spawn} and the - * third is implicit in the signature of {@link akka.typed.Behavior} in that the next behavior - * is always returned from the message processing logic. - * - * An ActorContext in addition provides access to the Actor’s own identity - * ({@link #getSelf “self”}), the {@link akka.typed.ActorSystem} it is part of, methods for querying the list - * of child Actors it created, access to {@link akka.typed.Terminated DeathWatch} and timed - * message scheduling. - */ -public interface ActorContext { - - /** - * The identity of this Actor, bound to the lifecycle of this Actor instance. - * An Actor with the same name that lives before or after this instance will - * have a different {@link akka.typed.ActorRef}. - */ - public ActorRef getSelf(); - - /** - * Return the mailbox capacity that was configured by the parent for this - * actor. - */ - public int getMailboxCapacity(); - - /** - * The {@link akka.typed.ActorSystem} to which this Actor belongs. - */ - public ActorSystem getSystem(); - - /** - * The list of child Actors created by this Actor during its lifetime that are - * still alive, in no particular order. - */ - public List> getChildren(); - - /** - * The named child Actor if it is alive. - */ - public Optional> getChild(String name); - - /** - * Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name. - * It is good practice to name Actors wherever practical. - */ - public ActorRef spawnAnonymous(Behavior behavior); - - /** - * Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name. - * It is good practice to name Actors wherever practical. - */ - public ActorRef spawnAnonymous(Behavior behavior, DeploymentConfig deployment); - - /** - * Create a child Actor from the given {@link akka.typed.Behavior} and with the given name. - */ - public ActorRef spawn(Behavior behavior, String name); - - /** - * Create a child Actor from the given {@link akka.typed.Behavior} and with the given name. - */ - public ActorRef spawn(Behavior behavior, String name, DeploymentConfig deployment); - - /** - * Force the child Actor under the given name to terminate after it finishes - * processing its current message. Nothing happens if the ActorRef does not - * refer to a current child actor. - * - * @return whether the passed-in {@link akka.typed.ActorRef} points to a current child Actor - */ - public boolean stop(ActorRef child); - - /** - * Register for {@link akka.typed.Terminated} notification once the Actor identified by the - * given {@link akka.typed.ActorRef} terminates. This notification is also generated when the - * {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed - * (e.g. in reaction to being unreachable). - */ - public void watch(ActorRef other); - - /** - * Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated} - * notification will not subsequently be received for the referenced Actor. - */ - public void unwatch(ActorRef other); - - /** - * Schedule the sending of a notification in case no other message is received - * during the given period of time. The timeout starts anew with each received - * message. Provide scala.concurrent.duration.Duration.Undefined to switch off this mechanism. - */ - public void setReceiveTimeout(FiniteDuration d, T msg); - - /** - * Cancel the sending of receive timeout notifications. - */ - public void cancelReceiveTimeout(); - - /** - * Schedule the sending of the given message to the given target Actor after - * the given time period has elapsed. The scheduled action can be cancelled by - * invoking {@link akka.actor.Cancellable#cancel} on the returned handle. - */ - public Cancellable schedule(FiniteDuration delay, ActorRef target, U msg); - - /** - * This Actor’s execution context. It can be used to run asynchronous tasks - * like scala.concurrent.Future combinators. - */ - public ExecutionContextExecutor getExecutionContext(); - - /** - * Create a child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - * - * The name of the child actor will be composed of a unique identifier - * starting with a dollar sign to which the given name argument is appended, - * with an inserted hyphen between these two parts. Therefore the given name - * argument does not need to be unique within the scope of the parent actor. - */ - public ActorRef createAdapter(Function f, String name); - - /** - * Create an anonymous child actor that will wrap messages such that other - * Actor’s protocols can be ingested by this Actor. You are strongly advised - * to cache these ActorRefs or to stop them when no longer needed. - */ - public ActorRef createAdapter(Function f); - -} diff --git a/akka-typed/src/main/scala/akka/typed/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/ActorContext.scala index f900cddc02..3565d8f29e 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorContext.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorContext.scala @@ -26,7 +26,7 @@ trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext case None ⇒ Optional.empty() } - override def getChildren(): java.util.List[akka.typed.ActorRef[Void]] = { + override def getChildren: java.util.List[akka.typed.ActorRef[Void]] = { val c = children val a = new ArrayList[ActorRef[Void]](c.size) val i = c.iterator @@ -34,16 +34,16 @@ trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext a } - override def getExecutionContext(): ExecutionContextExecutor = + override def getExecutionContext: ExecutionContextExecutor = executionContext - override def getMailboxCapacity(): Int = + override def getMailboxCapacity: Int = mailboxCapacity - override def getSelf(): akka.typed.ActorRef[T] = + override def getSelf: akka.typed.ActorRef[T] = self - override def getSystem(): akka.typed.ActorSystem[Void] = + override def getSystem: akka.typed.ActorSystem[Void] = system.asInstanceOf[ActorSystem[Void]] override def spawn[U](behavior: akka.typed.Behavior[U], name: String): akka.typed.ActorRef[U] = diff --git a/akka-typed/src/main/scala/akka/typed/ActorRef.scala b/akka-typed/src/main/scala/akka/typed/ActorRef.scala index 5fc333f71b..13d2cdbc2e 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorRef.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorRef.scala @@ -69,7 +69,7 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act object ActorRef { - implicit final class ActorRefScalaTell[-T](val ref: ActorRef[T]) extends AnyVal { + implicit final class ActorRefOps[-T](val ref: ActorRef[T]) extends AnyVal { /** * Send a message to the Actor referenced by this ActorRef using *at-most-once* * messaging semantics. diff --git a/akka-typed/src/main/scala/akka/typed/Behavior.scala b/akka-typed/src/main/scala/akka/typed/Behavior.scala index 77180872f2..8e8d8ba257 100644 --- a/akka-typed/src/main/scala/akka/typed/Behavior.scala +++ b/akka-typed/src/main/scala/akka/typed/Behavior.scala @@ -3,6 +3,7 @@ */ package akka.typed +import akka.util.LineNumbers import akka.annotation.{ DoNotInherit, InternalApi } import scala.annotation.tailrec @@ -145,12 +146,16 @@ object Behavior { /** * INTERNAL API. + * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi - private[akka] abstract class DeferredBehavior[T] extends Behavior[T] { + private[akka] final case class DeferredBehavior[T](factory: ActorContext[T] ⇒ Behavior[T]) extends Behavior[T] { + /** "undefer" the deferred behavior */ @throws(classOf[Exception]) - def apply(ctx: ActorContext[T]): Behavior[T] + def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx) + + override def toString: String = s"Deferred(${LineNumbers(factory)})" } /** diff --git a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala index 39d74d4e74..bba4660148 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/ActorCell.scala @@ -122,7 +122,7 @@ private[typed] class ActorCell[T]( spawn(behavior, name, deployment) } - override def stop(child: ActorRef[_]): Boolean = { + override def stop[U](child: ActorRef[U]): Boolean = { val name = child.path.name childrenMap.get(name) match { case None ⇒ false diff --git a/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala new file mode 100644 index 0000000000..073372c6cd --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/internal/BehaviorImpl.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed +package internal + +import akka.util.LineNumbers +import akka.annotation.InternalApi +import akka.typed.{ ActorContext ⇒ AC } +import akka.typed.scaladsl.Actor + +/** + * INTERNAL API + */ +@InternalApi private[akka] object BehaviorImpl { + import Behavior._ + + private val _nullFun = (_: Any) ⇒ null + private def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] + + implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal { + def as[U] = ctx.asInstanceOf[AC[U]] + } + + final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] { + private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] = + if (isUnhandled(behv)) unhandled + else if (isAlive(behv)) { + val next = canonicalize(behv, behavior, ctx) + if (next eq behavior) same else Widened(next, matcher) + } else stopped + + override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] = + postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) + + override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] = + matcher.applyOrElse(msg, nullFun) match { + case null ⇒ unhandled + case transformed ⇒ postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) + } + + override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" + } + + class ImmutableBehavior[T]( + val onMessage: (ActorContext[T], T) ⇒ Behavior[T], + onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]]) + extends ExtensibleBehavior[T] { + + override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = + onSignal.applyOrElse((ctx, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]]) + override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx, msg) + override def toString = s"Immutable(${LineNumbers(onMessage)})" + } + + final case class Tap[T]( + onMessage: Function2[ActorContext[T], T, _], + onSignal: Function2[ActorContext[T], Signal, _], + behavior: Behavior[T]) extends ExtensibleBehavior[T] { + + private def canonical(behv: Behavior[T]): Behavior[T] = + if (isUnhandled(behv)) unhandled + else if ((behv eq SameBehavior) || (behv eq this)) same + else if (isAlive(behv)) Tap(onMessage, onSignal, behv) + else stopped + override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = { + onSignal(ctx, signal) + canonical(Behavior.interpretSignal(behavior, ctx, signal)) + } + override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = { + onMessage(ctx, msg) + canonical(Behavior.interpretMessage(behavior, ctx, msg)) + } + override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)" + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala b/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala index 9292c5e0a2..2c4c1023b5 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/EventStreamImpl.scala @@ -44,25 +44,26 @@ private[typed] class EventStreamImpl(private val debug: Boolean)(implicit privat import scaladsl.Actor Actor.Deferred[Command] { _ ⇒ if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $this")) - Actor.Immutable[Command] { - case (ctx, Register(actor)) ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) - ctx.watch(actor) - Actor.Same + Actor.Immutable[Command] { (ctx, msg) ⇒ + msg match { + case Register(actor) ⇒ + if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) + ctx.watch(actor) + Actor.Same - case (ctx, UnregisterIfNoMoreSubscribedChannels(actor)) if hasSubscriptions(actor) ⇒ Actor.Same - // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream + case UnregisterIfNoMoreSubscribedChannels(actor) if hasSubscriptions(actor) ⇒ Actor.Same + // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream - case (ctx, UnregisterIfNoMoreSubscribedChannels(actor)) ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) - ctx.unwatch(actor) - Actor.Same + case UnregisterIfNoMoreSubscribedChannels(actor) ⇒ + if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) + ctx.unwatch(actor) + Actor.Same + } } onSignal { case (_, Terminated(actor)) ⇒ if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $this, because it was terminated")) unsubscribe(actor) Actor.Same - case (_, _) ⇒ Actor.Unhandled } } } diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala index 7000a685a2..e9c35e5d2c 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorContextAdapter.scala @@ -27,7 +27,7 @@ import akka.annotation.InternalApi ActorContextAdapter.spawnAnonymous(untyped, behavior, deployment) override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) = ActorContextAdapter.spawn(untyped, behavior, name, deployment) - override def stop(child: ActorRef[_]) = + override def stop[U](child: ActorRef[U]) = toUntyped(child) match { case f: akka.actor.FunctionRef ⇒ val cell = untyped.asInstanceOf[akka.actor.ActorCell] diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala new file mode 100644 index 0000000000..c707d3e968 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/javadsl/Actor.scala @@ -0,0 +1,243 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl + +import java.util.function.{ Function ⇒ JFunction } +import akka.japi.function.{ Function2 ⇒ JapiFunction2 } +import akka.japi.function.Procedure2 +import akka.typed.Behavior +import akka.typed.ExtensibleBehavior +import akka.typed.Signal +import akka.typed.internal.BehaviorImpl +import akka.typed.ActorRef +import akka.typed.SupervisorStrategy +import scala.reflect.ClassTag +import akka.typed.internal.Restarter +import akka.japi.pf.PFBuilder + +object Actor { + + private val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () + private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] + + /** + * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation + * is deferred to the child actor instead of running within the parent. + */ + def deferred[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] = + Behavior.DeferredBehavior(ctx ⇒ factory.apply(ctx)) + + /** + * Factory for creating a [[MutableBehavior]] that typically holds mutable state as + * instance variables in the concrete [[MutableBehavior]] implementation class. + * + * Creation of the behavior instance is deferred, i.e. it is created via the `factory` + * function. The reason for the deferred creation is to avoid sharing the same instance in + * multiple actors, and to create a new instance when the actor is restarted. + * + * @param producer + * behavior factory that takes the child actor’s context as argument + * @return the deferred behavior + */ + def mutable[T](factory: akka.japi.function.Function[ActorContext[T], MutableBehavior[T]]): Behavior[T] = + deferred(factory) + + /** + * Mutable behavior can be implemented by extending this class and implement the + * abstract method [[MutableBehavior#onMessage]] and optionally override + * [[MutableBehavior#onSignal]]. + * + * Instances of this behavior should be created via [[Actor#mutable]] and if + * the [[ActorContext]] is needed it can be passed as a constructor parameter + * from the factory function. + * + * @see [[Actor#mutable]] + */ + abstract class MutableBehavior[T] extends ExtensibleBehavior[T] { + @throws(classOf[Exception]) + override final def receiveMessage(ctx: akka.typed.ActorContext[T], msg: T): Behavior[T] = + onMessage(msg) + + /** + * Implement this method to process an incoming message and return the next behavior. + * + * The returned behavior can in addition to normal behaviors be one of the canned special objects: + *
    + *
  • returning `stopped` will terminate this Behavior
  • + *
  • returning `this` or `same` designates to reuse the current Behavior
  • + *
  • returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
  • + *
+ * + */ + @throws(classOf[Exception]) + def onMessage(msg: T): Behavior[T] + + @throws(classOf[Exception]) + override final def receiveSignal(ctx: akka.typed.ActorContext[T], msg: Signal): Behavior[T] = + onSignal(msg) + + /** + * Override this method to process an incoming [[akka.typed.Signal]] and return the next behavior. + * This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages + * can initiate a behavior change. + * + * The returned behavior can in addition to normal behaviors be one of the canned special objects: + * + * * returning `stopped` will terminate this Behavior + * * returning `this` or `Same` designates to reuse the current Behavior + * * returning `unhandled` keeps the same Behavior and signals that the message was not yet handled + * + * By default, this method returns `unhandled`. + */ + @throws(classOf[Exception]) + def onSignal(msg: Signal): Behavior[T] = + unhandled + } + + /** + * Return this behavior from message processing in order to advise the + * system to reuse the previous behavior. This is provided in order to + * avoid the allocation overhead of recreating the current behavior where + * that is not necessary. + */ + def same[T]: Behavior[T] = Behavior.same + + /** + * Return this behavior from message processing in order to advise the + * system to reuse the previous behavior, including the hint that the + * message has not been handled. This hint may be used by composite + * behaviors that delegate (partial) handling to other behaviors. + */ + def unhandled[T]: Behavior[T] = Behavior.unhandled + + /** + * Return this behavior from message processing to signal that this actor + * shall terminate voluntarily. If this actor has created child actors then + * these will be stopped as part of the shutdown procedure. The PostStop + * signal that results from stopping this actor will NOT be passed to the + * current behavior, it will be effectively ignored. + */ + def stopped[T]: Behavior[T] = Behavior.stopped + + /** + * A behavior that treats every incoming message as unhandled. + */ + def empty[T]: Behavior[T] = Behavior.empty + + /** + * A behavior that ignores every incoming message and returns “same”. + */ + def ignore[T]: Behavior[T] = Behavior.ignore + + /** + * Construct an actor behavior that can react to incoming messages but not to + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an [[akka.typed.ActorSystem]]) it will be executed within an + * [[ActorContext]] that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called immutable because the behavior instance doesn't + * have or close over any mutable state. Processing the next message + * results in a new behavior that can potentially be different from this one. + * State is updated by returning a new behavior that holds the new immutable + * state. + */ + def immutable[T](onMessage: JapiFunction2[ActorContext[T], T, Behavior[T]]): Behavior[T] = + new BehaviorImpl.ImmutableBehavior((ctx, msg) ⇒ onMessage.apply(ctx, msg)) + + /** + * Construct an actor behavior that can react to both incoming messages and + * lifecycle signals. After spawning this actor from another actor (or as the + * guardian of an [[akka.typed.ActorSystem]]) it will be executed within an + * [[ActorContext]] that allows access to the system, spawning and watching + * other actors, etc. + * + * This constructor is called immutable because the behavior instance doesn't + * have or close over any mutable state. Processing the next message + * results in a new behavior that can potentially be different from this one. + * State is updated by returning a new behavior that holds the new immutable + * state. + */ + def immutable[T]( + onMessage: JapiFunction2[ActorContext[T], T, Behavior[T]], + onSignal: JapiFunction2[ActorContext[T], Signal, Behavior[T]]): Behavior[T] = { + new BehaviorImpl.ImmutableBehavior( + (ctx, msg) ⇒ onMessage.apply(ctx, msg), + { case (ctx, sig) ⇒ onSignal.apply(ctx, sig) }) + } + + /** + * This type of Behavior wraps another Behavior while allowing you to perform + * some action upon each received message or signal. It is most commonly used + * for logging or tracing what a certain Actor does. + */ + def tap[T]( + onMessage: Procedure2[ActorContext[T], T], + onSignal: Procedure2[ActorContext[T], Signal], + behavior: Behavior[T]): Behavior[T] = { + BehaviorImpl.Tap( + (ctx, msg) ⇒ onMessage.apply(ctx, msg), + (ctx, sig) ⇒ onSignal.apply(ctx, sig), + behavior) + } + + /** + * Behavior decorator that copies all received message to the designated + * monitor [[akka.typed.ActorRef]] before invoking the wrapped behavior. The + * wrapped behavior can evolve (i.e. return different behavior) without needing to be + * wrapped in a `monitor` call again. + */ + def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = { + BehaviorImpl.Tap( + (ctx, msg) ⇒ monitor ! msg, + unitFunction, + behavior) + } + + /** + * Wrap the given behavior such that it is restarted (i.e. reset to its + * initial state) whenever it throws an exception of the given class or a + * subclass thereof. Exceptions that are not subtypes of `Thr` will not be + * caught and thus lead to the termination of the actor. + * + * It is possible to specify different supervisor strategies, such as restart, + * resume, backoff. + */ + def restarter[T, Thr <: Throwable]( + clazz: Class[Thr], + strategy: SupervisorStrategy, + initialBehavior: Behavior[T]): Behavior[T] = { + Restarter(Behavior.validateAsInitial(initialBehavior), strategy)(ClassTag(clazz)) + } + + /** + * Widen the wrapped Behavior by placing a funnel in front of it: the supplied + * PartialFunction decides which message to pull in (those that it is defined + * at) and may transform the incoming message to place them into the wrapped + * Behavior’s type hierarchy. Signals are not transformed. + * + * Example: + * {{{ + * Behavior<String> s = immutable((ctx, msg) -> { + * System.out.println(msg); + * return same(); + * }); + * Behavior<Number> n = widened(s, pf -> pf. + * match(BigInteger.class, i -> "BigInteger(" + i + ")"). + * match(BigDecimal.class, d -> "BigDecimal(" + d + ")") + * // drop all other kinds of Number + * ); + * }}} + * + * @param behavior + * the behavior that will receive the selected messages + * @param selector + * a partial function builder for describing the selection and + * transformation + * @return a behavior of the widened type + */ + def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] = + BehaviorImpl.Widened(behavior, selector.apply(new PFBuilder).build()) + +} diff --git a/akka-typed/src/main/scala/akka/typed/javadsl/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/javadsl/ActorContext.scala new file mode 100644 index 0000000000..b7559216c7 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/javadsl/ActorContext.scala @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.javadsl + +import java.util.function.{ Function ⇒ JFunction } +import akka.annotation.DoNotInherit +import akka.annotation.ApiMayChange +import akka.typed.ActorRef +import akka.typed.ActorSystem +import java.util.Optional +import akka.typed.Behavior +import akka.typed.DeploymentConfig +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContextExecutor + +/** + * An Actor is given by the combination of a [[Behavior]] and a context in + * which this behavior is executed. As per the Actor Model an Actor can perform + * the following actions when processing a message: + * + * - send a finite number of messages to other Actors it knows + * - create a finite number of Actors + * - designate the behavior for the next message + * + * In Akka the first capability is accessed by using the `tell` method + * on an [[ActorRef]], the second is provided by [[ActorContext#spawn]] + * and the third is implicit in the signature of [[Behavior]] in that the next + * behavior is always returned from the message processing logic. + * + * An `ActorContext` in addition provides access to the Actor’s own identity (“`getSelf`”), + * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it + * created, access to [[Terminated DeathWatch]] and timed message scheduling. + */ +@DoNotInherit +@ApiMayChange +trait ActorContext[T] { + // this must be a pure interface, i.e. only abstract methods + + /** + * The identity of this Actor, bound to the lifecycle of this Actor instance. + * An Actor with the same name that lives before or after this instance will + * have a different [[ActorRef]]. + */ + def getSelf: ActorRef[T] + + /** + * Return the mailbox capacity that was configured by the parent for this actor. + */ + def getMailboxCapacity: Int + + /** + * The [[ActorSystem]] to which this Actor belongs. + */ + def getSystem: ActorSystem[Void] + + /** + * The list of child Actors created by this Actor during its lifetime that + * are still alive, in no particular order. + */ + def getChildren: java.util.List[ActorRef[Void]] + + /** + * The named child Actor if it is alive. + */ + def getChild(name: String): Optional[ActorRef[Void]] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + def spawnAnonymous[U](behavior: Behavior[U]): ActorRef[U] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig): ActorRef[U] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] and with the given name. + */ + def spawn[U](behavior: Behavior[U], name: String): ActorRef[U] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] and with the given name. + */ + def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRef[U] + + /** + * Force the child Actor under the given name to terminate after it finishes + * processing its current message. Nothing happens if the ActorRef does not + * refer to a current child actor. + * + * @return whether the passed-in [[ActorRef]] points to a current child Actor + */ + def stop[U](child: ActorRef[U]): Boolean + + /** + * Register for [[Terminated]] notification once the Actor identified by the + * given [[ActorRef]] terminates. This notification is also generated when the + * [[ActorSystem]] to which the referenced Actor belongs is declared as + * failed (e.g. in reaction to being unreachable). + */ + def watch[U](other: ActorRef[U]): Unit + + /** + * Revoke the registration established by `watch`. A [[Terminated]] + * notification will not subsequently be received for the referenced Actor. + */ + def unwatch[U](other: ActorRef[U]): Unit + + /** + * Schedule the sending of a notification in case no other + * message is received during the given period of time. The timeout starts anew + * with each received message. Provide `Duration.Undefined` to switch off this + * mechanism. + */ + def setReceiveTimeout(d: FiniteDuration, msg: T): Unit + + /** + * Cancel the sending of receive timeout notifications. + */ + def cancelReceiveTimeout(): Unit + + /** + * Schedule the sending of the given message to the given target Actor after + * the given time period has elapsed. The scheduled action can be cancelled + * by invoking [[akka.actor.Cancellable#cancel]] on the returned + * handle. + */ + def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable + + /** + * This Actor’s execution context. It can be used to run asynchronous tasks + * like [[scala.concurrent.Future]] combinators. + */ + def getExecutionContext: ExecutionContextExecutor + + /** + * Create a child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + * + * The name of the child actor will be composed of a unique identifier + * starting with a dollar sign to which the given `name` argument is + * appended, with an inserted hyphen between these two parts. Therefore + * the given `name` argument does not need to be unique within the scope + * of the parent actor. + */ + def createAdapter[U](f: JFunction[U, T], name: String): ActorRef[U] + + /** + * Create an anonymous child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + */ + def createAdapter[U](f: JFunction[U, T]): ActorRef[U] + +} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala index 297be00742..6952c24d04 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala @@ -4,152 +4,19 @@ package akka.typed package scaladsl -import akka.util.LineNumbers import scala.reflect.ClassTag -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.ExecutionContextExecutor -import scala.deprecatedInheritance -import akka.typed.{ ActorContext ⇒ AC } + import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit - -/** - * An Actor is given by the combination of a [[Behavior]] and a context in - * which this behavior is executed. As per the Actor Model an Actor can perform - * the following actions when processing a message: - * - * - send a finite number of messages to other Actors it knows - * - create a finite number of Actors - * - designate the behavior for the next message - * - * In Akka the first capability is accessed by using the `!` or `tell` method - * on an [[ActorRef]], the second is provided by [[ActorContext#spawn]] - * and the third is implicit in the signature of [[Behavior]] in that the next - * behavior is always returned from the message processing logic. - * - * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), - * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it - * created, access to [[Terminated DeathWatch]] and timed message scheduling. - */ -@DoNotInherit -@ApiMayChange -trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T] ⇒ - - /** - * The identity of this Actor, bound to the lifecycle of this Actor instance. - * An Actor with the same name that lives before or after this instance will - * have a different [[ActorRef]]. - */ - def self: ActorRef[T] - - /** - * Return the mailbox capacity that was configured by the parent for this actor. - */ - def mailboxCapacity: Int - - /** - * The [[ActorSystem]] to which this Actor belongs. - */ - def system: ActorSystem[Nothing] - - /** - * The list of child Actors created by this Actor during its lifetime that - * are still alive, in no particular order. - */ - def children: Iterable[ActorRef[Nothing]] - - /** - * The named child Actor if it is alive. - */ - def child(name: String): Option[ActorRef[Nothing]] - - /** - * Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name. - * It is good practice to name Actors wherever practical. - */ - def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] - - /** - * Create a child Actor from the given [[akka.typed.Behavior]] and with the given name. - */ - def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] - - /** - * Force the child Actor under the given name to terminate after it finishes - * processing its current message. Nothing happens if the ActorRef does not - * refer to a current child actor. - * - * @return whether the passed-in [[ActorRef]] points to a current child Actor - */ - def stop(child: ActorRef[_]): Boolean - - /** - * Register for [[Terminated]] notification once the Actor identified by the - * given [[ActorRef]] terminates. This notification is also generated when the - * [[ActorSystem]] to which the referenced Actor belongs is declared as - * failed (e.g. in reaction to being unreachable). - */ - def watch[U](other: ActorRef[U]): Unit - - /** - * Revoke the registration established by `watch`. A [[Terminated]] - * notification will not subsequently be received for the referenced Actor. - */ - def unwatch[U](other: ActorRef[U]): Unit - - /** - * Schedule the sending of a notification in case no other - * message is received during the given period of time. The timeout starts anew - * with each received message. Provide `Duration.Undefined` to switch off this - * mechanism. - */ - def setReceiveTimeout(d: FiniteDuration, msg: T): Unit - - /** - * Cancel the sending of receive timeout notifications. - */ - def cancelReceiveTimeout(): Unit - - /** - * Schedule the sending of the given message to the given target Actor after - * the given time period has elapsed. The scheduled action can be cancelled - * by invoking [[akka.actor.Cancellable#cancel]] on the returned - * handle. - */ - def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable - - /** - * This Actor’s execution context. It can be used to run asynchronous tasks - * like [[scala.concurrent.Future]] combinators. - */ - implicit def executionContext: ExecutionContextExecutor - - /** - * Create a child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - * - * The name of the child actor will be composed of a unique identifier - * starting with a dollar sign to which the given `name` argument is - * appended, with an inserted hyphen between these two parts. Therefore - * the given `name` argument does not need to be unique within the scope - * of the parent actor. - */ - def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] - - /** - * Create an anonymous child actor that will wrap messages such that other Actor’s - * protocols can be ingested by this Actor. You are strongly advised to cache - * these ActorRefs or to stop them when no longer needed. - */ - def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAdapter(f, "") - -} +import akka.typed.internal.BehaviorImpl @ApiMayChange object Actor { import Behavior._ + private val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () + private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] + final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { /** * Widen the wrapped Behavior by placing a funnel in front of it: the supplied @@ -157,62 +24,25 @@ object Actor { * at) and may transform the incoming message to place them into the wrapped * Behavior’s type hierarchy. Signals are not transformed. * - * see also [[Actor.Widened]] + * Example: + * {{{ + * Immutable[String] { (ctx, msg) => println(msg); Same }.widen[Number] { + * case b: BigDecimal => s"BigDecimal($b)" + * case i: BigInteger => s"BigInteger($i)" + * // drop all other kinds of Number + * } + * }}} */ - def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher) - } - - private val _nullFun = (_: Any) ⇒ null - private def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] - private implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal { - def as[U] = ctx.asInstanceOf[AC[U]] - } - - /** - * Widen the wrapped Behavior by placing a funnel in front of it: the supplied - * PartialFunction decides which message to pull in (those that it is defined - * at) and may transform the incoming message to place them into the wrapped - * Behavior’s type hierarchy. Signals are not transformed. - * - * Example: - * {{{ - * Immutable[String] { (ctx, msg) => println(msg); Same }.widen[Number] { - * case b: BigDecimal => s"BigDecimal($b)" - * case i: BigInteger => s"BigInteger($i)" - * // drop all other kinds of Number - * } - * }}} - */ - final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] { - private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] = - if (isUnhandled(behv)) Unhandled - else if (isAlive(behv)) { - val next = canonicalize(behv, behavior, ctx) - if (next eq behavior) Same else Widened(next, matcher) - } else Stopped - - override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] = - postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) - - override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] = - matcher.applyOrElse(msg, nullFun) match { - case null ⇒ Unhandled - case transformed ⇒ postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) - } - - override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" + def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = + BehaviorImpl.Widened(behavior, matcher) } /** * Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation * is deferred to the child actor instead of running within the parent. */ - final case class Deferred[T](factory: ActorContext[T] ⇒ Behavior[T]) extends DeferredBehavior[T] { - /** "undefer" the deferred behavior */ - override def apply(ctx: AC[T]): Behavior[T] = factory(ctx) - - override def toString: String = s"Deferred(${LineNumbers(factory)})" - } + def Deferred[T](factory: ActorContext[T] ⇒ Behavior[T]): Behavior[T] = + Behavior.DeferredBehavior(factory) /** * Factory for creating a [[MutableBehavior]] that typically holds mutable state as @@ -328,21 +158,14 @@ object Actor { * State is updated by returning a new behavior that holds the new immutable * state. */ - final class Immutable[T] private ( - onMessage: (ActorContext[T], T) ⇒ Behavior[T], - onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]]) - extends ExtensibleBehavior[T] { - override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = onSignal.applyOrElse((ctx, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]]) - override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx, msg) - override def toString = s"Immutable(${LineNumbers(onMessage)})" + def Immutable[T](onMessage: (ActorContext[T], T) ⇒ Behavior[T]): Immutable[T] = + new Immutable(onMessage) - def onSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Immutable[T] = - new Immutable(onMessage, onSignal) - } + final class Immutable[T](onMessage: (ActorContext[T], T) ⇒ Behavior[T]) + extends BehaviorImpl.ImmutableBehavior[T](onMessage) { - object Immutable { - def apply[T](onMessage: (ActorContext[T], T) ⇒ Behavior[T]) = - new Immutable(onMessage) + def onSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] = + new BehaviorImpl.ImmutableBehavior(onMessage, onSignal) } /** @@ -350,26 +173,11 @@ object Actor { * some action upon each received message or signal. It is most commonly used * for logging or tracing what a certain Actor does. */ - final case class Tap[T]( + def Tap[T]( onMessage: Function2[ActorContext[T], T, _], onSignal: Function2[ActorContext[T], Signal, _], - behavior: Behavior[T]) extends ExtensibleBehavior[T] { - - private def canonical(behv: Behavior[T]): Behavior[T] = - if (isUnhandled(behv)) Unhandled - else if ((behv eq SameBehavior) || (behv eq this)) Same - else if (isAlive(behv)) Tap(onMessage, onSignal, behv) - else Stopped - override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = { - onSignal(ctx, signal) - canonical(Behavior.interpretSignal(behavior, ctx, signal)) - } - override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = { - onMessage(ctx, msg) - canonical(Behavior.interpretMessage(behavior, ctx, msg)) - } - override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)" - } + behavior: Behavior[T]): Behavior[T] = + BehaviorImpl.Tap(onMessage, onSignal, behavior) /** * Behavior decorator that copies all received message to the designated @@ -377,9 +185,8 @@ object Actor { * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. */ - object Monitor { - def apply[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap((_, msg) ⇒ monitor ! msg, unitFunction, behavior) - } + def Monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + Tap((_, msg) ⇒ monitor ! msg, unitFunction, behavior) /** * Wrap the given behavior such that it is restarted (i.e. reset to its @@ -396,34 +203,14 @@ object Actor { * val dbRestarts = Restarter[DbException]().wrap(dbConnector) * }}} */ - object Restarter { - class Apply[Thr <: Throwable](c: ClassTag[Thr], strategy: SupervisorStrategy) { - def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c) - } + def Restarter[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy = SupervisorStrategy.restart): Restarter[Thr] = + new Restarter(implicitly, strategy) - def apply[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy = SupervisorStrategy.restart): Apply[Thr] = - new Apply(implicitly, strategy) + final class Restarter[Thr <: Throwable: ClassTag](c: ClassTag[Thr], strategy: SupervisorStrategy) { + def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c) } // TODO // final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () ⇒ Behavior[T]) - /** - * INTERNAL API. - */ - private[akka] val _unhandledFunction = (_: Any) ⇒ Unhandled[Nothing] - /** - * INTERNAL API. - */ - private[akka] def unhandledFunction[T, U] = _unhandledFunction.asInstanceOf[(T ⇒ Behavior[U])] - - /** - * INTERNAL API. - */ - private[akka] val _unitFunction = (_: ActorContext[Any], _: Any) ⇒ () - /** - * INTERNAL API. - */ - private[akka] def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) ⇒ Unit)] - } diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/ActorContext.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/ActorContext.scala new file mode 100644 index 0000000000..90dc824178 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/ActorContext.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.scaladsl + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration.FiniteDuration + +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.typed.ActorRef +import akka.typed.ActorSystem +import akka.typed.Behavior +import akka.typed.DeploymentConfig +import akka.typed.EmptyDeploymentConfig + +/** + * An Actor is given by the combination of a [[Behavior]] and a context in + * which this behavior is executed. As per the Actor Model an Actor can perform + * the following actions when processing a message: + * + * - send a finite number of messages to other Actors it knows + * - create a finite number of Actors + * - designate the behavior for the next message + * + * In Akka the first capability is accessed by using the `!` or `tell` method + * on an [[ActorRef]], the second is provided by [[ActorContext#spawn]] + * and the third is implicit in the signature of [[Behavior]] in that the next + * behavior is always returned from the message processing logic. + * + * An `ActorContext` in addition provides access to the Actor’s own identity (“`self`”), + * the [[ActorSystem]] it is part of, methods for querying the list of child Actors it + * created, access to [[Terminated DeathWatch]] and timed message scheduling. + */ +@DoNotInherit +@ApiMayChange +trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T] ⇒ + + /** + * The identity of this Actor, bound to the lifecycle of this Actor instance. + * An Actor with the same name that lives before or after this instance will + * have a different [[ActorRef]]. + */ + def self: ActorRef[T] + + /** + * Return the mailbox capacity that was configured by the parent for this actor. + */ + def mailboxCapacity: Int + + /** + * The [[ActorSystem]] to which this Actor belongs. + */ + def system: ActorSystem[Nothing] + + /** + * The list of child Actors created by this Actor during its lifetime that + * are still alive, in no particular order. + */ + def children: Iterable[ActorRef[Nothing]] + + /** + * The named child Actor if it is alive. + */ + def child(name: String): Option[ActorRef[Nothing]] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name. + * It is good practice to name Actors wherever practical. + */ + def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + + /** + * Create a child Actor from the given [[akka.typed.Behavior]] and with the given name. + */ + def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U] + + /** + * Force the child Actor under the given name to terminate after it finishes + * processing its current message. Nothing happens if the ActorRef does not + * refer to a current child actor. + * + * @return whether the passed-in [[ActorRef]] points to a current child Actor + */ + def stop[U](child: ActorRef[U]): Boolean + + /** + * Register for [[Terminated]] notification once the Actor identified by the + * given [[ActorRef]] terminates. This notification is also generated when the + * [[ActorSystem]] to which the referenced Actor belongs is declared as + * failed (e.g. in reaction to being unreachable). + */ + def watch[U](other: ActorRef[U]): Unit + + /** + * Revoke the registration established by `watch`. A [[Terminated]] + * notification will not subsequently be received for the referenced Actor. + */ + def unwatch[U](other: ActorRef[U]): Unit + + /** + * Schedule the sending of a notification in case no other + * message is received during the given period of time. The timeout starts anew + * with each received message. Provide `Duration.Undefined` to switch off this + * mechanism. + */ + def setReceiveTimeout(d: FiniteDuration, msg: T): Unit + + /** + * Cancel the sending of receive timeout notifications. + */ + def cancelReceiveTimeout(): Unit + + /** + * Schedule the sending of the given message to the given target Actor after + * the given time period has elapsed. The scheduled action can be cancelled + * by invoking [[akka.actor.Cancellable#cancel]] on the returned + * handle. + */ + def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable + + /** + * This Actor’s execution context. It can be used to run asynchronous tasks + * like [[scala.concurrent.Future]] combinators. + */ + implicit def executionContext: ExecutionContextExecutor + + /** + * Create a child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + * + * The name of the child actor will be composed of a unique identifier + * starting with a dollar sign to which the given `name` argument is + * appended, with an inserted hyphen between these two parts. Therefore + * the given `name` argument does not need to be unique within the scope + * of the parent actor. + */ + def spawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] + + /** + * Create an anonymous child actor that will wrap messages such that other Actor’s + * protocols can be ingested by this Actor. You are strongly advised to cache + * these ActorRefs or to stop them when no longer needed. + */ + def spawnAdapter[U](f: U ⇒ T): ActorRef[U] = spawnAdapter(f, "") + +}