Let typed Stateful optionally handle signals also, #22293

* and thereby no need for MessageOrSignal
This commit is contained in:
Patrik Nordwall 2017-03-16 14:44:03 +01:00
parent b2b4f64d97
commit f485be2bf5
8 changed files with 122 additions and 128 deletions

View file

@ -114,7 +114,8 @@ class IntroSpec extends TypedSpec {
//#chatroom-main
val main: Behavior[akka.NotUsed] =
SignalOrMessage(
Stateful(
behavior = (_, _) Unhandled,
signal = { (ctx, sig)
sig match {
case PreStart
@ -128,8 +129,7 @@ class IntroSpec extends TypedSpec {
case _
Unhandled
}
},
mesg = (_, _) Unhandled)
})
val system = ActorSystem("ChatRoomDemo", main)
Await.result(system.whenTerminated, 1.second)

View file

@ -248,7 +248,7 @@ Actor will perform its job on its own accord, we do not need to send messages
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
only external messages, they also are notified of certain system events,
so-called Signals. In order to get access to those we choose to implement this
particular one using the :class:`SignalOrMessage` behavior decorator. The
particular one using the :class:`Stateful` behavior decorator. The
provided ``signal`` function will be invoked for signals (subclasses of :class:`Signal`)
or the ``mesg`` function for user messages.

View file

@ -52,11 +52,11 @@ public abstract class Actor {
}
private static class Stateful<T> extends Behavior<T> {
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
final Function2<ActorContext<T>, T, Behavior<T>> message;
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
public Stateful(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
public Stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
this.signal = signal;
this.message = message;
}
@ -157,29 +157,6 @@ public abstract class Actor {
return (Procedure2<ActorContext<T>, Signal>) (Object) _doNothing;
}
/**
* Construct an actor behavior that can react both to lifecycle signals and
* incoming messages. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* In either casesignal or messagethe next behavior must be returned. If no
* change is desired, use {@link #same}.
*
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> signalOrMessage(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(signal, message);
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
@ -197,7 +174,31 @@ public abstract class Actor {
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(unhandledFun(), message);
return new Stateful<T>(message, unhandledFun());
}
/**
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
* If no change is desired, use {@link #same}.
*
* @param message
* the function that describes how this actor reacts to the next
* message
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
return new Stateful<T>(message, signal);
}
/**

View file

@ -3,6 +3,8 @@
*/
package akka.typed
import akka.annotation.InternalApi
/**
* The behavior of an actor defines how it reacts to the messages that it
* receives. The message may either be of the type that the Actor declares
@ -93,6 +95,12 @@ object Behavior {
override def toString = "Unhandled"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] val unhandledSignal: (ActorContext[Nothing], Signal) Behavior[Nothing] =
(_, _) unhandledBehavior
/**
* INTERNAL API.
*/

View file

@ -263,25 +263,7 @@ object Actor {
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]
/**
* Construct an actor behavior that can react both to lifecycle signals and
* incoming messages. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* In either casesignal or messagethe next behavior must be returned. If no
* change is desired, use `Actor.same()`.
*/
final case class SignalOrMessage[T](
signal: (ActorContext[T], Signal) Behavior[T],
mesg: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T): Behavior[T] = mesg(ctx, msg)
override def toString = s"SignalOrMessage(${LineNumbers(signal)},${LineNumbers(mesg)})"
}
/**
* Construct an actor behavior that can react to incoming messages but not to
* Construct an actor behavior that can react to both incoming messages and
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
@ -290,8 +272,11 @@ object Actor {
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
*/
final case class Stateful[T](behavior: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
final case class Stateful[T](
behavior: (ActorContext[T], T) Behavior[T],
signal: (ActorContext[T], Signal) Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) Behavior[T]])
extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T) = behavior(ctx, msg)
override def toString = s"Stateful(${LineNumbers(behavior)})"
}

View file

@ -26,7 +26,7 @@ public class ActorCompile {
}
}
Behavior<MyMsg> actor1 = signalOrMessage((ctx, signal) -> same(), (ctx, msg) -> stopped());
Behavior<MyMsg> actor1 = stateful((ctx, msg) -> stopped(), (ctx, signal) -> same());
Behavior<MyMsg> actor2 = stateful((ctx, msg) -> unhandled());
Behavior<MyMsg> actor3 = stateless((ctx, msg) -> {});
Behavior<MyMsg> actor4 = empty();

View file

@ -5,7 +5,7 @@ import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.DeadLetterSuppression
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.SignalOrMessage
import akka.typed.scaladsl.Actor.Stateful
object ActorContextSpec {
@ -74,8 +74,7 @@ object ActorContextSpec {
final case class Adapter(a: ActorRef[Command]) extends Event
def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
Actor.SignalOrMessage(
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same },
Actor.Stateful(
(ctx, message) message match {
case ReceiveTimeout
monitor ! GotReceiveTimeout
@ -142,20 +141,19 @@ object ActorContextSpec {
}
case BecomeCareless(replyTo)
replyTo ! BecameCareless
Actor.SignalOrMessage(
Actor.Stateful(
(ctx, message) Actor.Unhandled,
(ctx, signal) signal match {
case Terminated(_) Actor.Unhandled
case sig
monitor ! GotSignal(sig)
Actor.Same
},
(ctx, message) Actor.Unhandled
)
})
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
Actor.Same
}
)
},
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same })
def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = {
import ScalaDSL._

View file

@ -538,37 +538,39 @@ class BehaviorSpec extends TypedSpec {
object `A Static Behavior (native)` extends StaticBehavior with NativeSystem
object `A Static Behavior (adapted)` extends StaticBehavior with AdaptedSystem
trait SignalOrMessageScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
trait StatefulWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.SignalOrMessage((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}, (ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Stateful(
(ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
},
(ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
})
})
}
object `A SignalOrMessage Behavior (scala,native)` extends SignalOrMessageScalaBehavior with NativeSystem
object `A SignalOrMessage Behavior (scala,adapted)` extends SignalOrMessageScalaBehavior with AdaptedSystem
object `A StatefulWithSignal Behavior (scala,native)` extends StatefulWithSignalScalaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (scala,adapted)` extends StatefulWithSignalScalaBehavior with AdaptedSystem
trait StatefulScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
@ -619,7 +621,7 @@ class BehaviorSpec extends TypedSpec {
object `A Stateless Behavior (scala,native)` extends StatelessScalaBehavior with NativeSystem
object `A Stateless Behavior (scala,adapted)` extends StatelessScalaBehavior with AdaptedSystem
trait WidenedScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with Siphon {
trait WidenedScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with Siphon {
import SActor.BehaviorDecorators
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
@ -630,7 +632,7 @@ class BehaviorSpec extends TypedSpec {
object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem
trait DeferredScalaBehavior extends SignalOrMessageScalaBehavior {
trait DeferredScalaBehavior extends StatefulWithSignalScalaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
@ -650,20 +652,19 @@ class BehaviorSpec extends TypedSpec {
object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem
trait TapScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with SignalSiphon {
trait TapScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.Tap(
(_, sig) inbox.ref ! Left(sig),
(_, msg) inbox.ref ! Right(msg),
super.behavior(monitor)._1
), inbox)
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem
trait RestarterScalaBehavior extends SignalOrMessageScalaBehavior with Reuse {
trait RestarterScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.Restarter[Exception]().wrap(super.behavior(monitor)._1) null
}
@ -703,37 +704,39 @@ class BehaviorSpec extends TypedSpec {
override def apply(in: JActorContext[Command]) = f(in)
}
trait SignalOrMessageJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
trait StatefulWithSignalJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.signalOrMessage(fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}), fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
JActor.stateful(
fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}),
fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}))
}))
}
object `A SignalOrMessage Behavior (java,native)` extends SignalOrMessageJavaBehavior with NativeSystem
object `A SignalOrMessage Behavior (java,adapted)` extends SignalOrMessageJavaBehavior with AdaptedSystem
object `A StatefulWithSignal Behavior (java,native)` extends StatefulWithSignalJavaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (java,adapted)` extends StatefulWithSignalJavaBehavior with AdaptedSystem
trait StatefulJavaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
@ -786,7 +789,7 @@ class BehaviorSpec extends TypedSpec {
object `A Stateless Behavior (java,native)` extends StatelessJavaBehavior with NativeSystem
object `A Stateless Behavior (java,adapted)` extends StatelessJavaBehavior with AdaptedSystem
trait WidenedJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with Siphon {
trait WidenedJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
@ -795,7 +798,7 @@ class BehaviorSpec extends TypedSpec {
object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem
trait DeferredJavaBehavior extends SignalOrMessageJavaBehavior {
trait DeferredJavaBehavior extends StatefulWithSignalJavaBehavior {
override type Aux = Inbox[PreStart]
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
@ -815,20 +818,19 @@ class BehaviorSpec extends TypedSpec {
object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem
trait TapJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with SignalSiphon {
trait TapJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
ps((_, sig) inbox.ref ! Left(sig)),
pc((_, msg) inbox.ref ! Right(msg)),
super.behavior(monitor)._1
), inbox)
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem
trait RestarterJavaBehavior extends SignalOrMessageJavaBehavior with Reuse {
trait RestarterJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.restarter(classOf[Exception], JActor.OnFailure.RESTART, super.behavior(monitor)._1) null
}