Merge pull request #455 from akka/wip-2061-dmb-tests-patriknw

Create test-fixture for durable mailboxes. See #2061
This commit is contained in:
patriknw 2012-05-20 22:48:54 -07:00
commit d64020a2cc
11 changed files with 241 additions and 79 deletions

View file

@ -13,7 +13,7 @@ Akka can be used in different ways:
be put into ``WEB-INF/lib``
- As a stand alone application by instantiating ActorSystem in a main class or
using the :ref:`microkernel`
using the :ref:`microkernel-scala` / :ref:`microkernel-java`
Using Akka as library
@ -27,5 +27,6 @@ modules to the stack.
Using Akka as a stand alone microkernel
----------------------------------------
Akka can also be run as a stand-alone microkernel. See :ref:`microkernel` for
Akka can also be run as a stand-alone microkernel. See
:ref:`microkernel-scala` / :ref:`microkernel-java` for
more information.

View file

@ -67,7 +67,8 @@ The Akka distribution includes the microkernel. To run the microkernel put your
application jar in the ``deploy`` directory and use the scripts in the ``bin``
directory.
More information is available in the documentation of the :ref:`microkernel`.
More information is available in the documentation of the
:ref:`microkernel-scala` / :ref:`microkernel-java`.
Using a build tool
------------------

View file

@ -1,5 +1,5 @@
.. _microkernel:
.. _microkernel-java:
Microkernel (Java)
==================

View file

