Merge pull request #527 from akka/wip-2203-always-deadletters-√

#2203 - publish failed message deliveries to DeadLetters when bounded or...
This commit is contained in:
viktorklang 2012-06-11 05:08:48 -07:00
commit f4eaeab43e
3 changed files with 32 additions and 64 deletions

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.{ Await, MessageQueueAppendFailedException, BoundedDequeBasedMailbox }
import akka.dispatch.{ Await, BoundedDequeBasedMailbox }
import akka.pattern.ask
import akka.util.duration._
import akka.actor.ActorSystem.Settings
@ -17,16 +17,8 @@ object ActorWithBoundedStashSpec {
class StashingActor(implicit sys: ActorSystem) extends Actor with Stash {
def receive = {
case "hello"
stash()
sender ! "OK"
case "world"
try {
unstashAll()
} catch {
case e: MessageQueueAppendFailedException
expectedException.open()
}
case "hello" stash()
case "world" unstashAll()
}
}
@ -36,18 +28,10 @@ object ActorWithBoundedStashSpec {
def receive = {
case "hello"
numStashed += 1
try {
stash()
} catch {
case e: StashOverflowException
if (numStashed == 21) stashOverflow.open()
}
try stash() catch { case e: StashOverflowException if (numStashed == 21) sender ! "STASHOVERFLOW" }
}
}
@volatile var expectedException: TestLatch = null
@volatile var stashOverflow: TestLatch = null
val testConf: Config = ConfigFactory.parseString("""
my-dispatcher {
mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded"
@ -56,47 +40,42 @@ object ActorWithBoundedStashSpec {
""")
// bounded deque-based mailbox with capacity 10
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 5 seconds)
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 1 seconds)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach {
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach with ImplicitSender {
import ActorWithBoundedStashSpec._
implicit val sys = system
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) }
def myProps(creator: Actor): Props = Props(creator).withDispatcher("my-dispatcher")
"An Actor with Stash and BoundedDequeBasedMailbox" must {
"throw a MessageQueueAppendFailedException in case of a capacity violation" in {
ActorWithBoundedStashSpec.expectedException = new TestLatch
"end up in DeadLetters in case of a capacity violation" in {
system.eventStream.subscribe(testActor, classOf[DeadLetter])
val stasher = system.actorOf(myProps(new StashingActor))
// fill up stash
val futures = for (_ 1 to 11) yield { stasher ? "hello" }
futures foreach { Await.ready(_, 10 seconds) }
(1 to 11) foreach { _ stasher ! "hello" }
// cause unstashAll with capacity violation
stasher ! "world"
Await.ready(ActorWithBoundedStashSpec.expectedException, 10 seconds)
expectMsg(DeadLetter("hello", testActor, stasher))
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
}
}
"An Actor with bounded Stash" must {
"throw a StashOverflowException in case of a stash capacity violation" in {
ActorWithBoundedStashSpec.stashOverflow = new TestLatch
val stasher = system.actorOf(myProps(new StashingActorWithOverflow))
// fill up stash
for (_ 1 to 21) { stasher ! "hello" }
Await.ready(ActorWithBoundedStashSpec.stashOverflow, 10 seconds)
(1 to 21) foreach { _ stasher ! "hello" }
expectMsg("STASHOVERFLOW")
}
}
}

View file

@ -6,9 +6,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.actor._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -39,9 +38,10 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
q.numberOfMessages must be === config.capacity
q.hasMessages must be === true
intercept[MessageQueueAppendFailedException] {
q.enqueue(null, exampleMessage)
}
system.eventStream.subscribe(testActor, classOf[DeadLetter])
q.enqueue(testActor, exampleMessage)
expectMsg(DeadLetter(exampleMessage.message, system.deadLetters, testActor))
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
q.dequeue must be === exampleMessage
q.numberOfMessages must be(config.capacity - 1)

View file

@ -6,18 +6,11 @@ package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
import akka.actor.ActorContext
import com.typesafe.config.Config
import akka.actor.ActorSystem
/**
* This exception normally is thrown when a bounded mailbox is over capacity
*/
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
import akka.actor._
/**
* INTERNAL API
@ -401,13 +394,11 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope) {
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue put handle
}
def dequeue(): Envelope = queue.poll()
}
@ -439,18 +430,16 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
override def queue: BlockingDeque[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue put handle
if (pushTimeOut.length > 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue putFirst handle
if (pushTimeOut.length > 0) {
if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue putFirst handle
def dequeue(): Envelope = queue.poll()
}