Merge pull request #657 from akka/wip-2451-√
#2451 - Changing so that Stash overrides postStop instead so that stashe...
This commit is contained in:
commit
da7b6ef3f3
4 changed files with 58 additions and 34 deletions
|
|
@ -14,71 +14,82 @@ import scala.concurrent.Await
|
|||
import scala.concurrent.util.duration._
|
||||
import akka.actor.ActorSystem.Settings
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import org.scalatest.Assertions.intercept
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
object ActorWithBoundedStashSpec {
|
||||
|
||||
class StashingActor(implicit sys: ActorSystem) extends Actor with Stash {
|
||||
class StashingActor extends Actor with Stash {
|
||||
def receive = {
|
||||
case "hello" ⇒ stash()
|
||||
case "world" ⇒ unstashAll()
|
||||
case "hello1" ⇒ stash()
|
||||
case "world" ⇒ unstashAll()
|
||||
}
|
||||
}
|
||||
|
||||
class StashingActorWithOverflow(implicit sys: ActorSystem) extends Actor with Stash {
|
||||
class StashingActorWithOverflow extends Actor with Stash {
|
||||
var numStashed = 0
|
||||
|
||||
def receive = {
|
||||
case "hello" ⇒
|
||||
case "hello2" ⇒
|
||||
numStashed += 1
|
||||
try stash() catch { case e: StashOverflowException ⇒ if (numStashed == 21) sender ! "STASHOVERFLOW" }
|
||||
try stash() catch {
|
||||
case _: StashOverflowException ⇒
|
||||
if (numStashed == 21) {
|
||||
sender ! "STASHOVERFLOW"
|
||||
context stop self
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// bounded deque-based mailbox with capacity 10
|
||||
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 10 millis)
|
||||
|
||||
val dispatcherId = "my-dispatcher"
|
||||
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
my-dispatcher {
|
||||
mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded"
|
||||
%s {
|
||||
mailbox-type = "%s"
|
||||
stash-capacity = 20
|
||||
}
|
||||
""")
|
||||
|
||||
// bounded deque-based mailbox with capacity 10
|
||||
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 1 seconds)
|
||||
""".format(dispatcherId, classOf[Bounded].getName))
|
||||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach with ImplicitSender {
|
||||
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
|
||||
import ActorWithBoundedStashSpec._
|
||||
|
||||
implicit val sys = system
|
||||
override def atStartup: Unit = {
|
||||
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello1")))
|
||||
system.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*hello2")))
|
||||
}
|
||||
|
||||
override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) }
|
||||
override def beforeEach(): Unit =
|
||||
system.eventStream.subscribe(testActor, classOf[DeadLetter])
|
||||
|
||||
def myProps(creator: ⇒ Actor): Props = Props(creator).withDispatcher("my-dispatcher")
|
||||
override def afterEach(): Unit =
|
||||
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
|
||||
|
||||
"An Actor with Stash and BoundedDequeBasedMailbox" must {
|
||||
"An Actor with Stash" must {
|
||||
|
||||
"end up in DeadLetters in case of a capacity violation" in {
|
||||
system.eventStream.subscribe(testActor, classOf[DeadLetter])
|
||||
|
||||
val stasher = system.actorOf(myProps(new StashingActor))
|
||||
val stasher = system.actorOf(Props[StashingActor].withDispatcher(dispatcherId))
|
||||
// fill up stash
|
||||
(1 to 11) foreach { _ ⇒ stasher ! "hello" }
|
||||
(1 to 11) foreach { _ ⇒ stasher ! "hello1" }
|
||||
|
||||
// cause unstashAll with capacity violation
|
||||
stasher ! "world"
|
||||
expectMsg(DeadLetter("hello", testActor, stasher))
|
||||
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
|
||||
expectMsg(DeadLetter("hello1", testActor, stasher))
|
||||
system stop stasher
|
||||
(1 to 10) foreach { _ ⇒ expectMsg(DeadLetter("hello1", testActor, stasher)) }
|
||||
}
|
||||
}
|
||||
|
||||
"An Actor with bounded Stash" must {
|
||||
|
||||
"throw a StashOverflowException in case of a stash capacity violation" in {
|
||||
val stasher = system.actorOf(myProps(new StashingActorWithOverflow))
|
||||
val stasher = system.actorOf(Props[StashingActorWithOverflow].withDispatcher(dispatcherId))
|
||||
// fill up stash
|
||||
(1 to 21) foreach { _ ⇒ stasher ! "hello" }
|
||||
(1 to 21) foreach { _ ⇒ stasher ! "hello2" }
|
||||
expectMsg("STASHOVERFLOW")
|
||||
(1 to 20) foreach { _ ⇒ expectMsg(DeadLetter("hello2", testActor, stasher)) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,13 +101,11 @@ An (unbounded) deque-based mailbox can be configured as follows:
|
|||
* `MessageQueueAppendFailedException` is thrown.
|
||||
*
|
||||
* The stash is guaranteed to be empty after calling `unstashAll()`.
|
||||
*
|
||||
* @throws MessageQueueAppendFailedException in case of a capacity violation when
|
||||
* prepending the stash to a bounded mailbox
|
||||
*/
|
||||
def unstashAll(): Unit = {
|
||||
try {
|
||||
for (msg ← theStash.reverseIterator) mailbox.enqueueFirst(self, msg)
|
||||
val i = theStash.reverseIterator
|
||||
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
|
||||
} finally {
|
||||
theStash = Vector.empty[Envelope]
|
||||
}
|
||||
|
|
@ -115,15 +113,22 @@ An (unbounded) deque-based mailbox can be configured as follows:
|
|||
|
||||
/**
|
||||
* Overridden callback. Prepends all messages in the stash to the mailbox,
|
||||
* clears the stash, stops all children and invokes the postStop() callback of the superclass.
|
||||
* clears the stash, stops all children and invokes the postStop() callback.
|
||||
*/
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
try unstashAll() finally {
|
||||
context.children foreach context.stop
|
||||
postStop()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash.
|
||||
* Must be called when overriding this method, otherwise stashed messages won't be propagated to DeadLetters
|
||||
* when actor stops.
|
||||
*/
|
||||
override def postStop(): Unit = unstashAll()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -155,6 +155,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
|
||||
private def finishTerminate() {
|
||||
val a = actor
|
||||
// The following order is crucial for things to work properly. Only chnage this if you're very confident and lucky.
|
||||
try if (a ne null) a.postStop()
|
||||
finally try dispatcher.detach(this)
|
||||
finally try parent.sendSystemMessage(ChildTerminated(self))
|
||||
|
|
|
|||
|
|
@ -267,4 +267,11 @@ If you don't want these in the log you need to add this to your configuration::
|
|||
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
|
||||
Stash postStop
|
||||
==============
|
||||
|
||||
Both Actors and UntypedActors using ``Stash`` now overrides postStop to make sure that
|
||||
stashed messages are put into the dead letters when the actor stops, make sure you call
|
||||
super.postStop if you override it.
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue