From e7f3945776369cf6fbff1e0dee76981c0bede343 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Jun 2011 17:40:30 +0200 Subject: [PATCH 01/17] add TestKit.expectMsgType - plus docs - and replace softTimeout hack with better solution --- akka-docs/scala/testing.rst | 21 +++++++--- .../src/main/scala/akka/testkit/TestKit.scala | 39 ++++++++++++------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index f3bb07c5bb..e8b20aa830 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -288,6 +288,13 @@ assertions concerning received messages. Here is the full list: does a conformance check; if you need the class to be equal, have a look at :meth:`expectMsgAllClassOf` with a single given class argument. + * :meth:`expectMsgType[T: Manifest](d: Duration)` + + An object which is an instance of the given type (after erasure) must be + received within the allotted time frame; the object will be returned. This + method is approximately equivalent to + ``expectMsgClass(manifest[T].erasure)``. + * :meth:`expectMsgAnyOf[T](d: Duration, obj: T*): T` An object must be received within the given time, and it must be equal ( @@ -398,21 +405,25 @@ is between :obj:`min` and :obj:`max`, where the former defaults to zero. The deadline calculated by adding the :obj:`max` parameter to the block's start time is implicitly available within the block to all examination methods, if you do not specify it, is is inherited from the innermost enclosing -:meth:`within` block. It should be noted that using :meth:`expectNoMsg` will -terminate upon reception of a message or at the deadline, whichever occurs -first; it follows that this examination usually is the last statement in a :meth:`within` block. +It should be noted that if the last message-receiving assertion of the block is +:meth:`expectNoMsg` or :meth:`receiveWhile`, the final check of the +:meth:`within` is skipped in order to avoid false positives due to wake-up +latencies. This means that while individual contained assertions still use the +maximum time bound, the overall block may take arbitrarily longer in this case. + .. code-block:: scala class SomeSpec extends WordSpec with MustMatchers with TestKit { "A Worker" must { "send timely replies" in { val worker = actorOf(...) - within (50 millis) { + within (500 millis) { worker ! "some work" expectMsg("some result") - expectNoMsg + expectNoMsg // will block for the rest of the 500ms + Thread.sleep(1000) // will NOT make this block fail } } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index bb9f69168b..10727ffc7c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -110,13 +110,12 @@ trait TestKitLight { val senderOption = Some(testActor) private var end: Duration = Duration.Inf - /* - * THIS IS A HACK: expectNoMsg and receiveWhile are bounded by `end`, but - * running them should not trigger an AssertionError, so mark their end - * time here and do not fail at the end of `within` if that time is not - * long gone. + + /** + * if last assertion was expectNoMsg, disable timing failure upon within() + * block end. */ - private var lastSoftTimeout: Duration = now - 5.millis + private var lastWasNoMsg = false /** * Stop test actor. Should be done at the end of the test unless relying on @@ -211,6 +210,8 @@ trait TestKitLight { val rem = end - start assert(rem >= min, "required min time " + min + " not possible, only " + format(min.unit, rem) + " left") + lastWasNoMsg = false + val max_diff = _max min rem val prev_end = end end = start + max_diff @@ -219,13 +220,8 @@ trait TestKitLight { val diff = now - start assert(min <= diff, "block took " + format(min.unit, diff) + ", should at least have been " + min) - /* - * caution: HACK AHEAD - */ - if (now - lastSoftTimeout > 5.millis) { + if (!lastWasNoMsg) { assert(diff <= max_diff, "block took " + format(_max.unit, diff) + ", exceeding " + format(_max.unit, max_diff)) - } else { - lastSoftTimeout -= 5.millis } ret @@ -302,6 +298,20 @@ trait TestKitLight { f(o) } + /** + * Same as `expectMsgType[T](remaining)`, but correctly treating the timeFactor. + */ + def expectMsgType[T](implicit m: Manifest[T]): T = expectMsgClass_internal(remaining, m.erasure.asInstanceOf[Class[T]]) + + /** + * Receive one message from the test actor and assert that it conforms to the + * given type (after erasure). Wait time is bounded by the given duration, + * with an AssertionFailure being thrown in case of timeout. + * + * @return the received object + */ + def expectMsgType[T](max: Duration)(implicit m: Manifest[T]): T = expectMsgClass_internal(max.dilated, m.erasure.asInstanceOf[Class[T]]) + /** * Same as `expectMsgClass(remaining, c)`, but correctly treating the timeFactor. */ @@ -454,7 +464,7 @@ trait TestKitLight { private def expectNoMsg_internal(max: Duration) { val o = receiveOne(max) assert(o eq null, "received unexpected message " + o) - lastSoftTimeout = now + lastWasNoMsg = true } /** @@ -503,7 +513,7 @@ trait TestKitLight { } val ret = doit(Nil) - lastSoftTimeout = now + lastWasNoMsg = true ret } @@ -543,6 +553,7 @@ trait TestKitLight { } else { queue.takeFirst } + lastWasNoMsg = false message match { case null ⇒ lastMessage = NullMessage From 48feec0bbb1d81d74e7706f553638a217933da53 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Jun 2011 21:33:27 +0200 Subject: [PATCH 02/17] add debug output to investigate cause of RoutingSpec sporadic failures "smallest mailbox" test fails randomly on my notebook --- .../src/test/scala/akka/routing/RoutingSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 3544767453..52ceb098bd 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -10,6 +10,7 @@ import akka.util.duration._ import akka.actor._ import akka.actor.Actor._ import akka.routing._ +import akka.event.EventHandler import java.util.concurrent.atomic.AtomicInteger import akka.dispatch.{ KeptPromise, Future } @@ -122,9 +123,11 @@ class RoutingSpec extends WordSpec with MustMatchers { latch.await(10 seconds) } finally { // because t1 is much slower and thus has a bigger mailbox all the time - t1Count.get must be < (t2Count.get) + EventHandler.info(this, "t1=" + t1Count + " t2=" + t2Count) } + t1Count.get must be < t2Count.get + for (a ← List(t1, t2, d)) a.stop() } From 10256991cbd1e52fbebab5d426dee7331e786652 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 27 Jun 2011 19:09:09 +0200 Subject: [PATCH 03/17] add Actor.freshInstance hook, test #955 --- .../akka/actor/actor/ActorRestartSpec.scala | 148 ++++++++++++++++++ .../src/main/scala/akka/actor/Actor.scala | 12 ++ .../src/main/scala/akka/actor/ActorRef.scala | 15 +- akka-docs/scala/actors.rst | 116 +++++++++----- 4 files changed, 253 insertions(+), 38 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala new file mode 100644 index 0000000000..3f1398e596 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor + +import org.scalatest.{ WordSpec, BeforeAndAfterAll, BeforeAndAfterEach } +import org.scalatest.matchers.MustMatchers + +import Actor.actorOf +import akka.testkit._ +import akka.util.duration._ +import akka.config.Supervision.OneForOneStrategy + +import java.util.concurrent.atomic._ + +object ActorRestartSpec { + + private var _gen = new AtomicInteger(0) + def generation = _gen.incrementAndGet + def generation_=(x: Int) { _gen.set(x) } + + sealed trait RestartType + case object Normal extends RestartType + case object Nested extends RestartType + case object Handover extends RestartType + case object Fail extends RestartType + + class Restarter(val testActor: ActorRef) extends Actor { + val gen = generation + var xx = 0 + var restart: RestartType = Normal + def receive = { + case x: Int ⇒ xx = x + case t: RestartType ⇒ restart = t + case "get" ⇒ self reply xx + } + override def preStart { testActor ! (("preStart", gen)) } + override def postStop { testActor ! (("postStop", gen)) } + override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } + override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } + override def freshInstance() = { + restart match { + case Normal ⇒ None + case Nested ⇒ + val ref = TestActorRef(new Actor { + def receive = { case _ ⇒ } + override def preStart { testActor ! ((this, self)) } + }).start() + testActor ! ((ref.underlyingActor, ref)) + None + case Handover ⇒ + val fresh = new Restarter(testActor) + fresh.xx = xx + Some(fresh) + case Fail ⇒ + throw new IllegalActorStateException("expected") + } + } + } + + class Supervisor extends Actor { + self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), 5, 5000) + def receive = { + case _ ⇒ + } + } +} + +class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with BeforeAndAfterEach { + import ActorRestartSpec._ + + override def beforeEach { generation = 0 } + + "An Actor restart" must { + + "invoke preRestart, preStart, postRestart" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "support creation of nested actors in freshInstance()" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! Nested + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] + tRef.underlyingActor must be(tActor) + expectMsg((tActor, tRef)) + tRef.stop() + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + } + + "use freshInstance() if available" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! 42 + actor ! Handover + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 42) + } + + "fall back to default factory if freshInstance() fails" in { + val actor = actorOf(new Restarter(testActor)).start() + expectMsg(1 second, ("preStart", 1)) + val supervisor = actorOf[Supervisor].start() + supervisor link actor + actor ! 42 + actor ! Fail + actor ! Kill + within(1 second) { + expectMsg(("preRestart", 1)) + expectMsg(("preStart", 2)) + expectMsg(("postRestart", 2)) + expectNoMsg + } + actor ! "get" + expectMsg(1 second, 0) + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 0ec0616acb..9925c060a5 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -662,6 +662,18 @@ trait Actor { */ def preRestart(reason: Throwable) {} + /** + * User overridable callback. + *

+ * Is called on the crashed Actor to give it the option of producing the + * Actor's reincarnation. If it returns None, which is the default, the + * initially provided actor factory is used. + *

+ * Warning: Propagating state from a crashed actor carries the risk + * of proliferating the cause of the error. Consider let-it-crash first. + */ + def freshInstance(): Option[Actor] = None + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 17fa3139dd..5e3c5fd3cf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -845,7 +845,20 @@ class LocalActorRef private[akka] ( val stackBefore = refStack.get refStack.set(stackBefore.push(this)) try { - actorFactory() + if (_status == ActorRefInternals.BEING_RESTARTED) { + val a = actor + val fresh = try a.freshInstance catch { + case e ⇒ + EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory") + None + } + fresh match { + case Some(ref) ⇒ ref + case None ⇒ actorFactory() + } + } else { + actorFactory() + } } finally { val stackAfter = refStack.get if (stackAfter.nonEmpty) diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index eb25cb2d1a..ad9fbf2756 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -92,6 +92,83 @@ Here we create a light-weight actor-based thread, that can be used to spawn off ... // do stuff } +Actor Internal API +------------------ + +The :class:`Actor` trait defines only one abstract method, the abovementioned +:meth:`receive`. In addition, it offers two convenience methods +:meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as +described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s +:class:`ActorRef` object. The remaining visible methods are user-overridable +life-cycle hooks which are described in the following:: + + def preStart() {} + def preRestart(cause: Throwable) {} + def freshInstance(): Option[Actor] = None + def postRestart(cause: Throwable) {} + def postStop() {} + +The implementations shown above are the defaults provided by the :class:`Actor` +trait. + +Start Hook +^^^^^^^^^^ + +Right after starting the actor, its :meth:`preStart` method is invoked. This is +guaranteed to happen before the first message from external sources is queued +to the actor’s mailbox. + +:: + + override def preStart { + // e.g. send initial message to self + self ! GetMeStarted + // or do any other stuff, e.g. registering with other actors + someService ! Register(self) + } + +Restart Hooks +^^^^^^^^^^^^^ + +A supervised actor, i.e. one which is linked to another actor with a fault +handling strategy, will be restarted in case an exception is thrown while +processing a message. This restart involves four of the hooks mentioned above: + +1. The old actor is informed by calling :meth:`preRestart` with the exception + which caused the restart; this method is the best place for cleaning up, + preparing hand-over to the fresh actor instance, etc. +2. The old actor’s :meth:`freshInstance` factory method is invoked, which may + optionally produce the new actor instance which will replace this actor. If + this method returns :obj:`None` or throws an exception, the initial factory + from the ``Actor.actorOf`` call is used to produce the fresh instance. +3. The new actor’s :meth:`preStart` method is invoked, just as in the normal + start-up case. +4. The new actor’s :meth:`postRestart` method is called with the exception + which caused the restart. + +.. warning:: + + The :meth:`freshInstance` hook may be used to propagate (part of) the failed + actor’s state to the fresh instance. This carries the risk of proliferating + the cause for the crash which triggered the restart. If you are tempted to + take this route, it is strongly advised to step back and consider other + possible approaches, e.g. distributing the state in question using other + means or spawning short-lived worker actors for carrying out “risky” tasks. + +An actor restart replaces only the actual actor object; the contents of the +mailbox and the hotswap stack are unaffected by the restart, so processing of +messages will resume after the :meth:`postRestart` hook returns. Any message +sent to an actor which is being restarted will be queued to its mailbox as +usual. + +Stop Hook +^^^^^^^^^ + +After stopping an actor, its :meth:`postStop` hook is called, which may be used +e.g. for deregistering this actor from other services. This hook is guaranteed +to run after message queuing has been disabled for this actor, i.e. sending +messages would fail with an :class:`IllegalActorStateException`. + Identifying Actors ------------------ @@ -252,43 +329,6 @@ This method should return a ``PartialFunction``, e.g. a ‘match/case’ clause } } -Actor internal API ------------------- - -The Actor trait contains almost no member fields or methods to invoke, you just use the Actor trait to implement the: - -#. ``receive`` message handler -#. life-cycle callbacks: - - #. preStart - #. postStop - #. preRestart - #. postRestart - -The ``Actor`` trait has one single member field: - -.. code-block:: scala - - val self: ActorRef - -This ``self`` field holds a reference to its ``ActorRef`` and it is this reference you want to access the Actor's API. Here, for example, you find methods to reply to messages, send yourself messages, define timeouts, fault tolerance etc., start and stop etc. - -However, for convenience you can import these functions and fields like below, which will allow you do drop the ``self`` prefix: - -.. code-block:: scala - - class MyActor extends Actor { - import self._ - id = ... - dispatcher = ... - start - ... - } - -But in this documentation we will always prefix the calls with ``self`` for clarity. - -Let's start by looking how we can reply to messages in a convenient way using this ``ActorRef`` API. - Reply to messages ----------------- @@ -441,6 +481,8 @@ You can also send an actor the ``akka.actor.PoisonPill`` message, which will sto If the sender is a ``Future`` (e.g. the message is sent with ``?``), the ``Future`` will be completed with an ``akka.actor.ActorKilledException("PoisonPill")``. +.. _Actor.HotSwap: + HotSwap ------- From 956d055d872ec19f8944b217a89a60e220636bef Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 27 Jun 2011 22:20:09 +0200 Subject: [PATCH 04/17] make currentMessage available in preRestart, test #957 - use Option[Any] since currentMessage may be unavailable (supervisor being restarted) - plus docs --- .../akka/actor/actor/ActorRestartSpec.scala | 41 +++++++++++++++---- .../src/main/scala/akka/actor/Actor.scala | 13 +++++- .../src/main/scala/akka/actor/ActorRef.scala | 3 +- akka-docs/scala/actors.rst | 17 +++++--- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index 3f1398e596..2b52c5cd4f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -36,7 +36,6 @@ object ActorRestartSpec { case "get" ⇒ self reply xx } override def preStart { testActor ! (("preStart", gen)) } - override def postStop { testActor ! (("postStop", gen)) } override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } override def freshInstance() = { @@ -71,13 +70,21 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo import ActorRestartSpec._ override def beforeEach { generation = 0 } + override def afterEach { toStop foreach (_.stop()) } + + private var toStop = List.empty[ActorRef] + private def newActor(f: ⇒ Actor): ActorRef = { + val ref = actorOf(f) + toStop ::= ref + ref.start() + } "An Actor restart" must { "invoke preRestart, preStart, postRestart" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! Kill within(1 second) { @@ -89,9 +96,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "support creation of nested actors in freshInstance()" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! Nested actor ! Kill @@ -108,9 +115,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "use freshInstance() if available" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! 42 actor ! Handover @@ -126,9 +133,9 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo } "fall back to default factory if freshInstance() fails" in { - val actor = actorOf(new Restarter(testActor)).start() + val actor = newActor(new Restarter(testActor)) expectMsg(1 second, ("preStart", 1)) - val supervisor = actorOf[Supervisor].start() + val supervisor = newActor(new Supervisor) supervisor link actor actor ! 42 actor ! Fail @@ -143,6 +150,22 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo expectMsg(1 second, 0) } + "call preRestart(cause, currentMessage) if defined" in { + val actor = newActor(new Actor { + def receive = { case _ ⇒ } + override def preRestart(cause: Throwable, currentMessage: Option[Any]) { + testActor ! (("preRestart", currentMessage)) + } + }) + val supervisor = newActor(new Supervisor) + supervisor link actor + actor ! Kill + within(1 second) { + expectMsg(("preRestart", Some(Kill))) + expectNoMsg + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 9925c060a5..bb4bb47327 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -658,10 +658,21 @@ trait Actor { /** * User overridable callback. *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. + * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. Override either the variant + * with or without the currentMessage argument. */ def preRestart(reason: Throwable) {} + /** + * User overridable callback. + *

+ * Is called on a crashed Actor right BEFORE it is restarted to allow clean + * up of resources before Actor is terminated. Override either the variant + * with or without the currentMessage argument. + */ + def preRestart(reason: Throwable, message: Option[Any]) { preRestart(reason) } + /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5e3c5fd3cf..1287324288 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -766,7 +766,8 @@ class LocalActorRef private[akka] ( def performRestart() { val failedActor = actorInstance.get if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - failedActor.preRestart(reason) + val message = if (currentMessage ne null) Some(currentMessage.message) else None + failedActor.preRestart(reason, message) val freshActor = newActor setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index ad9fbf2756..b23d53385e 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -99,11 +99,15 @@ The :class:`Actor` trait defines only one abstract method, the abovementioned :meth:`receive`. In addition, it offers two convenience methods :meth:`become`/:meth:`unbecome` for modifying the hotswap behavior stack as described in :ref:`Actor.HotSwap` and the :obj:`self` reference to this actor’s -:class:`ActorRef` object. The remaining visible methods are user-overridable -life-cycle hooks which are described in the following:: +:class:`ActorRef` object. If the current actor behavior does not match a +received message, :meth:`unhandled` is called, which by default throws an +:class:`UnhandledMessageException`. + +The remaining visible methods are user-overridable life-cycle hooks which are +described in the following:: def preStart() {} - def preRestart(cause: Throwable) {} + def preRestart(cause: Throwable, message: Option[Any]) {} def freshInstance(): Option[Actor] = None def postRestart(cause: Throwable) {} def postStop() {} @@ -135,7 +139,10 @@ handling strategy, will be restarted in case an exception is thrown while processing a message. This restart involves four of the hooks mentioned above: 1. The old actor is informed by calling :meth:`preRestart` with the exception - which caused the restart; this method is the best place for cleaning up, + which caused the restart and the message which triggered that exception; the + latter may be ``None`` if the restart was not caused by processing a + message, e.g. when a supervisor does not trap the exception and is restarted + in turn by its supervisor. This method is the best place for cleaning up, preparing hand-over to the fresh actor instance, etc. 2. The old actor’s :meth:`freshInstance` factory method is invoked, which may optionally produce the new actor instance which will replace this actor. If @@ -158,7 +165,7 @@ processing a message. This restart involves four of the hooks mentioned above: An actor restart replaces only the actual actor object; the contents of the mailbox and the hotswap stack are unaffected by the restart, so processing of messages will resume after the :meth:`postRestart` hook returns. Any message -sent to an actor which is being restarted will be queued to its mailbox as +sent to an actor while it is being restarted will be queued to its mailbox as usual. Stop Hook From 00b9166b577ef6a270cfe4eb6bdfdb4049bc8f31 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 3 Jul 2011 20:38:06 +0200 Subject: [PATCH 05/17] make ActorRestartSpec thread safe --- .../scala/akka/actor/actor/ActorRestartSpec.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index 2b52c5cd4f..cdff70c461 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -70,12 +70,18 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo import ActorRestartSpec._ override def beforeEach { generation = 0 } - override def afterEach { toStop foreach (_.stop()) } + override def afterEach { + val it = toStop.iterator + while (it.hasNext) { + it.next.stop() + it.remove + } + } - private var toStop = List.empty[ActorRef] + private var toStop = new java.util.concurrent.ConcurrentSkipListSet[ActorRef] private def newActor(f: ⇒ Actor): ActorRef = { val ref = actorOf(f) - toStop ::= ref + toStop add ref ref.start() } From 5de2ca7aa55df94f7591c78f1a5b7cdd95f5068e Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 3 Jul 2011 22:19:34 +0200 Subject: [PATCH 06/17] remove Actor.preRestart(cause: Throwable) - adapt all internal uses (all tests green) - start migration guide for 2.0 with this change --- .../scala/akka/actor/actor/ActorRefSpec.scala | 2 +- .../akka/actor/actor/ActorRestartSpec.scala | 26 ++++--------------- .../akka/actor/actor/FSMTransitionSpec.scala | 2 +- .../actor/supervisor/SupervisorTreeSpec.scala | 2 +- .../akka/actor/supervisor/Ticket669Spec.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 14 ++-------- .../main/scala/akka/actor/UntypedActor.scala | 2 +- .../src/main/scala/akka/camel/Producer.scala | 2 +- .../scala/akka/camel/ConsumerScalaTest.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../project/migration-guide-1.1.x-1.2.x.rst | 6 +++++ .../project/migration-guide-1.2.x-2.0.x.rst | 20 ++++++++++++++ akka-docs/project/migration-guides.rst | 2 ++ .../akka/spring/RemoteTypedActorOneImpl.java | 4 ++- .../akka/spring/RemoteTypedActorTwoImpl.java | 4 ++- .../scala/akka/testkit/TestActorRefSpec.scala | 2 +- 16 files changed, 50 insertions(+), 44 deletions(-) create mode 100644 akka-docs/project/migration-guide-1.1.x-1.2.x.rst create mode 100644 akka-docs/project/migration-guide-1.2.x-2.0.x.rst diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala index 4bef71a74d..f11b762493 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRefSpec.scala @@ -334,7 +334,7 @@ class ActorRefSpec extends WordSpec with MustMatchers { val ref = Actor.actorOf( new Actor { def receive = { case _ ⇒ } - override def preRestart(reason: Throwable) = latch.countDown() + override def preRestart(reason: Throwable, msg: Option[Any]) = latch.countDown() override def postRestart(reason: Throwable) = latch.countDown() }).start() diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala index cdff70c461..d2b9a42ee5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ActorRestartSpec.scala @@ -36,7 +36,7 @@ object ActorRestartSpec { case "get" ⇒ self reply xx } override def preStart { testActor ! (("preStart", gen)) } - override def preRestart(cause: Throwable) { testActor ! (("preRestart", gen)) } + override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! (("preRestart", msg, gen)) } override def postRestart(cause: Throwable) { testActor ! (("postRestart", gen)) } override def freshInstance() = { restart match { @@ -94,7 +94,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo supervisor link actor actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -109,7 +109,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Nested actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) val (tActor, tRef) = expectMsgType[(Actor, TestActorRef[Actor])] tRef.underlyingActor must be(tActor) expectMsg((tActor, tRef)) @@ -129,7 +129,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Handover actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -147,7 +147,7 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo actor ! Fail actor ! Kill within(1 second) { - expectMsg(("preRestart", 1)) + expectMsg(("preRestart", Some(Kill), 1)) expectMsg(("preStart", 2)) expectMsg(("postRestart", 2)) expectNoMsg @@ -156,22 +156,6 @@ class ActorRestartSpec extends WordSpec with MustMatchers with TestKit with Befo expectMsg(1 second, 0) } - "call preRestart(cause, currentMessage) if defined" in { - val actor = newActor(new Actor { - def receive = { case _ ⇒ } - override def preRestart(cause: Throwable, currentMessage: Option[Any]) { - testActor ! (("preRestart", currentMessage)) - } - }) - val supervisor = newActor(new Supervisor) - supervisor link actor - actor ! Kill - within(1 second) { - expectMsg(("preRestart", Some(Kill))) - expectNoMsg - } - } - } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala index 0cbaddd8e6..1eba7a71c1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/FSMTransitionSpec.scala @@ -32,7 +32,7 @@ object FSMTransitionSpec { case Ev("reply") ⇒ stay replying "reply" } initialize - override def preRestart(reason: Throwable) { target ! "restarted" } + override def preRestart(reason: Throwable, msg: Option[Any]) { target ! "restarted" } } class Forwarder(target: ActorRef) extends Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala index d8aaa9d0e4..85b41c777c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorTreeSpec.scala @@ -25,7 +25,7 @@ class SupervisorTreeSpec extends WordSpec with MustMatchers { case Die ⇒ throw new Exception(self.address + " is dying...") } - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { log += self.address } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala index 0a70058c7b..41019fc0a8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/Ticket669Spec.scala @@ -59,7 +59,7 @@ object Ticket669Spec { case msg ⇒ throw new Exception("test") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.reply_?("failure1") } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index bb4bb47327..fecd776ca1 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -659,19 +659,9 @@ trait Actor { * User overridable callback. *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean - * up of resources before Actor is terminated. Override either the variant - * with or without the currentMessage argument. + * up of resources before Actor is terminated. */ - def preRestart(reason: Throwable) {} - - /** - * User overridable callback. - *

- * Is called on a crashed Actor right BEFORE it is restarted to allow clean - * up of resources before Actor is terminated. Override either the variant - * with or without the currentMessage argument. - */ - def preRestart(reason: Throwable, message: Option[Any]) { preRestart(reason) } + def preRestart(reason: Throwable, message: Option[Any]) {} /** * User overridable callback. diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index d561ce6221..fa546b2437 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -102,7 +102,7 @@ abstract class UntypedActor extends Actor { *

* Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated. */ - override def preRestart(reason: Throwable) {} + override def preRestart(reason: Throwable, lastMessage: Option[Any]) {} /** * User overridable callback. diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 041f3397ff..9ad6ce22d2 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -58,7 +58,7 @@ trait ProducerSupport { this: Actor ⇒ * Default implementation of Actor.preRestart for freeing resources needed * to actually send messages to endpointUri. */ - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { try { preRestartProducer(reason) } finally { processor.stop } } diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index d80e847efa..27bbc264d7 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -254,7 +254,7 @@ object ConsumerScalaTest { case "succeed" ⇒ self.reply("ok") } - override def preRestart(reason: scala.Throwable) { + override def preRestart(reason: scala.Throwable, msg: Option[Any]) { self.reply_?("pr") } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 48c77ce7db..afab58547a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1606,7 +1606,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = Dispatchers.newPinnedDispatcher(self) - override def preRestart(reason: Throwable) { + override def preRestart(reason: Throwable, msg: Option[Any]) { EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason)) } diff --git a/akka-docs/project/migration-guide-1.1.x-1.2.x.rst b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst new file mode 100644 index 0000000000..e4988b9460 --- /dev/null +++ b/akka-docs/project/migration-guide-1.1.x-1.2.x.rst @@ -0,0 +1,6 @@ +.. _migration-1.2: + +################################ + Migration Guide 1.1.x to 1.2.x +################################ + diff --git a/akka-docs/project/migration-guide-1.2.x-2.0.x.rst b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst new file mode 100644 index 0000000000..7eabcf2f10 --- /dev/null +++ b/akka-docs/project/migration-guide-1.2.x-2.0.x.rst @@ -0,0 +1,20 @@ +.. _migration-2.0: + +################################ + Migration Guide 1.2.x to 2.0.x +################################ + +Actors +====== + +The 2.0 release contains several new features which require source-level +changes in client code. This API cleanup is planned to be the last one for a +significant amount of time. + +Lifecycle Callbacks +------------------- + +The :meth:`preRestart(cause: Throwable)` method has been replaced by +:meth:`preRestart(cause: Throwable, lastMessage: Any)`, hence you must insert +the second argument in all overriding methods. The good news is that any missed +actor will not compile without error. diff --git a/akka-docs/project/migration-guides.rst b/akka-docs/project/migration-guides.rst index 7eb063811c..7af815f241 100644 --- a/akka-docs/project/migration-guides.rst +++ b/akka-docs/project/migration-guides.rst @@ -6,6 +6,8 @@ Migration Guides .. toctree:: :maxdepth: 1 + migration-guide-1.2.x-2.0.x + migration-guide-1.1.x-1.2.x migration-guide-1.0.x-1.1.x migration-guide-0.10.x-1.0.x migration-guide-0.9.x-0.10.x diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java index c7ad0f3de6..f6ab0e56e7 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorOneImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedActorOne { @@ -22,7 +24,7 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java index 1479db0d8c..2ee07195e2 100644 --- a/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java +++ b/akka-spring/src/test/java/akka/spring/RemoteTypedActorTwoImpl.java @@ -2,6 +2,8 @@ package akka.spring; import akka.actor.*; +import scala.Option; + import java.util.concurrent.CountDownLatch; public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedActorTwo { @@ -22,7 +24,7 @@ public class RemoteTypedActorTwoImpl extends TypedActor implements RemoteTypedAc } @Override - public void preRestart(Throwable e) { + public void preRestart(Throwable e, Option msg) { try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {} latch.countDown(); } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 3ef904fe88..6b4f2415b9 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -174,7 +174,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac self.faultHandler = OneForOneStrategy(List(classOf[Throwable]), Some(2), Some(1000)) val ref = TestActorRef(new TActor { def receiveT = { case _ ⇒ } - override def preRestart(reason: Throwable) { counter -= 1 } + override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 } }).start() self.dispatcher = CallingThreadDispatcher.global From 4e6dd9e74e13aba812299502e0d092f6360a2565 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jul 2011 15:30:35 +0200 Subject: [PATCH 07/17] The Unb0rkening --- .../cluster/api/registry/RegistryStoreMultiJvmSpec.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala index ae0eadd97f..349626445c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/api/registry/RegistryStoreMultiJvmSpec.scala @@ -57,16 +57,14 @@ class RegistryStoreMultiJvmNode1 extends MasterClusterTestNode { } barrier("store-1-in-node-1", NrOfNodes) { - val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - node.store("hello-world-1", classOf[HelloWorld1], serializer) + node.store("hello-world-1", classOf[HelloWorld1], Serialization.serializerFor(classOf[HelloWorld1])) } barrier("use-1-in-node-2", NrOfNodes) { } barrier("store-2-in-node-1", NrOfNodes) { - val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x ⇒ fail("No serializer found"), s ⇒ s) - node.store("hello-world-2", classOf[HelloWorld1], false, serializer) + node.store("hello-world-2", classOf[HelloWorld1], false, Serialization.serializerFor(classOf[HelloWorld1])) } barrier("use-2-in-node-2", NrOfNodes) { From bb558c03f4f003d3452e6c696780f0bb1cd36a7f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jul 2011 17:17:04 +0200 Subject: [PATCH 08/17] Switching to Serializer.Identifier for storing within ZK --- .../scala/akka/serialization/Serializer.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 25 +++++-------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 835cf9fa7a..44ecffc2e8 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -8,7 +8,7 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B import akka.util.ClassLoaderObjectInputStream object Serializer { - val defaultSerializerName = JavaSerializer.getClass.getName + val defaultSerializerName = classOf[JavaSerializer].getName type Identifier = Byte } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 9a26ad985b..5f5d4f4771 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -703,8 +703,6 @@ class DefaultClusterNode private[akka]( serializeMailbox: Boolean, serializer: Serializer): ClusterNode = if (isConnected.isOn) { - val serializerClassName = serializer.getClass.getName - EventHandler.debug(this, "Storing actor with address [%s] in cluster".format(actorAddress)) @@ -739,9 +737,9 @@ class DefaultClusterNode private[akka]( // create ADDRESS -> SERIALIZER CLASS NAME mapping try { - zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName) + zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString) } catch { - case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName) + case e: ZkNodeExistsException ⇒ zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializer.identifier.toString) } // create ADDRESS -> NODE mapping @@ -1084,21 +1082,10 @@ class DefaultClusterNode private[akka]( /** * Returns Serializer for actor with specific address. */ - def serializerForActor(actorAddress: String): Serializer = { - val serializerClassName = - try { - zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String] - } catch { - case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) - } - - ReflectiveAccess.getClassFor(serializerClassName) match { - // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess. - case Right(clazz) ⇒ clazz.newInstance.asInstanceOf[Serializer] - case Left(error) ⇒ - EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString)) - throw error - } + def serializerForActor(actorAddress: String): Serializer = try { + Serialization.serializerByIdentity(zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String].toByte) + } catch { + case e: ZkNoNodeException ⇒ throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress)) } /** From fe1051af30ac2a34b040d0ec428d18242eaed462 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jul 2011 19:28:17 +0200 Subject: [PATCH 09/17] Adding some ScalaDoc to the Serializer --- .../main/scala/akka/serialization/Serializer.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 44ecffc2e8..aa57c4b47d 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -12,13 +12,24 @@ object Serializer { type Identifier = Byte } +/** + * A Serializer represents a bimap between an object and an array of bytes representing that object + */ trait Serializer extends scala.Serializable { /** * Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic * Values from 0 to 16 is reserved for Akka internal usage */ def identifier: Serializer.Identifier + + /** + * Serializes the given object into an Array of Byte + */ def toBinary(o: AnyRef): Array[Byte] + + /** + * Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into + */ def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef } From 3bc7db0ddeedaa83f698025dafc67e7ea86065ea Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 19 Jul 2011 19:28:46 +0200 Subject: [PATCH 10/17] Closing ticket #1030, removing lots of warnings --- .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../scala/akka/dispatch/Dispatchers.scala | 15 ++++++----- .../src/main/scala/akka/dispatch/Future.scala | 14 +++++----- .../main/scala/akka/event/EventHandler.scala | 4 +-- .../src/main/scala/akka/routing/Pool.scala | 2 +- .../src/main/scala/akka/cluster/Cluster.scala | 12 ++++----- .../remote/netty/NettyRemoteSupport.scala | 26 +++++++++---------- .../serialization/SerializationProtocol.scala | 4 +-- .../src/main/scala/akka/agent/Agent.scala | 8 +++--- .../scala/akka/testkit/TestActorRef.scala | 6 ++--- 10 files changed, 46 insertions(+), 47 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index efdf9dc269..948c90ecbe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1011,7 +1011,7 @@ private[akka] case class RemoteActorRef private[akka]( case _ ⇒ None } val chFuture = channel match { - case f: Promise[Any] ⇒ Some(f) + case f: Promise[_] ⇒ Some(f.asInstanceOf[Promise[Any]]) case _ ⇒ None } val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 9af29eed98..64c0c5afb2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -197,14 +197,15 @@ object Dispatchers { case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { - case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒ - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match { - case r: Right[Exception, MessageDispatcherConfigurator] ⇒ r.b - case l: Left[Exception, MessageDispatcherConfigurator] ⇒ - throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a) + case Right(clazz) ⇒ + ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match { + case Right(configurator) ⇒ configurator + case Left(exception)⇒ + throw new IllegalArgumentException( + "Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception) } - case l: Left[Exception, _] ⇒ - throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a) + case Left(exception) ⇒ + throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, exception) } } map { _ configure cfg diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 8705d3da4e..5c858a0905 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -90,8 +90,8 @@ object Futures { val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature? f.value.get match { - case r: Right[Throwable, T] ⇒ - val added = results add r.b + case Right(value) ⇒ + val added = results add value if (added && results.size == allDone) { //Only one thread can get here if (done.switchOn) { try { @@ -109,9 +109,9 @@ object Futures { } } } - case l: Left[Throwable, T] ⇒ + case Left(exception) ⇒ if (done.switchOn) { - result completeWithException l.a + result completeWithException exception results.clear } } @@ -148,10 +148,8 @@ object Futures { val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold f.value.get match { - case r: Right[Throwable, T] ⇒ - result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op)) - case l: Left[Throwable, T] ⇒ - result.completeWithException(l.a) + case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op)) + case Left(exception) ⇒ result.completeWithException(exception) } } } diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 134543e284..0a0e00e2cc 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -112,8 +112,8 @@ object EventHandler extends ListenerManagement { defaultListeners foreach { listenerName ⇒ try { ReflectiveAccess.getClassFor[Actor](listenerName) match { - case r: Right[_, Class[Actor]] ⇒ addListener(Actor.localActorOf(r.b).start()) - case l: Left[Exception, _] ⇒ throw l.a + case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start()) + case Left(exception) ⇒ throw exception } } catch { case e: Exception ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 6398ef2241..6bfd4c18b3 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -265,7 +265,7 @@ trait MailboxPressureCapacitor { */ trait ActiveFuturesPressureCapacitor { def pressure(delegates: Seq[ActorRef]): Int = - delegates count { _.channel.isInstanceOf[Promise[Any]] } + delegates count { _.channel.isInstanceOf[Promise[_]] } } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5f5d4f4771..25cb290f05 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1917,7 +1917,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case f: Function0[Unit] ⇒ try { + case f: Function0[_] ⇒ try { f() } finally { self.stop() @@ -1930,7 +1930,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case f: Function0[Any] ⇒ try { + case f: Function0[_] ⇒ try { self.reply(f()) } finally { self.stop() @@ -1943,8 +1943,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case (fun: Function[Any, Unit], param: Any) ⇒ try { - fun(param) + case (fun: Function[_, _], param: Any) ⇒ try { + fun.asInstanceOf[Any => Unit].apply(param) } finally { self.stop() } @@ -1956,8 +1956,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = computeGridDispatcher def receive = { - case (fun: Function[Any, Unit], param: Any) ⇒ try { - self.reply(fun(param)) + case (fun: Function[_, _], param: Any) ⇒ try { + self.reply(fun.asInstanceOf[Any => Any](param)) } finally { self.stop() } diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e79e426d94..a339a1b8b6 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -88,14 +88,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem lock.readLock.lock try { val c = remoteClients.get(key) match { - case s: Some[RemoteClient] ⇒ s.get + case Some(client) ⇒ client case None ⇒ lock.readLock.unlock lock.writeLock.lock //Lock upgrade, not supported natively try { try { remoteClients.get(key) match { //Recheck for addition, race between upgrades - case s: Some[RemoteClient] ⇒ s.get //If already populated by other writer + case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) client.connect() @@ -111,15 +111,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { remoteClients.remove(Address(address)) match { - case s: Some[RemoteClient] ⇒ s.get.shutdown() - case None ⇒ false + case Some(client) ⇒ client.shutdown() + case None ⇒ false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { remoteClients.get(Address(address)) match { - case s: Some[RemoteClient] ⇒ s.get.connect(reconnectIfAlreadyConnected = true) - case None ⇒ false + case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true) + case None ⇒ false } } @@ -632,12 +632,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) def address = currentServer.get match { - case s: Some[NettyRemoteServer] ⇒ s.get.address - case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress + case Some(server) ⇒ server.address + case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress } def name = currentServer.get match { - case s: Some[NettyRemoteServer] ⇒ s.get.name + case Some(server) ⇒ server.name case None ⇒ val a = ReflectiveAccess.RemoteModule.configDefaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort @@ -920,15 +920,15 @@ class RemoteServerHandler( request.getActorInfo.getTimeout, new ActorPromise(request.getActorInfo.getTimeout). onComplete(_.value.get match { - case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request)) - case r: Right[Throwable, Any] ⇒ + case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request)) + case r: Right[_,_] ⇒ val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( Some(actorRef), Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, - r, - true, + r.asInstanceOf[Either[Throwable,Any]], + isOneWay = true, Some(actorRef)) // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method diff --git a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala index e86665295b..d8b1293bc6 100644 --- a/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-cluster/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -95,10 +95,10 @@ object ActorSerialization { if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.") val messages = actorRef.mailbox match { - case q: java.util.Queue[MessageInvocation] ⇒ + case q: java.util.Queue[_] ⇒ val l = new scala.collection.mutable.ListBuffer[MessageInvocation] val it = q.iterator - while (it.hasNext == true) l += it.next + while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation] l } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 3a97263369..dca421c941 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -282,8 +282,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor { val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false) def receive = { - case update: Update[T] ⇒ - self.tryReply(atomic(txFactory) { agent.ref alter update.function }) + case update: Update[_] ⇒ + self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] }) case Get ⇒ self reply agent.get case _ ⇒ () } @@ -298,8 +298,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false) def receive = { - case update: Update[T] ⇒ try { - self.tryReply(atomic(txFactory) { agent.ref alter update.function }) + case update: Update[_] ⇒ try { + self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] }) } finally { agent.resume self.stop() diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 6920faea64..116450665a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -70,12 +70,12 @@ object TestActorRef { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[T](manifest[T].erasure, noParams, noArgs) match { - case r: Right[_, T] ⇒ r.b - case l: Left[Exception, _] ⇒ throw new ActorInitializationException( + case Right(value) ⇒ value + case Left(exception) ⇒ throw new ActorInitializationException( "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + - "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", l.a) + "\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", exception) } }, address) } From 9c5b789b83730e3b0855d05f747bf8b301ff2626 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Jul 2011 15:35:49 +0200 Subject: [PATCH 11/17] Ticket 1002: group conf settings --- akka-sbt-plugin/build.sbt | 3 -- .../src/main/scala/AkkaKernelPlugin.scala | 51 +++++++++++-------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/akka-sbt-plugin/build.sbt b/akka-sbt-plugin/build.sbt index e01a2e9809..b7fa421544 100644 --- a/akka-sbt-plugin/build.sbt +++ b/akka-sbt-plugin/build.sbt @@ -9,6 +9,3 @@ version := "2.0-SNAPSHOT" publishMavenStyle := true -publishTo := Some("Typesafe Publish Repo" at "http://repo.typesafe.com/typesafe/maven-releases/") - -credentials += Credentials(Path.userHome / ".ivy2" / "typesafe-credentials") diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index 65f24ae9d8..e91816a1bb 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -10,19 +10,29 @@ import java.io.File object AkkaMicrokernelPlugin extends Plugin { + case class DistConfig( + outputDirectory: File, + configSourceDirs: Seq[File], + distJvmOptions: String, + distMainClass: String, + libFilter: File ⇒ Boolean, + additionalLibs: Seq[File]) + val Dist = config("dist") extend (Runtime) val dist = TaskKey[File]("dist", "Builds an Akka microkernel directory") // TODO how to reuse keyword "clean" here instead (dist:clean) val distClean = TaskKey[File]("clean-dist", "Removes Akka microkernel directory") + val outputDirectory = SettingKey[File]("output-directory") val configSourceDirs = TaskKey[Seq[File]]("config-source-directories", "Configuration files are copied from these directories") - val distJvmOptions = SettingKey[String]("jvm-options", "JVM parameters to use in start script") + val distJvmOptions = SettingKey[String]("kernel-jvm-options", "JVM parameters to use in start script") val distMainClass = SettingKey[String]("kernel-main-class", "Kernel main class to use in start script") val libFilter = SettingKey[File ⇒ Boolean]("lib-filter", "Filter of dependency jar files") val additionalLibs = TaskKey[Seq[File]]("additional-libs", "Additional dependency jar files") + val distConfig = TaskKey[DistConfig]("dist-config") override lazy val settings = inConfig(Dist)(Seq( @@ -36,32 +46,29 @@ object AkkaMicrokernelPlugin extends Plugin { distJvmOptions := "-Xms1024M -Xmx1024M -Xss1M -XX:MaxPermSize=256M -XX:+UseParallelGC", distMainClass := "akka.kernel.Main", libFilter := { f ⇒ true }, - additionalLibs <<= defaultAdditionalLibs)) ++ + additionalLibs <<= defaultAdditionalLibs, + distConfig <<= (outputDirectory, configSourceDirs, distJvmOptions, distMainClass, libFilter, additionalLibs) map DistConfig)) ++ Seq( dist <<= (dist in Dist).identity) private def distTask: Initialize[Task[File]] = - (outputDirectory, sourceDirectory, crossTarget, dependencyClasspath, - configSourceDirs, distJvmOptions, distMainClass, libFilter, streams) map { - (outDir, src, tgt, cp, configSrc, jvmOptions, mainClass, libFilt, s) ⇒ - val log = s.log - val distBinPath = outDir / "bin" - val distConfigPath = outDir / "config" - val distDeployPath = outDir / "deploy" - val distLibPath = outDir / "lib" - // TODO how do I grab the additionalLibs setting? Can't add it in input tuple, limitation of number of elements in map of tuple. - val addLibs = Seq.empty[File] + (distConfig, sourceDirectory, crossTarget, dependencyClasspath, streams) map { (conf, src, tgt, cp, s) ⇒ + val log = s.log + val distBinPath = conf.outputDirectory / "bin" + val distConfigPath = conf.outputDirectory / "config" + val distDeployPath = conf.outputDirectory / "deploy" + val distLibPath = conf.outputDirectory / "lib" - log.info("Creating distribution %s ..." format outDir) - IO.createDirectory(outDir) - Scripts(jvmOptions, mainClass).writeScripts(distBinPath) - copyDirectories(configSrc, distConfigPath) - copyJars(tgt, distDeployPath) - copyFiles(libFiles(cp, libFilt), distLibPath) - copyFiles(addLibs, distLibPath) - log.info("Distribution created.") - outDir - } + log.info("Creating distribution %s ..." format conf.outputDirectory) + IO.createDirectory(conf.outputDirectory) + Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath) + copyDirectories(conf.configSourceDirs, distConfigPath) + copyJars(tgt, distDeployPath) + copyFiles(libFiles(cp, conf.libFilter), distLibPath) + copyFiles(conf.additionalLibs, distLibPath) + log.info("Distribution created.") + conf.outputDirectory + } private def distCleanTask: Initialize[Task[File]] = (outputDirectory, streams) map { (outDir, s) ⇒ From e0ae830fbb697c8a4ab6d06b20f6998c58d5672e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Jul 2011 15:45:45 +0200 Subject: [PATCH 12/17] Ticket 1002: Fixed dist:clean --- akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index e91816a1bb..033bdadc79 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -4,6 +4,7 @@ import sbt._ import sbt.Keys._ +import sbt.Keys._ import sbt.classpath.ClasspathUtilities import sbt.Project.Initialize import java.io.File @@ -21,7 +22,7 @@ object AkkaMicrokernelPlugin extends Plugin { val Dist = config("dist") extend (Runtime) val dist = TaskKey[File]("dist", "Builds an Akka microkernel directory") // TODO how to reuse keyword "clean" here instead (dist:clean) - val distClean = TaskKey[File]("clean-dist", "Removes Akka microkernel directory") + val distClean = TaskKey[Unit]("clean", "Removes Akka microkernel directory") val outputDirectory = SettingKey[File]("output-directory") val configSourceDirs = TaskKey[Seq[File]]("config-source-directories", @@ -70,12 +71,11 @@ object AkkaMicrokernelPlugin extends Plugin { conf.outputDirectory } - private def distCleanTask: Initialize[Task[File]] = + private def distCleanTask: Initialize[Task[Unit]] = (outputDirectory, streams) map { (outDir, s) ⇒ val log = s.log log.info("Cleaning " + outDir) IO.delete(outDir) - outDir } def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ From 741b8ccf95bd8e1d35f0e846cf3afdb63f65a794 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Jul 2011 16:06:06 +0200 Subject: [PATCH 13/17] Ticket 1002: Only dist for kernel projects --- .../src/main/scala/AkkaKernelPlugin.scala | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index 033bdadc79..db2ca5bf13 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -53,36 +53,46 @@ object AkkaMicrokernelPlugin extends Plugin { dist <<= (dist in Dist).identity) private def distTask: Initialize[Task[File]] = - (distConfig, sourceDirectory, crossTarget, dependencyClasspath, streams) map { (conf, src, tgt, cp, s) ⇒ - val log = s.log - val distBinPath = conf.outputDirectory / "bin" - val distConfigPath = conf.outputDirectory / "config" - val distDeployPath = conf.outputDirectory / "deploy" - val distLibPath = conf.outputDirectory / "lib" - - log.info("Creating distribution %s ..." format conf.outputDirectory) - IO.createDirectory(conf.outputDirectory) - Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath) - copyDirectories(conf.configSourceDirs, distConfigPath) - copyJars(tgt, distDeployPath) - copyFiles(libFiles(cp, conf.libFilter), distLibPath) - copyFiles(conf.additionalLibs, distLibPath) - log.info("Distribution created.") + (distConfig, sourceDirectory, crossTarget, dependencyClasspath, allDependencies, streams) map { (conf, src, tgt, cp, deps, s) ⇒ + + if (isKernelProject(deps)) { + val log = s.log + val distBinPath = conf.outputDirectory / "bin" + val distConfigPath = conf.outputDirectory / "config" + val distDeployPath = conf.outputDirectory / "deploy" + val distLibPath = conf.outputDirectory / "lib" + + log.info("Creating distribution %s ..." format conf.outputDirectory) + IO.createDirectory(conf.outputDirectory) + Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath) + copyDirectories(conf.configSourceDirs, distConfigPath) + copyJars(tgt, distDeployPath) + copyFiles(libFiles(cp, conf.libFilter), distLibPath) + copyFiles(conf.additionalLibs, distLibPath) + log.info("Distribution created.") + } conf.outputDirectory } private def distCleanTask: Initialize[Task[Unit]] = - (outputDirectory, streams) map { (outDir, s) ⇒ - val log = s.log - log.info("Cleaning " + outDir) - IO.delete(outDir) + (outputDirectory, allDependencies, streams) map { (outDir, deps, s) ⇒ + + if (isKernelProject(deps)) { + val log = s.log + log.info("Cleaning " + outDir) + IO.delete(outDir) + } } - def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ + def isKernelProject(dependencies: Seq[ModuleID]): Boolean = { + dependencies.exists(moduleId => moduleId.organization == "se.scalablesolutions.akka" && moduleId.name == "akka-kernel") + } + + private def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ Seq(src / "main" / "config") ++ resources } - def defaultAdditionalLibs = (libraryDependencies) map { (libs) ⇒ + private def defaultAdditionalLibs = (libraryDependencies) map { (libs) ⇒ Seq.empty[File] } From 48b772c2adf5ec0fd5ac6246b06230d3dac278e0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 19 Jul 2011 20:49:17 +0200 Subject: [PATCH 14/17] Ticket 1002: Include target jars from dependent subprojects --- .../src/main/scala/AkkaKernelPlugin.scala | 81 +++++++++++++++++-- 1 file changed, 75 insertions(+), 6 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala index db2ca5bf13..2f00de19dc 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaKernelPlugin.scala @@ -4,10 +4,12 @@ import sbt._ import sbt.Keys._ -import sbt.Keys._ +import sbt.Load.BuildStructure import sbt.classpath.ClasspathUtilities import sbt.Project.Initialize +import sbt.CommandSupport._ import java.io.File +import scala.collection.mutable.{ Set => MutableSet } object AkkaMicrokernelPlugin extends Plugin { @@ -21,7 +23,6 @@ object AkkaMicrokernelPlugin extends Plugin { val Dist = config("dist") extend (Runtime) val dist = TaskKey[File]("dist", "Builds an Akka microkernel directory") - // TODO how to reuse keyword "clean" here instead (dist:clean) val distClean = TaskKey[Unit]("clean", "Removes Akka microkernel directory") val outputDirectory = SettingKey[File]("output-directory") @@ -53,22 +54,29 @@ object AkkaMicrokernelPlugin extends Plugin { dist <<= (dist in Dist).identity) private def distTask: Initialize[Task[File]] = - (distConfig, sourceDirectory, crossTarget, dependencyClasspath, allDependencies, streams) map { (conf, src, tgt, cp, deps, s) ⇒ + (distConfig, sourceDirectory, crossTarget, dependencyClasspath, projectDependencies, allDependencies, buildStructure, state, streams) map { + (conf, src, tgt, cp, projDeps, allDeps, buildStruct, st, s) ⇒ - if (isKernelProject(deps)) { + if (isKernelProject(allDeps)) { val log = s.log val distBinPath = conf.outputDirectory / "bin" val distConfigPath = conf.outputDirectory / "config" val distDeployPath = conf.outputDirectory / "deploy" val distLibPath = conf.outputDirectory / "lib" - + + val subProjectDependencies: Set[SubProjectInfo] = allSubProjectDependencies(projDeps, buildStruct, st) + log.info("Creating distribution %s ..." format conf.outputDirectory) IO.createDirectory(conf.outputDirectory) Scripts(conf.distJvmOptions, conf.distMainClass).writeScripts(distBinPath) copyDirectories(conf.configSourceDirs, distConfigPath) copyJars(tgt, distDeployPath) + copyFiles(libFiles(cp, conf.libFilter), distLibPath) copyFiles(conf.additionalLibs, distLibPath) + for (subTarget <- subProjectDependencies.map(_.target)) { + copyJars(subTarget, distLibPath) + } log.info("Distribution created.") } conf.outputDirectory @@ -89,7 +97,7 @@ object AkkaMicrokernelPlugin extends Plugin { } private def defaultConfigSourceDirs = (sourceDirectory, unmanagedResourceDirectories) map { (src, resources) ⇒ - Seq(src / "main" / "config") ++ resources + Seq(src / "config", src / "main" / "config") ++ resources } private def defaultAdditionalLibs = (libraryDependencies) map { (libs) ⇒ @@ -163,6 +171,67 @@ object AkkaMicrokernelPlugin extends Plugin { val (libs, directories) = classpath.map(_.data).partition(ClasspathUtilities.isArchive) libs.map(_.asFile).filter(libFilter) } + + private def allSubProjectDependencies(projDeps: Seq[ModuleID], buildStruct: BuildStructure, state: State): Set[SubProjectInfo] = { + val buildUnit = buildStruct.units(buildStruct.root) + val uri = buildStruct.root + val allProjects = buildUnit.defined.map { + case (id, proj) => (ProjectRef(uri, id) -> proj) + } + + val projDepsNames = projDeps.map(_.name) + def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id) + val subProjects: Seq[SubProjectInfo] = allProjects.collect { + case (projRef, project) if include(project) => projectInfo(projRef, project, buildStruct, state, allProjects) + }.toList + + val allSubProjects = subProjects.map(_.recursiveSubProjects).flatten.toSet + allSubProjects +} + + private def projectInfo(projectRef: ProjectRef, project: ResolvedProject, buildStruct: BuildStructure, state: State, + allProjects: Map[ProjectRef, ResolvedProject]): SubProjectInfo = { + + def optionalSetting[A](key: ScopedSetting[A]) = key in projectRef get buildStruct.data + + def setting[A](key: ScopedSetting[A], errorMessage: => String) = { + optionalSetting(key) getOrElse { + logger(state).error(errorMessage); + throw new IllegalArgumentException() + } + } + + def evaluateTask[T](taskKey: sbt.Project.ScopedKey[sbt.Task[T]]) = { + EvaluateTask.evaluateTask(buildStruct, taskKey, state, projectRef, false, EvaluateTask.SystemProcessors) + } + + val projDeps: Seq[ModuleID] = evaluateTask(Keys.projectDependencies) match { + case Some(Value(moduleIds)) => moduleIds + case _ => Seq.empty + } + + val projDepsNames = projDeps.map(_.name) + def include(project: ResolvedProject): Boolean = projDepsNames.exists(_ == project.id) + val subProjects = allProjects.collect { + case (projRef, proj) if include(proj) => projectInfo(projRef, proj, buildStruct, state, allProjects) + }.toList + + val target = setting(Keys.crossTarget, "Missing crossTarget directory") + SubProjectInfo(project.id, target, subProjects) + } + + private case class SubProjectInfo(id: String, target: File, subProjects: Seq[SubProjectInfo]) { + + def recursiveSubProjects: Set[SubProjectInfo] = { + val flatSubProjects = for { + x <- subProjects + y <- x.recursiveSubProjects + } yield y + + flatSubProjects.toSet + this + } + + } } From df3d536b9cff7a1399ff1957276efdf5b52de5c2 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 19 Jul 2011 16:20:43 -0400 Subject: [PATCH 15/17] improve docs for dispatcher throughput --- akka-docs/scala/dispatchers.rst | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 4f385364a2..abcfd6d9bf 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -93,7 +93,7 @@ You can also set the rejection policy that should be used, e.g. what should be d * java.util.concurrent.ThreadPoolExecutor.DiscardPolicy - discards the message (throws it away) * java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy - discards the oldest message in the mailbox (throws it away) -You cane read more about these policies `here `_. +You can read more about these policies `here `_. Here is an example: @@ -104,7 +104,7 @@ Here is an example: import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy class MyActor extends Actor { - self.dispatcher = Dispatchers.newDispatcher(name) + self.dispatcher = Dispatchers.newDispatcher(name, throughput = 15) .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) .setCorePoolSize(16) .setMaxPoolSize(128) @@ -114,8 +114,14 @@ Here is an example: ... } -This 'Dispatcher' allows you to define the 'throughput' it should have. This defines the number of messages for a specific Actor the dispatcher should process in one single sweep. -Setting this to a higher number will increase throughput but lower fairness, and vice versa. If you don't specify it explicitly then it uses the default value defined in the 'akka.conf' configuration file: +The standard :class:`Dispatcher` allows you to define the ``throughput`` it +should have, as shown above. This defines the number of messages for a specific +Actor the dispatcher should process in one single sweep; in other words, the +dispatcher will bunch up to ``throughput`` message invocations together when +having elected an actor to run. Setting this to a higher number will increase +throughput but lower fairness, and vice versa. If you don't specify it +explicitly then it uses the default value defined in the 'akka.conf' +configuration file: .. code-block:: ruby From 0fe749f1af9547852cd8fee3e3c243cf931cb233 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 11 Jul 2011 22:00:17 +0200 Subject: [PATCH 16/17] add @experimental annotation in akka package --- .../src/main/scala/akka/experimental.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 akka-actor/src/main/scala/akka/experimental.scala diff --git a/akka-actor/src/main/scala/akka/experimental.scala b/akka-actor/src/main/scala/akka/experimental.scala new file mode 100644 index 0000000000..cfc976551a --- /dev/null +++ b/akka-actor/src/main/scala/akka/experimental.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka + +import annotation.target._ + +/** + * This annotation marks a feature which is not yet considered stable and may + * change or be removed in a future release. + * + * @author Roland Kuhn + * @since 1.2 + */ +@getter +@setter +@beanGetter +@beanSetter +final class experimental(since: String) extends annotation.StaticAnnotation From 6222e5db18e93518a5eaae7320850299a7b83086 Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 19 Jul 2011 17:09:45 -0400 Subject: [PATCH 17/17] mark Actor.freshInstance as @experimental --- akka-actor/src/main/scala/akka/actor/Actor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 3d2f49b9d1..b981c954ce 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -5,6 +5,7 @@ package akka.actor import DeploymentConfig._ +import akka.experimental import akka.dispatch._ import akka.config._ import Config._ @@ -677,6 +678,7 @@ trait Actor { * Warning: Propagating state from a crashed actor carries the risk * of proliferating the cause of the error. Consider let-it-crash first. */ + @experimental("1.2") def freshInstance(): Option[Actor] = None /**