implement javadsl in Scala, #22749

* and separate scaladsl api and impl, #22749
This commit is contained in:
Patrik Nordwall 2017-04-27 08:14:42 +02:00
parent aceca4cc24
commit 2eb41b910a
17 changed files with 692 additions and 822 deletions

View file

@ -92,7 +92,7 @@ class EffectfulActorContext[T](_name: String, _initialBehavior: Behavior[T], _ma
effectQueue.offer(Spawned(name))
super.spawn(behavior, name)
}
override def stop(child: ActorRef[_]): Boolean = {
override def stop[U](child: ActorRef[U]): Boolean = {
effectQueue.offer(Stopped(child.path.name))
super.stop(child)
}

View file

@ -47,7 +47,7 @@ class StubbedActorContext[T](
* Do not actually stop the child inbox, only simulate the liveness check.
* Removal is asynchronous, explicit removeInbox is needed from outside afterwards.
*/
override def stop(child: ActorRef[_]): Boolean = {
override def stop[U](child: ActorRef[U]): Boolean = {
_children.get(child.path.name) match {
case None false
case Some(inbox) inbox.ref == child

View file

@ -279,7 +279,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
if (system eq nativeSystem) suite + "Native"
else suite + "Adapted"
def setup(name: String, wrapper: Option[Actor.Restarter.Apply[_]] = None)(
def setup(name: String, wrapper: Option[Actor.Restarter[_]] = None)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
runTest(s"$mySuite-$name")(StepWise[Event] { (ctx, startWith)
val props = wrapper.map(_.wrap(behavior(ctx))).getOrElse(behavior(ctx))

View file

@ -601,8 +601,8 @@ class BehaviorSpec extends TypedSpec {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
ps((_, sig) inbox.ref ! Left(sig)),
pc((_, msg) inbox.ref ! Right(msg)),
ps((_, sig) inbox.ref ! Left(sig)),
super.behavior(monitor)._1), inbox)
}
}

View file

@ -1,392 +0,0 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import static akka.typed.scaladsl.Actor.Empty;
import static akka.typed.scaladsl.Actor.Ignore;
import static akka.typed.scaladsl.Actor.Same;
import static akka.typed.scaladsl.Actor.Stopped;
import static akka.typed.scaladsl.Actor.Unhandled;
import java.util.function.Function;
import akka.japi.function.Function2;
import akka.japi.function.Procedure2;
import akka.japi.pf.PFBuilder;
import akka.typed.*;
import akka.typed.internal.Restarter;
import akka.typed.scaladsl.Actor.Widened;
import scala.reflect.ClassTag;
public abstract class Actor {
/*
* This DSL is implemented in Java in order to ensure consistent usability from Java,
* taking possible Scala oddities out of the equation. There is some duplication in
* the behavior implementations, but that is unavoidable if both DSLs shall offer the
* same runtime performance (especially concerning allocations for function converters).
*/
private static class Deferred<T> extends Behavior.DeferredBehavior<T> {
final akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer;
public Deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
this.producer = producer;
}
@Override
public Behavior<T> apply(akka.typed.ActorContext<T> ctx) throws Exception {
return producer.apply(ctx);
}
}
private static class Immutable<T> extends ExtensibleBehavior<T> {
final Function2<ActorContext<T>, T, Behavior<T>> message;
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
public Immutable(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
this.signal = signal;
this.message = message;
}
@Override
public Behavior<T> receiveSignal(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return signal.apply(ctx, msg);
}
@Override
public Behavior<T> receiveMessage(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
return message.apply(ctx, msg);
}
}
private static class Tap<T> extends ExtensibleBehavior<T> {
final Procedure2<ActorContext<T>, Signal> signal;
final Procedure2<ActorContext<T>, T> message;
final Behavior<T> behavior;
public Tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
this.signal = signal;
this.message = message;
this.behavior = behavior;
}
private Behavior<T> canonicalize(Behavior<T> behv) {
if (Behavior.isUnhandled(behv))
return Unhandled();
else if (behv == Same() || behv == this)
return Same();
else if (Behavior.isAlive(behv))
return new Tap<T>(signal, message, behv);
else
return Stopped();
}
@Override
public Behavior<T> receiveSignal(akka.typed.ActorContext<T> ctx, Signal signal) throws Exception {
this.signal.apply(ctx, signal);
return canonicalize(Behavior.interpretSignal(behavior, ctx, signal));
}
@Override
public Behavior<T> receiveMessage(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
message.apply(ctx, msg);
return canonicalize(Behavior.interpretMessage(behavior, ctx, msg));
}
}
private static Function2<Object, Object, Object> _unhandledFun = (ctx, msg) -> Unhandled();
@SuppressWarnings("unchecked")
private static <T> Function2<ActorContext<T>, Signal, Behavior<T>> unhandledFun() {
return (Function2<ActorContext<T>, Signal, Behavior<T>>) (Object) _unhandledFun;
}
private static Procedure2<Object, Object> _doNothing = (ctx, msg) -> {
};
@SuppressWarnings("unchecked")
private static <T> Procedure2<ActorContext<T>, Signal> doNothing() {
return (Procedure2<ActorContext<T>, Signal>) (Object) _doNothing;
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called immutable because the behavior instance doesn't
* have or close over any mutable state. Processing the next message
* results in a new behavior that can potentially be different from this one.
* State is updated by returning a new behavior that holds the new immutable
* state. If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> immutable(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Immutable<T>(message, unhandledFun());
}
/**
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called immutable because the behavior instance doesn't
* have or close over any mutable state. Processing the next message
* results in a new behavior that can potentially be different from this one.
* State is updated by returning a new behavior that holds the new immutable
* state. If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @return the behavior
*/
static public <T> Behavior<T> immutable(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
return new Immutable<T>(message, signal);
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior. This is provided in order to avoid the
* allocation overhead of recreating the current behavior where that is not
* necessary.
*
* @return pseudo-behavior marking no change
*/
static public <T> Behavior<T> same() {
return Same();
}
/**
* Return this behavior from message processing in order to advise the system
* to reuse the previous behavior, including the hint that the message has not
* been handled. This hint may be used by composite behaviors that delegate
* (partial) handling to other behaviors.
*
* @return pseudo-behavior marking unhandled
*/
static public <T> Behavior<T> unhandled() {
return Unhandled();
}
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*
* @return the inert behavior
*/
static public <T> Behavior<T> stopped() {
return Stopped();
}
/**
* A behavior that treats every incoming message as unhandled.
*
* @return the empty behavior
*/
static public <T> Behavior<T> empty() {
return Empty();
}
/**
* A behavior that ignores every incoming message and returns same.
*
* @return the inert behavior
*/
static public <T> Behavior<T> ignore() {
return Ignore();
}
/**
* Behavior decorator that allows you to perform any side-effect before a
* signal or message is delivered to the wrapped behavior. The wrapped
* behavior can evolve (i.e. be immutable) without needing to be wrapped in a
* <code>tap(...)</code> call again.
*
* @param signal
* procedure to invoke with the {@link ActorContext} and the
* {@link akka.typed.Signal} as arguments before delivering the signal to
* the wrapped behavior
* @param message
* procedure to invoke with the {@link ActorContext} and the received
* message as arguments before delivering the signal to the wrapped
* behavior
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> tap(Procedure2<ActorContext<T>, Signal> signal, Procedure2<ActorContext<T>, T> message,
Behavior<T> behavior) {
return new Tap<T>(signal, message, behavior);
}
/**
* Behavior decorator that copies all received message to the designated
* monitor {@link akka.typed.ActorRef} before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a <code>monitor(...)</code> call again.
*
* @param monitor
* ActorRef to which to copy all received messages
* @param behavior
* initial behavior to be wrapped
* @return the decorated behavior
*/
static public <T> Behavior<T> monitor(ActorRef<T> monitor, Behavior<T> behavior) {
return new Tap<T>(doNothing(), (ctx, msg) -> monitor.tell(msg), behavior);
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of <code>Thr</code> will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify different supervisor strategies, such as restart,
* resume, backoff.
*
* @param clazz
* the type of exceptions that shall be caught
* @param strategy
* whether to restart, resume, or backoff the actor upon a caught failure
* @param initialBehavior
* the initial behavior, that is also restored during a restart
* @return the wrapped behavior
*/
static public <T, Thr extends Throwable> Behavior<T> restarter(Class<Thr> clazz, SupervisorStrategy strategy,
Behavior<T> initialBehavior) {
final ClassTag<Thr> catcher = akka.japi.Util.classTag(clazz);
return Restarter.apply(Behavior.validateAsInitial(initialBehavior), strategy, catcher);
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* <code><pre>
* Behavior&lt;String> s = immutable((ctx, msg) -> {
* System.out.println(msg);
* return same();
* });
* Behavior&lt;Number> n = widened(s, pf -> pf.
* match(BigInteger.class, i -> "BigInteger(" + i + ")").
* match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
* // drop all other kinds of Number
* );
* </pre></code>
*
* @param behavior
* the behavior that will receive the selected messages
* @param selector
* a partial function builder for describing the selection and
* transformation
* @return a behavior of the widened type
*/
static public <T, U> Behavior<U> widened(Behavior<T> behavior, Function<PFBuilder<U, T>, PFBuilder<U, T>> selector) {
return new Widened<T, U>(behavior, selector.apply(new PFBuilder<>()).build());
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior
* creation is deferred to the child actor instead of running within the
* parent.
*
* @param producer
* behavior factory that takes the child actors context as argument
* @return the deferred behavior
*/
static public <T> Behavior<T> deferred(akka.japi.function.Function<ActorContext<T>, Behavior<T>> producer) {
return new Deferred<T>(producer);
}
/**
* Factory for creating a <code>MutableBehavior</code> that typically holds mutable state as
* instance variables in the concrete <code>MutableBehavior</code> implementation class.
*
* Creation of the behavior instance is deferred, i.e. it is created via the <code>producer</code>
* function. The reason for the deferred creation is to avoid sharing the same instance in
* multiple actors, and to create a new instance when the actor is restarted.
*
* @param producer
* behavior factory that takes the child actors context as argument
* @return the deferred behavior
*/
static public <T> Behavior<T> mutable(akka.japi.function.Function<ActorContext<T>, MutableBehavior<T>> producer) {
return deferred(ctx -> producer.apply(ctx));
}
/**
* Mutable behavior can be implemented by extending this class and implement the
* abstract method {@link MutableBehavior#onMessage} and optionally override
* {@link MutableBehavior#onSignal}.
*
* Instances of this behavior should be created via {@link Actor#mutable} and if
* the {@link ActorContext} is needed it can be passed as a constructor parameter
* from the factory function.
*
* @see Actor#mutable
*/
static public abstract class MutableBehavior<T> extends ExtensibleBehavior<T> {
@Override
final public Behavior<T> receiveMessage(akka.typed.ActorContext<T> ctx, T msg) throws Exception {
return onMessage(msg);
}
/**
* Implement this method to process an incoming message and return the next behavior.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
* <ul>
* <li>returning `stopped` will terminate this Behavior</li>
* <li>returning `this` or `same` designates to reuse the current Behavior</li>
* <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
* </ul>
*
*/
public abstract Behavior<T> onMessage(T msg) throws Exception;
@Override
final public Behavior<T> receiveSignal(akka.typed.ActorContext<T> ctx, Signal msg) throws Exception {
return onSignal(msg);
}
/**
* Override this method to process an incoming {@link akka.typed.Signal} and return the next behavior.
* This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages
* can initiate a behavior change.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
* <ul>
* <li>returning `stopped` will terminate this Behavior</li>
* <li>returning `this` or `same` designates to reuse the current Behavior</li>
* <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
* </ul>
*
* By default, this method returns `unhandled`.
*/
public Behavior<T> onSignal(Signal msg) throws Exception {
return unhandled();
}
}
}

View file

@ -1,159 +0,0 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.javadsl;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import akka.actor.Cancellable;
import akka.typed.ActorRef;
import akka.typed.ActorSystem;
import akka.typed.Behavior;
import akka.typed.DeploymentConfig;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;
/**
* An Actor is given by the combination of a {@link akka.typed.Behavior} and a context in which
* this behavior is executed. As per the Actor Model an Actor can perform the
* following actions when processing a message:
*
* <ul>
* <li>send a finite number of messages to other Actors it knows</li>
* <li>create a finite number of Actors</li>
* <li>designate the behavior for the next message</li>
* </ul>
*
* In Akka the first capability is accessed by using the {@link akka.typed.ActorRef#tell}
* method, the second is provided by {@link akka.typed.ActorContext#spawn} and the
* third is implicit in the signature of {@link akka.typed.Behavior} in that the next behavior
* is always returned from the message processing logic.
*
* An <code>ActorContext</code> in addition provides access to the Actors own identity
* ({@link #getSelf self}), the {@link akka.typed.ActorSystem} it is part of, methods for querying the list
* of child Actors it created, access to {@link akka.typed.Terminated DeathWatch} and timed
* message scheduling.
*/
public interface ActorContext<T> {
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different {@link akka.typed.ActorRef}.
*/
public ActorRef<T> getSelf();
/**
* Return the mailbox capacity that was configured by the parent for this
* actor.
*/
public int getMailboxCapacity();
/**
* The {@link akka.typed.ActorSystem} to which this Actor belongs.
*/
public ActorSystem<Void> getSystem();
/**
* The list of child Actors created by this Actor during its lifetime that are
* still alive, in no particular order.
*/
public List<ActorRef<Void>> getChildren();
/**
* The named child Actor if it is alive.
*/
public Optional<ActorRef<Void>> getChild(String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
public <U> ActorRef<U> spawnAnonymous(Behavior<U> behavior, DeploymentConfig deployment);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name);
/**
* Create a child Actor from the given {@link akka.typed.Behavior} and with the given name.
*/
public <U> ActorRef<U> spawn(Behavior<U> behavior, String name, DeploymentConfig deployment);
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in {@link akka.typed.ActorRef} points to a current child Actor
*/
public boolean stop(ActorRef<?> child);
/**
* Register for {@link akka.typed.Terminated} notification once the Actor identified by the
* given {@link akka.typed.ActorRef} terminates. This notification is also generated when the
* {@link akka.typed.ActorSystem} to which the referenced Actor belongs is declared as failed
* (e.g. in reaction to being unreachable).
*/
public <U> void watch(ActorRef<U> other);
/**
* Revoke the registration established by {@link #watch}. A {@link akka.typed.Terminated}
* notification will not subsequently be received for the referenced Actor.
*/
public <U> void unwatch(ActorRef<U> other);
/**
* Schedule the sending of a notification in case no other message is received
* during the given period of time. The timeout starts anew with each received
* message. Provide <code>scala.concurrent.duration.Duration.Undefined</code> to switch off this mechanism.
*/
public void setReceiveTimeout(FiniteDuration d, T msg);
/**
* Cancel the sending of receive timeout notifications.
*/
public void cancelReceiveTimeout();
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled by
* invoking {@link akka.actor.Cancellable#cancel} on the returned handle.
*/
public <U> Cancellable schedule(FiniteDuration delay, ActorRef<U> target, U msg);
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like <code>scala.concurrent.Future</code> combinators.
*/
public ExecutionContextExecutor getExecutionContext();
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given <code>name</code> argument is appended,
* with an inserted hyphen between these two parts. Therefore the given <code>name</code>
* argument does not need to be unique within the scope of the parent actor.
*/
public <U> ActorRef<U> createAdapter(Function<U, T> f, String name);
/**
* Create an anonymous child actor that will wrap messages such that other
* Actors protocols can be ingested by this Actor. You are strongly advised
* to cache these ActorRefs or to stop them when no longer needed.
*/
public <U> ActorRef<U> createAdapter(Function<U, T> f);
}

View file

@ -26,7 +26,7 @@ trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext
case None Optional.empty()
}
override def getChildren(): java.util.List[akka.typed.ActorRef[Void]] = {
override def getChildren: java.util.List[akka.typed.ActorRef[Void]] = {
val c = children
val a = new ArrayList[ActorRef[Void]](c.size)
val i = c.iterator
@ -34,16 +34,16 @@ trait ActorContext[T] extends javadsl.ActorContext[T] with scaladsl.ActorContext
a
}
override def getExecutionContext(): ExecutionContextExecutor =
override def getExecutionContext: ExecutionContextExecutor =
executionContext
override def getMailboxCapacity(): Int =
override def getMailboxCapacity: Int =
mailboxCapacity
override def getSelf(): akka.typed.ActorRef[T] =
override def getSelf: akka.typed.ActorRef[T] =
self
override def getSystem(): akka.typed.ActorSystem[Void] =
override def getSystem: akka.typed.ActorSystem[Void] =
system.asInstanceOf[ActorSystem[Void]]
override def spawn[U](behavior: akka.typed.Behavior[U], name: String): akka.typed.ActorRef[U] =

View file

@ -69,7 +69,7 @@ abstract class ActorRef[-T](_path: a.ActorPath) extends java.lang.Comparable[Act
object ActorRef {
implicit final class ActorRefScalaTell[-T](val ref: ActorRef[T]) extends AnyVal {
implicit final class ActorRefOps[-T](val ref: ActorRef[T]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.

View file

@ -3,6 +3,7 @@
*/
package akka.typed
import akka.util.LineNumbers
import akka.annotation.{ DoNotInherit, InternalApi }
import scala.annotation.tailrec
@ -145,12 +146,16 @@ object Behavior {
/**
* INTERNAL API.
* Not placed in internal.BehaviorImpl because Behavior is sealed.
*/
@InternalApi
private[akka] abstract class DeferredBehavior[T] extends Behavior[T] {
private[akka] final case class DeferredBehavior[T](factory: ActorContext[T] Behavior[T]) extends Behavior[T] {
/** "undefer" the deferred behavior */
@throws(classOf[Exception])
def apply(ctx: ActorContext[T]): Behavior[T]
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
/**

View file

@ -122,7 +122,7 @@ private[typed] class ActorCell[T](
spawn(behavior, name, deployment)
}
override def stop(child: ActorRef[_]): Boolean = {
override def stop[U](child: ActorRef[U]): Boolean = {
val name = child.path.name
childrenMap.get(name) match {
case None false

View file

@ -0,0 +1,77 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed
package internal
import akka.util.LineNumbers
import akka.annotation.InternalApi
import akka.typed.{ ActorContext AC }
import akka.typed.scaladsl.Actor
/**
* INTERNAL API
*/
@InternalApi private[akka] object BehaviorImpl {
import Behavior._
private val _nullFun = (_: Any) null
private def nullFun[T] = _nullFun.asInstanceOf[Any T]
implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
def as[U] = ctx.asInstanceOf[AC[U]]
}
final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] {
private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] =
if (isUnhandled(behv)) unhandled
else if (isAlive(behv)) {
val next = canonicalize(behv, behavior, ctx)
if (next eq behavior) same else Widened(next, matcher)
} else stopped
override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] =
postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] =
matcher.applyOrElse(msg, nullFun) match {
case null unhandled
case transformed postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
}
class ImmutableBehavior[T](
val onMessage: (ActorContext[T], T) Behavior[T],
onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]])
extends ExtensibleBehavior[T] {
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
onSignal.applyOrElse((ctx, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]])
override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx, msg)
override def toString = s"Immutable(${LineNumbers(onMessage)})"
}
final case class Tap[T](
onMessage: Function2[ActorContext[T], T, _],
onSignal: Function2[ActorContext[T], Signal, _],
behavior: Behavior[T]) extends ExtensibleBehavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) unhandled
else if ((behv eq SameBehavior) || (behv eq this)) same
else if (isAlive(behv)) Tap(onMessage, onSignal, behv)
else stopped
override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = {
onSignal(ctx, signal)
canonical(Behavior.interpretSignal(behavior, ctx, signal))
}
override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = {
onMessage(ctx, msg)
canonical(Behavior.interpretMessage(behavior, ctx, msg))
}
override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)"
}
}

View file

@ -44,25 +44,26 @@ private[typed] class EventStreamImpl(private val debug: Boolean)(implicit privat
import scaladsl.Actor
Actor.Deferred[Command] { _
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $this"))
Actor.Immutable[Command] {
case (ctx, Register(actor))
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
ctx.watch(actor)
Actor.Same
Actor.Immutable[Command] { (ctx, msg)
msg match {
case Register(actor)
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
ctx.watch(actor)
Actor.Same
case (ctx, UnregisterIfNoMoreSubscribedChannels(actor)) if hasSubscriptions(actor) Actor.Same
// hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream
case UnregisterIfNoMoreSubscribedChannels(actor) if hasSubscriptions(actor) Actor.Same
// hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream
case (ctx, UnregisterIfNoMoreSubscribedChannels(actor))
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
ctx.unwatch(actor)
Actor.Same
case UnregisterIfNoMoreSubscribedChannels(actor)
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
ctx.unwatch(actor)
Actor.Same
}
} onSignal {
case (_, Terminated(actor))
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $this, because it was terminated"))
unsubscribe(actor)
Actor.Same
case (_, _) Actor.Unhandled
}
}
}

View file

@ -27,7 +27,7 @@ import akka.annotation.InternalApi
ActorContextAdapter.spawnAnonymous(untyped, behavior, deployment)
override def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig) =
ActorContextAdapter.spawn(untyped, behavior, name, deployment)
override def stop(child: ActorRef[_]) =
override def stop[U](child: ActorRef[U]) =
toUntyped(child) match {
case f: akka.actor.FunctionRef
val cell = untyped.asInstanceOf[akka.actor.ActorCell]

View file

@ -0,0 +1,243 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.javadsl
import java.util.function.{ Function JFunction }
import akka.japi.function.{ Function2 JapiFunction2 }
import akka.japi.function.Procedure2
import akka.typed.Behavior
import akka.typed.ExtensibleBehavior
import akka.typed.Signal
import akka.typed.internal.BehaviorImpl
import akka.typed.ActorRef
import akka.typed.SupervisorStrategy
import scala.reflect.ClassTag
import akka.typed.internal.Restarter
import akka.japi.pf.PFBuilder
object Actor {
private val _unitFunction = (_: ActorContext[Any], _: Any) ()
private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
def deferred[T](factory: akka.japi.function.Function[ActorContext[T], Behavior[T]]): Behavior[T] =
Behavior.DeferredBehavior(ctx factory.apply(ctx))
/**
* Factory for creating a [[MutableBehavior]] that typically holds mutable state as
* instance variables in the concrete [[MutableBehavior]] implementation class.
*
* Creation of the behavior instance is deferred, i.e. it is created via the `factory`
* function. The reason for the deferred creation is to avoid sharing the same instance in
* multiple actors, and to create a new instance when the actor is restarted.
*
* @param producer
* behavior factory that takes the child actors context as argument
* @return the deferred behavior
*/
def mutable[T](factory: akka.japi.function.Function[ActorContext[T], MutableBehavior[T]]): Behavior[T] =
deferred(factory)
/**
* Mutable behavior can be implemented by extending this class and implement the
* abstract method [[MutableBehavior#onMessage]] and optionally override
* [[MutableBehavior#onSignal]].
*
* Instances of this behavior should be created via [[Actor#mutable]] and if
* the [[ActorContext]] is needed it can be passed as a constructor parameter
* from the factory function.
*
* @see [[Actor#mutable]]
*/
abstract class MutableBehavior[T] extends ExtensibleBehavior[T] {
@throws(classOf[Exception])
override final def receiveMessage(ctx: akka.typed.ActorContext[T], msg: T): Behavior[T] =
onMessage(msg)
/**
* Implement this method to process an incoming message and return the next behavior.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
* <ul>
* <li>returning `stopped` will terminate this Behavior</li>
* <li>returning `this` or `same` designates to reuse the current Behavior</li>
* <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
* </ul>
*
*/
@throws(classOf[Exception])
def onMessage(msg: T): Behavior[T]
@throws(classOf[Exception])
override final def receiveSignal(ctx: akka.typed.ActorContext[T], msg: Signal): Behavior[T] =
onSignal(msg)
/**
* Override this method to process an incoming [[akka.typed.Signal]] and return the next behavior.
* This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages
* can initiate a behavior change.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
*
* * returning `stopped` will terminate this Behavior
* * returning `this` or `Same` designates to reuse the current Behavior
* * returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
*
* By default, this method returns `unhandled`.
*/
@throws(classOf[Exception])
def onSignal(msg: Signal): Behavior[T] =
unhandled
}
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior. This is provided in order to
* avoid the allocation overhead of recreating the current behavior where
* that is not necessary.
*/
def same[T]: Behavior[T] = Behavior.same
/**
* Return this behavior from message processing in order to advise the
* system to reuse the previous behavior, including the hint that the
* message has not been handled. This hint may be used by composite
* behaviors that delegate (partial) handling to other behaviors.
*/
def unhandled[T]: Behavior[T] = Behavior.unhandled
/**
* Return this behavior from message processing to signal that this actor
* shall terminate voluntarily. If this actor has created child actors then
* these will be stopped as part of the shutdown procedure. The PostStop
* signal that results from stopping this actor will NOT be passed to the
* current behavior, it will be effectively ignored.
*/
def stopped[T]: Behavior[T] = Behavior.stopped
/**
* A behavior that treats every incoming message as unhandled.
*/
def empty[T]: Behavior[T] = Behavior.empty
/**
* A behavior that ignores every incoming message and returns same.
*/
def ignore[T]: Behavior[T] = Behavior.ignore
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called immutable because the behavior instance doesn't
* have or close over any mutable state. Processing the next message
* results in a new behavior that can potentially be different from this one.
* State is updated by returning a new behavior that holds the new immutable
* state.
*/
def immutable[T](onMessage: JapiFunction2[ActorContext[T], T, Behavior[T]]): Behavior[T] =
new BehaviorImpl.ImmutableBehavior((ctx, msg) onMessage.apply(ctx, msg))
/**
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called immutable because the behavior instance doesn't
* have or close over any mutable state. Processing the next message
* results in a new behavior that can potentially be different from this one.
* State is updated by returning a new behavior that holds the new immutable
* state.
*/
def immutable[T](
onMessage: JapiFunction2[ActorContext[T], T, Behavior[T]],
onSignal: JapiFunction2[ActorContext[T], Signal, Behavior[T]]): Behavior[T] = {
new BehaviorImpl.ImmutableBehavior(
(ctx, msg) onMessage.apply(ctx, msg),
{ case (ctx, sig) onSignal.apply(ctx, sig) })
}
/**
* This type of Behavior wraps another Behavior while allowing you to perform
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
def tap[T](
onMessage: Procedure2[ActorContext[T], T],
onSignal: Procedure2[ActorContext[T], Signal],
behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.Tap(
(ctx, msg) onMessage.apply(ctx, msg),
(ctx, sig) onSignal.apply(ctx, sig),
behavior)
}
/**
* Behavior decorator that copies all received message to the designated
* monitor [[akka.typed.ActorRef]] before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.Tap(
(ctx, msg) monitor ! msg,
unitFunction,
behavior)
}
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
* initial state) whenever it throws an exception of the given class or a
* subclass thereof. Exceptions that are not subtypes of `Thr` will not be
* caught and thus lead to the termination of the actor.
*
* It is possible to specify different supervisor strategies, such as restart,
* resume, backoff.
*/
def restarter[T, Thr <: Throwable](
clazz: Class[Thr],
strategy: SupervisorStrategy,
initialBehavior: Behavior[T]): Behavior[T] = {
Restarter(Behavior.validateAsInitial(initialBehavior), strategy)(ClassTag(clazz))
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* Behavior&lt;String> s = immutable((ctx, msg) -> {
* System.out.println(msg);
* return same();
* });
* Behavior&lt;Number> n = widened(s, pf -> pf.
* match(BigInteger.class, i -> "BigInteger(" + i + ")").
* match(BigDecimal.class, d -> "BigDecimal(" + d + ")")
* // drop all other kinds of Number
* );
* }}}
*
* @param behavior
* the behavior that will receive the selected messages
* @param selector
* a partial function builder for describing the selection and
* transformation
* @return a behavior of the widened type
*/
def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] =
BehaviorImpl.Widened(behavior, selector.apply(new PFBuilder).build())
}

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.javadsl
import java.util.function.{ Function JFunction }
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.typed.ActorRef
import akka.typed.ActorSystem
import java.util.Optional
import akka.typed.Behavior
import akka.typed.DeploymentConfig
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`getSelf`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] {
// this must be a pure interface, i.e. only abstract methods
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def getSelf: ActorRef[T]
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def getMailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def getSystem: ActorSystem[Void]
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def getChildren: java.util.List[ActorRef[Void]]
/**
* The named child Actor if it is alive.
*/
def getChild(name: String): Optional[ActorRef[Void]]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U]): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop[U](child: ActorRef[U]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): Unit
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): Unit
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable#cancel]] on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
def getExecutionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def createAdapter[U](f: JFunction[U, T], name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def createAdapter[U](f: JFunction[U, T]): ActorRef[U]
}

