!act,doc #3831 Adding more Java with Lambda documentation and support

* The Java with Lambda support documentation for AbstractActor and AbstractFSM are now on par with Scala
* Many small fixes and additions of missing things
* Added an AbstractActorContext that has convenience functions for getChild and getChildren
This commit is contained in:
Björn Antonsson 2014-02-21 12:43:30 +01:00
parent 8396e923cf
commit 0dcb6d6654
34 changed files with 2494 additions and 211 deletions

View file

@ -25,7 +25,7 @@ public final class FI {
* @param i an instance that the application is performed on
* @return the result of the application
*/
public abstract R apply(I i);
public R apply(I i) throws Exception;
}
/**
@ -43,7 +43,7 @@ public final class FI {
* @param i2 an instance that the application is performed on
* @return the result of the application
*/
public abstract R apply(I1 i1, I2 i2);
public R apply(I1 i1, I2 i2) throws Exception;
}
/**
@ -58,7 +58,7 @@ public final class FI {
* @param t an instance that the predicate is evaluated on.
* @return the result of the predicate
*/
public abstract boolean defined(T t);
public boolean defined(T t);
}
/**
@ -72,7 +72,7 @@ public final class FI {
*
* @param i an instance that the application is performed on
*/
public abstract void apply(I i);
public void apply(I i) throws Exception;
}
/**
@ -88,7 +88,7 @@ public final class FI {
* @param i1 an instance that the application is performed on
* @param i2 an instance that the application is performed on
*/
public abstract void apply(I1 i1, I2 i2);
public void apply(I1 i1, I2 i2) throws Exception;
}
/**
@ -106,7 +106,7 @@ public final class FI {
* @param i2 an instance that the application is performed on
* @param i3 an instance that the application is performed on
*/
public abstract void apply(I1 i1, I2 i2, I3 i3);
public void apply(I1 i1, I2 i2, I3 i3) throws Exception;
}
/**
@ -116,7 +116,7 @@ public final class FI {
/**
* The application to perform.
*/
public abstract void apply();
public void apply() throws Exception;
}
/**
@ -129,7 +129,7 @@ public final class FI {
* @param o an instance that the predicate is evaluated on.
* @return the result of the predicate
*/
public abstract boolean defined(Object o);
public boolean defined(Object o);
}

View file

@ -7,6 +7,7 @@ package akka.japi.pf;
import akka.actor.FSM;
import scala.PartialFunction;
import java.util.Arrays;
import java.util.List;
/**
@ -42,7 +43,7 @@ public class FSMStateFunctionBuilder<S, D> {
}
},
new FI.Apply<FSM.Event, FSM.State<S, D>>() {
public FSM.State<S, D> apply(FSM.Event e) {
public FSM.State<S, D> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
P p = (P) e.event();
@SuppressWarnings("unchecked")
@ -91,7 +92,7 @@ public class FSMStateFunctionBuilder<S, D> {
}
},
new FI.Apply<FSM.Event, FSM.State<S, D>>() {
public FSM.State<S, D> apply(FSM.Event e) {
public FSM.State<S, D> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
Q q = (Q) e.stateData();
return apply.apply(q);
@ -102,6 +103,74 @@ public class FSMStateFunctionBuilder<S, D> {
return this;
}
/**
* Add a case statement that matches on the data type and if the event compares equal.
*
* @param event an event to compare equal against
* @param dataType the data type to match on
* @param apply an action to apply to the event and state data if there is a match
* @param <Q> the data type to match on
* @return the builder with the case statement added
*/
public <Q> FSMStateFunctionBuilder<S, D> eventEquals(final Object event,
final Class<Q> dataType,
final FI.Apply<Q, FSM.State<S, D>> apply) {
return event(Arrays.asList(event), dataType, apply);
}
/**
* Add a case statement that matches if any of the event types in the list match or
* any of the event instances in the list compares equal.
*
* @param eventMatches a list of types or instances to match against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public FSMStateFunctionBuilder<S, D> event(final List<Object> eventMatches,
final FI.Apply<D, FSM.State<S, D>> apply) {
builder.match(FSM.Event.class,
new FI.TypedPredicate<FSM.Event>() {
@Override
public boolean defined(FSM.Event e) {
boolean emMatch = false;
Object event = e.event();
for (Object em : eventMatches) {
if (em instanceof Class) {
Class emc = (Class) em;
emMatch = emc.isInstance(event);
} else {
emMatch = event.equals(em);
}
if (emMatch)
break;
}
return emMatch;
}
},
new FI.Apply<FSM.Event, FSM.State<S, D>>() {
public FSM.State<S, D> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
D d = (D) e.stateData();
return apply.apply(d);
}
}
);
return this;
}
/**
* Add a case statement that matches if event compares equal.
*
* @param event an event to compare equal against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
public FSMStateFunctionBuilder<S, D> eventEquals(final Object event,
final FI.Apply<D, FSM.State<S, D>> apply) {
return event(Arrays.asList(event), apply);
}
/**
* Add a case statement that matches on any type of event.
*
@ -111,7 +180,7 @@ public class FSMStateFunctionBuilder<S, D> {
public FSMStateFunctionBuilder<S, D> anyEvent(final FI.Apply2<Object, D, FSM.State<S, D>> apply) {
builder.match(FSM.Event.class,
new FI.Apply<FSM.Event, FSM.State<S, D>>() {
public FSM.State<S, D> apply(FSM.Event e) {
public FSM.State<S, D> apply(FSM.Event e) throws Exception {
@SuppressWarnings("unchecked")
D d = (D) e.stateData();
return apply.apply(e.event(), d);

View file

@ -36,7 +36,7 @@ public class FSMStopBuilder<S, D> {
}
},
new FI.UnitApply<FSM.StopEvent>() {
public void apply(FSM.StopEvent e) {
public void apply(FSM.StopEvent e) throws Exception {
@SuppressWarnings("unchecked")
S s = (S) e.currentState();
@SuppressWarnings("unchecked")
@ -94,7 +94,7 @@ public class FSMStopBuilder<S, D> {
}
},
new FI.UnitApply<FSM.StopEvent>() {
public void apply(FSM.StopEvent e) {
public void apply(FSM.StopEvent e) throws Exception {
@SuppressWarnings("unchecked")
P p = (P) e.reason();
@SuppressWarnings("unchecked")

View file

@ -21,8 +21,8 @@ public class FSMTransitionHandlerBuilder<S> {
/**
* Add a case statement that matches on a from state and a to state.
*
* @param fromState the from state to match on
* @param toState the to state to match on
* @param fromState the from state to match on, or null for any
* @param toState the to state to match on, or null for any
* @param apply an action to apply when the states match
* @return the builder with the case statement added
*/
@ -33,12 +33,13 @@ public class FSMTransitionHandlerBuilder<S> {
new FI.TypedPredicate<Tuple2>() {
@Override
public boolean defined(Tuple2 t) {
return fromState.equals(t._1()) && toState.equals(t._2());
return (fromState == null || fromState.equals(t._1()))
&& (toState == null || toState.equals(t._2()));
}
},
new FI.UnitApply<Tuple2>() {
@Override
public void apply(Tuple2 t) {
public void apply(Tuple2 t) throws Exception {
apply.apply();
}
}
@ -46,6 +47,39 @@ public class FSMTransitionHandlerBuilder<S> {
return this;
}
/**
* Add a case statement that matches on a from state and a to state.
*
* @param fromState the from state to match on, or null for any
* @param toState the to state to match on, or null for any
* @param apply an action to apply when the states match
* @return the builder with the case statement added
*/
public FSMTransitionHandlerBuilder<S> state(final S fromState,
final S toState,
final FI.UnitApply2<S, S> apply) {
builder.match(Tuple2.class,
new FI.TypedPredicate<Tuple2>() {
@Override
public boolean defined(Tuple2 t) {
return (fromState == null || fromState.equals(t._1()))
&& (toState == null || toState.equals(t._2()));
}
},
new FI.UnitApply<Tuple2>() {
@Override
public void apply(Tuple2 t) throws Exception {
@SuppressWarnings("unchecked")
S sf = (S) t._1();
@SuppressWarnings("unchecked")
S st = (S) t._2();
apply.apply(sf, st);
}
}
);
return this;
}
/**
* Build a {@link scala.PartialFunction} from this builder.
* After this call the builder will be reset.

View file

@ -46,6 +46,20 @@ public class Match<I, R> extends AbstractMatch<I, R> {
return new PFBuilder<F, T>().match(type, predicate, apply);
}
/**
* Convenience function to create a {@link PFBuilder} with the first
* case statement added.
*
* @param object the object to compare equals with
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
* @see PFBuilder#matchEquals(Object, FI.Apply)
*/
public static <F, T, P> PFBuilder<F, T> matchEquals(final P object,
final FI.Apply<P, T> apply) {
return new PFBuilder<F, T>().matchEquals(object, apply);
}
/**
* Create a {@link Match} from the builder.
*

View file

@ -63,6 +63,25 @@ public final class PFBuilder<I, R> extends AbstractPFBuilder<I, R> {
return this;
}
/**
* Add a new case statement to this builder.
*
* @param object the object to compare equals with
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
*/
public <P> PFBuilder<I, R> matchEquals(final P object,
final FI.Apply<P, R> apply) {
addStatement(new CaseStatement<I, P, R>(
new FI.Predicate() {
@Override
public boolean defined(Object o) {
return object.equals(o);
}
}, apply));
return this;
}
/**
* Add a new case statement to this builder, that matches any argument.
* @param apply an action to apply to the argument

View file

@ -9,7 +9,9 @@ package akka.japi.pf;
*
* There is both a match on type only, and a match on type and predicate.
*
* Inside an actor you can use it like this with Java 8 to define your receive method:
* Inside an actor you can use it like this with Java 8 to define your receive method.
* <p/>
* Example:
* <pre>
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
@ -55,4 +57,26 @@ public class ReceiveBuilder {
FI.UnitApply<P> apply) {
return UnitMatch.match(type, predicate, apply);
}
/**
* Return a new {@link UnitPFBuilder} with a case statement added.
*
* @param object the object to compare equals with
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
*/
public static <P> UnitPFBuilder<Object> matchEquals(P object, FI.UnitApply<P> apply) {
return UnitMatch.matchEquals(object, apply);
}
/**
* Return a new {@link UnitPFBuilder} with a case statement added.
*
* @param apply an action to apply to the argument
* @return a builder with the case statement added
*/
public static UnitPFBuilder<Object> matchAny(FI.UnitApply<Object> apply) {
return UnitMatch.matchAny(apply);
}
}

View file

@ -47,6 +47,32 @@ public class UnitMatch<I> extends AbstractMatch<I, BoxedUnit> {
return new UnitPFBuilder<F>().match(type, predicate, apply);
}
/**
* Convenience function to create a {@link UnitPFBuilder} with the first
* case statement added.
*
* @param object the object to compare equals with
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
* @see UnitPFBuilder#matchEquals(Object, FI.UnitApply)
*/
public static <F, P> UnitPFBuilder<F> matchEquals(final P object,
final FI.UnitApply<P> apply) {
return new UnitPFBuilder<F>().matchEquals(object, apply);
}
/**
* Convenience function to create a {@link UnitPFBuilder} with the first
* case statement added.
*
* @param apply an action to apply to the argument
* @return a builder with the case statement added
* @see UnitPFBuilder#matchAny(FI.UnitApply)
*/
public static <F> UnitPFBuilder<F> matchAny(final FI.UnitApply<Object> apply) {
return new UnitPFBuilder<F>().matchAny(apply);
}
/**
* Create a {@link UnitMatch} from the builder.
*

View file

@ -67,6 +67,24 @@ public final class UnitPFBuilder<I> extends AbstractPFBuilder<I, BoxedUnit> {
return this;
}
/**
* Add a new case statement to this builder.
*
* @param object the object to compare equals with
* @param apply an action to apply to the argument if the object compares equal
* @return a builder with the case statement added
*/
public <P> UnitPFBuilder<I> matchEquals(final P object,
final FI.UnitApply<P> apply) {
addStatement(new UnitCaseStatement<I, P>(
new FI.Predicate() {
@Override
public boolean defined(Object o) {
return object.equals(o);
}
}, apply));
return this;
}
/**
* Add a new case statement to this builder, that matches any argument.
* @param apply an action to apply to the argument

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
object AbstractActor {
/**
* emptyBehavior is a Receive-expression that matches no messages at all, ever.
*/
final val emptyBehavior = Actor.emptyBehavior
}
/**
* Actor base class that should be extended to create Java actors that use lambdas.
* <p/>
* Example:
* <pre>
* public class MyActor extends AbstractActor {
* int count = 0;
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.
* match(Double.class, d -> {
* sender().tell(d.isNaN() ? 0 : d, self());
* }).
* match(Integer.class, i -> {
* sender().tell(i * 10, self());
* }).
* match(String.class, s -> s.startsWith("foo"), s -> {
* sender().tell(s.toUpperCase(), self());
* }).build();
* }
* }
* </pre>
*/
abstract class AbstractActor extends Actor {
/**
* Returns this AbstractActor's AbstractActorContext
* The AbstractActorContext is not thread safe so do not expose it outside of the
* AbstractActor.
*/
def getContext(): AbstractActorContext = context.asInstanceOf[AbstractActorContext]
}
/**
* Actor base class that should be extended to create an actor with a stash.
*
* The stash enables an actor to temporarily stash away messages that can not or
* should not be handled using the actor's current behavior.
* <p/>
* Example:
* <pre>
* public class MyActorWithStash extends AbstractActorWithStash {
* int count = 0;
* @Override
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.match(String.class, s -> {
* if (count < 0) {
* sender().tell(new Integer(s.length()), self());
* } else if (count == 2) {
* count = -1;
* unstashAll();
* } else {
* count += 1;
* stash();
* }
* }).build();
* }
* }
* </pre>
* Note that the subclasses of `AbstractActorWithStash` by default request a Deque based mailbox since this class
* implements the `RequiresMessageQueue<DequeBasedMessageQueueSemantics>` marker interface.
* You can override the default mailbox provided when `DequeBasedMessageQueueSemantics` are requested via config:
* <pre>
* akka.actor.mailbox.requirements {
* "akka.dispatch.BoundedDequeBasedMessageQueueSemantics" = your-custom-mailbox
* }
* </pre>
* Alternatively, you can add your own requirement marker to the actor and configure a mailbox type to be used
* for your marker.
*
* For a `Stash` based actor that enforces unbounded deques see [[akka.actor.AbstractActorWithUnboundedStash]].
* There is also an unrestricted version [[akka.actor.AbstractActorWithUnrestrictedStash]] that does not
* enforce the mailbox type.
*/
abstract class AbstractActorWithStash extends AbstractActor with Stash
/**
* Actor base class with `Stash` that enforces an unbounded deque for the actor. The proper mailbox has to be configured
* manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait.
* See [[akka.actor.AbstractActorWithStash]] for details on how `Stash` works.
*/
abstract class AbstractActorWithUnboundedStash extends AbstractActor with UnboundedStash
/**
* Actor base class with `Stash` that does not enforce any mailbox type. The mailbox of the actor has to be configured
* manually. See [[akka.actor.AbstractActorWithStash]] for details on how `Stash` works.
*/
abstract class AbstractActorWithUnrestrictedStash extends AbstractActor with UnrestrictedStash

View file

@ -563,11 +563,3 @@ trait Actor {
}
}
}
/**
* Java API
*
* Abstract base class for Java Actors.
*
*/
abstract class AbstractActor extends akka.actor.Actor

View file

@ -70,6 +70,12 @@ trait ActorContext extends ActorRefFactory {
*/
def setReceiveTimeout(timeout: Duration): Unit
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Replaces the current behavior on the top of the behavior stack.
*/
def become(behavior: Actor.Receive): Unit = become(behavior, true)
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* This method acts upon the behavior stack as follows:
@ -77,14 +83,14 @@ trait ActorContext extends ActorRefFactory {
* - if `discardOld = true` it will replace the top element (i.e. the current behavior)
* - if `discardOld = false` it will keep the current behavior and push the given one atop
*
* The default of replacing the current behavior has been chosen to avoid memory leaks in
* case client code is written without consulting this documentation first (i.e. always pushing
* new closures and never issuing an `unbecome()`)
* The default of replacing the current behavior on the stack has been chosen to avoid memory
* leaks in case client code is written without consulting this documentation first (i.e.
* always pushing new behaviors and never issuing an `unbecome()`)
*/
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit
def become(behavior: Actor.Receive, discardOld: Boolean): Unit
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
* Reverts the Actor behavior to the previous one on the behavior stack.
*/
def unbecome(): Unit
@ -149,6 +155,25 @@ trait ActorContext extends ActorRefFactory {
throw new NotSerializableException("ActorContext is not serializable!")
}
/**
* AbstractActorContext is the AbstractActor equivalent of ActorContext,
* containing the Java API
*/
trait AbstractActorContext extends ActorContext {
/**
* Returns an unmodifiable Java Collection containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getChildren(): java.lang.Iterable[ActorRef]
/**
* Returns a reference to the named child or null if no child with
* that name exists.
*/
def getChild(name: String): ActorRef
}
/**
* UntypedActorContext is the UntypedActor equivalent of ActorContext,
* containing the Java API
@ -169,7 +194,7 @@ trait UntypedActorContext extends ActorContext {
/**
* Changes the Actor's behavior to become the new 'Procedure' handler.
* Replaces the current behavior at the top of the hotswap stack.
* Replaces the current behavior on the top of the behavior stack.
*/
def become(behavior: Procedure[Any]): Unit
@ -180,9 +205,9 @@ trait UntypedActorContext extends ActorContext {
* - if `discardOld = true` it will replace the top element (i.e. the current behavior)
* - if `discardOld = false` it will keep the current behavior and push the given one atop
*
* The default of replacing the current behavior has been chosen to avoid memory leaks in
* case client code is written without consulting this documentation first (i.e. always pushing
* new closures and never issuing an `unbecome()`)
* The default of replacing the current behavior on the stack has been chosen to avoid memory
* leaks in case client code is written without consulting this documentation first (i.e.
* always pushing new behaviors and never issuing an `unbecome()`)
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit
@ -347,7 +372,7 @@ private[akka] class ActorCell(
final val props: Props, // Must be final so that it can be properly cleared in clearActorCellFields
val dispatcher: MessageDispatcher,
val parent: InternalActorRef)
extends UntypedActorContext with Cell
extends UntypedActorContext with AbstractActorContext with Cell
with dungeon.ReceiveTimeout
with dungeon.Children
with dungeon.Dispatch

View file

@ -266,12 +266,11 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
type Timeout = Option[FiniteDuration]
type TransitionHandler = PartialFunction[(S, S), Unit]
val Event: FSM.Event.type = FSM.Event
val StopEvent: FSM.StopEvent.type = FSM.StopEvent
/*
* import so that these are visible without an import
*/
val Event: FSM.Event.type = FSM.Event
val StopEvent: FSM.StopEvent.type = FSM.StopEvent
/**
* This extractor is just convenience for matching a (S, S) pair, including a
@ -755,6 +754,21 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
}
/**
* Java API
*/
object AbstractFSM {
/**
* A partial function value which does not match anything and can be used to
* reset `whenUnhandled` and `onTermination` handlers.
*
* {{{
* onTermination(FSM.NullFunction())
* }}}
*/
def NullFunction[S, D]: PartialFunction[S, D] = FSM.NullFunction
}
/**
* Java API
*
@ -834,6 +848,16 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
final def onTransition(transitionHandlerBuilder: FSMTransitionHandlerBuilder[S]): Unit =
onTransition(transitionHandlerBuilder.build().asInstanceOf[TransitionHandler])
/**
* Add a handler which is called upon each state transition, i.e. not when
* staying in the same state.
*
* <b>Multiple handlers may be installed, and every one of them will be
* called, not only the first one matching.</b>
*/
final def onTransition(transitionHandler: UnitApply2[S, S]): Unit =
onTransition(transitionHandler)
/**
* Set handler which is called upon reception of unhandled messages. Calling
* this method again will overwrite the previous contents.
@ -877,6 +901,44 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
final def matchEvent[DT <: D](eventMatches: JList[AnyRef], dataType: Class[DT], apply: Apply[DT, State]): FSMStateFunctionBuilder[S, D] =
new FSMStateFunctionBuilder[S, D]().event(eventMatches, dataType, apply);
/**
* Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set.
*
* A case statement that matches on the data type and if the event compares equal.
*
* @param event an event to compare equal against
* @param dataType the data type to match on
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
final def matchEventEquals[DT <: D](event: AnyRef, dataType: Class[DT], apply: Apply[DT, State]): FSMStateFunctionBuilder[S, D] =
new FSMStateFunctionBuilder[S, D]().eventEquals(event, dataType, apply);
/**
* Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set.
*
* A case statement that matches if any of the event types in the list match or any
* of the event instances in the list compares equal.
*
* @param eventMatches a list of types or instances to match against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
final def matchEvent(eventMatches: JList[AnyRef], apply: Apply[D, State]): FSMStateFunctionBuilder[S, D] =
new FSMStateFunctionBuilder[S, D]().event(eventMatches, apply);
/**
* Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set.
*
* A case statement that matches if the event compares equal.
*
* @param event an event to compare equal against
* @param apply an action to apply to the event and state data if there is a match
* @return the builder with the case statement added
*/
final def matchEventEquals(event: AnyRef, apply: Apply[D, State]): FSMStateFunctionBuilder[S, D] =
new FSMStateFunctionBuilder[S, D]().eventEquals(event, apply);
/**
* Create an [[akka.japi.pf.FSMStateFunctionBuilder]] with the first case statement set.
*
@ -901,6 +963,19 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
final def matchState(fromState: S, toState: S, apply: UnitApplyVoid): FSMTransitionHandlerBuilder[S] =
new FSMTransitionHandlerBuilder[S]().state(fromState, toState, apply)
/**
* Create an [[akka.japi.pf.FSMTransitionHandlerBuilder]] with the first case statement set.
*
* A case statement that matches on a from state and a to state.
*
* @param fromState the from state to match on
* @param toState the to state to match on
* @param apply an action to apply when the states match
* @return the builder with the case statement added
*/
final def matchState(fromState: S, toState: S, apply: UnitApply2[S, S]): FSMTransitionHandlerBuilder[S] =
new FSMTransitionHandlerBuilder[S]().state(fromState, toState, apply)
/**
* Create an [[akka.japi.pf.FSMStopBuilder]] with the first case statement set.
*
@ -968,6 +1043,18 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
*/
final def goTo(nextStateName: S): State = goto(nextStateName)
/**
* Schedule named timer to deliver message after given delay, possibly repeating.
* Any existing timer with the same name will automatically be canceled before
* adding the new timer.
* @param name identifier to be used with cancelTimer()
* @param msg message to be delivered
* @param timeout delay of first message delivery and between subsequent messages
* @return current state descriptor
*/
final def setTimer(name: String, msg: Any, timeout: FiniteDuration): Unit =
setTimer(name, msg, timeout, false);
/**
* Default reason if calling `stop()`.
*/
@ -978,23 +1065,6 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
* also applies to `Stop` supervision directive.
*/
val Shutdown: FSM.Reason = FSM.Shutdown
/**
* Signifies that the [[akka.actor.FSM]] is shutting itself down because of
* an error, e.g. if the state to transition into does not exist. You can use
* this to match on a Failure in the [[akka.japi.pf.FSMStopBuilder]].
*/
def Failure: Class[_ <: FSM.Reason] = classOf[FSM.Failure]
/**
* A partial function value which does not match anything and can be used to
* reset `whenUnhandled` and `onTermination` handlers.
*
* {{{
* onTermination(FSM.NullFunction)
* }}}
*/
val NullFunction: PartialFunction[Any, Nothing] = FSM.NullFunction
}
/**

View file

@ -10,8 +10,9 @@ import akka.actor.UntypedActor;
import scala.concurrent.duration.Duration;
public class MyReceiveTimeoutUntypedActor extends UntypedActor {
//#receive-timeout
ActorRef target = getContext().system().deadLetters();
//#receive-timeout
public MyReceiveTimeoutUntypedActor() {
// To set an initial delay
@ -22,12 +23,16 @@ public class MyReceiveTimeoutUntypedActor extends UntypedActor {
if (message.equals("Hello")) {
// To set in a response to a message
getContext().setReceiveTimeout(Duration.create("1 second"));
//#receive-timeout
target = getSender();
target.tell("Hello world", getSelf());
//#receive-timeout
} else if (message instanceof ReceiveTimeout) {
// To turn it off
getContext().setReceiveTimeout(Duration.Undefined());
//#receive-timeout
target.tell("timeout", getSelf());
//#receive-timeout
} else {
unhandled(message);
}

View file

@ -1,14 +1,37 @@
.. _lambda-actors-java:
###################################
Actors (Java8 with Lambda Support)
Actors (Java with Lambda Support)
###################################
The `Actor Model`_ provides a higher level of abstraction for writing concurrent
and distributed systems. It alleviates the developer from having to deal with
explicit locking and thread management, making it easier to write correct
concurrent and parallel systems. Actors were defined in the 1973 paper by Carl
Hewitt but have been popularized by the Erlang language, and used for example at
Ericsson with great success to build highly concurrent and reliable telecom
systems.
The API of Akkas Actors is similar to Scala Actors which has borrowed some of
its syntax from Erlang.
.. _Actor Model: http://en.wikipedia.org/wiki/Actor_model
Creating Actors
===============
.. note::
Since Akka enforces parental supervision every actor is supervised and
(potentially) the supervisor of its children, it is advisable that you
familiarize yourself with :ref:`actor-systems` and :ref:`supervision` and it
may also help to read :ref:`addressing`.
Defining an Actor class
-----------------------
Actor classes are implemented by extending the :class:AbstractActor class and implementing
Actor classes are implemented by extending the :class:`AbstractActor` class and implementing
the :meth:`receive` method. The :meth:`receive` method should define a series of match
statements (which has the type ``PartialFunction<Object, BoxedUnit>``) that defines
which messages your Actor can handle, along with the implementation of how the
@ -19,7 +42,7 @@ function there is a builder named ``ReceiveBuilder`` that you can use.
Here is an example:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/MyActor.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java
:include: imports,my-actor
Please note that the Akka Actor ``receive`` message loop is exhaustive, which
@ -27,20 +50,891 @@ is different compared to Erlang and the late Scala Actors. This means that you
need to provide a pattern match for all messages that it can accept and if you
want to be able to handle unknown messages then you need to have a default case
as in the example above. Otherwise an ``akka.actor.UnhandledMessage(message,
sender, recipient)`` will be published to the ``ActorSystem``'s ``EventStream``.
sender, recipient)`` will be published to the ``ActorSystem``'s
``EventStream``.
Note further that the return type of the behavior defined above is ``Unit``; if
the actor shall reply to the received message then this must be done explicitly
as explained below.
The result of the :meth:`receive` method is a partial function object, which is
stored within the actor as its “initial behavior”.
stored within the actor as its “initial behavior”, see `Become/Unbecome`_ for
further information on changing the behavior of an actor after its
construction.
Here is s slightly bigger example:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/SampleActor.java
TODO:
Props
-----
Lots of doc missing here
:class:`Props` is a configuration class to specify options for the creation
of actors, think of it as an immutable and thus freely shareable recipe for
creating an actor including associated deployment information (e.g. which
dispatcher to use, see more below). Here are some examples of how to create a
:class:`Props` instance.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-props
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#creating-props
The second variant shows how to pass constructor arguments to the
:class:`Actor` being created, but it should only be used outside of actors as
explained below.
The last line shows a possibility to pass constructor arguments regardless of
the context it is being used in. The presence of a matching constructor is
verified during construction of the :class:`Props` object, resulting in an
:class:`IllegalArgumentEception` if no or multiple matching constructors are
found.
Dangerous Variants
^^^^^^^^^^^^^^^^^^
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#creating-props-deprecated
This method is not recommended to be used within another actor because it
encourages to close over the enclosing scope, resulting in non-serializable
:class:`Props` and possibly race conditions (breaking the actor encapsulation).
On the other hand using this variant in a :class:`Props` factory in the actors
companion object as documented under “Recommended Practices” below is completely
fine.
There were two use-cases for these methods: passing constructor arguments to
the actor—which is solved by the newly introduced
:meth:`Props.create(clazz, args)` method above or the recommended practice
below—and creating actors “on the spot” as anonymous classes. The latter should
be solved by making these actors named classes instead (if they are not
declared within a top-level ``object`` then the enclosing instances ``this``
reference needs to be passed as the first argument).
.. warning::
Declaring one actor within another is very dangerous and breaks actor
encapsulation. Never pass an actors ``this`` reference into :class:`Props`!
Recommended Practices
^^^^^^^^^^^^^^^^^^^^^
It is a good idea to provide factory methods on the companion object of each
:class:`Actor` which help keeping the creation of suitable :class:`Props` as
close to the actor definition as possible. This also avoids the pitfalls
associated with using the ``Props.create(...)`` method which takes a by-name
argument, since within a companion object the given code block will not retain
a reference to its enclosing scope:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#props-factory
Creating Actors with Props
--------------------------
Actors are created by passing a :class:`Props` instance into the
:meth:`actorOf` factory method which is available on :class:`ActorSystem` and
:class:`ActorContext`.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-actorRef
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#system-actorOf
Using the :class:`ActorSystem` will create top-level actors, supervised by the
actor systems provided guardian actor, while using an actors context will
create a child actor.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#context-actorOf
:exclude: plus-some-behavior
It is recommended to create a hierarchy of children, grand-children and so on
such that it fits the logical failure-handling structure of the application,
see :ref:`actor-systems`.
The call to :meth:`actorOf` returns an instance of :class:`ActorRef`. This is a
handle to the actor instance and the only way to interact with it. The
:class:`ActorRef` is immutable and has a one to one relationship with the Actor
it represents. The :class:`ActorRef` is also serializable and network-aware.
This means that you can serialize it, send it over the wire and use it on a
remote host and it will still be representing the same Actor on the original
node, across the network.
The name parameter is optional, but you should preferably name your actors,
since that is used in log messages and for identifying actors. The name must
not be empty or start with ``$``, but it may contain URL encoded characters
(eg. ``%20`` for a blank space). If the given name is already in use by
another child to the same parent an `InvalidActorNameException` is thrown.
Actors are automatically started asynchronously when created.
.. _actor-create-factory-lambda:
Dependency Injection
--------------------
If your UntypedActor has a constructor that takes parameters then those need to
be part of the :class:`Props` as well, as described `above`__. But there
are cases when a factory method must be used, for example when the actual
constructor arguments are determined by a dependency injection framework.
__ Props_
.. includecode:: code/docs/actor/UntypedActorDocTest.java#import-indirect
.. includecode:: code/docs/actor/UntypedActorDocTest.java
:include: creating-indirectly
:exclude: obtain-fresh-Actor-instance-from-DI-framework
.. warning::
You might be tempted at times to offer an :class:`IndirectActorProducer`
which always returns the same instance, e.g. by using a static field. This is
not supported, as it goes against the meaning of an actor restart, which is
described here: :ref:`supervision-restart`.
When using a dependency injection framework, actor beans *MUST NOT* have
singleton scope.
Techniques for dependency injection and integration with dependency injection frameworks
are described in more depth in the
`Using Akka with Dependency Injection <http://letitcrash.com/post/55958814293/akka-dependency-injection>`_
guideline and the `Akka Java Spring <http://typesafe.com/activator/template/akka-java-spring>`_ tutorial
in Typesafe Activator.
The Inbox
---------
When writing code outside of actors which shall communicate with actors, the
``ask`` pattern can be a solution (see below), but there are two thing it
cannot do: receiving multiple replies (e.g. by subscribing an :class:`ActorRef`
to a notification service) and watching other actors lifecycle. For these
purposes there is the :class:`Inbox` class:
.. includecode:: code/docs/actor/InboxDocTest.java#inbox
The :meth:`send` method wraps a normal :meth:`tell` and supplies the internal
actors reference as the sender. This allows the reply to be received on the
last line. Watching an actor is quite simple as well:
.. includecode:: code/docs/actor/InboxDocTest.java#watch
Actor API
=========
The :class:`AbstractActor` class defines only one abstract method, the above mentioned
:meth:`receive`, which implements the behavior of the actor.
If the current actor behavior does not match a received message,
:meth:`unhandled` is called, which by default publishes an
``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor
systems event stream (set configuration item
``akka.actor.debug.unhandled`` to ``on`` to have them converted into
actual Debug messages).
In addition, it offers:
* :meth:`self()` reference to the :class:`ActorRef` of the actor
* :meth:`sender()` reference sender Actor of the last received message, typically used as described in :ref:`LambdaActor.Reply`
* :meth:`supervisorStrategy()` user overridable definition the strategy to use for supervising child actors
This strategy is typically declared inside the actor in order to have access
to the actors internal state within the decider function: since failure is
communicated as a message sent to the supervisor and processed like other
messages (albeit outside of the normal behavior), all values and variables
within the actor are available, as is the ``sender`` reference (which will
be the immediate child reporting the failure; if the original failure
occurred within a distant descendant it is still reported one level up at a
time).
* :meth:`context()` exposes contextual information for the actor and the current message, such as:
* factory methods to create child actors (:meth:`actorOf`)
* system that the actor belongs to
* parent supervisor
* supervised children
* lifecycle monitoring
* hotswap behavior stack as described in :ref:`actor-hotswap-lambda`
The remaining visible methods are user-overridable life-cycle hooks which are
described in the following:
.. includecode:: code/docs/actor/UntypedActorDocTest.java#lifecycle-callbacks
The implementations shown above are the defaults provided by the :class:`AbstractActor`
class.
Actor Lifecycle
---------------
.. image:: ../images/actor_lifecycle.png
:align: center
:width: 680
A path in an actor system represents a "place" which might be occupied
by a living actor. Initially (apart from system initialized actors) a path is
empty. When ``actorOf()`` is called it assigns an *incarnation* of the actor
described by the passed ``Props`` to the given path. An actor incarnation is
identified by the path *and a UID*. A restart only swaps the ``Actor``
instance defined by the ``Props`` but the incarnation and hence the UID remains
the same.
The lifecycle of an incarnation ends when the actor is stopped. At
that point the appropriate lifecycle events are called and watching actors
are notified of the termination. After the incarnation is stopped, the path can
be reused again by creating an actor with ``actorOf()``. In this case the
name of the new incarnation will be the same as the previous one but the
UIDs will differ.
An ``ActorRef`` always represents an incarnation (path and UID) not just a
given path. Therefore if an actor is stopped and a new one with the same
name is created an ``ActorRef`` of the old incarnation will not point
to the new one.
``ActorSelection`` on the other hand points to the path (or multiple paths
if wildcards are used) and is completely oblivious to which incarnation is currently
occupying it. ``ActorSelection`` cannot be watched for this reason. It is
possible to resolve the current incarnation's ``ActorRef`` living under the
path by sending an ``Identify`` message to the ``ActorSelection`` which
will be replied to with an ``ActorIdentity`` containing the correct reference
(see :ref:`actorSelection-lambda`). This can also be done with the ``resolveOne``
method of the :class:`ActorSelection`, which returns a ``Future`` of the matching
:class:`ActorRef`.
.. _deathwatch-lambda:
Lifecycle Monitoring aka DeathWatch
-----------------------------------
In order to be notified when another actor terminates (i.e. stops permanently,
not temporary failure and restart), an actor may register itself for reception
of the :class:`Terminated` message dispatched by the other actor upon
termination (see `Stopping Actors`_). This service is provided by the
:class:`DeathWatch` component of the actor system.
Registering a monitor is easy:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#watch
It should be noted that the :class:`Terminated` message is generated
independent of the order in which registration and termination occur.
In particular, the watching actor will receive a :class:`Terminated` message
even if the watched actor has already been terminated at the time of registration.
Registering multiple times does not necessarily lead to multiple messages being
generated, but there is no guarantee that only exactly one such message is
received: if termination of the watched actor has generated and queued the
message, and another registration is done before this message has been
processed, then a second message will be queued, because registering for
monitoring of an already terminated actor leads to the immediate generation of
the :class:`Terminated` message.
It is also possible to deregister from watching another actors liveliness
using ``context.unwatch(target)``. This works even if the :class:`Terminated`
message has already been enqueued in the mailbox; after calling :meth:`unwatch`
no :class:`Terminated` message for that actor will be processed anymore.
.. _start-hook-lambda:
Start Hook
----------
Right after starting the actor, its :meth:`preStart` method is invoked.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#preStart
This method is called when the actor is first created. During restarts it is
called by the default implementation of :meth:`postRestart`, which means that
by overriding that method you can choose whether the initialization code in
this method is called only exactly once for this actor or for every restart.
Initialization code which is part of the actors constructor will always be
called when an instance of the actor class is created, which happens at every
restart.
.. _restart-hook-lambda:
Restart Hooks
-------------
All actors are supervised, i.e. linked to another actor with a fault
handling strategy. Actors may be restarted in case an exception is thrown while
processing a message (see :ref:`supervision`). This restart involves the hooks
mentioned above:
1. The old actor is informed by calling :meth:`preRestart` with the exception
which caused the restart and the message which triggered that exception; the
latter may be ``None`` if the restart was not caused by processing a
message, e.g. when a supervisor does not trap the exception and is restarted
in turn by its supervisor, or if an actor is restarted due to a siblings
failure. If the message is available, then that messages sender is also
accessible in the usual way (i.e. by calling ``sender``).
This method is the best place for cleaning up, preparing hand-over to the
fresh actor instance, etc. By default it stops all children and calls
:meth:`postStop`.
2. The initial factory from the ``actorOf`` call is used
to produce the fresh instance.
3. The new actors :meth:`postRestart` method is invoked with the exception
which caused the restart. By default the :meth:`preStart`
is called, just as in the normal start-up case.
An actor restart replaces only the actual actor object; the contents of the
mailbox is unaffected by the restart, so processing of messages will resume
after the :meth:`postRestart` hook returns. The message
that triggered the exception will not be received again. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
.. warning::
Be aware that the ordering of failure notifications relative to user messages
is not deterministic. In particular, a parent might restart its child before
it has processed the last messages sent by the child before the failure.
See :ref:`message-ordering` for details.
.. _stop-hook-lambda:
Stop Hook
---------
After stopping an actor, its :meth:`postStop` hook is called, which may be used
e.g. for deregistering this actor from other services. This hook is guaranteed
to run after message queuing has been disabled for this actor, i.e. messages
sent to a stopped actor will be redirected to the :obj:`deadLetters` of the
:obj:`ActorSystem`.
.. _actorSelection-lambda:
Identifying Actors via Actor Selection
======================================
As described in :ref:`addressing`, each actor has a unique logical path, which
is obtained by following the chain of actors from child to parent until
reaching the root of the actor system, and it has a physical path, which may
differ if the supervision chain includes any remote supervisors. These paths
are used by the system to look up actors, e.g. when a remote message is
received and the recipient is searched, but they are also useful more directly:
actors may look up other actors by specifying absolute or relative
paths—logical or physical—and receive back an :class:`ActorSelection` with the
result:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-local
The supplied path is parsed as a :class:`java.net.URI`, which basically means
that it is split on ``/`` into path elements. If the path starts with ``/``, it
is absolute and the look-up starts at the root guardian (which is the parent of
``"/user"``); otherwise it starts at the current actor. If a path element equals
``..``, the look-up will take a step “up” towards the supervisor of the
currently traversed actor, otherwise it will step “down” to the named child.
It should be noted that the ``..`` in actor paths here always means the logical
structure, i.e. the supervisor.
The path elements of an actor selection may contain wildcard patterns allowing for
broadcasting of messages to that section:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-wildcard
Messages can be sent via the :class:`ActorSelection` and the path of the
:class:`ActorSelection` is looked up when delivering each message. If the selection
does not match any actors the message will be dropped.
To acquire an :class:`ActorRef` for an :class:`ActorSelection` you need to send
a message to the selection and use the ``sender()`` reference of the reply from
the actor. There is a built-in ``Identify`` message that all Actors will
understand and automatically reply to with a ``ActorIdentity`` message
containing the :class:`ActorRef`. This message is handled specially by the
actors which are traversed in the sense that if a concrete name lookup fails
(i.e. a non-wildcard path element does not correspond to a live actor) then a
negative result is generated. Please note that this does not mean that delivery
of that reply is guaranteed, it still is a normal message.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#import-identify
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#identify
You can also acquire an :class:`ActorRef` for an :class:`ActorSelection` with
the ``resolveOne`` method of the :class:`ActorSelection`. It returns a ``Future``
of the matching :class:`ActorRef` if such an actor exists. It is completed with
failure [[akka.actor.ActorNotFound]] if no such actor exists or the identification
didn't complete within the supplied `timeout`.
Remote actor addresses may also be looked up, if :ref:`remoting <remoting-java>` is enabled:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#selection-remote
An example demonstrating actor look-up is given in :ref:`remote-sample-java`.
.. note::
``actorFor`` is deprecated in favor of ``actorSelection`` because actor references
acquired with ``actorFor`` behaves different for local and remote actors.
In the case of a local actor reference, the named actor needs to exist before the
lookup, or else the acquired reference will be an :class:`EmptyLocalActorRef`.
This will be true even if an actor with that exact path is created after acquiring
the actor reference. For remote actor references acquired with `actorFor` the
behaviour is different and sending messages to such a reference will under the hood
look up the actor by path on the remote system for every message send.
Messages and immutability
=========================
**IMPORTANT**: Messages can be any kind of object but have to be
immutable. Akka cant enforce immutability (yet) so this has to be by
convention.
Here is an example of an immutable message:
.. includecode:: code/docs/actor/ImmutableMessage.java#immutable-message
Send messages
=============
Messages are sent to an Actor through one of the following methods.
* ``tell`` means “fire-and-forget”, e.g. send a message asynchronously and return
immediately.
* ``ask`` sends a message asynchronously and returns a :class:`Future`
representing a possible reply.
Message ordering is guaranteed on a per-sender basis.
.. note::
There are performance implications of using ``ask`` since something needs to
keep track of when it times out, there needs to be something that bridges
a ``Promise`` into an ``ActorRef`` and it also needs to be reachable through
remoting. So always prefer ``tell`` for performance, and only ``ask`` if you must.
In all these methods you have the option of passing along your own ``ActorRef``.
Make it a practice of doing so because it will allow the receiver actors to be able to respond
to your message, since the sender reference is sent along with the message.
.. _actors-tell-sender-lambda:
Tell: Fire-forget
-----------------
This is the preferred way of sending messages. No blocking waiting for a
message. This gives the best concurrency and scalability characteristics.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#tell
The sender reference is passed along with the message and available within the
receiving actor via its :meth:`sender()` method while processing this
message. Inside of an actor it is usually :meth:`self()` who shall be the
sender, but there can be cases where replies shall be routed to some other
actor—e.g. the parent—in which the second argument to :meth:`tell` would be a
different one. Outside of an actor and if no reply is needed the second
argument can be ``null``; if a reply is needed outside of an actor you can use
the ask-pattern described next..
Ask: Send-And-Receive-Future
----------------------------
The ``ask`` pattern involves actors as well as futures, hence it is offered as
a use pattern rather than a method on :class:`ActorRef`:
.. includecode:: code/docs/actor/UntypedActorDocTest.java#import-ask
.. includecode:: code/docs/actor/UntypedActorDocTest.java#ask-pipe
This example demonstrates ``ask`` together with the ``pipe`` pattern on
futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ``ask`` produces
a :class:`Future`, two of which are composed into a new future using the
:meth:`Futures.sequence` and :meth:`map` methods and then ``pipe`` installs
an ``onComplete``-handler on the future to effect the submission of the
aggregated :class:`Result` to another actor.
Using ``ask`` will send a message to the receiving Actor as with ``tell``, and
the receiving actor must reply with ``sender().tell(reply, self())`` in order to
complete the returned :class:`Future` with a value. The ``ask`` operation
involves creating an internal actor for handling this reply, which needs to
have a timeout after which it is destroyed in order not to leak resources; see
more below.
.. warning::
To complete the future with an exception you need send a Failure message to the sender.
This is *not done automatically* when an actor throws an exception while processing a message.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#reply-exception
If the actor does not complete the future, it will expire after the timeout period,
specified as parameter to the ``ask`` method; this will complete the
:class:`Future` with an :class:`AskTimeoutException`.
See :ref:`futures-java` for more information on how to await or query a
future.
The ``onComplete``, ``onSuccess``, or ``onFailure`` methods of the ``Future`` can be
used to register a callback to get a notification when the Future completes.
Gives you a way to avoid blocking.
.. warning::
When using future callbacks, inside actors you need to carefully avoid closing over
the containing actors reference, i.e. do not call methods or access mutable state
on the enclosing actor from within the callback. This would break the actor
encapsulation and may introduce synchronization bugs and race conditions because
the callback will be scheduled concurrently to the enclosing actor. Unfortunately
there is not yet a way to detect these illegal accesses at compile time. See also:
:ref:`jmm-shared-state`
Forward message
---------------
You can forward a message from one actor to another. This means that the
original sender address/reference is maintained even though the message is going
through a 'mediator'. This can be useful when writing actors that work as
routers, load-balancers, replicators etc.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#forward
Receive messages
================
An Actor has to implement the ``receive`` method to receive messages:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive
The :meth:`receive` method should define a series of match statements (which has the type
``PartialFunction<Object, BoxedUnit>``) that defines which messages your Actor can handle,
along with the implementation of how the messages should be processed.
Don't let the type signature scare you. To allow you to easily build up a partial
function there is a builder named ``ReceiveBuilder`` that you can use.
Here is an example:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java
:include: imports,my-actor
.. _LambdaActor.Reply:
Reply to messages
=================
If you want to have a handle for replying to a message, you can use
``sender()``, which gives you an ActorRef. You can reply by sending to
that ActorRef with ``sender().tell(replyMsg, self())``. You can also store the ActorRef
for replying later, or passing on to other actors. If there is no sender (a
message was sent without an actor or future context) then the sender
defaults to a 'dead-letter' actor ref.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/MyActor.java#reply
Receive timeout
===============
The `ActorContext` :meth:`setReceiveTimeout` defines the inactivity timeout after which
the sending of a `ReceiveTimeout` message is triggered.
When specified, the receive function should be able to handle an `akka.actor.ReceiveTimeout` message.
1 millisecond is the minimum supported timeout.
Please note that the receive timeout might fire and enqueue the `ReceiveTimeout` message right after
another message was enqueued; hence it is **not guaranteed** that upon reception of the receive
timeout there must have been an idle period beforehand as configured via this method.
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity
periods). Pass in `Duration.Undefined` to switch off this feature.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#receive-timeout
.. _stopping-actors-lambda:
Stopping actors
===============
Actors are stopped by invoking the :meth:`stop` method of a ``ActorRefFactory``,
i.e. ``ActorContext`` or ``ActorSystem``. Typically the context is used for stopping
child actors and the system for stopping top level actors. The actual termination of
the actor is performed asynchronously, i.e. :meth:`stop` may return before the actor is
stopped.
Processing of the current message, if any, will continue before the actor is stopped,
but additional messages in the mailbox will not be processed. By default these
messages are sent to the :obj:`deadLetters` of the :obj:`ActorSystem`, but that
depends on the mailbox implementation.
Termination of an actor proceeds in two steps: first the actor suspends its
mailbox processing and sends a stop command to all its children, then it keeps
processing the internal termination notifications from its children until the last one is
gone, finally terminating itself (invoking :meth:`postStop`, dumping mailbox,
publishing :class:`Terminated` on the :ref:`DeathWatch <deathwatch-lambda>`, telling
its supervisor). This procedure ensures that actor system sub-trees terminate
in an orderly fashion, propagating the stop command to the leaves and
collecting their confirmation back to the stopped supervisor. If one of the
actors does not respond (i.e. processing a message for extended periods of time
and therefore not receiving the stop command), this whole process will be
stuck.
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
stopped, and the aforementioned process will ensure proper termination of the
whole system.
The :meth:`postStop()` hook is invoked after an actor is fully stopped. This
enables cleaning up of resources:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#postStop
:exclude: clean-up-some-resources
.. note::
Since stopping an actor is asynchronous, you cannot immediately reuse the
name of the child you just stopped; this will result in an
:class:`InvalidActorNameException`. Instead, :meth:`watch()` the terminating
actor and create its replacement in response to the :class:`Terminated`
message which will eventually arrive.
.. _poison-pill-lambda:
PoisonPill
----------
You can also send an actor the ``akka.actor.PoisonPill`` message, which will
stop the actor when the message is processed. ``PoisonPill`` is enqueued as
ordinary messages and will be handled after messages that were already queued
in the mailbox.
Graceful Stop
-------------
:meth:`gracefulStop` is useful if you need to wait for termination or compose ordered
termination of several actors:
.. includecode:: code/docs/actor/UntypedActorDocTest.java#import-gracefulStop
.. includecode:: code/docs/actor/UntypedActorDocTest.java#gracefulStop
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#gracefulStop-actor
When ``gracefulStop()`` returns successfully, the actors ``postStop()`` hook
will have been executed: there exists a happens-before edge between the end of
``postStop()`` and the return of ``gracefulStop()``.
In the above example a custom ``Manager.Shutdown`` message is sent to the target
actor to initiate the process of stopping the actor. You can use ``PoisonPill`` for
this, but then you have limited possibilities to perform interactions with other actors
before stopping the target actor. Simple cleanup tasks can be handled in ``postStop``.
.. warning::
Keep in mind that an actor stopping and its name being deregistered are
separate events which happen asynchronously from each other. Therefore it may
be that you will find the name still in use after ``gracefulStop()``
returned. In order to guarantee proper deregistration, only reuse names from
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
.. _actor-hotswap-lambda:
Become/Unbecome
===============
Upgrade
-------
Akka supports hotswapping the Actors message loop (e.g. its implementation) at
runtime: invoke the ``context.become`` method from within the Actor.
:meth:`become` takes a ``PartialFunction<Object, BoxedUnit>`` that implements the new
message handler. The hotswapped code is kept in a Stack which can be pushed and
popped.
.. warning::
Please note that the actor will revert to its original behavior when restarted by its Supervisor.
To hotswap the Actor behavior using ``become``:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#hot-swap-actor
This variant of the :meth:`become` method is useful for many different things,
such as to implement a Finite State Machine (FSM, for an example see `Dining
Hakkers`_). It will replace the current behavior (i.e. the top of the behavior
stack), which means that you do not use :meth:`unbecome`, instead always the
next behavior is explicitly installed.
.. _Dining Hakkers: http://typesafe.com/activator/template/akka-sample-fsm-java-lambda
The other way of using :meth:`become` does not replace but add to the top of
the behavior stack. In this case care must be taken to ensure that the number
of “pop” operations (i.e. :meth:`unbecome`) matches the number of “push” ones
in the long run, otherwise this amounts to a memory leak (which is why this
behavior is not the default).
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#swapper
Stash
=====
Stash
=====
The ``AbstractActorWithStash`` class enables an actor to temporarily stash away messages
that can not or should not be handled using the actor's current
behavior. Upon changing the actor's message handler, i.e., right
before invoking ``context().become()`` or ``context().unbecome()``, all
stashed messages can be "unstashed", thereby prepending them to the actor's
mailbox. This way, the stashed messages can be processed in the same
order as they have been received originally. An actor that extends
``AbstractActorWithStash`` will automatically get a deque-based mailbox.
.. note::
The abstract class ``AbstractActorWithStash`` implements the marker
interface ``RequiresMessageQueue<DequeBasedMessageQueueSemantics>``
which requests the system to automatically choose a deque based
mailbox implementation for the actor. If you want more
control over the mailbox, see the documentation on mailboxes: :ref:`mailboxes-java`.
Here is an example of the ``AbstractActorWithStash`` class in action:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/ActorDocTest.java#stash
Invoking ``stash()`` adds the current message (the message that the
actor received last) to the actor's stash. It is typically invoked
when handling the default case in the actor's message handler to stash
messages that aren't handled by the other cases. It is illegal to
stash the same message twice; to do so results in an
``IllegalStateException`` being thrown. The stash may also be bounded
in which case invoking ``stash()`` may lead to a capacity violation,
which results in a ``StashOverflowException``. The capacity of the
stash can be configured using the ``stash-capacity`` setting (an ``Int``) of the
mailbox's configuration.
Invoking ``unstashAll()`` enqueues messages from the stash to the
actor's mailbox until the capacity of the mailbox (if any) has been
reached (note that messages from the stash are prepended to the
mailbox). In case a bounded mailbox overflows, a
``MessageQueueAppendFailedException`` is thrown.
The stash is guaranteed to be empty after calling ``unstashAll()``.
The stash is backed by a ``scala.collection.immutable.Vector``. As a
result, even a very large number of messages may be stashed without a
major impact on performance.
Note that the stash is part of the ephemeral actor state, unlike the
mailbox. Therefore, it should be managed like other parts of the
actor's state which have the same property. The :class:`AbstractActorWithStash`
implementation of :meth:`preRestart` will call ``unstashAll()``, which is
usually the desired behavior.
.. note::
If you want to enforce that your actor can only work with an unbounded stash,
then you should use the ``AbstractActorWithUnboundedStash`` class instead.
.. _killing-actors-lambda:
Killing an Actor
================
You can kill an actor by sending a ``Kill`` message. This will cause the actor
to throw a :class:`ActorKilledException`, triggering a failure. The actor will
suspend operation and its supervisor will be asked how to handle the failure,
which may mean resuming the actor, restarting it or terminating it completely.
See :ref:`supervision-directives` for more information.
Use ``Kill`` like this:
.. includecode:: code/docs/actor/UntypedActorDocTest.java
:include: kill
Actors and exceptions
=====================
It can happen that while a message is being processed by an actor, that some
kind of exception is thrown, e.g. a database exception.
What happens to the Message
---------------------------
If an exception is thrown while a message is being processed (i.e. taken out of
its mailbox and handed over to the current behavior), then this message will be
lost. It is important to understand that it is not put back on the mailbox. So
if you want to retry processing of a message, you need to deal with it yourself
by catching the exception and retry your flow. Make sure that you put a bound
on the number of retries since you don't want a system to livelock (so
consuming a lot of cpu cycles without making progress). Another possibility
would be to have a look at the :ref:`PeekMailbox pattern <mailbox-acking>`.
What happens to the mailbox
---------------------------
If an exception is thrown while a message is being processed, nothing happens to
the mailbox. If the actor is restarted, the same mailbox will be there. So all
messages on that mailbox will be there as well.
What happens to the actor
-------------------------
If code within an actor throws an exception, that actor is suspended and the
supervision process is started (see :ref:`supervision`). Depending on the
supervisors decision the actor is resumed (as if nothing happened), restarted
(wiping out its internal state and starting from scratch) or terminated.
Initialization patterns
=======================
The rich lifecycle hooks of Actors provide a useful toolkit to implement various initialization patterns. During the
lifetime of an ``ActorRef``, an actor can potentially go through several restarts, where the old instance is replaced by
a fresh one, invisibly to the outside observer who only sees the ``ActorRef``.
One may think about the new instances as "incarnations". Initialization might be necessary for every incarnation
of an actor, but sometimes one needs initialization to happen only at the birth of the first instance when the
``ActorRef`` is created. The following sections provide patterns for different initialization needs.
Initialization via constructor
------------------------------
Using the constructor for initialization has various benefits. First of all, it makes it possible to use ``val`` fields to store
any state that does not change during the life of the actor instance, making the implementation of the actor more robust.
The constructor is invoked for every incarnation of the actor, therefore the internals of the actor can always assume
that proper initialization happened. This is also the drawback of this approach, as there are cases when one would
like to avoid reinitializing internals on restart. For example, it is often useful to preserve child actors across
restarts. The following section provides a pattern for this case.
Initialization via preStart
---------------------------
The method ``preStart()`` of an actor is only called once directly during the initialization of the first instance, that
is, at creation of its ``ActorRef``. In the case of restarts, ``preStart()`` is called from ``postRestart()``, therefore
if not overridden, ``preStart()`` is called on every incarnation. However, overriding ``postRestart()`` one can disable
this behavior, and ensure that there is only one call to ``preStart()``.
One useful usage of this pattern is to disable creation of new ``ActorRefs`` for children during restarts. This can be
achieved by overriding ``preRestart()``:
.. includecode:: code/docs/actor/InitializationDocSpecJava.java#preStartInit
Please note, that the child actors are *still restarted*, but no new ``ActorRef`` is created. One can recursively apply
the same principles for the children, ensuring that their ``preStart()`` method is called only at the creation of their
refs.
For more information see :ref:`supervision-restart`.
Initialization via message passing
----------------------------------
There are cases when it is impossible to pass all the information needed for actor initialization in the constructor,
for example in the presence of circular dependencies. In this case the actor should listen for an initialization message,
and use ``become()`` or a finite state-machine state transition to encode the initialized and uninitialized states
of the actor.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/InitializationDocTest.java#messageInit
If the actor may receive messages before it has been initialized, a useful tool can be the ``Stash`` to save messages
until the initialization finishes, and replaying them after the actor became initialized.
.. warning::
This pattern should be used with care, and applied only when none of the patterns above are applicable. One of
the potential issues is that messages might be lost when sent to remote actors. Also, publishing an ``ActorRef`` in
an uninitialized state might lead to the condition that it receives a user message before the initialization has been
done.
.. _actor-performance-lambda:
Lambdas and Performance
=======================
There is one big difference between the optimized partial functions created by the Scala compiler and the ones created by the
``ReceiveBuilder``. The partial functions created by the ``ReceiveBuilder`` consist of multiple lambda expressions for every match
statement, where each lambda is an object referencing the code to be run. This is something that the JVM can have problems
optimizing and the resulting code might not be as performant as the Scala equivalent or the corresponding
:ref:`untyped actor <untyped-actors-java>` version.

View file

@ -1,7 +1,7 @@
.. _lambda-fsm-java:
.. _lambda-fsm:
################################
FSM (Java 8 with Lambda Support)
FSM (Java with Lambda Support)
################################
@ -30,11 +30,11 @@ send them on after the burst ended or a flush request is received.
First, consider all of the below to use these import statements:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java#simple-imports
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#simple-imports
The contract of our “Buncher” actor is that it accepts or produces the following messages:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Events.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Events.java
:include: simple-events
:exclude: boilerplate
@ -42,11 +42,10 @@ The contract of our “Buncher” actor is that it accepts or produces the follo
``Batches`` to be passed on; ``Queue`` will add to the internal queue while
``Flush`` will mark the end of a burst.
The actor can be in two states: no message queued (aka ``Idle``) or some
message queued (aka ``Active``). The states and the state data is defined like this:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
:include: simple-state
:exclude: boilerplate
@ -57,9 +56,9 @@ reference to send the batches to and the actual queue of messages.
Now lets take a look at the skeleton for our FSM actor:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
:include: simple-fsm
:exclude: transition-elided,unhandled-elided,termination-elided
:exclude: transition-elided,unhandled-elided
The basic strategy is to declare the actor, by inheriting the :class:`AbstractFSM` class
and specifying the possible states and data values as type parameters. Within
@ -86,7 +85,7 @@ shall work identically in both states, we make use of the fact that any event
which is not handled by the ``when()`` block is passed to the
``whenUnhandled()`` block:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java#unhandled-elided
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#unhandled-elided
The first case handled here is adding ``Queue()`` requests to the internal
queue and going to the ``Active`` state (this does the obvious thing of staying
@ -100,17 +99,18 @@ target, for which we use the ``onTransition`` mechanism: you can declare
multiple such blocks and all of them will be tried for matching behavior in
case a state transition occurs (i.e. only when the state actually changes).
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java#transition-elided
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java#transition-elided
The transition callback is a partial function which takes as input a pair of
states—the current and the next state. During the state change, the old state
data is available via ``stateData`` as shown, and the new state data would be
available as ``nextStateData``.
TODO
----
To verify that this buncher actually works, it is quite easy to write a test
using the :ref:`akka-testkit`, here using JUnit as an example:
Add the test here
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/BuncherTest.java
:include: test-code
Reference
=========
@ -121,7 +121,7 @@ The AbstractFSM Class
The :class:`AbstractFSM` abstract class is the base class used to implement an FSM. It implements
Actor since an Actor is created to drive the FSM.
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
:include: simple-fsm
:exclude: fsm-body
@ -170,9 +170,10 @@ may be changed during action processing with
e.g. via external message.
The :meth:`stateFunction` argument is a :class:`PartialFunction[Event, State]`,
which is conveniently given using state function builder syntax as demonstrated below:
which is conveniently given using the state function builder syntax as
demonstrated below:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/Buncher.java
:include: when-syntax
.. warning::
@ -184,14 +185,14 @@ It is recommended practice to declare the states as an enum and then verify that
``when`` clause for each of the states. If you want to leave the handling of a state
“unhandled” (more below), it still needs to be declared like this:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java#NullFunction
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java#NullFunction
Defining the Initial State
--------------------------
Each FSM needs a starting point, which is declared using
:func:`startWith(state, data[, timeout]);`
:func:`startWith(state, data[, timeout])`
The optionally given timeout argument overrides any specification given for the
desired initial state. If you want to cancel a default timeout, use
@ -204,7 +205,7 @@ If a state doesn't handle a received event a warning is logged. If you want to
do something else in this case you can specify that with
:func:`whenUnhandled(stateFunction)`:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: unhandled-syntax
Within this handler the state of the FSM may be queried using the
@ -246,9 +247,9 @@ of the modifiers described in the following:
This modifier sends a reply to the currently processed message and otherwise
does not modify the state transition.
All modifier can be chained to achieve a nice and concise description:
All modifiers can be chained to achieve a nice and concise description:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: modifier-syntax
The parentheses are not actually needed in all cases, but they visually
@ -285,14 +286,15 @@ The handler is a partial function which takes a pair of states as input; no
resulting state is needed as it is not possible to modify the transition in
progress.
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: transition-syntax
The convenience extractor :obj:`->` enables decomposition of the pair of states
with a clear visual reminder of the transition's direction. As usual in pattern
matches, an underscore may be used for irrelevant parts; alternatively you
could bind the unconstrained state to a variable, e.g. for logging as shown in
the last case.
It is also possible to pass a function object accepting two states to
:func:`onTransition`, in case your transition handling logic is implemented as
a method:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: alt-transition-syntax
The handlers registered with this method are stacked, so you can intersperse
:func:`onTransition` blocks with :func:`when` blocks as suits your design. It
@ -367,14 +369,14 @@ state data which is available during termination handling.
the same way as a state transition (but note that the ``return`` statement
may not be used within a :meth:`when` block).
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: stop-syntax
You can use :func:`onTermination(handler)` to specify custom code that is
executed when the FSM is stopped. The handler is a partial function which takes
a :class:`StopEvent(reason, stateName, stateData)` as argument:
.. includecode:: ../../../akka-samples/akka-sample-java8/src/main/java/sample/java8/buncher/Buncher.java
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: termination-syntax
As for the :func:`whenUnhandled` case, this handler is not stacked, so each
@ -398,5 +400,57 @@ implementation by the :class:`AbstractFSM` class is to execute the
Testing and Debugging Finite State Machines
===========================================
TODO
----
During development and for trouble shooting FSMs need care just as any other
actor. There are specialized tools available as described in :ref:`TestFSMRef`
and in the following.
Event Tracing
-------------
The setting ``akka.actor.debug.fsm`` in :ref:`configuration` enables logging of an
event trace by :class:`LoggingFSM` instances:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: logging-fsm
:exclude: body-elided
This FSM will log at DEBUG level:
* all processed events, including :obj:`StateTimeout` and scheduled timer
messages
* every setting and cancellation of named timers
* all state transitions
Life cycle changes and special messages can be logged as described for
:ref:`Actors <actor.logging-scala>`.
Rolling Event Log
-----------------
The :class:`AbstractLoggingFSM` class adds one more feature to the FSM: a rolling event
log which may be used during debugging (for tracing how the FSM entered a
certain failure state) or for other creative uses:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/actor/fsm/FSMDocTest.java
:include: logging-fsm
The :meth:`logDepth` defaults to zero, which turns off the event log.
.. warning::
The log buffer is allocated during actor creation, which is why the
configuration is done using a virtual method call. If you want to override
with a ``val``, make sure that its initialization happens before the
initializer of :class:`LoggingFSM` runs, and do not change the value returned
by ``logDepth`` after the buffer has been allocated.
The contents of the event log are available using method :meth:`getLog`, which
returns an :class:`IndexedSeq[LogEntry]` where the oldest entry is at index
zero.
Examples
========
A bigger FSM example contrasted with Actor's :meth:`become`/:meth:`unbecome` can be found in
the `Typesafe Activator <http://typesafe.com/platform/getstarted>`_ template named
`Akka FSM in Scala <http://typesafe.com/activator/template/akka-sample-fsm-java-lambda>`_

View file

@ -185,11 +185,11 @@ to have them converted into actual Debug messages).
In addition, it offers:
* :obj:`getSelf()` reference to the :class:`ActorRef` of the actor
* :meth:`getSelf()` reference to the :class:`ActorRef` of the actor
* :obj:`getSender()` reference sender Actor of the last received message, typically used as described in :ref:`UntypedActor.Reply`
* :meth:`getSender()` reference sender Actor of the last received message, typically used as described in :ref:`UntypedActor.Reply`
* :obj:`supervisorStrategy()` user overridable definition the strategy to use for supervising child actors
* :meth:`supervisorStrategy()` user overridable definition the strategy to use for supervising child actors
This strategy is typically declared inside the actor in order to have access
to the actors internal state within the decider function: since failure is
@ -200,7 +200,7 @@ In addition, it offers:
occurred within a distant descendant it is still reported one level up at a
time).
* :obj:`getContext()` exposes contextual information for the actor and the current message, such as:
* :meth:`getContext()` exposes contextual information for the actor and the current message, such as:
* factory methods to create child actors (:meth:`actorOf`)
* system that the actor belongs to
@ -273,8 +273,8 @@ the whole functionality):
It should be noted that the :class:`Terminated` message is generated
independent of the order in which registration and termination occur.
In particular, the watching actor will receive a :class:`Terminated` message even if the
watched actor has already been terminated at the time of registration.
In particular, the watching actor will receive a :class:`Terminated` message
even if the watched actor has already been terminated at the time of registration.
Registering multiple times does not necessarily lead to multiple messages being
generated, but there is no guarantee that only exactly one such message is
@ -402,8 +402,8 @@ actors which are traversed in the sense that if a concrete name lookup fails
negative result is generated. Please note that this does not mean that delivery
of that reply is guaranteed, it still is a normal message.
.. includecode:: code/docs/actor/UntypedActorDocTest.java
:include: import-identify,identify
.. includecode:: code/docs/actor/UntypedActorDocTest.java#import-identify
.. includecode:: code/docs/actor/UntypedActorDocTest.java#identify
You can also acquire an :class:`ActorRef` for an :class:`ActorSelection` with
the ``resolveOne`` method of the :class:`ActorSelection`. It returns a ``Future``
@ -772,7 +772,7 @@ major impact on performance.
Note that the stash is part of the ephemeral actor state, unlike the
mailbox. Therefore, it should be managed like other parts of the
actor's state which have the same property. The :class:`Stash` traits
actor's state which have the same property. The :class:`UntypedActorWithStash`
implementation of :meth:`preRestart` will call ``unstashAll()``, which is
usually the desired behavior.

View file

@ -82,23 +82,6 @@ class DemoActorWrapper extends Actor {
def receive = Actor.emptyBehavior
}
class AnonymousActor extends Actor {
//#anonymous-actor
def receive = {
case m: DoIt =>
context.actorOf(Props(new Actor {
def receive = {
case DoIt(msg) =>
val replyMsg = doSomeDangerousWork(msg)
sender() ! replyMsg
context.stop(self)
}
def doSomeDangerousWork(msg: ImmutableMessage): String = { "done" }
})) forward m
}
//#anonymous-actor
}
class Hook extends Actor {
var child: ActorRef = _
//#preStart
@ -283,15 +266,6 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.stop(myActor)
}
"creating actor with constructor" in {
//#creating-constructor
// allows passing in arguments to the MyActor constructor
val myActor = system.actorOf(Props[MyActor], name = "myactor")
//#creating-constructor
system.stop(myActor)
}
"creating a Props config" in {
//#creating-props
import akka.actor.Props

View file

@ -251,7 +251,7 @@ of the modifiers described in the following:
This modifier sends a reply to the currently processed message and otherwise
does not modify the state transition.
All modifier can be chained to achieve a nice and concise description:
All modifiers can be chained to achieve a nice and concise description:
.. includecode:: code/docs/actor/FSMDocSpec.scala
:include: modifier-syntax

View file

@ -1,6 +1,6 @@
name := "akka-sample-java8"
name := "akka-docs-java-lambda"
version := "0.0.1-SNAPSHOT"
version := "2.3-SNAPSHOT"
scalaVersion := "2.10.3"

View file

@ -1,4 +1,3 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
@ -10,9 +9,9 @@
</properties>
<groupId>sample</groupId>
<artifactId>akka-sample-java8</artifactId>
<artifactId>akka-docs-java-lambda</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<version>2.3-SNAPSHOT</version>
<dependencies>
<dependency>
@ -52,4 +51,3 @@
</build>
</project>

View file

@ -0,0 +1,561 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import akka.actor.*;
import akka.event.LoggingAdapter;
import akka.event.Logging;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actor.Messages.Swap.Swap;
import static docs.actor.Messages.*;
import java.util.concurrent.TimeUnit;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
//#import-props
import akka.actor.Props;
//#import-props
//#import-actorRef
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
//#import-actorRef
//#import-identify
import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
//#import-identify
//#import-graceFulStop
import akka.pattern.AskTimeoutException;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-graceFulStop
public class ActorDocTest {
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("ActorDocTest");
}
@AfterClass
public static void afterClass() {
system.shutdown();
system.awaitTermination(Duration.create("5 seconds"));
}
static
//#context-actorOf
public class FirstActor extends AbstractActor {
final ActorRef child = context().actorOf(Props.create(MyActor.class), "myChild");
//#plus-some-behavior
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchAny(x -> {
sender().tell(x, self());
}).build();
}
//#plus-some-behavior
}
//#context-actorOf
static public abstract class ReceiveActor extends AbstractActor {
@Override
//#receive
public abstract PartialFunction<Object, BoxedUnit> receive();
//#receive
}
static public class ActorWithArgs extends AbstractActor {
private final String args;
ActorWithArgs(String args) {
this.args = args;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchAny(x -> { }).build();
}
}
static
//#props-factory
public class DemoActor extends AbstractActor {
/**
* Create Props for an actor of this type.
* @param magicNumber The magic number to be passed to this actors constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
static Props props(Integer magicNumber) {
// You need to specify the actual type of the returned actor
// since Java 8 lambdas have some runtime type information erased
return Props.create(DemoActor.class, () -> new DemoActor(magicNumber));
}
private final Integer magicNumber;
DemoActor(Integer magicNumber) {
this.magicNumber = magicNumber;
}
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Integer.class, i -> {
sender().tell(i + magicNumber, self());
}).build();
}
}
//#props-factory
static
//#props-factory
public class SomeOtherActor extends AbstractActor {
// Props(new DemoActor(42)) would not be safe
ActorRef demoActor = context().actorOf(DemoActor.props(42), "demo");
// ...
//#props-factory
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
}
//#props-factory
}
//#props-factory
public static class Hook extends AbstractActor {
ActorRef target = null;
//#preStart
@Override
public void preStart() {
target = context().actorOf(Props.create(MyActor.class, "target"));
}
//#preStart
public PartialFunction<Object, BoxedUnit> receive() {
return emptyBehavior();
}
//#postStop
@Override
public void postStop() {
//#clean-up-some-resources
final String message = "stopped";
//#tell
// dont forget to think about who is the sender (2nd argument)
target.tell(message, self());
//#tell
final Object result = "";
//#forward
target.forward(result, context());
//#forward
target = null;
//#clean-up-some-resources
}
//#postStop
// compilation test only
public void compileSelections() {
//#selection-local
// will look up this absolute path
context().actorSelection("/user/serviceA/actor");
// will look up sibling beneath same supervisor
context().actorSelection("../joe");
//#selection-local
//#selection-wildcard
// will look all children to serviceB with names starting with worker
context().actorSelection("/user/serviceB/worker*");
// will look up all siblings beneath same supervisor
context().actorSelection("../*");
//#selection-wildcard
//#selection-remote
context().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
//#selection-remote
}
}
public static class ReplyException extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchAny(x -> {
//#reply-exception
try {
String result = operation();
sender().tell(result, self());
} catch (Exception e) {
sender().tell(new akka.actor.Status.Failure(e), self());
throw e;
}
//#reply-exception
}).build();
}
private String operation() {
return "Hi";
}
}
static
//#gracefulStop-actor
public class Manager extends AbstractActor {
private static enum Shutdown {
Shutdown
}
public static final Shutdown SHUTDOWN = Shutdown.Shutdown;
private ActorRef worker =
context().watch(context().actorOf(Props.create(Cruncher.class), "worker"));
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("job", s -> {
worker.tell("crunch", self());
}).
matchEquals(SHUTDOWN, x -> {
worker.tell(PoisonPill.getInstance(), self());
context().become(shuttingDown);
}).build();
}
public PartialFunction<Object, BoxedUnit> shuttingDown =
ReceiveBuilder.
matchEquals("job", s -> {
sender().tell("service unavailable, shutting down", self());
}).
match(Terminated.class, t -> t.actor().equals(worker), t -> {
context().stop(self());
}).build();
}
//#gracefulStop-actor
@Test
public void usePatternsGracefulStop() throws Exception {
ActorRef actorRef = system.actorOf(Props.create(Manager.class));
//#gracefulStop
try {
Future<Boolean> stopped =
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
//#gracefulStop
}
public static class Cruncher extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("crunch", s -> { }).build();
}
}
static
//#swapper
public class Swapper extends AbstractActor {
final LoggingAdapter log = Logging.getLogger(context().system(), this);
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals(Swap, s -> {
log.info("Hi");
context().become(ReceiveBuilder.
matchEquals(Swap, x -> {
log.info("Ho");
context().unbecome(); // resets the latest 'become' (just for fun)
}).build(), false); // push on top instead of replace
}).build();
}
}
//#swapper
static
//#swapper
public class SwapperApp {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("SwapperSystem");
ActorRef swapper = system.actorOf(Props.create(Swapper.class), "swapper");
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
system.shutdown();
}
}
//#swapper
@Test
public void creatingActorWithSystemActorOf() {
//#system-actorOf
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem");
final ActorRef myActor = system.actorOf(Props.create(MyActor.class), "myactor");
//#system-actorOf
try {
new JavaTestKit(system) {
{
myActor.tell("hello", getRef());
expectMsgEquals("hello");
}
};
} finally {
JavaTestKit.shutdownActorSystem(system);
}
}
@Test
public void creatingPropsConfig() {
//#creating-props
Props props1 = Props.create(MyActor.class);
Props props2 = Props.create(ActorWithArgs.class,
() -> new ActorWithArgs("arg")); // careful, see below
Props props3 = Props.create(ActorWithArgs.class, "arg");
//#creating-props
//#creating-props-deprecated
// NOT RECOMMENDED within another actor:
// encourages to close over enclosing class
Props props7 = Props.create(ActorWithArgs.class,
() -> new ActorWithArgs("arg"));
//#creating-props-deprecated
}
@Test(expected=IllegalArgumentException.class)
public void creatingPropsIllegal() {
//#creating-props-illegal
// This will throw an IllegalArgumentException since some runtime
// type information of the lambda is erased.
// Use Props.create(actorClass, Creator) instead.
Props props = Props.create(() -> new ActorWithArgs("arg"));
//#creating-props-illegal
}
static
//#receive-timeout
public class ReceiveTimeoutActor extends AbstractActor {
public ReceiveTimeoutActor() {
// To set an initial delay
context().setReceiveTimeout(Duration.create("10 seconds"));
}
//#receive-timeout
ActorRef target = context().system().deadLetters();
//#receive-timeout
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("Hello", s -> {
// To set in a response to a message
context().setReceiveTimeout(Duration.create("1 second"));
//#receive-timeout
target = sender();
target.tell("Hello world", self());
//#receive-timeout
}).
match(ReceiveTimeout.class, r -> {
// To turn it off
context().setReceiveTimeout(Duration.Undefined());
//#receive-timeout
target.tell("timeout", self());
//#receive-timeout
}).build();
}
}
//#receive-timeout
@Test
public void using_receiveTimeout() {
final ActorRef myActor = system.actorOf(Props.create(ReceiveTimeoutActor.class));
new JavaTestKit(system) {
{
myActor.tell("Hello", getRef());
expectMsgEquals("Hello world");
expectMsgEquals("timeout");
}
};
}
static
//#hot-swap-actor
public class HotSwapActor extends AbstractActor {
private PartialFunction<Object, BoxedUnit> angry;
private PartialFunction<Object, BoxedUnit> happy;
{
angry =
ReceiveBuilder.
matchEquals("foo", s -> {
sender().tell("I am already angry?", self());
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
happy = ReceiveBuilder.
matchEquals("bar", s -> {
sender().tell("I am already happy :-)", self());
}).
matchEquals("foo", s -> {
context().become(angry);
}).build();
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("foo", s -> {
context().become(angry);
}).
matchEquals("bar", s -> {
context().become(happy);
}).build();
}
}
//#hot-swap-actor
@Test
public void using_hot_swap() {
final ActorRef actor = system.actorOf(Props.create(HotSwapActor.class), "hot");
new JavaTestKit(system) {
{
actor.tell("foo", getRef());
actor.tell("foo", getRef());
expectMsgEquals("I am already angry?");
actor.tell("bar", getRef());
actor.tell("bar", getRef());
expectMsgEquals("I am already happy :-)");
actor.tell("foo", getRef());
actor.tell("foo", getRef());
expectMsgEquals("I am already angry?");
expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
}
};
}
static
//#stash
public class ActorWithProtocol extends AbstractActorWithStash {
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("open", s -> {
context().become(ReceiveBuilder.
matchEquals("write", ws -> { /* do writing */ }).
matchEquals("close", cs -> {
unstashAll();
context().unbecome();
}).
matchAny(msg -> stash()).build(), false);
}).
matchAny(msg -> stash()).build();
}
}
//#stash
@Test
public void using_Stash() {
final ActorRef actor = system.actorOf(Props.create(ActorWithProtocol.class), "stash");
}
static
//#watch
public class WatchActor extends AbstractActor {
private final ActorRef child = context().actorOf(Props.empty(), "target");
{
context().watch(child); // <-- this is the only call needed for registration
}
private ActorRef lastSender = system.deadLetters();
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("kill", s -> {
context().stop(child);
lastSender = sender();
}).
match(Terminated.class, t -> t.actor().equals(child), t -> {
lastSender.tell("finished", self());
}).build();
}
}
//#watch
@Test
public void using_watch() {
ActorRef actor = system.actorOf(Props.create(WatchActor.class));
new JavaTestKit(system) {
{
actor.tell("kill", getRef());
expectMsgEquals("finished");
}
};
}
static
//#identify
public class Follower extends AbstractActor {
final Integer identifyId = 1;
{
ActorSelection selection = context().actorSelection("/user/another");
selection.tell(new Identify(identifyId), self());
}
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ActorIdentity.class, id -> id.getRef() != null, id -> {
ActorRef ref = id.getRef();
context().watch(ref);
context().become(active(ref));
}).
match(ActorIdentity.class, id -> id.getRef() == null, id -> {
context().stop(self());
}).build();
}
final PartialFunction<Object, BoxedUnit> active(final ActorRef another) {
return ReceiveBuilder.
match(Terminated.class, t -> t.actor().equals(another), t -> {
context().stop(self());
}).build();
}
}
//#identify
@Test
public void using_Identify() {
ActorRef a = system.actorOf(Props.empty());
ActorRef b = system.actorOf(Props.create(Follower.class));
new JavaTestKit(system) {
{
watch(b);
system.stop(a);
assertEquals(expectMsgClass(Duration.create(2, TimeUnit.SECONDS), Terminated.class).actor(), b);
}
};
}
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class InitializationDocTest {
static ActorSystem system = null;
@BeforeClass
public static void beforeClass() {
system = ActorSystem.create("InitializationDocTest");
}
@AfterClass
public static void afterClass() {
system.shutdown();
system.awaitTermination(Duration.create("5 seconds"));
}
public static class MessageInitExample extends AbstractActor {
//#messageInit
private String initializeMe = null;
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
matchEquals("init", m1 -> {
initializeMe = "Up and running";
context().become(ReceiveBuilder.
matchEquals("U OK?", m2 -> {
sender().tell(initializeMe, self());
}).build());
}).build();
}
//#messageInit
}
@Test
public void testIt() {
new JavaTestKit(system) {{
ActorRef testactor = system.actorOf(Props.create(MessageInitExample.class), "testactor");
String msg = "U OK?";
testactor.tell(msg, getRef());
expectNoMsg(Duration.create(1, TimeUnit.SECONDS));
testactor.tell("init", getRef());
testactor.tell(msg, getRef());
expectMsgEquals("Up and running");
}};
}
}

View file

@ -0,0 +1,149 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Messages {
static
//#immutable-message
public class ImmutableMessage {
private final int sequenceNumber;
private final List<String> values;
public ImmutableMessage(int sequenceNumber, List<String> values) {
this.sequenceNumber = sequenceNumber;
this.values = Collections.unmodifiableList(new ArrayList<String>(values));
}
public int getSequenceNumber() {
return sequenceNumber;
}
public List<String> getValues() {
return values;
}
}
//#immutable-message
public static class DoIt {
private final ImmutableMessage msg;
DoIt(ImmutableMessage msg) {
this.msg = msg;
}
public ImmutableMessage getMsg() {
return msg;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DoIt doIt = (DoIt) o;
if (!msg.equals(doIt.msg)) return false;
return true;
}
@Override
public int hashCode() {
return msg.hashCode();
}
@Override
public String toString() {
return "DoIt{" +
"msg=" + msg +
'}';
}
}
public static class Message {
final String str;
Message(String str) {
this.str = str;
}
public String getStr() {
return str;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message = (Message) o;
if (!str.equals(message.str)) return false;
return true;
}
@Override
public int hashCode() {
return str.hashCode();
}
@Override
public String toString() {
return "Message{" +
"str='" + str + '\'' +
'}';
}
}
public static enum Swap {
Swap
}
public static class Result {
final String x;
final String s;
public Result(String x, String s) {
this.x = x;
this.s = s;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((s == null) ? 0 : s.hashCode());
result = prime * result + ((x == null) ? 0 : x.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Result other = (Result) obj;
if (s == null) {
if (other.s != null)
return false;
} else if (!s.equals(other.s))
return false;
if (x == null) {
if (other.x != null)
return false;
} else if (!x.equals(other.x))
return false;
return true;
}
}
}

View file

@ -1,4 +1,8 @@
package sample.java8;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#imports
import akka.actor.AbstractActor;
@ -17,7 +21,14 @@ public class MyActor extends AbstractActor {
@Override
public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(String.class, s -> s.equals("test"), s -> log.info("received test")).
match(String.class, s -> {
log.info("Received String message: {}", s);
//#my-actor
//#reply
sender().tell(s, self());
//#reply
//#my-actor
}).
matchAny(o -> log.info("received unknown message")).build();
}
}

View file

@ -1,4 +1,8 @@
package sample.java8;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
//#sample-actor
import akka.actor.AbstractActor;

View file

@ -1,10 +1,13 @@
package sample;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import sample.java8.SampleActor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

View file

@ -1,4 +1,8 @@
package sample.java8.buncher;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.fsm;
//#simple-imports
import akka.actor.AbstractFSM;
@ -10,15 +14,14 @@ import java.util.List;
import scala.concurrent.duration.Duration;
//#simple-imports
import static sample.java8.buncher.Buncher.State;
import static sample.java8.buncher.Buncher.Data;
import static sample.java8.buncher.Buncher.State.*;
import static sample.java8.buncher.Buncher.Uninitialized.*;
import static sample.java8.buncher.Events.*;
import static docs.actor.fsm.Buncher.Data;
import static docs.actor.fsm.Buncher.State.*;
import static docs.actor.fsm.Buncher.State;
import static docs.actor.fsm.Buncher.Uninitialized.*;
import static docs.actor.fsm.Events.*;
//#simple-fsm
public class Buncher extends AbstractFSM<State, Data> {
{
//#fsm-body
startWith(Idle, Uninitialized);
@ -26,7 +29,8 @@ public class Buncher extends AbstractFSM<State, Data> {
//#when-syntax
when(Idle,
matchEvent(SetTarget.class, Uninitialized.class,
(setTarget, uninitialized) -> stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
(setTarget, uninitialized) ->
stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
//#when-syntax
//#transition-elided
@ -50,25 +54,14 @@ public class Buncher extends AbstractFSM<State, Data> {
matchEvent(Queue.class, Todo.class,
(queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj()))).
anyEvent((event, state) -> {
log().warning("received unhandled request {} in state {}/{}", event, stateName(), state);
log().warning("received unhandled request {} in state {}/{}",
event, stateName(), state);
return stay();
}));
//#unhandled-elided
//#termination-elided
//#termination-syntax
onTermination(
matchStop(Normal(),
(state, data) -> {/* Do something here */}).
stop(Shutdown(),
(state, data) -> {/* Do something here */}).
stop(Failure(),
(reason, state, data) -> {/* Do something here */}));
//#termination-syntax
//#termination-elided
//#fsm-body
initialize();
//#fsm-body
}
//#simple-fsm
@ -141,15 +134,3 @@ public class Buncher extends AbstractFSM<State, Data> {
//#simple-fsm
}
//#simple-fsm
//#NullFunction
//#unhandled-syntax
//#modifier-syntax
//#transition-syntax
//#stop-syntax
// TODO add code example here
//#stop-syntax
//#transition-syntax
//#modifier-syntax
//#unhandled-syntax
//#NullFunction

View file

@ -1,4 +1,8 @@
package sample;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.fsm;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@ -9,12 +13,13 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.LinkedList;
import sample.java8.buncher.*;
import static sample.java8.buncher.Events.Batch;
import static sample.java8.buncher.Events.Queue;
import static sample.java8.buncher.Events.SetTarget;
import static sample.java8.buncher.Events.Flush.Flush;
import docs.actor.fsm.*;
import static docs.actor.fsm.Events.Batch;
import static docs.actor.fsm.Events.Queue;
import static docs.actor.fsm.Events.SetTarget;
import static docs.actor.fsm.Events.Flush.Flush;
//#test-code
public class BuncherTest {
static ActorSystem system;
@ -31,10 +36,11 @@ public class BuncherTest {
}
@Test
public void testBuncherActor()
public void testBuncherActorBatchesCorrectly()
{
new JavaTestKit(system) {{
final ActorRef buncher = system.actorOf(Props.create(Buncher.class), "buncher-actor");
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));
final ActorRef probe = getRef();
buncher.tell(new SetTarget(probe), probe);
@ -56,4 +62,19 @@ public class BuncherTest {
system.stop(buncher);
}};
}
@Test
public void testBuncherActorDoesntBatchUninitialized()
{
new JavaTestKit(system) {{
final ActorRef buncher =
system.actorOf(Props.create(Buncher.class));
final ActorRef probe = getRef();
buncher.tell(new Queue(42), probe);
expectNoMsg();
system.stop(buncher);
}};
}
}
//#test-code

View file

@ -1,4 +1,8 @@
package sample.java8.buncher;
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.fsm;
import akka.actor.ActorRef;
import java.util.List;

View file

@ -0,0 +1,179 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.actor.fsm;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import static org.junit.Assert.*;
import static docs.actor.fsm.FSMDocTest.StateType.*;
import static docs.actor.fsm.FSMDocTest.Messages.*;
import static java.util.concurrent.TimeUnit.*;
public class FSMDocTest {
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("FSMDocTest");
}
@AfterClass
public static void tearDown() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
public static enum StateType {
SomeState,
Processing,
Idle,
Active,
Error
}
public static enum Messages {
WillDo,
Tick
}
public static enum Data {
Foo,
Bar
};
public static interface X {};
public static class DummyFSM extends AbstractFSM<StateType, Integer> {
Integer newData = 42;
//#alt-transition-syntax
public void handler(StateType from, StateType to) {
// handle transition here
}
//#alt-transition-syntax
{
//#modifier-syntax
when(SomeState, matchAnyEvent((msg, data) -> {
return goTo(Processing).using(newData).
forMax(Duration.create(5, SECONDS)).replying(WillDo);
}));
//#modifier-syntax
//#NullFunction
when(SomeState, AbstractFSM.NullFunction());
//#NullFunction
//#transition-syntax
onTransition(
matchState(Active, Idle, () -> setTimer("timeout",
Tick, Duration.create(1, SECONDS), true)).
state(Active, null, () -> cancelTimer("timeout")).
state(null, Idle, (f, t) -> log().info("entering Idle from " + f)));
//#transition-syntax
//#alt-transition-syntax
onTransition(this::handler);
//#alt-transition-syntax
//#stop-syntax
when(Error, matchEventEquals("stop", (data) -> {
// do cleanup ...
return stop();
}));
//#stop-syntax
//#termination-syntax
onTermination(
matchStop(Normal(),
(state, data) -> {/* Do something here */}).
stop(Shutdown(),
(state, data) -> {/* Do something here */}).
stop(Failure.class,
(reason, state, data) -> {/* Do something here */}));
//#termination-syntax
//#unhandled-syntax
whenUnhandled(
matchEvent(X.class, null, (x, data) -> {
log().info("Received unhandled event: " + x);
return stay();
}).
anyEvent((event, data) -> {
log().warning("Received unknown event: " + event);
return goTo(Error);
}));
}
//#unhandled-syntax
}
static
//#logging-fsm
public class MyFSM extends AbstractLoggingFSM<StateType, Data> {
//#body-elided
//#logging-fsm
ActorRef target = null;
//#logging-fsm
@Override
public int logDepth() { return 12; }
{
onTermination(
matchStop(Failure.class, (reason, state, data) -> {
String lastEvents = getLog().mkString("\n\t");
log().warning("Failure in state " + state + " with data " + data + "\n" +
"Events leading up to this point:\n\t" + lastEvents);
//#logging-fsm
target.tell(reason.cause(), self());
target.tell(state, self());
target.tell(data, self());
target.tell(lastEvents, self());
//#logging-fsm
})
);
//...
//#logging-fsm
startWith(SomeState, Data.Foo);
when(SomeState, matchEvent(ActorRef.class, Data.class, (ref, data) -> {
target = ref;
target.tell("going active", self());
return goTo(Active);
}));
when(Active, matchEventEquals("stop", (data) -> {
target.tell("stopping", self());
return stop(new Failure("This is not the error you're looking for"));
}));
initialize();
//#logging-fsm
}
//#body-elided
}
//#logging-fsm
@Test
public void testLoggingFSM()
{
new JavaTestKit(system) {{
final ActorRef logger =
system.actorOf(Props.create(MyFSM.class));
final ActorRef probe = getRef();
logger.tell(probe, probe);
expectMsgEquals("going active");
logger.tell("stop", probe);
expectMsgEquals("stopping");
expectMsgEquals("This is not the error you're looking for");
expectMsgEquals(Active);
expectMsgEquals(Data.Foo);
String msg = expectMsgClass(String.class);
assertThat(msg, CoreMatchers.startsWith("LogEntry(SomeState,Foo,Actor[akka://FSMDocTest/system/"));
}};
}
}

View file

@ -1,13 +0,0 @@
Copyright 2014 Typesafe, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -88,7 +88,7 @@ check "$java8_path"
check mvn
# now do some work
tmp="$script_dir/../../akka-samples/akka-sample-java8"
tmp="$script_dir/../../akka-samples/akka-docs-java-lambda"
try cd "$tmp" "can't step into project directory: $tmp"
export JAVA_HOME="$java8_home"
try mvn clean test "mvn execution in akka-sample-java8 failed"
try mvn clean test "mvn execution in akka-docs-java-lambda failed"