diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index ca6d90e721..3a2c1bb627 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -7,7 +7,6 @@ package akka.actor import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit._ import TestEvent.Mute -import FSM._ import akka.util.duration._ import akka.event._ import com.typesafe.config.ConfigFactory @@ -52,7 +51,7 @@ object FSMActorSpec { } } case Event("hello", _) ⇒ stay replying "world" - case Event("bye", _) ⇒ stop(Shutdown) + case Event("bye", _) ⇒ stop(FSM.Shutdown) } when(Open) { @@ -63,7 +62,7 @@ object FSMActorSpec { } whenUnhandled { - case Ev(msg) ⇒ { + case Event(msg, _) ⇒ { log.warning("unhandled event " + msg + " in state " + stateName + " with data " + stateData) unhandledLatch.open stay @@ -82,7 +81,7 @@ object FSMActorSpec { } onTermination { - case StopEvent(Shutdown, Locked, _) ⇒ + case StopEvent(FSM.Shutdown, Locked, _) ⇒ // stop is called from lockstate with shutdown as reason... terminatedLatch.open } @@ -110,6 +109,8 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im "unlock the lock" in { + import FSM.{ Transition, CurrentState, SubscribeTransitionCallBack } + val latches = new Latches import latches._ @@ -163,7 +164,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im val fsm = TestActorRef(new Actor with FSM[Int, Null] { startWith(1, null) when(1) { - case Ev("go") ⇒ goto(2) + case Event("go", _) ⇒ goto(2) } }) val name = fsm.path.toString @@ -182,7 +183,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im lazy val fsm = new Actor with FSM[Int, Null] { override def preStart = { started.countDown } startWith(1, null) - when(1) { NullFunction } + when(1) { FSM.NullFunction } onTermination { case x ⇒ testActor ! x } @@ -190,7 +191,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im val ref = system.actorOf(Props(fsm)) Await.ready(started, timeout.duration) system.stop(ref) - expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) + expectMsg(1 second, fsm.StopEvent(FSM.Shutdown, 1, null)) } "log events and transitions if asked to do so" in { @@ -204,12 +205,12 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im val fsm = TestActorRef(new Actor with LoggingFSM[Int, Null] { startWith(1, null) when(1) { - case Ev("go") ⇒ - setTimer("t", Shutdown, 1.5 seconds, false) + case Event("go", _) ⇒ + setTimer("t", FSM.Shutdown, 1.5 seconds, false) goto(2) } when(2) { - case Ev("stop") ⇒ + case Event("stop", _) ⇒ cancelTimer("t") stop } @@ -230,7 +231,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im expectMsgPF(1 second, hint = "processing Event(stop,null)") { case Logging.Debug(`name`, `fsmClass`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(name, fsmClass, "canceling timer 't'"), FSM.Normal) expectNoMsg(1 second) system.eventStream.unsubscribe(testActor) } @@ -251,6 +252,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im }) fsmref ! "log" val fsm = fsmref.underlyingActor + import FSM.LogEntry expectMsg(1 second, IndexedSeq(LogEntry(1, 0, "log"))) fsmref ! "count" fsmref ! "log" diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index bf5a1974ee..59468125eb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -160,37 +160,37 @@ object FSMTimingSpec { startWith(Initial, 0) when(Initial) { - case Ev(TestSingleTimer) ⇒ + case Event(TestSingleTimer, _) ⇒ setTimer("tester", Tick, 500 millis, false) goto(TestSingleTimer) - case Ev(TestRepeatedTimer) ⇒ + case Event(TestRepeatedTimer, _) ⇒ setTimer("tester", Tick, 100 millis, true) goto(TestRepeatedTimer) using 4 - case Ev(TestStateTimeoutOverride) ⇒ + case Event(TestStateTimeoutOverride, _) ⇒ goto(TestStateTimeout) forMax (Duration.Inf) - case Ev(x: FSMTimingSpec.State) ⇒ goto(x) + case Event(x: FSMTimingSpec.State, _) ⇒ goto(x) } when(TestStateTimeout, stateTimeout = 500 millis) { - case Ev(StateTimeout) ⇒ goto(Initial) - case Ev(Cancel) ⇒ goto(Initial) replying (Cancel) + case Event(StateTimeout, _) ⇒ goto(Initial) + case Event(Cancel, _) ⇒ goto(Initial) replying (Cancel) } when(TestSingleTimer) { - case Ev(Tick) ⇒ + case Event(Tick, _) ⇒ tester ! Tick goto(Initial) } when(TestCancelTimer) { - case Ev(Tick) ⇒ + case Event(Tick, _) ⇒ setTimer("hallo", Tock, 1 milli, false) TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) cancelTimer("hallo") sender ! Tick setTimer("hallo", Tock, 500 millis, false) stay - case Ev(Tock) ⇒ + case Event(Tock, _) ⇒ tester ! Tock stay - case Ev(Cancel) ⇒ + case Event(Cancel, _) ⇒ cancelTimer("hallo") goto(Initial) } @@ -206,29 +206,29 @@ object FSMTimingSpec { } when(TestCancelStateTimerInNamedTimerMessage) { // FSM is suspended after processing this message and resumed 500ms later - case Ev(Tick) ⇒ + case Event(Tick, _) ⇒ suspend(self) setTimer("named", Tock, 1 millis, false) TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1 second) stay forMax (1 millis) replying Tick - case Ev(Tock) ⇒ + case Event(Tock, _) ⇒ goto(TestCancelStateTimerInNamedTimerMessage2) } when(TestCancelStateTimerInNamedTimerMessage2) { - case Ev(StateTimeout) ⇒ + case Event(StateTimeout, _) ⇒ goto(Initial) - case Ev(Cancel) ⇒ + case Event(Cancel, _) ⇒ goto(Initial) replying Cancel } when(TestUnhandled) { - case Ev(SetHandler) ⇒ + case Event(SetHandler, _) ⇒ whenUnhandled { - case Ev(Tick) ⇒ + case Event(Tick, _) ⇒ tester ! Unhandled(Tick) stay } stay - case Ev(Cancel) ⇒ + case Event(Cancel, _) ⇒ whenUnhandled(NullFunction) goto(Initial) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index 8d8fc5e725..691be63a0b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -5,7 +5,6 @@ package akka.actor import akka.testkit._ import akka.util.duration._ -import FSM._ import akka.util.Duration object FSMTransitionSpec { @@ -17,13 +16,13 @@ object FSMTransitionSpec { class MyFSM(target: ActorRef) extends Actor with FSM[Int, Unit] { startWith(0, Unit) when(0) { - case Ev("tick") ⇒ goto(1) + case Event("tick", _) ⇒ goto(1) } when(1) { - case Ev("tick") ⇒ goto(0) + case Event("tick", _) ⇒ goto(0) } whenUnhandled { - case Ev("reply") ⇒ stay replying "reply" + case Event("reply", _) ⇒ stay replying "reply" } initialize override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } @@ -32,10 +31,10 @@ object FSMTransitionSpec { class OtherFSM(target: ActorRef) extends Actor with FSM[Int, Int] { startWith(0, 0) when(0) { - case Ev("tick") ⇒ goto(1) using (1) + case Event("tick", _) ⇒ goto(1) using (1) } when(1) { - case Ev(_) ⇒ stay + case _ ⇒ stay } onTransition { case 0 -> 1 ⇒ target ! ((stateData, nextStateData)) @@ -56,6 +55,8 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { "A FSM transition notifier" must { "notify listeners" in { + import FSM.{ SubscribeTransitionCallBack, CurrentState, Transition } + val fsm = system.actorOf(Props(new MyFSM(testActor))) within(1 second) { fsm ! SubscribeTransitionCallBack(testActor) @@ -77,8 +78,8 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { })) within(300 millis) { - fsm ! SubscribeTransitionCallBack(forward) - expectMsg(CurrentState(fsm, 0)) + fsm ! FSM.SubscribeTransitionCallBack(forward) + expectMsg(FSM.CurrentState(fsm, 0)) system.stop(forward) fsm ! "tick" expectNoMsg diff --git a/akka-actor/src/main/java/com/eaio/uuid/UUID.java b/akka-actor/src/main/java/com/eaio/uuid/UUID.java index 46bc867cc0..a578a68c6d 100644 --- a/akka-actor/src/main/java/com/eaio/uuid/UUID.java +++ b/akka-actor/src/main/java/com/eaio/uuid/UUID.java @@ -32,8 +32,6 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import org.omg.CORBA.portable.IDLEntity; - import com.eaio.util.lang.Hex; /** diff --git a/akka-actor/src/main/java/com/eaio/uuid/UUIDHelper.java b/akka-actor/src/main/java/com/eaio/uuid/UUIDHelper.java deleted file mode 100644 index 7abbe85895..0000000000 --- a/akka-actor/src/main/java/com/eaio/uuid/UUIDHelper.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.eaio.uuid; - - -/** -* com/eaio/uuid/UUIDHelper.java . -* Generated by the IDL-to-Java compiler (portable), version "3.1" -* from uuid.idl -* Sonntag, 7. März 2004 21.35 Uhr CET -*/ - - -/** - * The UUID struct. - */ -abstract public class UUIDHelper -{ - private static String _id = "IDL:com/eaio/uuid/UUID:1.0"; - - public static void insert (org.omg.CORBA.Any a, com.eaio.uuid.UUID that) - { - org.omg.CORBA.portable.OutputStream out = a.create_output_stream (); - a.type (type ()); - write (out, that); - a.read_value (out.create_input_stream (), type ()); - } - - public static com.eaio.uuid.UUID extract (org.omg.CORBA.Any a) - { - return read (a.create_input_stream ()); - } - - private static org.omg.CORBA.TypeCode __typeCode = null; - private static boolean __active = false; - synchronized public static org.omg.CORBA.TypeCode type () - { - if (__typeCode == null) - { - synchronized (org.omg.CORBA.TypeCode.class) - { - if (__typeCode == null) - { - if (__active) - { - return org.omg.CORBA.ORB.init().create_recursive_tc ( _id ); - } - __active = true; - org.omg.CORBA.StructMember[] _members0 = new org.omg.CORBA.StructMember [2]; - org.omg.CORBA.TypeCode _tcOf_members0 = null; - _tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong); - _members0[0] = new org.omg.CORBA.StructMember ( - "time", - _tcOf_members0, - null); - _tcOf_members0 = org.omg.CORBA.ORB.init ().get_primitive_tc (org.omg.CORBA.TCKind.tk_longlong); - _members0[1] = new org.omg.CORBA.StructMember ( - "clockSeqAndNode", - _tcOf_members0, - null); - __typeCode = org.omg.CORBA.ORB.init ().create_struct_tc (com.eaio.uuid.UUIDHelper.id (), "UUID", _members0); - __active = false; - } - } - } - return __typeCode; - } - - public static String id () - { - return _id; - } - - public static com.eaio.uuid.UUID read (org.omg.CORBA.portable.InputStream istream) - { - com.eaio.uuid.UUID value = new com.eaio.uuid.UUID (); - value.time = istream.read_longlong (); - value.clockSeqAndNode = istream.read_longlong (); - return value; - } - - public static void write (org.omg.CORBA.portable.OutputStream ostream, com.eaio.uuid.UUID value) - { - ostream.write_longlong (value.time); - ostream.write_longlong (value.clockSeqAndNode); - } - -} diff --git a/akka-actor/src/main/java/com/eaio/uuid/UUIDHolder.java b/akka-actor/src/main/java/com/eaio/uuid/UUIDHolder.java deleted file mode 100644 index d5531f5e00..0000000000 --- a/akka-actor/src/main/java/com/eaio/uuid/UUIDHolder.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.eaio.uuid; - -/** -* com/eaio/uuid/UUIDHolder.java . -* Generated by the IDL-to-Java compiler (portable), version "3.1" -* from uuid.idl -* Sonntag, 7. März 2004 21.35 Uhr CET -*/ - - -/** - * The UUID struct. - */ -public final class UUIDHolder implements org.omg.CORBA.portable.Streamable -{ - public com.eaio.uuid.UUID value = null; - - public UUIDHolder () - { - } - - public UUIDHolder (com.eaio.uuid.UUID initialValue) - { - value = initialValue; - } - - public void _read (org.omg.CORBA.portable.InputStream i) - { - value = com.eaio.uuid.UUIDHelper.read (i); - } - - public void _write (org.omg.CORBA.portable.OutputStream o) - { - com.eaio.uuid.UUIDHelper.write (o, value); - } - - public org.omg.CORBA.TypeCode _type () - { - return com.eaio.uuid.UUIDHelper.type (); - } - -} diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c8a07f9779..38e8ab679f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -57,7 +57,7 @@ import akka.event.LoggingAdapter * * } else if (o instanceof Request3) { * val msg = ((Request3) o).getMsg(); - * getSender().tell(other.ask(msg, 5000)); // reply with Future for holding the other’s reply (timeout 5 seconds) + * getSender().tell(ask(other, msg, 5000)); // reply with Future for holding the other’s reply (timeout 5 seconds) * * } else { * unhandled(o); diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 5660811c00..b277142e76 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -48,6 +48,14 @@ object FSM { } } + /** + * This extractor is just convenience for matching a (S, S) pair, including a + * reminder what the new state is. + */ + object -> { + def unapply[S](in: (S, S)) = Some(in) + } + case class LogEntry[S, D](stateName: S, stateData: D, event: Any) case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) { @@ -174,6 +182,10 @@ trait FSM[S, D] extends Listeners { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] + // “import” so that it is visible without an import + val -> = FSM.-> + val StateTimeout = FSM.StateTimeout + val log = Logging(context.system, this) /** @@ -284,14 +296,6 @@ trait FSM[S, D] extends Listeners { */ protected final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout - /** - * This extractor is just convenience for matching a (S, S) pair, including a - * reminder what the new state is. - */ - object -> { - def unapply[S](in: (S, S)) = Some(in) - } - /** * Set handler which is called upon each state transition, i.e. not when * staying in the same state. This may use the pair extractor defined in the @@ -533,9 +537,6 @@ trait FSM[S, D] extends Listeners { } case class Event(event: Any, stateData: D) - object Ev { - def unapply[D](e: Event): Option[Any] = Some(e.event) - } case class StopEvent[S, D](reason: Reason, currentState: S, stateData: D) } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 895268fb44..8a21f841bb 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -47,36 +47,36 @@ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒ /** - * Implicit conversion from `Seq` of Cause-Action pairs to a `Decider`. See makeDecider(causeAction). + * Implicit conversion from `Seq` of Cause-Directive pairs to a `Decider`. See makeDecider(causeDirective). */ - implicit def seqCauseAction2Decider(trapExit: Iterable[CauseAction]): Decider = makeDecider(trapExit) + implicit def seqCauseDirective2Decider(trapExit: Iterable[CauseDirective]): Decider = makeDecider(trapExit) // the above would clash with seqThrowable2Decider for empty lists } object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { - sealed trait Action + sealed trait Directive /** * Resumes message processing for the failed Actor */ - case object Resume extends Action + case object Resume extends Directive /** * Discards the old Actor instance and replaces it with a new, * then resumes message processing. */ - case object Restart extends Action + case object Restart extends Directive /** * Stops the Actor */ - case object Stop extends Action + case object Stop extends Directive /** * Escalates the failure to the supervisor of the supervisor, * by rethrowing the cause of the failure. */ - case object Escalate extends Action + case object Escalate extends Directive /** * Resumes message processing for the failed Actor @@ -127,9 +127,9 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { */ implicit def seqThrowable2Decider(trapExit: Seq[Class[_ <: Throwable]]): Decider = makeDecider(trapExit) - type Decider = PartialFunction[Throwable, Action] - type JDecider = akka.japi.Function[Throwable, Action] - type CauseAction = (Class[_ <: Throwable], Action) + type Decider = PartialFunction[Throwable, Directive] + type JDecider = akka.japi.Function[Throwable, Directive] + type CauseDirective = (Class[_ <: Throwable], Directive) /** * Decider builder which just checks whether one of @@ -152,14 +152,14 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { def makeDecider(trapExit: JIterable[Class[_ <: Throwable]]): Decider = makeDecider(trapExit.toSeq) /** - * Decider builder for Iterables of cause-action pairs, e.g. a map obtained + * Decider builder for Iterables of cause-directive pairs, e.g. a map obtained * from configuration; will sort the pairs so that the most specific type is * checked before all its subtypes, allowing carving out subtrees of the * Throwable hierarchy. */ - def makeDecider(flat: Iterable[CauseAction]): Decider = { - val actions = sort(flat) - return { case x ⇒ actions find (_._1 isInstance x) map (_._2) getOrElse Escalate } + def makeDecider(flat: Iterable[CauseDirective]): Decider = { + val directives = sort(flat) + return { case x ⇒ directives find (_._1 isInstance x) map (_._2) getOrElse Escalate } } def makeDecider(func: JDecider): Decider = { @@ -170,8 +170,8 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { * Sort so that subtypes always precede their supertypes, but without * obeying any order between unrelated subtypes (insert sort). */ - def sort(in: Iterable[CauseAction]): Seq[CauseAction] = - (new ArrayBuffer[CauseAction](in.size) /: in) { (buf, ca) ⇒ + def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] = + (new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) ⇒ buf.indexWhere(_._1 isAssignableFrom ca._1) match { case -1 ⇒ buf append ca case x ⇒ buf insert (x, ca) @@ -215,8 +215,8 @@ abstract class SupervisorStrategy { * Returns whether it processed the failure or not */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { - val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate - action match { + val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate + directive match { case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true case Stop ⇒ processFailure(context, false, child, cause, stats, children); true @@ -227,10 +227,13 @@ abstract class SupervisorStrategy { } /** - * Restart all child actors when one fails + * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider` + * to all children when one fails, as opposed to [[akka.actor.OneForOneStrategy]] that applies + * it only to the child actor that failed. + * * @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window - * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a + * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. */ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) @@ -270,10 +273,13 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration } /** - * Restart a child actor when it fails + * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider` + * to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies + * it to all children. + * * @param maxNrOfRetries the number of times an actor is allowed to be restarted, negative value means no limit * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window - * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Action]], you can also use a + * @param decider = mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a * `Seq` of Throwables which maps the given Throwables to restarts, otherwise escalates. */ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index 6dd4d8c2c5..daa7467196 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -37,9 +37,9 @@ import akka.japi.{ Creator } * } * * private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), - * new Function() { + * new Function() { * @Override - * public Action apply(Throwable t) { + * public Directive apply(Throwable t) { * if (t instanceof ArithmeticException) { * return resume(); * } else if (t instanceof NullPointerException) { diff --git a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java index abf2207a1d..bb8f11467c 100644 --- a/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/FaultHandlingTestBase.java @@ -40,9 +40,9 @@ public class FaultHandlingTestBase { //#strategy private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), - new Function() { + new Function() { @Override - public Action apply(Throwable t) { + public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return resume(); } else if (t instanceof NullPointerException) { @@ -78,9 +78,9 @@ public class FaultHandlingTestBase { //#strategy2 private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"), - new Function() { + new Function() { @Override - public Action apply(Throwable t) { + public Directive apply(Throwable t) { if (t instanceof ArithmeticException) { return resume(); } else if (t instanceof NullPointerException) { diff --git a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java index 2490e50794..4486450f43 100644 --- a/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/java/code/akka/docs/actor/japi/FaultHandlingDocSample.java @@ -118,9 +118,9 @@ public class FaultHandlingDocSample { // Stop the CounterService child if it throws ServiceUnavailable private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(), - new Function() { + new Function() { @Override - public Action apply(Throwable t) { + public Directive apply(Throwable t) { if (t instanceof ServiceUnavailable) { return stop(); } else { @@ -229,9 +229,9 @@ public class FaultHandlingDocSample { // Restart the storage child when StorageException is thrown. // After 3 restarts within 5 seconds it will be stopped. private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"), - new Function() { + new Function() { @Override - public Action apply(Throwable t) { + public Directive apply(Throwable t) { if (t instanceof StorageException) { return restart(); } else { diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst index 8e2dfe3cd3..17107b8a82 100644 --- a/akka-docs/java/fault-tolerance.rst +++ b/akka-docs/java/fault-tolerance.rst @@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy: :include: strategy I have chosen a few well-known exception types in order to demonstrate the -application of the fault handling actions described in :ref:`supervision`. +application of the fault handling directives described in :ref:`supervision`. First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the @@ -71,7 +71,7 @@ in the same way as the default strategy defined above. Test Application ---------------- -The following section shows the effects of the different actions in practice, +The following section shows the effects of the different directives in practice, wherefor a test setup is needed. First off, we need a suitable supervisor: .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java @@ -93,13 +93,13 @@ Let us create actors: .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java :include: create -The first test shall demonstrate the ``Resume`` action, so we try it out by +The first test shall demonstrate the ``Resume`` directive, so we try it out by setting some non-initial state in the actor and have it fail: .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java :include: resume -As you can see the value 42 survives the fault handling action. Now, if we +As you can see the value 42 survives the fault handling directive. Now, if we change the failure to a more serious ``NullPointerException``, that will no longer be the case: @@ -113,7 +113,7 @@ terminated by the supervisor: :include: stop Up to now the supervisor was completely unaffected by the child’s failure, -because the actions set did handle it. In case of an ``Exception``, this is not +because the directives set did handle it. In case of an ``Exception``, this is not true anymore and the supervisor escalates the failure. .. includecode:: code/akka/docs/actor/FaultHandlingTestBase.java @@ -123,7 +123,7 @@ The supervisor itself is supervised by the top-level actor provided by the :class:`ActorSystem`, which has the default policy to restart in case of all ``Exception`` cases (with the notable exceptions of ``ActorInitializationException`` and ``ActorKilledException``). Since the -default action in case of a restart is to kill all children, we expected our poor +default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure. In case this is not desired (which depends on the use case), we need to use a diff --git a/akka-docs/modules/microkernel.rst b/akka-docs/modules/microkernel.rst index ec6eabe3ef..7600e1ebd2 100644 --- a/akka-docs/modules/microkernel.rst +++ b/akka-docs/modules/microkernel.rst @@ -24,7 +24,7 @@ command (on a unix-based system): .. code-block:: none - bin/start sample.kernel.hello.HelloKernel + bin/akka sample.kernel.hello.HelloKernel Use ``Ctrl-C`` to interrupt and exit the microkernel. diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index c6faa5e7e1..01ef6ade83 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -657,3 +657,7 @@ extend that, either through inheritance or delegation, is to use ``PartialFunction.orElse`` chaining. .. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse + +Or: + +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#receive-orElse2 \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 4698c22315..98fa19aba2 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -133,6 +133,29 @@ class SpecificActor extends GenericActor { case class MyMsg(subject: String) //#receive-orElse +//#receive-orElse2 +trait ComposableActor extends Actor { + private var receives: List[Receive] = List() + protected def registerReceive(receive: Receive) { + receives = receive :: receives + } + + def receive = receives reduce { _ orElse _ } +} + +class MyComposableActor extends ComposableActor { + override def preStart() { + registerReceive({ + case "foo" ⇒ /* Do something */ + }) + + registerReceive({ + case "bar" ⇒ /* Do something */ + }) + } +} + +//#receive-orElse2 class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "import context" in { diff --git a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala index 3a4608e840..2b2cb003a9 100644 --- a/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/testkit/TestkitDocSpec.scala @@ -89,10 +89,10 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val fsm = TestFSMRef(new Actor with FSM[Int, String] { startWith(1, "") when(1) { - case Ev("go") ⇒ goto(2) using "go" + case Event("go", _) ⇒ goto(2) using "go" } when(2) { - case Ev("back") ⇒ goto(1) using "back" + case Event("back", _) ⇒ goto(1) using "back" } }) diff --git a/akka-docs/scala/fault-tolerance.rst b/akka-docs/scala/fault-tolerance.rst index 8eaf9398b4..f8b9fe0631 100644 --- a/akka-docs/scala/fault-tolerance.rst +++ b/akka-docs/scala/fault-tolerance.rst @@ -43,7 +43,7 @@ For the sake of demonstration let us consider the following strategy: :include: strategy I have chosen a few well-known exception types in order to demonstrate the -application of the fault handling actions described in :ref:`supervision`. +application of the fault handling directives described in :ref:`supervision`. First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the @@ -53,8 +53,8 @@ that the respective limit does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely. The match statement which forms the bulk of the body is of type ``Decider``, -which is a ``PartialFunction[Throwable, Action]``. This -is the piece which maps child failure types to their corresponding actions. +which is a ``PartialFunction[Throwable, Directive]``. This +is the piece which maps child failure types to their corresponding directives. Default Supervisor Strategy --------------------------- @@ -76,7 +76,7 @@ in the same way as the default strategy defined above. Test Application ---------------- -The following section shows the effects of the different actions in practice, +The following section shows the effects of the different directives in practice, wherefor a test setup is needed. First off, we need a suitable supervisor: .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -99,13 +99,13 @@ Let us create actors: .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala :include: create -The first test shall demonstrate the ``Resume`` action, so we try it out by +The first test shall demonstrate the ``Resume`` directive, so we try it out by setting some non-initial state in the actor and have it fail: .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala :include: resume -As you can see the value 42 survives the fault handling action. Now, if we +As you can see the value 42 survives the fault handling directive. Now, if we change the failure to a more serious ``NullPointerException``, that will no longer be the case: @@ -119,7 +119,7 @@ terminated by the supervisor: :include: stop Up to now the supervisor was completely unaffected by the child’s failure, -because the actions set did handle it. In case of an ``Exception``, this is not +because the directives set did handle it. In case of an ``Exception``, this is not true anymore and the supervisor escalates the failure. .. includecode:: code/akka/docs/actor/FaultHandlingDocSpec.scala @@ -129,7 +129,7 @@ The supervisor itself is supervised by the top-level actor provided by the :class:`ActorSystem`, which has the default policy to restart in case of all ``Exception`` cases (with the notable exceptions of ``ActorInitializationException`` and ``ActorKilledException``). Since the -default action in case of a restart is to kill all children, we expected our poor +default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure. In case this is not desired (which depends on the use case), we need to use a diff --git a/akka-docs/scala/fsm.rst b/akka-docs/scala/fsm.rst index 618381901c..2b35d21f41 100644 --- a/akka-docs/scala/fsm.rst +++ b/akka-docs/scala/fsm.rst @@ -178,7 +178,7 @@ demonstrated below: .. code-block:: scala when(Idle) { - case Ev(Start(msg)) => // convenience extractor when state data not needed + case Event(Start(msg), _) => goto(Timer) using (msg, sender) } @@ -188,9 +188,8 @@ demonstrated below: goto(Idle) } -The :class:`Event(msg, data)` case class may be used directly in the pattern as -shown in state Idle, or you may use the extractor :obj:`Ev(msg)` when the state -data are not needed. +The :class:`Event(msg: Any, data: D)` case class is parameterized with the data +type held by the FSM for convenient pattern matching. Defining the Initial State -------------------------- @@ -216,7 +215,7 @@ do something else in this case you can specify that with case Event(x : X, data) => log.info(this, "Received unhandled event: " + x) stay - case Ev(msg) => + case Event(msg, _) => log.warn(this, "Received unknown event: " + x) goto(Error) } @@ -259,7 +258,7 @@ All modifier can be chained to achieve a nice and concise description: .. code-block:: scala when(State) { - case Ev(msg) => + case Event(msg, _) => goto(Processing) using (msg) forMax (5 seconds) replying (WillDo) } @@ -396,7 +395,7 @@ state data which is available during termination handling. .. code-block:: scala when(A) { - case Ev(Stop) => + case Event(Stop, _) => doCleanup() stop() } diff --git a/akka-kernel/src/main/dist/bin/akka b/akka-kernel/src/main/dist/bin/akka index 595bc6e34c..84ae2e5d78 100755 --- a/akka-kernel/src/main/dist/bin/akka +++ b/akka-kernel/src/main/dist/bin/akka @@ -19,6 +19,6 @@ declare AKKA_HOME="$(cd "$(cd "$(dirname "$0")"; pwd -P)"/..; pwd)" [ -n "$JAVA_OPTS" ] || JAVA_OPTS="-Xmx1024M -Xms1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC -XX:OnOutOfMemoryError=\"kill -9 %p\"" -[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*:$AKKA_HOME/config" +[ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/config:$AKKA_HOME/lib/akka/*" java "$JAVA_OPTS" -cp "$AKKA_CLASSPATH" -Dakka.home="$AKKA_HOME" -Dakka.kernel.quiet=$quiet akka.kernel.Main "$@" diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 3cfbf0ce1b..d42cfcf165 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -34,7 +34,7 @@ class TestActorRef[T <: Actor]( _supervisor.path / name, false) { - private case object InternalGetActor extends AutoReceivedMessage + import TestActorRef.InternalGetActor override def newActorCell( system: ActorSystemImpl, @@ -98,6 +98,8 @@ class TestActorRef[T <: Actor]( object TestActorRef { + private case object InternalGetActor extends AutoReceivedMessage + private val number = new AtomicLong private[testkit] def randomName: String = { val l = number.getAndIncrement() diff --git a/akka-testkit/src/test/java/akka/testkit/TestActorRefJavaSpec.java b/akka-testkit/src/test/java/akka/testkit/TestActorRefJavaSpec.java new file mode 100644 index 0000000000..73350f819a --- /dev/null +++ b/akka-testkit/src/test/java/akka/testkit/TestActorRefJavaSpec.java @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.testkit; + +import org.junit.Test; +import akka.actor.Props; + +import static org.junit.Assert.*; + +public class TestActorRefJavaSpec { + + @Test + public void shouldBeAbleToUseApply() { + //Just a dummy call to make sure it compiles + TestActorRef ref = TestActorRef.apply(new Props(), null); + } +} \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 20f7e8b16a..172bdc230f 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -29,14 +29,11 @@ object AkkaSpec { stdout-loglevel = "WARNING" actor { default-dispatcher { - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-factor = 2 - core-pool-size-min = 8 - core-pool-size-max = 8 - max-pool-size-factor = 2 - max-pool-size-min = 8 - max-pool-size-max = 8 + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 8 + parallelism-factor = 2.0 + parallelism-max = 8 } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala index d2ec767504..86c6a8c7c5 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestFSMRefSpec.scala @@ -12,19 +12,17 @@ import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class TestFSMRefSpec extends AkkaSpec { - import FSM._ - "A TestFSMRef" must { "allow access to state data" in { val fsm = TestFSMRef(new Actor with FSM[Int, String] { startWith(1, "") when(1) { - case Ev("go") ⇒ goto(2) using "go" - case Ev(StateTimeout) ⇒ goto(2) using "timeout" + case Event("go", _) ⇒ goto(2) using "go" + case Event(StateTimeout, _) ⇒ goto(2) using "timeout" } when(2) { - case Ev("back") ⇒ goto(1) using "back" + case Event("back", _) ⇒ goto(1) using "back" } }, "test-fsm-ref-1") fsm.stateName must be(1) diff --git a/scripts/samples/start b/scripts/samples/start index 491c617db2..21563159f0 100755 --- a/scripts/samples/start +++ b/scripts/samples/start @@ -8,6 +8,6 @@ AKKA_HOME="$(cd "$SAMPLE"/../../../..; pwd)" [ -n "$AKKA_CLASSPATH" ] || AKKA_CLASSPATH="$AKKA_HOME/lib/scala-library.jar:$AKKA_HOME/lib/akka/*" -SAMPLE_CLASSPATH="$AKKA_CLASSPATH:$SAMPLE/lib/*:$SAMPLE/config" +SAMPLE_CLASSPATH="$SAMPLE/config:$AKKA_CLASSPATH:$SAMPLE/lib/*" java $JAVA_OPTS -cp "$SAMPLE_CLASSPATH" -Dakka.home="$SAMPLE" akka.kernel.Main diff --git a/scripts/samples/start.bat b/scripts/samples/start.bat index 1bffae4e5b..a6a3ec5e33 100644 --- a/scripts/samples/start.bat +++ b/scripts/samples/start.bat @@ -3,6 +3,6 @@ set SAMPLE=%~dp0.. set AKKA_HOME=%SAMPLE%\..\..\..\.. set JAVA_OPTS=-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC set AKKA_CLASSPATH=%AKKA_HOME%\lib\scala-library.jar;%AKKA_HOME%\lib\akka\* -set SAMPLE_CLASSPATH=%AKKA_CLASSPATH%;%SAMPLE%\lib\*;%SAMPLE%\config +set SAMPLE_CLASSPATH=%SAMPLE%\config;%AKKA_CLASSPATH%;%SAMPLE%\lib\* java %JAVA_OPTS% -cp "%SAMPLE_CLASSPATH%" -Dakka.home="%SAMPLE%" akka.kernel.Main