View file

@ -4,152 +4,19 @@
package akka.typed
package scaladsl
import akka.util.LineNumbers
import scala.reflect.ClassTag
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import scala.deprecatedInheritance
import akka.typed.{ ActorContext AC }
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T]
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop(child: ActorRef[_]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): Unit
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): Unit
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable#cancel]] on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
}
import akka.typed.internal.BehaviorImpl
@ApiMayChange
object Actor {
import Behavior._
private val _unitFunction = (_: ActorContext[Any], _: Any) ()
private def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
@ -157,62 +24,25 @@ object Actor {
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* see also [[Actor.Widened]]
* Example:
* {{{
* Immutable[String] { (ctx, msg) => println(msg); Same }.widen[Number] {
* case b: BigDecimal => s"BigDecimal(&dollar;b)"
* case i: BigInteger => s"BigInteger(&dollar;i)"
* // drop all other kinds of Number
* }
* }}}
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = Widened(behavior, matcher)
}
private val _nullFun = (_: Any) null
private def nullFun[T] = _nullFun.asInstanceOf[Any T]
private implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
def as[U] = ctx.asInstanceOf[AC[U]]
}
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
* PartialFunction decides which message to pull in (those that it is defined
* at) and may transform the incoming message to place them into the wrapped
* Behaviors type hierarchy. Signals are not transformed.
*
* Example:
* {{{
* Immutable[String] { (ctx, msg) => println(msg); Same }.widen[Number] {
* case b: BigDecimal => s"BigDecimal(&dollar;b)"
* case i: BigInteger => s"BigInteger(&dollar;i)"
* // drop all other kinds of Number
* }
* }}}
*/
final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] {
private def postProcess(behv: Behavior[T], ctx: AC[T]): Behavior[U] =
if (isUnhandled(behv)) Unhandled
else if (isAlive(behv)) {
val next = canonicalize(behv, behavior, ctx)
if (next eq behavior) Same else Widened(next, matcher)
} else Stopped
override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] =
postProcess(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] =
matcher.applyOrElse(msg, nullFun) match {
case null Unhandled
case transformed postProcess(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
}
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
BehaviorImpl.Widened(behavior, matcher)
}
/**
* Wrap a behavior factory so that it runs upon PreStart, i.e. behavior creation
* is deferred to the child actor instead of running within the parent.
*/
final case class Deferred[T](factory: ActorContext[T] Behavior[T]) extends DeferredBehavior[T] {
/** "undefer" the deferred behavior */
override def apply(ctx: AC[T]): Behavior[T] = factory(ctx)
override def toString: String = s"Deferred(${LineNumbers(factory)})"
}
def Deferred[T](factory: ActorContext[T] Behavior[T]): Behavior[T] =
Behavior.DeferredBehavior(factory)
/**
* Factory for creating a [[MutableBehavior]] that typically holds mutable state as
@ -328,21 +158,14 @@ object Actor {
* State is updated by returning a new behavior that holds the new immutable
* state.
*/
final class Immutable[T] private (
onMessage: (ActorContext[T], T) Behavior[T],
onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]])
extends ExtensibleBehavior[T] {
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = onSignal.applyOrElse((ctx, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(ActorContext[T], Signal), Behavior[T]]])
override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx, msg)
override def toString = s"Immutable(${LineNumbers(onMessage)})"
def Immutable[T](onMessage: (ActorContext[T], T) Behavior[T]): Immutable[T] =
new Immutable(onMessage)
def onSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Immutable[T] =
new Immutable(onMessage, onSignal)
}
final class Immutable[T](onMessage: (ActorContext[T], T) Behavior[T])
extends BehaviorImpl.ImmutableBehavior[T](onMessage) {
object Immutable {
def apply[T](onMessage: (ActorContext[T], T) Behavior[T]) =
new Immutable(onMessage)
def onSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] =
new BehaviorImpl.ImmutableBehavior(onMessage, onSignal)
}
/**
@ -350,26 +173,11 @@ object Actor {
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
final case class Tap[T](
def Tap[T](
onMessage: Function2[ActorContext[T], T, _],
onSignal: Function2[ActorContext[T], Signal, _],
behavior: Behavior[T]) extends ExtensibleBehavior[T] {
private def canonical(behv: Behavior[T]): Behavior[T] =
if (isUnhandled(behv)) Unhandled
else if ((behv eq SameBehavior) || (behv eq this)) Same
else if (isAlive(behv)) Tap(onMessage, onSignal, behv)
else Stopped
override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = {
onSignal(ctx, signal)
canonical(Behavior.interpretSignal(behavior, ctx, signal))
}
override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = {
onMessage(ctx, msg)
canonical(Behavior.interpretMessage(behavior, ctx, msg))
}
override def toString = s"Tap(${LineNumbers(onSignal)},${LineNumbers(onMessage)},$behavior)"
}
behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.Tap(onMessage, onSignal, behavior)
/**
* Behavior decorator that copies all received message to the designated
@ -377,9 +185,8 @@ object Actor {
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*/
object Monitor {
def apply[T](monitor: ActorRef[T], behavior: Behavior[T]): Tap[T] = Tap((_, msg) monitor ! msg, unitFunction, behavior)
}
def Monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
Tap((_, msg) monitor ! msg, unitFunction, behavior)
/**
* Wrap the given behavior such that it is restarted (i.e. reset to its
@ -396,34 +203,14 @@ object Actor {
* val dbRestarts = Restarter[DbException]().wrap(dbConnector)
* }}}
*/
object Restarter {
class Apply[Thr <: Throwable](c: ClassTag[Thr], strategy: SupervisorStrategy) {
def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c)
}
def Restarter[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy = SupervisorStrategy.restart): Restarter[Thr] =
new Restarter(implicitly, strategy)
def apply[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy = SupervisorStrategy.restart): Apply[Thr] =
new Apply(implicitly, strategy)
final class Restarter[Thr <: Throwable: ClassTag](c: ClassTag[Thr], strategy: SupervisorStrategy) {
def wrap[T](b: Behavior[T]): Behavior[T] = akka.typed.internal.Restarter(Behavior.validateAsInitial(b), strategy)(c)
}
// TODO
// final case class Selective[T](timeout: FiniteDuration, selector: PartialFunction[T, Behavior[T]], onTimeout: () Behavior[T])
/**
* INTERNAL API.
*/
private[akka] val _unhandledFunction = (_: Any) Unhandled[Nothing]
/**
* INTERNAL API.
*/
private[akka] def unhandledFunction[T, U] = _unhandledFunction.asInstanceOf[(T Behavior[U])]
/**
* INTERNAL API.
*/
private[akka] val _unitFunction = (_: ActorContext[Any], _: Any) ()
/**
* INTERNAL API.
*/
private[akka] def unitFunction[T] = _unitFunction.asInstanceOf[((ActorContext[T], Signal) Unit)]
}

View file

@ -0,0 +1,148 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.typed.ActorRef
import akka.typed.ActorSystem
import akka.typed.Behavior
import akka.typed.DeploymentConfig
import akka.typed.EmptyDeploymentConfig
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
* which this behavior is executed. As per the Actor Model an Actor can perform
* the following actions when processing a message:
*
* - send a finite number of messages to other Actors it knows
* - create a finite number of Actors
* - designate the behavior for the next message
*
* In Akka the first capability is accessed by using the `!` or `tell` method
* on an [[ActorRef]], the second is provided by [[ActorContext#spawn]]
* and the third is implicit in the signature of [[Behavior]] in that the next
* behavior is always returned from the message processing logic.
*
* An `ActorContext` in addition provides access to the Actors own identity (`self`),
* the [[ActorSystem]] it is part of, methods for querying the list of child Actors it
* created, access to [[Terminated DeathWatch]] and timed message scheduling.
*/
@DoNotInherit
@ApiMayChange
trait ActorContext[T] { this: akka.typed.javadsl.ActorContext[T]
/**
* The identity of this Actor, bound to the lifecycle of this Actor instance.
* An Actor with the same name that lives before or after this instance will
* have a different [[ActorRef]].
*/
def self: ActorRef[T]
/**
* Return the mailbox capacity that was configured by the parent for this actor.
*/
def mailboxCapacity: Int
/**
* The [[ActorSystem]] to which this Actor belongs.
*/
def system: ActorSystem[Nothing]
/**
* The list of child Actors created by this Actor during its lifetime that
* are still alive, in no particular order.
*/
def children: Iterable[ActorRef[Nothing]]
/**
* The named child Actor if it is alive.
*/
def child(name: String): Option[ActorRef[Nothing]]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*/
def spawnAnonymous[U](behavior: Behavior[U], deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Create a child Actor from the given [[akka.typed.Behavior]] and with the given name.
*/
def spawn[U](behavior: Behavior[U], name: String, deployment: DeploymentConfig = EmptyDeploymentConfig): ActorRef[U]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef does not
* refer to a current child actor.
*
* @return whether the passed-in [[ActorRef]] points to a current child Actor
*/
def stop[U](child: ActorRef[U]): Boolean
/**
* Register for [[Terminated]] notification once the Actor identified by the
* given [[ActorRef]] terminates. This notification is also generated when the
* [[ActorSystem]] to which the referenced Actor belongs is declared as
* failed (e.g. in reaction to being unreachable).
*/
def watch[U](other: ActorRef[U]): Unit
/**
* Revoke the registration established by `watch`. A [[Terminated]]
* notification will not subsequently be received for the referenced Actor.
*/
def unwatch[U](other: ActorRef[U]): Unit
/**
* Schedule the sending of a notification in case no other
* message is received during the given period of time. The timeout starts anew
* with each received message. Provide `Duration.Undefined` to switch off this
* mechanism.
*/
def setReceiveTimeout(d: FiniteDuration, msg: T): Unit
/**
* Cancel the sending of receive timeout notifications.
*/
def cancelReceiveTimeout(): Unit
/**
* Schedule the sending of the given message to the given target Actor after
* the given time period has elapsed. The scheduled action can be cancelled
* by invoking [[akka.actor.Cancellable#cancel]] on the returned
* handle.
*/
def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): akka.actor.Cancellable
/**
* This Actors execution context. It can be used to run asynchronous tasks
* like [[scala.concurrent.Future]] combinators.
*/
implicit def executionContext: ExecutionContextExecutor
/**
* Create a child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*
* The name of the child actor will be composed of a unique identifier
* starting with a dollar sign to which the given `name` argument is
* appended, with an inserted hyphen between these two parts. Therefore
* the given `name` argument does not need to be unique within the scope
* of the parent actor.
*/
def spawnAdapter[U](f: U T, name: String): ActorRef[U]
/**
* Create an anonymous child actor that will wrap messages such that other Actors
* protocols can be ingested by this Actor. You are strongly advised to cache
* these ActorRefs or to stop them when no longer needed.
*/
def spawnAdapter[U](f: U T): ActorRef[U] = spawnAdapter(f, "")
}