Merge branch 'master' into wip-1735-fix-pipe-to-√
This commit is contained in:
commit
0c02adaa75
26 changed files with 173 additions and 251 deletions
|
|
@ -7,7 +7,6 @@ package akka.actor
|
||||||
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import TestEvent.Mute
|
import TestEvent.Mute
|
||||||
import FSM._
|
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.event._
|
import akka.event._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
@ -52,7 +51,7 @@ object FSMActorSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case Event("hello", _) ⇒ stay replying "world"
|
case Event("hello", _) ⇒ stay replying "world"
|
||||||
case Event("bye", _) ⇒ stop(Shutdown)
|
case Event("bye", _) ⇒ stop(FSM.Shutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
when(Open) {
|
when(Open) {
|
||||||
|
|
@ -63,7 +62,7 @@ object FSMActorSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
whenUnhandled {
|
whenUnhandled {
|
||||||
case Ev(msg) ⇒ {
|
case Event(msg, _) ⇒ {
|
||||||
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
|
log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData)
|
||||||
unhandledLatch.open
|
unhandledLatch.open
|
||||||
stay
|
stay
|
||||||
|
|
@ -82,7 +81,7 @@ object FSMActorSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
onTermination {
|
onTermination {
|
||||||
case StopEvent(Shutdown, Locked, _) ⇒
|
case StopEvent(FSM.Shutdown, Locked, _) ⇒
|
||||||
// stop is called from lockstate with shutdown as reason...
|
// stop is called from lockstate with shutdown as reason...
|
||||||
terminatedLatch.open
|
terminatedLatch.open
|
||||||
}
|
}
|
||||||
|
|
@ -110,6 +109,8 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
|
|
||||||
"unlock the lock" in {
|
"unlock the lock" in {
|
||||||
|
|
||||||
|
import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack }
|
||||||
|
|
||||||
val latches = new Latches
|
val latches = new Latches
|
||||||
import latches._
|
import latches._
|
||||||
|
|
||||||
|
|
@ -163,7 +164,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
val fsm = TestActorRef(new Actor with FSM[Int, Null] {
|
val fsm = TestActorRef(new Actor with FSM[Int, Null] {
|
||||||
startWith(1, null)
|
startWith(1, null)
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev("go") ⇒ goto(2)
|
case Event("go", _) ⇒ goto(2)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
val name = fsm.path.toString
|
val name = fsm.path.toString
|
||||||
|
|
@ -182,7 +183,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
lazy val fsm = new Actor with FSM[Int, Null] {
|
lazy val fsm = new Actor with FSM[Int, Null] {
|
||||||
override def preStart = { started.countDown }
|
override def preStart = { started.countDown }
|
||||||
startWith(1, null)
|
startWith(1, null)
|
||||||
when(1) { NullFunction }
|
when(1) { FSM.NullFunction }
|
||||||
onTermination {
|
onTermination {
|
||||||
case x ⇒ testActor ! x
|
case x ⇒ testActor ! x
|
||||||
}
|
}
|
||||||
|
|
@ -190,7 +191,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
val ref = system.actorOf(Props(fsm))
|
val ref = system.actorOf(Props(fsm))
|
||||||
Await.ready(started, timeout.duration)
|
Await.ready(started, timeout.duration)
|
||||||
system.stop(ref)
|
system.stop(ref)
|
||||||
expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null))
|
expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null))
|
||||||
}
|
}
|
||||||
|
|
||||||
"log events and transitions if asked to do so" in {
|
"log events and transitions if asked to do so" in {
|
||||||
|
|
@ -204,12 +205,12 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
|
val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] {
|
||||||
startWith(1, null)
|
startWith(1, null)
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev("go") ⇒
|
case Event("go", _) ⇒
|
||||||
setTimer("t", Shutdown, 1.5 seconds, false)
|
setTimer("t", FSM.Shutdown, 1.5 seconds, false)
|
||||||
goto(2)
|
goto(2)
|
||||||
}
|
}
|
||||||
when(2) {
|
when(2) {
|
||||||
case Ev("stop") ⇒
|
case Event("stop", _) ⇒
|
||||||
cancelTimer("t")
|
cancelTimer("t")
|
||||||
stop
|
stop
|
||||||
}
|
}
|
||||||
|
|
@ -230,7 +231,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
|
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
|
||||||
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
|
case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true
|
||||||
}
|
}
|
||||||
expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), Normal)
|
expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), FSM.Normal)
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1 second)
|
||||||
system.eventStream.unsubscribe(testActor)
|
system.eventStream.unsubscribe(testActor)
|
||||||
}
|
}
|
||||||
|
|
@ -251,6 +252,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
|
||||||
})
|
})
|
||||||
fsmref ! "log"
|
fsmref ! "log"
|
||||||
val fsm = fsmref.underlyingActor
|
val fsm = fsmref.underlyingActor
|
||||||
|
import FSM.LogEntry
|
||||||
expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log")))
|
expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log")))
|
||||||
fsmref ! "count"
|
fsmref ! "count"
|
||||||
fsmref ! "log"
|
fsmref ! "log"
|
||||||
|
|
|
||||||
|
|
@ -160,37 +160,37 @@ object FSMTimingSpec {
|
||||||
|
|
||||||
startWith(Initial, 0)
|
startWith(Initial, 0)
|
||||||
when(Initial) {
|
when(Initial) {
|
||||||
case Ev(TestSingleTimer) ⇒
|
case Event(TestSingleTimer, _) ⇒
|
||||||
setTimer("tester", Tick, 500 millis, false)
|
setTimer("tester", Tick, 500 millis, false)
|
||||||
goto(TestSingleTimer)
|
goto(TestSingleTimer)
|
||||||
case Ev(TestRepeatedTimer) ⇒
|
case Event(TestRepeatedTimer, _) ⇒
|
||||||
setTimer("tester", Tick, 100 millis, true)
|
setTimer("tester", Tick, 100 millis, true)
|
||||||
goto(TestRepeatedTimer) using 4
|
goto(TestRepeatedTimer) using 4
|
||||||
case Ev(TestStateTimeoutOverride) ⇒
|
case Event(TestStateTimeoutOverride, _) ⇒
|
||||||
goto(TestStateTimeout) forMax (Duration.Inf)
|
goto(TestStateTimeout) forMax (Duration.Inf)
|
||||||
case Ev(x: FSMTimingSpec.State) ⇒ goto(x)
|
case Event(x: FSMTimingSpec.State, _) ⇒ goto(x)
|
||||||
}
|
}
|
||||||
when(TestStateTimeout, stateTimeout = 500 millis) {
|
when(TestStateTimeout, stateTimeout = 500 millis) {
|
||||||
case Ev(StateTimeout) ⇒ goto(Initial)
|
case Event(StateTimeout, _) ⇒ goto(Initial)
|
||||||
case Ev(Cancel) ⇒ goto(Initial) replying (Cancel)
|
case Event(Cancel, _) ⇒ goto(Initial) replying (Cancel)
|
||||||
}
|
}
|
||||||
when(TestSingleTimer) {
|
when(TestSingleTimer) {
|
||||||
case Ev(Tick) ⇒
|
case Event(Tick, _) ⇒
|
||||||
tester ! Tick
|
tester ! Tick
|
||||||
goto(Initial)
|
goto(Initial)
|
||||||
}
|
}
|
||||||
when(TestCancelTimer) {
|
when(TestCancelTimer) {
|
||||||
case Ev(Tick) ⇒
|
case Event(Tick, _) ⇒
|
||||||
setTimer("hallo", Tock, 1 milli, false)
|
setTimer("hallo", Tock, 1 milli, false)
|
||||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||||
cancelTimer("hallo")
|
cancelTimer("hallo")
|
||||||
sender ! Tick
|
sender ! Tick
|
||||||
setTimer("hallo", Tock, 500 millis, false)
|
setTimer("hallo", Tock, 500 millis, false)
|
||||||
stay
|
stay
|
||||||
case Ev(Tock) ⇒
|
case Event(Tock, _) ⇒
|
||||||
tester ! Tock
|
tester ! Tock
|
||||||
stay
|
stay
|
||||||
case Ev(Cancel) ⇒
|
case Event(Cancel, _) ⇒
|
||||||
cancelTimer("hallo")
|
cancelTimer("hallo")
|
||||||
goto(Initial)
|
goto(Initial)
|
||||||
}
|
}
|
||||||
|
|
@ -206,29 +206,29 @@ object FSMTimingSpec {
|
||||||
}
|
}
|
||||||
when(TestCancelStateTimerInNamedTimerMessage) {
|
when(TestCancelStateTimerInNamedTimerMessage) {
|
||||||
// FSM is suspended after processing this message and resumed 500ms later
|
// FSM is suspended after processing this message and resumed 500ms later
|
||||||
case Ev(Tick) ⇒
|
case Event(Tick, _) ⇒
|
||||||
suspend(self)
|
suspend(self)
|
||||||
setTimer("named", Tock, 1 millis, false)
|
setTimer("named", Tock, 1 millis, false)
|
||||||
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second)
|
||||||
stay forMax (1 millis) replying Tick
|
stay forMax (1 millis) replying Tick
|
||||||
case Ev(Tock) ⇒
|
case Event(Tock, _) ⇒
|
||||||
goto(TestCancelStateTimerInNamedTimerMessage2)
|
goto(TestCancelStateTimerInNamedTimerMessage2)
|
||||||
}
|
}
|
||||||
when(TestCancelStateTimerInNamedTimerMessage2) {
|
when(TestCancelStateTimerInNamedTimerMessage2) {
|
||||||
case Ev(StateTimeout) ⇒
|
case Event(StateTimeout, _) ⇒
|
||||||
goto(Initial)
|
goto(Initial)
|
||||||
case Ev(Cancel) ⇒
|
case Event(Cancel, _) ⇒
|
||||||
goto(Initial) replying Cancel
|
goto(Initial) replying Cancel
|
||||||
}
|
}
|
||||||
when(TestUnhandled) {
|
when(TestUnhandled) {
|
||||||
case Ev(SetHandler) ⇒
|
case Event(SetHandler, _) ⇒
|
||||||
whenUnhandled {
|
whenUnhandled {
|
||||||
case Ev(Tick) ⇒
|
case Event(Tick, _) ⇒
|
||||||
tester ! Unhandled(Tick)
|
tester ! Unhandled(Tick)
|
||||||
stay
|
stay
|
||||||
}
|
}
|
||||||
stay
|
stay
|
||||||
case Ev(Cancel) ⇒
|
case Event(Cancel, _) ⇒
|
||||||
whenUnhandled(NullFunction)
|
whenUnhandled(NullFunction)
|
||||||
goto(Initial)
|
goto(Initial)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ package akka.actor
|
||||||
|
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import FSM._
|
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
|
|
||||||
object FSMTransitionSpec {
|
object FSMTransitionSpec {
|
||||||
|
|
@ -17,13 +16,13 @@ object FSMTransitionSpec {
|
||||||
class MyFSM(target: ActorRef) extends Actor with FSM[Int, Unit] {
|
class MyFSM(target: ActorRef) extends Actor with FSM[Int, Unit] {
|
||||||
startWith(0, Unit)
|
startWith(0, Unit)
|
||||||
when(0) {
|
when(0) {
|
||||||
case Ev("tick") ⇒ goto(1)
|
case Event("tick", _) ⇒ goto(1)
|
||||||
}
|
}
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev("tick") ⇒ goto(0)
|
case Event("tick", _) ⇒ goto(0)
|
||||||
}
|
}
|
||||||
whenUnhandled {
|
whenUnhandled {
|
||||||
case Ev("reply") ⇒ stay replying "reply"
|
case Event("reply", _) ⇒ stay replying "reply"
|
||||||
}
|
}
|
||||||
initialize
|
initialize
|
||||||
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
|
override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" }
|
||||||
|
|
@ -32,10 +31,10 @@ object FSMTransitionSpec {
|
||||||
class OtherFSM(target: ActorRef) extends Actor with FSM[Int, Int] {
|
class OtherFSM(target: ActorRef) extends Actor with FSM[Int, Int] {
|
||||||
startWith(0, 0)
|
startWith(0, 0)
|
||||||
when(0) {
|
when(0) {
|
||||||
case Ev("tick") ⇒ goto(1) using (1)
|
case Event("tick", _) ⇒ goto(1) using (1)
|
||||||
}
|
}
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev(_) ⇒ stay
|
case _ ⇒ stay
|
||||||
}
|
}
|
||||||
onTransition {
|
onTransition {
|
||||||
case 0 -> 1 ⇒ target ! ((stateData, nextStateData))
|
case 0 -> 1 ⇒ target ! ((stateData, nextStateData))
|
||||||
|
|
@ -56,6 +55,8 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
||||||
"A FSM transition notifier" must {
|
"A FSM transition notifier" must {
|
||||||
|
|
||||||
"notify listeners" in {
|
"notify listeners" in {
|
||||||
|
import FSM.{ SubscribeTransitionCallBack, CurrentState, Transition }
|
||||||
|
|
||||||
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
val fsm = system.actorOf(Props(new MyFSM(testActor)))
|
||||||
within(1 second) {
|
within(1 second) {
|
||||||
fsm ! SubscribeTransitionCallBack(testActor)
|
fsm ! SubscribeTransitionCallBack(testActor)
|
||||||
|
|
@ -77,8 +78,8 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender {
|
||||||
}))
|
}))
|
||||||
|
|
||||||
within(300 millis) {
|
within(300 millis) {
|
||||||
fsm ! SubscribeTransitionCallBack(forward)
|
fsm ! FSM.SubscribeTransitionCallBack(forward)
|
||||||
expectMsg(CurrentState(fsm, 0))
|
expectMsg(FSM.CurrentState(fsm, 0))
|
||||||
system.stop(forward)
|
system.stop(forward)
|
||||||
fsm ! "tick"
|
fsm ! "tick"
|
||||||
expectNoMsg
|
expectNoMsg
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,6 @@ import java.io.ObjectInputStream;
|
||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
import org.omg.CORBA.portable.IDLEntity;
|
|
||||||
|
|
||||||
import com.eaio.util.lang.Hex;
|
import com.eaio.util.lang.Hex;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -1,86 +0,0 @@
|
||||||
package com.eaio.uuid;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* com/eaio/uuid/UUIDHelper.java .
|
|
||||||
* Generated by the IDL-to-Java compiler (portable), version "3.1"
|
|
||||||
* from uuid.idl
|
|
||||||
* Sonntag, 7. März 2004 21.35 Uhr CET
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The UUID struct.
|
|
||||||
*/
|
|
||||||
abstract public class UUIDHelper
|
|
||||||
{
|
|
||||||
private static String _id = "IDL:com/eaio/uuid/UUID:1.0";
|
|
||||||
|
|
||||||
public static void insert (org.omg.CORBA.Any a, com.eaio.uuid.UUID that)
|
|
||||||
{
|
|
||||||
org.omg.CORBA.portable.OutputStream out = a.create_output_stream ();
|
|
||||||
a.type (type ());
|
|
||||||
write (out, that);
|
|
||||||
a.read_value (out.create_input_stream (), type ());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static com.eaio.uuid.UUID extract (org.omg.CORBA.Any a)
|
|
||||||
{
|
|
||||||
return read (a.create_input_stream ());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static org.omg.CORBA.TypeCode __typeCode = null;
|
|
||||||
private static boolean __active = false;
|
|
||||||
synchronized public static org.omg.CORBA.TypeCode type ()
|
|
||||||
{
|
|
||||||
if (__typeCode == null)
|
|
||||||
{
|
|
||||||
synchronized (org.omg.CORBA.TypeCode.class)
|
|
||||||
{
|
|
||||||
if (__typeCode == null)
|
|
||||||
{
|
|
||||||
if (__active)
|
|
||||||
{
|
|
||||||
return org.omg.CORBA.ORB.init().create_recursive_tc ( _id );
|
|
||||||
}
|
|
||||||
__active = true;
|
|
||||||
org.omg.CORBA.StructMember[] _members0 = new org.omg.CORBA.StructMember [2];
|
|
||||||
org.omg.CORBA.TypeCode _tcOf_members0 = null;
|
|
||||||
_tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong);
|
|
||||||
_members0[0] = new org.omg.CORBA.StructMember (
|
|
||||||
"time",
|
|
||||||
_tcOf_members0,
|
|
||||||
null);
|
|
||||||
_tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong);
|
|
||||||
_members0[1] = new org.omg.CORBA.StructMember (
|
|
||||||
"clockSeqAndNode",
|
|
||||||
_tcOf_members0,
|
|
||||||
null);
|
|
||||||
__typeCode = org.omg.CORBA.ORB.init ().create_struct_tc (com.eaio.uuid.UUIDHelper.id (), "UUID", _members0);
|
|
||||||
__active = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return __typeCode;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String id ()
|
|
||||||
{
|
|
||||||
return _id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static com.eaio.uuid.UUID read (org.omg.CORBA.portable.InputStream istream)
|
|
||||||
{
|
|
||||||
com.eaio.uuid.UUID value = new com.eaio.uuid.UUID ();
|
|
||||||
value.time = istream.read_longlong ();
|
|
||||||
value.clockSeqAndNode = istream.read_longlong ();
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void write (org.omg.CORBA.portable.OutputStream ostream, com.eaio.uuid.UUID value)
|
|
||||||
{
|
|
||||||
ostream.write_longlong (value.time);
|
|
||||||
ostream.write_longlong (value.clockSeqAndNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -1,42 +0,0 @@
|
||||||
package com.eaio.uuid;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* com/eaio/uuid/UUIDHolder.java .
|
|
||||||
* Generated by the IDL-to-Java compiler (portable), version "3.1"
|
|
||||||
* from uuid.idl
|
|
||||||
* Sonntag, 7. März 2004 21.35 Uhr CET
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The UUID struct.
|
|
||||||
*/
|
|
||||||
public final class UUIDHolder implements org.omg.CORBA.portable.Streamable
|
|
||||||
{
|
|
||||||
public com.eaio.uuid.UUID value = null;
|
|
||||||
|
|
||||||
public UUIDHolder ()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public UUIDHolder (com.eaio.uuid.UUID initialValue)
|
|
||||||
{
|
|
||||||
value = initialValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void _read (org.omg.CORBA.portable.InputStream i)
|
|
||||||
{
|
|
||||||
value = com.eaio.uuid.UUIDHelper.read (i);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void _write (org.omg.CORBA.portable.OutputStream o)
|
|
||||||
{
|
|
||||||
com.eaio.uuid.UUIDHelper.write (o, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public org.omg.CORBA.TypeCode _type ()
|
|
||||||
{
|
|
||||||
return com.eaio.uuid.UUIDHelper.type ();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -57,7 +57,7 @@ import akka.event.LoggingAdapter
|
||||||
*
|
*
|
||||||
* } else if (o instanceof Request3) {
|
* } else if (o instanceof Request3) {
|
||||||
* val msg = ((Request3) o).getMsg();
|
* val msg = ((Request3) o).getMsg();
|
||||||
* getSender().tell(other.ask(msg, 5000)); // reply with Future for holding the other’s reply (timeout 5 seconds)
|
* getSender().tell(ask(other, msg, 5000)); // reply with Future for holding the other’s reply (timeout 5 seconds)
|
||||||
*
|
*
|
||||||
* } else {
|
* } else {
|
||||||
* unhandled(o);
|
* unhandled(o);
|
||||||
|
|
|
||||||
|
|
@ -48,6 +48,14 @@ object FSM {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This extractor is just convenience for matching a (S, S) pair, including a
|
||||||
|
* reminder what the new state is.
|
||||||
|
*/
|
||||||
|
object -> {
|
||||||
|
def unapply[S](in: (S, S)) = Some(in)
|
||||||
|
}
|
||||||
|
|
||||||
case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
|
case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
|
||||||
|
|
||||||
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
|
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
|
||||||
|
|
@ -174,6 +182,10 @@ trait FSM[S, D] extends Listeners {
|
||||||
type Timeout = Option[Duration]
|
type Timeout = Option[Duration]
|
||||||
type TransitionHandler = PartialFunction[(S, S), Unit]
|
type TransitionHandler = PartialFunction[(S, S), Unit]
|
||||||
|
|
||||||
|
// “import” so that it is visible without an import
|
||||||
|
val -> = FSM.->
|
||||||
|
val StateTimeout = FSM.StateTimeout
|
||||||
|
|
||||||
val log = Logging(context.system, this)
|
val log = Logging(context.system, this)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -284,14 +296,6 @@ trait FSM[S, D] extends Listeners {
|
||||||
*/
|
*/
|
||||||
protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
|
protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
|
||||||
|
|
||||||
/**
|
|
||||||
* This extractor is just convenience for matching a (S, S) pair, including a
|
|
||||||
* reminder what the new state is.
|
|
||||||
*/
|
|
||||||
object -> {
|
|
||||||
def unapply[S](in: (S, S)) = Some(in)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set handler which is called upon each state transition, i.e. not when
|
* Set handler which is called upon each state transition, i.e. not when
|
||||||
* staying in the same state. This may use the pair extractor defined in the
|
* staying in the same state. This may use the pair extractor defined in the
|
||||||
|
|
@ -533,9 +537,6 @@ trait FSM[S, D] extends Listeners {
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Event(event: Any, stateData: D)
|
case class Event(event: Any, stateData: D)
|
||||||
object Ev {
|
|
||||||
def unapply[D](e: Event): Option[Any] = Some(e.event)
|
|
||||||
}
|
|
||||||
|
|
||||||
case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D)
|
case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -47,36 +47,36 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int =
|
||||||
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒
|
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction).
|
* Implicit conversion from `Seq` of Cause-Directive pairs to a `Decider`. See makeDecider(causeDirective).
|
||||||
*/
|
*/
|
||||||
implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit)
|
implicit def seqCauseDirective2Decider(trapExit: Iterable[CauseDirective]): Decider = makeDecider(trapExit)
|
||||||
// the above would clash with seqThrowable2Decider for empty lists
|
// the above would clash with seqThrowable2Decider for empty lists
|
||||||
}
|
}
|
||||||
|
|
||||||
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
sealed trait Action
|
sealed trait Directive
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resumes message processing for the failed Actor
|
* Resumes message processing for the failed Actor
|
||||||
*/
|
*/
|
||||||
case object Resume extends Action
|
case object Resume extends Directive
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Discards the old Actor instance and replaces it with a new,
|
* Discards the old Actor instance and replaces it with a new,
|
||||||
* then resumes message processing.
|
* then resumes message processing.
|
||||||
*/
|
*/
|
||||||
case object Restart extends Action
|
case object Restart extends Directive
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stops the Actor
|
* Stops the Actor
|
||||||
*/
|
*/
|
||||||
case object Stop extends Action
|
case object Stop extends Directive
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Escalates the failure to the supervisor of the supervisor,
|
* Escalates the failure to the supervisor of the supervisor,
|
||||||
* by rethrowing the cause of the failure.
|
* by rethrowing the cause of the failure.
|
||||||
*/
|
*/
|
||||||
case object Escalate extends Action
|
case object Escalate extends Directive
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resumes message processing for the failed Actor
|
* Resumes message processing for the failed Actor
|
||||||
|
|
@ -127,9 +127,9 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
*/
|
*/
|
||||||
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit)
|
||||||
|
|
||||||
type Decider = PartialFunction[Throwable, Action]
|
type Decider = PartialFunction[Throwable, Directive]
|
||||||
type JDecider = akka.japi.Function[Throwable, Action]
|
type JDecider = akka.japi.Function[Throwable, Directive]
|
||||||
type CauseAction = (Class[_ <: Throwable], Action)
|
type CauseDirective = (Class[_ <: Throwable], Directive)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decider builder which just checks whether one of
|
* Decider builder which just checks whether one of
|
||||||
|
|
@ -152,14 +152,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
|
def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decider builder for Iterables of cause-action pairs, e.g. a map obtained
|
* Decider builder for Iterables of cause-directive pairs, e.g. a map obtained
|
||||||
* from configuration; will sort the pairs so that the most specific type is
|
* from configuration; will sort the pairs so that the most specific type is
|
||||||
* checked before all its subtypes, allowing carving out subtrees of the
|
* checked before all its subtypes, allowing carving out subtrees of the
|
||||||
* Throwable hierarchy.
|
* Throwable hierarchy.
|
||||||
*/
|
*/
|
||||||
def makeDecider(flat: Iterable[CauseAction]): Decider = {
|
def makeDecider(flat: Iterable[CauseDirective]): Decider = {
|
||||||
val actions = sort(flat)
|
val directives = sort(flat)
|
||||||
return { case x ⇒ actions find (_._1 isInstance x) map (_._2) getOrElse Escalate }
|
return { case x ⇒ directives find (_._1 isInstance x) map (_._2) getOrElse Escalate }
|
||||||
}
|
}
|
||||||
|
|
||||||
def makeDecider(func: JDecider): Decider = {
|
def makeDecider(func: JDecider): Decider = {
|
||||||
|
|
@ -170,8 +170,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
||||||
* Sort so that subtypes always precede their supertypes, but without
|
* Sort so that subtypes always precede their supertypes, but without
|
||||||
* obeying any order between unrelated subtypes (insert sort).
|
* obeying any order between unrelated subtypes (insert sort).
|
||||||
*/
|
*/
|
||||||
def sort(in: Iterable[CauseAction]): Seq[CauseAction] =
|
def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] =
|
||||||
(new ArrayBuffer[CauseAction](in.size) /: in) { (buf, ca) ⇒
|
(new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) ⇒
|
||||||
buf.indexWhere(_._1 isAssignableFrom ca._1) match {
|
buf.indexWhere(_._1 isAssignableFrom ca._1) match {
|
||||||
case -1 ⇒ buf append ca
|
case -1 ⇒ buf append ca
|
||||||
case x ⇒ buf insert (x, ca)
|
case x ⇒ buf insert (x, ca)
|
||||||
|
|
@ -215,8 +215,8 @@ abstract class SupervisorStrategy {
|
||||||
* Returns whether it processed the failure or not
|
* Returns whether it processed the failure or not
|
||||||
*/
|
*/
|
||||||
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
||||||
val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
|
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate
|
||||||
action match {
|
directive match {
|
||||||
case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true
|
case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true
|
||||||
case Restart ⇒ processFailure(context, true, child, cause, stats, children); true
|
case Restart ⇒ processFailure(context, true, child, cause, stats, children); true
|
||||||
case Stop ⇒ processFailure(context, false, child, cause, stats, children); true
|
case Stop ⇒ processFailure(context, false, child, cause, stats, children); true
|
||||||
|
|
@ -227,10 +227,13 @@ abstract class SupervisorStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart all child actors when one fails
|
* Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
|
||||||
|
* to all children when one fails, as opposed to [[akka.actor.OneForOneStrategy]] that applies
|
||||||
|
* it only to the child actor that failed.
|
||||||
|
*
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||||
|
|
@ -270,10 +273,13 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restart a child actor when it fails
|
* Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
|
||||||
|
* to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies
|
||||||
|
* it to all children.
|
||||||
|
*
|
||||||
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
* @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit
|
||||||
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
* @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
|
||||||
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a
|
* @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
|
||||||
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
* `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates.
|
||||||
*/
|
*/
|
||||||
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider)
|
||||||
|
|
|
||||||
|
|
@ -37,9 +37,9 @@ import akka.japi.{ Creator }
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
* private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
* new Function<Throwable, Action>() {
|
* new Function<Throwable, Directive>() {
|
||||||
* @Override
|
* @Override
|
||||||
* public Action apply(Throwable t) {
|
* public Directive apply(Throwable t) {
|
||||||
* if (t instanceof ArithmeticException) {
|
* if (t instanceof ArithmeticException) {
|
||||||
* return resume();
|
* return resume();
|
||||||
* } else if (t instanceof NullPointerException) {
|
* } else if (t instanceof NullPointerException) {
|
||||||
|
|
|
||||||
|
|
@ -40,9 +40,9 @@ public class FaultHandlingTestBase {
|
||||||
|
|
||||||
//#strategy
|
//#strategy
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
new Function<Throwable, Action>() {
|
new Function<Throwable, Directive>() {
|
||||||
@Override
|
@Override
|
||||||
public Action apply(Throwable t) {
|
public Directive apply(Throwable t) {
|
||||||
if (t instanceof ArithmeticException) {
|
if (t instanceof ArithmeticException) {
|
||||||
return resume();
|
return resume();
|
||||||
} else if (t instanceof NullPointerException) {
|
} else if (t instanceof NullPointerException) {
|
||||||
|
|
@ -78,9 +78,9 @@ public class FaultHandlingTestBase {
|
||||||
|
|
||||||
//#strategy2
|
//#strategy2
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
|
||||||
new Function<Throwable, Action>() {
|
new Function<Throwable, Directive>() {
|
||||||
@Override
|
@Override
|
||||||
public Action apply(Throwable t) {
|
public Directive apply(Throwable t) {
|
||||||
if (t instanceof ArithmeticException) {
|
if (t instanceof ArithmeticException) {
|
||||||
return resume();
|
return resume();
|
||||||
} else if (t instanceof NullPointerException) {
|
} else if (t instanceof NullPointerException) {
|
||||||
|
|
|
||||||
|
|
@ -118,9 +118,9 @@ public class FaultHandlingDocSample {
|
||||||
|
|
||||||
// Stop the CounterService child if it throws ServiceUnavailable
|
// Stop the CounterService child if it throws ServiceUnavailable
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
|
private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
|
||||||
new Function<Throwable, Action>() {
|
new Function<Throwable, Directive>() {
|
||||||
@Override
|
@Override
|
||||||
public Action apply(Throwable t) {
|
public Directive apply(Throwable t) {
|
||||||
if (t instanceof ServiceUnavailable) {
|
if (t instanceof ServiceUnavailable) {
|
||||||
return stop();
|
return stop();
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -229,9 +229,9 @@ public class FaultHandlingDocSample {
|
||||||
// Restart the storage child when StorageException is thrown.
|
// Restart the storage child when StorageException is thrown.
|
||||||
// After 3 restarts within 5 seconds it will be stopped.
|
// After 3 restarts within 5 seconds it will be stopped.
|
||||||
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
|
private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
|
||||||
new Function<Throwable, Action>() {
|
new Function<Throwable, Directive>() {
|
||||||
@Override
|
@Override
|
||||||
public Action apply(Throwable t) {
|
public Directive apply(Throwable t) {
|
||||||
if (t instanceof StorageException) {
|
if (t instanceof StorageException) {
|
||||||
return restart();
|
return restart();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
|
||||||
:include: strategy
|
:include: strategy
|
||||||
|
|
||||||
I have chosen a few well-known exception types in order to demonstrate the
|
I have chosen a few well-known exception types in order to demonstrate the
|
||||||
application of the fault handling actions described in :ref:`supervision`.
|
application of the fault handling directives described in :ref:`supervision`.
|
||||||
First off, it is a one-for-one strategy, meaning that each child is treated
|
First off, it is a one-for-one strategy, meaning that each child is treated
|
||||||
separately (an all-for-one strategy works very similarly, the only difference
|
separately (an all-for-one strategy works very similarly, the only difference
|
||||||
is that any decision is applied to all children of the supervisor, not only the
|
is that any decision is applied to all children of the supervisor, not only the
|
||||||
|
|
@ -71,7 +71,7 @@ in the same way as the default strategy defined above.
|
||||||
Test Application
|
Test Application
|
||||||
----------------
|
----------------
|
||||||
|
|
||||||
The following section shows the effects of the different actions in practice,
|
The following section shows the effects of the different directives in practice,
|
||||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
|
@ -93,13 +93,13 @@ Let us create actors:
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
:include: create
|
:include: create
|
||||||
|
|
||||||
The first test shall demonstrate the ``Resume`` action, so we try it out by
|
The first test shall demonstrate the ``Resume`` directive, so we try it out by
|
||||||
setting some non-initial state in the actor and have it fail:
|
setting some non-initial state in the actor and have it fail:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
:include: resume
|
:include: resume
|
||||||
|
|
||||||
As you can see the value 42 survives the fault handling action. Now, if we
|
As you can see the value 42 survives the fault handling directive. Now, if we
|
||||||
change the failure to a more serious ``NullPointerException``, that will no
|
change the failure to a more serious ``NullPointerException``, that will no
|
||||||
longer be the case:
|
longer be the case:
|
||||||
|
|
||||||
|
|
@ -113,7 +113,7 @@ terminated by the supervisor:
|
||||||
:include: stop
|
:include: stop
|
||||||
|
|
||||||
Up to now the supervisor was completely unaffected by the child’s failure,
|
Up to now the supervisor was completely unaffected by the child’s failure,
|
||||||
because the actions set did handle it. In case of an ``Exception``, this is not
|
because the directives set did handle it. In case of an ``Exception``, this is not
|
||||||
true anymore and the supervisor escalates the failure.
|
true anymore and the supervisor escalates the failure.
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
.. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java
|
||||||
|
|
@ -123,7 +123,7 @@ The supervisor itself is supervised by the top-level actor provided by the
|
||||||
:class:`ActorSystem`, which has the default policy to restart in case of all
|
:class:`ActorSystem`, which has the default policy to restart in case of all
|
||||||
``Exception`` cases (with the notable exceptions of
|
``Exception`` cases (with the notable exceptions of
|
||||||
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
||||||
default action in case of a restart is to kill all children, we expected our poor
|
default directive in case of a restart is to kill all children, we expected our poor
|
||||||
child not to survive this failure.
|
child not to survive this failure.
|
||||||
|
|
||||||
In case this is not desired (which depends on the use case), we need to use a
|
In case this is not desired (which depends on the use case), we need to use a
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ command (on a unix-based system):
|
||||||
|
|
||||||
.. code-block:: none
|
.. code-block:: none
|
||||||
|
|
||||||
bin/start sample.kernel.hello.HelloKernel
|
bin/akka sample.kernel.hello.HelloKernel
|
||||||
|
|
||||||
Use ``Ctrl-C`` to interrupt and exit the microkernel.
|
Use ``Ctrl-C`` to interrupt and exit the microkernel.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -657,3 +657,7 @@ extend that, either through inheritance or delegation, is to use
|
||||||
``PartialFunction.orElse`` chaining.
|
``PartialFunction.orElse`` chaining.
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse
|
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse
|
||||||
|
|
||||||
|
Or:
|
||||||
|
|
||||||
|
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse2
|
||||||
|
|
@ -133,6 +133,29 @@ class SpecificActor extends GenericActor {
|
||||||
case class MyMsg(subject: String)
|
case class MyMsg(subject: String)
|
||||||
//#receive-orElse
|
//#receive-orElse
|
||||||
|
|
||||||
|
//#receive-orElse2
|
||||||
|
trait ComposableActor extends Actor {
|
||||||
|
private var receives: List[Receive] = List()
|
||||||
|
protected def registerReceive(receive: Receive) {
|
||||||
|
receives = receive :: receives
|
||||||
|
}
|
||||||
|
|
||||||
|
def receive = receives reduce { _ orElse _ }
|
||||||
|
}
|
||||||
|
|
||||||
|
class MyComposableActor extends ComposableActor {
|
||||||
|
override def preStart() {
|
||||||
|
registerReceive({
|
||||||
|
case "foo" ⇒ /* Do something */
|
||||||
|
})
|
||||||
|
|
||||||
|
registerReceive({
|
||||||
|
case "bar" ⇒ /* Do something */
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//#receive-orElse2
|
||||||
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
|
|
||||||
"import context" in {
|
"import context" in {
|
||||||
|
|
|
||||||
|
|
@ -89,10 +89,10 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
||||||
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
|
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
|
||||||
startWith(1, "")
|
startWith(1, "")
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev("go") ⇒ goto(2) using "go"
|
case Event("go", _) ⇒ goto(2) using "go"
|
||||||
}
|
}
|
||||||
when(2) {
|
when(2) {
|
||||||
case Ev("back") ⇒ goto(1) using "back"
|
case Event("back", _) ⇒ goto(1) using "back"
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy:
|
||||||
:include: strategy
|
:include: strategy
|
||||||
|
|
||||||
I have chosen a few well-known exception types in order to demonstrate the
|
I have chosen a few well-known exception types in order to demonstrate the
|
||||||
application of the fault handling actions described in :ref:`supervision`.
|
application of the fault handling directives described in :ref:`supervision`.
|
||||||
First off, it is a one-for-one strategy, meaning that each child is treated
|
First off, it is a one-for-one strategy, meaning that each child is treated
|
||||||
separately (an all-for-one strategy works very similarly, the only difference
|
separately (an all-for-one strategy works very similarly, the only difference
|
||||||
is that any decision is applied to all children of the supervisor, not only the
|
is that any decision is applied to all children of the supervisor, not only the
|
||||||
|
|
@ -53,8 +53,8 @@ that the respective limit does not apply, leaving the possibility to specify an
|
||||||
absolute upper limit on the restarts or to make the restarts work infinitely.
|
absolute upper limit on the restarts or to make the restarts work infinitely.
|
||||||
|
|
||||||
The match statement which forms the bulk of the body is of type ``Decider``,
|
The match statement which forms the bulk of the body is of type ``Decider``,
|
||||||
which is a ``PartialFunction[Throwable, Action]``. This
|
which is a ``PartialFunction[Throwable, Directive]``. This
|
||||||
is the piece which maps child failure types to their corresponding actions.
|
is the piece which maps child failure types to their corresponding directives.
|
||||||
|
|
||||||
Default Supervisor Strategy
|
Default Supervisor Strategy
|
||||||
---------------------------
|
---------------------------
|
||||||
|
|
@ -76,7 +76,7 @@ in the same way as the default strategy defined above.
|
||||||
Test Application
|
Test Application
|
||||||
----------------
|
----------------
|
||||||
|
|
||||||
The following section shows the effects of the different actions in practice,
|
The following section shows the effects of the different directives in practice,
|
||||||
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
wherefor a test setup is needed. First off, we need a suitable supervisor:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||||
|
|
@ -99,13 +99,13 @@ Let us create actors:
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||||
:include: create
|
:include: create
|
||||||
|
|
||||||
The first test shall demonstrate the ``Resume`` action, so we try it out by
|
The first test shall demonstrate the ``Resume`` directive, so we try it out by
|
||||||
setting some non-initial state in the actor and have it fail:
|
setting some non-initial state in the actor and have it fail:
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||||
:include: resume
|
:include: resume
|
||||||
|
|
||||||
As you can see the value 42 survives the fault handling action. Now, if we
|
As you can see the value 42 survives the fault handling directive. Now, if we
|
||||||
change the failure to a more serious ``NullPointerException``, that will no
|
change the failure to a more serious ``NullPointerException``, that will no
|
||||||
longer be the case:
|
longer be the case:
|
||||||
|
|
||||||
|
|
@ -119,7 +119,7 @@ terminated by the supervisor:
|
||||||
:include: stop
|
:include: stop
|
||||||
|
|
||||||
Up to now the supervisor was completely unaffected by the child’s failure,
|
Up to now the supervisor was completely unaffected by the child’s failure,
|
||||||
because the actions set did handle it. In case of an ``Exception``, this is not
|
because the directives set did handle it. In case of an ``Exception``, this is not
|
||||||
true anymore and the supervisor escalates the failure.
|
true anymore and the supervisor escalates the failure.
|
||||||
|
|
||||||
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
.. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala
|
||||||
|
|
@ -129,7 +129,7 @@ The supervisor itself is supervised by the top-level actor provided by the
|
||||||
:class:`ActorSystem`, which has the default policy to restart in case of all
|
:class:`ActorSystem`, which has the default policy to restart in case of all
|
||||||
``Exception`` cases (with the notable exceptions of
|
``Exception`` cases (with the notable exceptions of
|
||||||
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
``ActorInitializationException`` and ``ActorKilledException``). Since the
|
||||||
default action in case of a restart is to kill all children, we expected our poor
|
default directive in case of a restart is to kill all children, we expected our poor
|
||||||
child not to survive this failure.
|
child not to survive this failure.
|
||||||
|
|
||||||
In case this is not desired (which depends on the use case), we need to use a
|
In case this is not desired (which depends on the use case), we need to use a
|
||||||
|
|
|
||||||
|
|
@ -178,7 +178,7 @@ demonstrated below:
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
when(Idle) {
|
when(Idle) {
|
||||||
case Ev(Start(msg)) => // convenience extractor when state data not needed
|
case Event(Start(msg), _) =>
|
||||||
goto(Timer) using (msg, sender)
|
goto(Timer) using (msg, sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -188,9 +188,8 @@ demonstrated below:
|
||||||
goto(Idle)
|
goto(Idle)
|
||||||
}
|
}
|
||||||
|
|
||||||
The :class:`Event(msg, data)` case class may be used directly in the pattern as
|
The :class:`Event(msg: Any, data: D)` case class is parameterized with the data
|
||||||
shown in state Idle, or you may use the extractor :obj:`Ev(msg)` when the state
|
type held by the FSM for convenient pattern matching.
|
||||||
data are not needed.
|
|
||||||
|
|
||||||
Defining the Initial State
|
Defining the Initial State
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
@ -216,7 +215,7 @@ do something else in this case you can specify that with
|
||||||
case Event(x : X, data) =>
|
case Event(x : X, data) =>
|
||||||
log.info(this, "Received unhandled event: " + x)
|
log.info(this, "Received unhandled event: " + x)
|
||||||
stay
|
stay
|
||||||
case Ev(msg) =>
|
case Event(msg, _) =>
|
||||||
log.warn(this, "Received unknown event: " + x)
|
log.warn(this, "Received unknown event: " + x)
|
||||||
goto(Error)
|
goto(Error)
|
||||||
}
|
}
|
||||||
|
|
@ -259,7 +258,7 @@ All modifier can be chained to achieve a nice and concise description:
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
when(State) {
|
when(State) {
|
||||||
case Ev(msg) =>
|
case Event(msg, _) =>
|
||||||
goto(Processing) using (msg) forMax (5 seconds) replying (WillDo)
|
goto(Processing) using (msg) forMax (5 seconds) replying (WillDo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -396,7 +395,7 @@ state data which is available during termination handling.
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
when(A) {
|
when(A) {
|
||||||
case Ev(Stop) =>
|
case Event(Stop, _) =>
|
||||||
doCleanup()
|
doCleanup()
|
||||||
stop()
|
stop()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
2
akka-kernel/src/main/dist/bin/akka
vendored
2
akka-kernel/src/main/dist/bin/akka
vendored
|
|
@ -19,6 +19,6 @@ declare AKKA_HOME="$(cd "$(cd "$(dirname "$0")"; pwd -P)"/..; pwd)"
|
||||||
|
|
||||||
[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx1024M -Xms1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC -XX:OnOutOfMemoryError=\"kill -9 %p\""
|
[ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx1024M -Xms1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC -XX:OnOutOfMemoryError=\"kill -9 %p\""
|
||||||
|
|
||||||
[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*:$AKKA_HOME/config"
|
[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/config:$AKKA_HOME/lib/akka/*"
|
||||||
|
|
||||||
java "$JAVA_OPTS" -cp "$AKKA_CLASSPATH" -Dakka.home="$AKKA_HOME" -Dakka.kernel.quiet=$quiet akka.kernel.Main "$@"
|
java "$JAVA_OPTS" -cp "$AKKA_CLASSPATH" -Dakka.home="$AKKA_HOME" -Dakka.kernel.quiet=$quiet akka.kernel.Main "$@"
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ class TestActorRef[T <: Actor](
|
||||||
_supervisor.path / name,
|
_supervisor.path / name,
|
||||||
false) {
|
false) {
|
||||||
|
|
||||||
private case object InternalGetActor extends AutoReceivedMessage
|
import TestActorRef.InternalGetActor
|
||||||
|
|
||||||
override def newActorCell(
|
override def newActorCell(
|
||||||
system: ActorSystemImpl,
|
system: ActorSystemImpl,
|
||||||
|
|
@ -98,6 +98,8 @@ class TestActorRef[T <: Actor](
|
||||||
|
|
||||||
object TestActorRef {
|
object TestActorRef {
|
||||||
|
|
||||||
|
private case object InternalGetActor extends AutoReceivedMessage
|
||||||
|
|
||||||
private val number = new AtomicLong
|
private val number = new AtomicLong
|
||||||
private[testkit] def randomName: String = {
|
private[testkit] def randomName: String = {
|
||||||
val l = number.getAndIncrement()
|
val l = number.getAndIncrement()
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.testkit;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import akka.actor.Props;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class TestActorRefJavaSpec {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldBeAbleToUseApply() {
|
||||||
|
//Just a dummy call to make sure it compiles
|
||||||
|
TestActorRef ref = TestActorRef.apply(new Props(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -29,14 +29,11 @@ object AkkaSpec {
|
||||||
stdout-loglevel = "WARNING"
|
stdout-loglevel = "WARNING"
|
||||||
actor {
|
actor {
|
||||||
default-dispatcher {
|
default-dispatcher {
|
||||||
executor = "thread-pool-executor"
|
executor = "fork-join-executor"
|
||||||
thread-pool-executor {
|
fork-join-executor {
|
||||||
core-pool-size-factor = 2
|
parallelism-min = 8
|
||||||
core-pool-size-min = 8
|
parallelism-factor = 2.0
|
||||||
core-pool-size-max = 8
|
parallelism-max = 8
|
||||||
max-pool-size-factor = 2
|
|
||||||
max-pool-size-min = 8
|
|
||||||
max-pool-size-max = 8
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,19 +12,17 @@ import akka.util.duration._
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class TestFSMRefSpec extends AkkaSpec {
|
class TestFSMRefSpec extends AkkaSpec {
|
||||||
|
|
||||||
import FSM._
|
|
||||||
|
|
||||||
"A TestFSMRef" must {
|
"A TestFSMRef" must {
|
||||||
|
|
||||||
"allow access to state data" in {
|
"allow access to state data" in {
|
||||||
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
|
val fsm = TestFSMRef(new Actor with FSM[Int, String] {
|
||||||
startWith(1, "")
|
startWith(1, "")
|
||||||
when(1) {
|
when(1) {
|
||||||
case Ev("go") ⇒ goto(2) using "go"
|
case Event("go", _) ⇒ goto(2) using "go"
|
||||||
case Ev(StateTimeout) ⇒ goto(2) using "timeout"
|
case Event(StateTimeout, _) ⇒ goto(2) using "timeout"
|
||||||
}
|
}
|
||||||
when(2) {
|
when(2) {
|
||||||
case Ev("back") ⇒ goto(1) using "back"
|
case Event("back", _) ⇒ goto(1) using "back"
|
||||||
}
|
}
|
||||||
}, "test-fsm-ref-1")
|
}, "test-fsm-ref-1")
|
||||||
fsm.stateName must be(1)
|
fsm.stateName must be(1)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,6 @@ AKKA_HOME="$(cd "$SAMPLE"/../../../..; pwd)"
|
||||||
|
|
||||||
[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*"
|
[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*"
|
||||||
|
|
||||||
SAMPLE_CLASSPATH="$AKKA_CLASSPATH:$SAMPLE/lib/*:$SAMPLE/config"
|
SAMPLE_CLASSPATH="$SAMPLE/config:$AKKA_CLASSPATH:$SAMPLE/lib/*"
|
||||||
|
|
||||||
java $JAVA_OPTS -cp "$SAMPLE_CLASSPATH" -Dakka.home="$SAMPLE" akka.kernel.Main
|
java $JAVA_OPTS -cp "$SAMPLE_CLASSPATH" -Dakka.home="$SAMPLE" akka.kernel.Main
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,6 @@ set SAMPLE=%~dp0..
|
||||||
set AKKA_HOME=%SAMPLE%\..\..\..\..
|
set AKKA_HOME=%SAMPLE%\..\..\..\..
|
||||||
set JAVA_OPTS=-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC
|
set JAVA_OPTS=-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC
|
||||||
set AKKA_CLASSPATH=%AKKA_HOME%\lib\scala-library.jar;%AKKA_HOME%\lib\akka\*
|
set AKKA_CLASSPATH=%AKKA_HOME%\lib\scala-library.jar;%AKKA_HOME%\lib\akka\*
|
||||||
set SAMPLE_CLASSPATH=%AKKA_CLASSPATH%;%SAMPLE%\lib\*;%SAMPLE%\config
|
set SAMPLE_CLASSPATH=%SAMPLE%\config;%AKKA_CLASSPATH%;%SAMPLE%\lib\*
|
||||||
|
|
||||||
java %JAVA_OPTS% -cp "%SAMPLE_CLASSPATH%" -Dakka.home="%SAMPLE%" akka.kernel.Main
|
java %JAVA_OPTS% -cp "%SAMPLE_CLASSPATH%" -Dakka.home="%SAMPLE%" akka.kernel.Main
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue