From c0adc6fc22335d8e6a7bdc494dc211dff877670e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 29 Aug 2012 17:58:59 +0200 Subject: [PATCH 1/2] #2451 - Changing so that Stash overrides postStop instead so that stashed messages are reenqueued to mailbox on preRestart calling postStop and on stop, ending up in dead letters as they should. --- .../actor/ActorWithBoundedStashSpec.scala | 67 +++++++++++-------- .../src/main/scala/akka/actor/Stash.scala | 10 +-- .../scala/akka/actor/cell/FaultHandling.scala | 1 + 3 files changed, 42 insertions(+), 36 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala index a836572ad2..b3b9a57839 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorWithBoundedStashSpec.scala @@ -14,71 +14,82 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import akka.actor.ActorSystem.Settings import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.Assertions.intercept import org.scalatest.BeforeAndAfterEach object ActorWithBoundedStashSpec { - class StashingActor(implicit sys: ActorSystem) extends Actor with Stash { + class StashingActor extends Actor with Stash { def receive = { - case "hello" ⇒ stash() - case "world" ⇒ unstashAll() + case "hello1" ⇒ stash() + case "world" ⇒ unstashAll() } } - class StashingActorWithOverflow(implicit sys: ActorSystem) extends Actor with Stash { + class StashingActorWithOverflow extends Actor with Stash { var numStashed = 0 def receive = { - case "hello" ⇒ + case "hello2" ⇒ numStashed += 1 - try stash() catch { case e: StashOverflowException ⇒ if (numStashed == 21) sender ! "STASHOVERFLOW" } + try stash() catch { + case _: StashOverflowException ⇒ + if (numStashed == 21) { + sender ! "STASHOVERFLOW" + context stop self + } + } } } + // bounded deque-based mailbox with capacity 10 + class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 10 millis) + + val dispatcherId = "my-dispatcher" + val testConf: Config = ConfigFactory.parseString(""" - my-dispatcher { - mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded" + %s { + mailbox-type = "%s" stash-capacity = 20 } - """) - - // bounded deque-based mailbox with capacity 10 - class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 1 seconds) + """.format(dispatcherId, classOf[Bounded].getName)) } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach with ImplicitSender { +class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender { import ActorWithBoundedStashSpec._ - implicit val sys = system + override def atStartup: Unit = { + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello1"))) + system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello2"))) + } - override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) } + override def beforeEach(): Unit = + system.eventStream.subscribe(testActor, classOf[DeadLetter]) - def myProps(creator: ⇒ Actor): Props = Props(creator).withDispatcher("my-dispatcher") + override def afterEach(): Unit = + system.eventStream.unsubscribe(testActor, classOf[DeadLetter]) - "An Actor with Stash and BoundedDequeBasedMailbox" must { + "An Actor with Stash" must { "end up in DeadLetters in case of a capacity violation" in { - system.eventStream.subscribe(testActor, classOf[DeadLetter]) - - val stasher = system.actorOf(myProps(new StashingActor)) + val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId)) // fill up stash - (1 to 11) foreach { _ ⇒ stasher ! "hello" } + (1 to 11) foreach { _ ⇒ stasher ! "hello1" } // cause unstashAll with capacity violation stasher ! "world" - expectMsg(DeadLetter("hello", testActor, stasher)) - system.eventStream.unsubscribe(testActor, classOf[DeadLetter]) + expectMsg(DeadLetter("hello1", testActor, stasher)) + system stop stasher + (1 to 10) foreach { _ ⇒ expectMsg(DeadLetter("hello1", testActor, stasher)) } } - } - - "An Actor with bounded Stash" must { "throw a StashOverflowException in case of a stash capacity violation" in { - val stasher = system.actorOf(myProps(new StashingActorWithOverflow)) + val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId)) // fill up stash - (1 to 21) foreach { _ ⇒ stasher ! "hello" } + (1 to 21) foreach { _ ⇒ stasher ! "hello2" } expectMsg("STASHOVERFLOW") + (1 to 20) foreach { _ ⇒ expectMsg(DeadLetter("hello2", testActor, stasher)) } } } } diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 7076a996df..0e64c5acb5 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -114,15 +114,9 @@ An (unbounded) deque-based mailbox can be configured as follows: } /** - * Overridden callback. Prepends all messages in the stash to the mailbox, - * clears the stash, stops all children and invokes the postStop() callback of the superclass. + * Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash. */ - override def preRestart(reason: Throwable, message: Option[Any]) { - try unstashAll() finally { - context.children foreach context.stop - postStop() - } - } + override def postStop(): Unit = unstashAll() } diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 0c0f9bb9c0..749cf62bb8 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -155,6 +155,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ private def finishTerminate() { val a = actor + // The following order is crucial for things to work properly. Only cahnge this if you're very confident and lucky. try if (a ne null) a.postStop() finally try dispatcher.detach(this) finally try parent.sendSystemMessage(ChildTerminated(self)) From f9a58ea43034cf739b493cfeb0ec929e406a8e79 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 30 Aug 2012 12:18:51 +0200 Subject: [PATCH 2/2] Reintroduced preRestart override for Stash and added Stash-changes to the migration docs --- .../src/main/scala/akka/actor/Stash.scala | 19 +++++++++++++++---- .../scala/akka/actor/cell/FaultHandling.scala | 2 +- .../project/migration-guide-2.0.x-2.1.x.rst | 7 +++++++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 0e64c5acb5..05b618d03a 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -101,20 +101,31 @@ An (unbounded) deque-based mailbox can be configured as follows: * `MessageQueueAppendFailedException` is thrown. * * The stash is guaranteed to be empty after calling `unstashAll()`. - * - * @throws MessageQueueAppendFailedException in case of a capacity violation when - * prepending the stash to a bounded mailbox */ def unstashAll(): Unit = { try { - for (msg ← theStash.reverseIterator) mailbox.enqueueFirst(self, msg) + val i = theStash.reverseIterator + while (i.hasNext) mailbox.enqueueFirst(self, i.next()) } finally { theStash = Vector.empty[Envelope] } } + /** + * Overridden callback. Prepends all messages in the stash to the mailbox, + * clears the stash, stops all children and invokes the postStop() callback. + */ + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + try unstashAll() finally { + context.children foreach context.stop + postStop() + } + } + /** * Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash. + * Must be called when overriding this method, otherwise stashed messages won't be propagated to DeadLetters + * when actor stops. */ override def postStop(): Unit = unstashAll() diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 749cf62bb8..97e85fe049 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -155,7 +155,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ private def finishTerminate() { val a = actor - // The following order is crucial for things to work properly. Only cahnge this if you're very confident and lucky. + // The following order is crucial for things to work properly. Only chnage this if you're very confident and lucky. try if (a ne null) a.postStop() finally try dispatcher.detach(this) finally try parent.sendSystemMessage(ChildTerminated(self)) diff --git a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst index 772e5ea784..ac4e530591 100644 --- a/akka-docs/project/migration-guide-2.0.x-2.1.x.rst +++ b/akka-docs/project/migration-guide-2.0.x-2.1.x.rst @@ -256,4 +256,11 @@ If you don't want these in the log you need to add this to your configuration:: akka.remote.log-remote-lifecycle-events = off +Stash postStop +============== + +Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sure that +stashed messages are put into the dead letters when the actor stops, make sure you call +super.postStop if you override it. +