@ -33,8 +33,100 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor")
val myActor = system.actorOf(Props[MyActor].
withDispatcher("my-dispatcher"), name = "myactor")
//#dispatcher-config-use
}
}
//#custom-mailbox
import com.typesafe.config.Config
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.actor.mailbox.DurableMessageQueue
import akka.actor.mailbox.DurableMessageSerialization
class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config)
extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MyMessageQueue(o)
case None throw new IllegalArgumentException(
"requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class MyMessageQueue(_owner: ActorContext)
extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val storage = new QueueStorage
def enqueue(receiver: ActorRef, envelope: Envelope) {
val data: Array[Byte] = serialize(envelope)
storage.push(data)
}
def dequeue(): Envelope = {
val data: Option[Array[Byte]] = storage.pull()
data.map(deserialize).orNull
}
def hasMessages: Boolean = !storage.isEmpty
def numberOfMessages: Int = storage.size
/**
* Called when the mailbox is disposed.
* An ordinary mailbox would send remaining messages to deadLetters,
* but the purpose of a durable mailbox is to continue
* with the same message queue when the actor is started again.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
//#custom-mailbox
// dummy
class QueueStorage {
import java.util.concurrent.ConcurrentLinkedQueue
val queue = new ConcurrentLinkedQueue[Array[Byte]]
def push(data: Array[Byte]): Unit = queue.offer(data)
def pull(): Option[Array[Byte]] = Option(queue.poll())
def isEmpty: Boolean = queue.isEmpty
def size: Int = queue.size
}
//#custom-mailbox-test
import akka.actor.mailbox.DurableMailboxSpec
object MyMailboxSpec {
val config = """
MyStorage-dispatcher {
mailbox-type = akka.docs.actor.mailbox.MyMailboxType
}
"""
}
class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) {
override def atStartup() {
}
override def atTermination() {
}
"MyMailbox" must {
"deliver a message" in {
val actor = createMailboxTestActor()
implicit val sender = testActor
actor ! "hello"
expectMsg("hello")
}
// add more tests
}
}

View file

@ -4,9 +4,8 @@
package akka.docs.actor.mailbox;
//#imports
import akka.actor.UntypedActorFactory;
import akka.actor.UntypedActor;
import akka.actor.Props;
import akka.actor.ActorRef;
//#imports
@ -16,8 +15,8 @@ import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import static org.junit.Assert.*;
@ -39,12 +38,8 @@ public class DurableMailboxDocTestBase {
@Test
public void configDefinedDispatcher() {
//#dispatcher-config-use
ActorRef myActor = system.actorOf(
new Props().withDispatcher("my-dispatcher").withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
}), "myactor");
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).
withDispatcher("my-dispatcher"), "myactor");
//#dispatcher-config-use
myActor.tell("test");
}

View file

@ -9,40 +9,45 @@
Overview
========
Akka supports a set of durable mailboxes. A durable mailbox is a replacement for
the standard actor mailbox that is durable. What this means in practice is that
if there are pending messages in the actor's mailbox when the node of the actor
resides on crashes, then when you restart the node, the actor will be able to
continue processing as if nothing had happened; with all pending messages still
in its mailbox.
A durable mailbox is a mailbox which stores the messages on durable storage.
What this means in practice is that if there are pending messages in the actor's
mailbox when the node of the actor resides on crashes, then when you restart the
node, the actor will be able to continue processing as if nothing had happened;
with all pending messages still in its mailbox.
None of these mailboxes implements transactions for current message. It's possible
You configure durable mailboxes through the dispatcher. The actor is oblivious
to which type of mailbox it is using.
This gives you an excellent way of creating bulkheads in your application, where
groups of actors sharing the same dispatcher also share the same backing
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
One basic file based durable mailbox is provided by Akka out-of-the-box.
Other implementations can easily be added. Some are available as separate community
Open Source projects, such as:
* `AMQP Durable Mailbox <https://github.com/drexin/akka-amqp-mailbox>`_
A durable mailbox is like any other mailbox not likely to be transactional. It's possible
if the actor crashes after receiving a message, but before completing processing of
it, that the message could be lost.
.. warning:: **IMPORTANT**
.. warning::
None of these mailboxes work with blocking message send, i.e. the message
A durable mailbox typically doesn't work with blocking message send, i.e. the message
send operations that are relying on futures; ``?`` or ``ask``. If the node
has crashed and then restarted, the thread that was blocked waiting for the
reply is gone and there is no way we can deliver the message.
The durable mailboxes supported out-of-the-box are:
- ``FileBasedMailbox`` -- backed by a journaling transaction log on the local file system
File-based durable mailbox
==========================
You can easily implement your own mailbox. Look at the existing implementation for inspiration.
.. _DurableMailbox.General:
General Usage
-------------
The durable mailboxes and their configuration options reside in the
``akka.actor.mailbox`` package.
You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using.
This mailbox is backed by a journaling transaction log on the local file
system. It is the simplest to use since it does not require an extra
infrastructure piece to administer, but it is usually sufficient and just what
you need.
In the configuration of the dispatcher you specify the fully qualified class name
of the mailbox:
@ -60,32 +65,38 @@ Corresponding example in Java:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java
:include: imports,dispatcher-config-use
The actor is oblivious to which type of mailbox it is using.
This gives you an excellent way of creating bulkheads in your application, where
groups of actors sharing the same dispatcher also share the same backing
storage. Read more about that in the :ref:`dispatchers-scala` documentation.
File-based durable mailbox
==========================
This mailbox is backed by a journaling transaction log on the local file
system. It is the simplest to use since it does not require an extra
infrastructure piece to administer, but it is usually sufficient and just what
you need.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Config::
my-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
}
You can also configure and tune the file-based durable mailbox. This is done in
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
.. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf
:language: none
How to implement a durable mailbox
==================================
Here is an example of how to implement a custom durable mailbox. Essentially it consists of
a configurator (MailboxType) and a queue implementation (DurableMessageQueue).
The envelope contains the message sent to the actor, and information about sender. It is the
envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization
to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala`
mechanism. This optional and you may store the envelope data in any way you like.
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: custom-mailbox
To facilitate testing of a durable mailbox you may use ``DurableMailboxSpec`` as base class.
It implements a few basic tests and helps you setup the a fixture. More tests can be
added in concrete subclass like this:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: custom-mailbox-test
You find DurableMailboxDocSpec in ``akka-mailboxes-common-test-2.1-SNAPSHOT.jar``.
Add this dependency::
"com.typesafe.akka" % "akka-mailboxes-common-test" % "2.1-SNAPSHOT"
For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk,
and ZooKeeper, which can be found in Akka git repository tag
`v2.0.1 <https://github.com/akka/akka/tree/v2.0.1/akka-durable-mailboxes>`_.

View file

@ -1,5 +1,5 @@
.. _microkernel:
.. _microkernel-scala:
Microkernel (Scala)
===================

View file

@ -25,19 +25,17 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp
}
}
def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue]
def clean {
def clean() {
FileUtils.deleteDirectory(new java.io.File(queuePath))
}
override def atStartup() {
clean
clean()
super.atStartup()
}
override def atTermination() {
clean
clean()
super.atTermination()
}
}

View file

@ -3,14 +3,25 @@
*/
package akka.actor.mailbox
import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import akka.util.duration._
import java.io.InputStream
import scala.annotation.tailrec
import DurableMailboxSpecActorFactory.AccumulatorActor
import DurableMailboxSpecActorFactory.MailboxTestActor
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.LocalActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.dispatch.Mailbox
import akka.testkit.TestKit
import akka.util.duration.intToDurationInt
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.{ Mailbox, Await }
import com.typesafe.config.ConfigFactory
import java.io.InputStream
import java.util.concurrent.TimeoutException
import org.scalatest.BeforeAndAfterAll
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import scala.annotation.tailrec
object DurableMailboxSpecActorFactory {
@ -28,13 +39,62 @@ object DurableMailboxSpecActorFactory {
}
object DurableMailboxSpec {
def fallbackConfig: Config = ConfigFactory.parseString("""
akka {
event-handlers = ["akka.testkit.TestEventListener"]
loglevel = "WARNING"
stdout-loglevel = "WARNING"
}
""")
}
/**
* Reusable test fixture for durable mailboxes. Implements a few basic tests. More
* tests can be added in concrete subclass.
*
* Subclass must define dispatcher in the supplied config for the specific backend.
* The id of the dispatcher must be the same as the `<backendName>-dispatcher`.
*/
abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) {
abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String)
extends TestKit(system) with WordSpec with MustMatchers with BeforeAndAfterAll {
import DurableMailboxSpecActorFactory._
/**
* Subclass must define dispatcher in the supplied config for the specific backend.
* The id of the dispatcher must be the same as the `<backendName>-dispatcher`.
*/
def this(backendName: String, config: String) = {
this(ActorSystem(backendName + "BasedDurableMailboxSpec",
ConfigFactory.parseString(config).withFallback(DurableMailboxSpec.fallbackConfig)),
backendName)
}
final override def beforeAll {
atStartup()
}
/**
* May be implemented in concrete subclass to do additional things once before test
* cases are run.
*/
protected def atStartup() {}
final override def afterAll {
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()
}
/**
* May be implemented in concrete subclass to do additional things once after all
* test cases have been run.
*/
def atTermination() {}
protected def streamMustContain(in: InputStream, words: String): Unit = {
val output = new Array[Byte](8192)
@ -60,7 +120,8 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten
case some system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some)
}
def isDurableMailbox(m: Mailbox): Boolean
private def isDurableMailbox(m: Mailbox): Boolean =
m.messageQueue.isInstanceOf[DurableMessageQueue]
"A " + backendName + " based mailbox backed actor" must {

View file

@ -5,7 +5,7 @@ package akka.testkit
import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag }
import org.scalatest.matchers.MustMatchers
import akka.actor.{ ActorSystem, ActorSystemImpl }
import akka.actor.ActorSystem
import akka.actor.{ Actor, ActorRef, Props }
import akka.event.{ Logging, LoggingAdapter }
import akka.util.duration._
@ -72,7 +72,7 @@ abstract class AkkaSpec(_system: ActorSystem)
final override def afterAll {
system.shutdown()
try Await.ready(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch {
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
}
atTermination()

View file

@ -154,7 +154,9 @@ object AkkaBuild extends Build {
base = file("akka-durable-mailboxes/akka-mailboxes-common"),
dependencies = Seq(remote, testkit % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.mailboxes
libraryDependencies ++= Dependencies.mailboxes,
// DurableMailboxSpec published in akka-mailboxes-common-test
publishArtifact in Test := true
)
)
@ -257,7 +259,8 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel),
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test",
remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel),
settings = defaultSettings ++ Sphinx.settings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs,