Create test-fixture for durable mailboxes. See #2061

* Improved DurableMailboxSpec for stand alone usage
* Changed build to publish DurableMailboxSpec in akka-mailboxes-common-test
* Changed documentation of durable mailboxes and added full example of
  how to implement a durable mailbox, with test
This commit is contained in:
Patrik Nordwall 2012-05-15 16:01:32 +02:00
parent 2e248e4b49
commit 8924080017
11 changed files with 234 additions and 79 deletions

View file

@ -33,8 +33,94 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor")
val myActor = system.actorOf(Props[MyActor].
withDispatcher("my-dispatcher"), name = "myactor")
//#dispatcher-config-use
}
}
//#custom-mailbox
import com.typesafe.config.Config
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.actor.mailbox.DurableMessageQueue
import akka.actor.mailbox.DurableMessageSerialization
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MyMessageQueue(o)
case None throw new IllegalArgumentException(
"requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class MyMessageQueue(_owner: ActorContext)
extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val storage = new QueueStorage
def enqueue(receiver: ActorRef, envelope: Envelope) {
val data: Array[Byte] = serialize(envelope)
storage.push(data)
}
def dequeue(): Envelope = {
val data: Option[Array[Byte]] = storage.pull()
data.map(deserialize(_)).getOrElse(null)
}
def hasMessages: Boolean = !storage.isEmpty
def numberOfMessages: Int = storage.size
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
//#custom-mailbox
// dummy
class QueueStorage {
import java.util.concurrent.ConcurrentLinkedQueue
val queue = new ConcurrentLinkedQueue[Array[Byte]]
def push(data: Array[Byte]): Unit = queue.offer(data)
def pull(): Option[Array[Byte]] = Option(queue.poll())
def isEmpty: Boolean = queue.isEmpty
def size: Int = queue.size
}
//#custom-mailbox-test
import akka.actor.mailbox.DurableMailboxSpec
object MyMailboxSpec {
val config = """
MyStorage-dispatcher {
mailbox-type = akka.docs.actor.mailbox.MyMailboxType
}
"""
}
class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) {
override def atStartup() {
}
override def atTermination() {
}
"MyMailbox" must {
"deliver a message" in {
val actor = createMailboxTestActor()
implicit val sender = testActor
actor ! "hello"
expectMsg("hello")
}
// add more tests
}
}