diff --git a/akka-docs/intro/deployment-scenarios.rst b/akka-docs/intro/deployment-scenarios.rst index b2d0334c7d..fc3b38cbd2 100644 --- a/akka-docs/intro/deployment-scenarios.rst +++ b/akka-docs/intro/deployment-scenarios.rst @@ -13,7 +13,7 @@ Akka can be used in different ways: be put into ``WEB-INF/lib`` - As a stand alone application by instantiating ActorSystem in a main class or - using the :ref:`microkernel` + using the :ref:`microkernel-scala` / :ref:`microkernel-java` Using Akka as library @@ -27,5 +27,6 @@ modules to the stack. Using Akka as a stand alone microkernel ---------------------------------------- -Akka can also be run as a stand-alone microkernel. See :ref:`microkernel` for +Akka can also be run as a stand-alone microkernel. See +:ref:`microkernel-scala` / :ref:`microkernel-java` for more information. diff --git a/akka-docs/intro/getting-started.rst b/akka-docs/intro/getting-started.rst index b3bdbf70f3..9c76ee8edf 100644 --- a/akka-docs/intro/getting-started.rst +++ b/akka-docs/intro/getting-started.rst @@ -67,7 +67,8 @@ The Akka distribution includes the microkernel. To run the microkernel put your application jar in the ``deploy`` directory and use the scripts in the ``bin`` directory. -More information is available in the documentation of the :ref:`microkernel`. +More information is available in the documentation of the +:ref:`microkernel-scala` / :ref:`microkernel-java`. Using a build tool ------------------ diff --git a/akka-docs/java/microkernel.rst b/akka-docs/java/microkernel.rst index 551c118e94..d6652fe316 100644 --- a/akka-docs/java/microkernel.rst +++ b/akka-docs/java/microkernel.rst @@ -1,5 +1,5 @@ -.. _microkernel: +.. _microkernel-java: Microkernel (Java) ================== diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index 2f67c607ed..25f312cac3 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -33,8 +33,94 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { "configuration of dispatcher with durable mailbox" in { //#dispatcher-config-use - val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor") + val myActor = system.actorOf(Props[MyActor]. + withDispatcher("my-dispatcher"), name = "myactor") //#dispatcher-config-use } } + +//#custom-mailbox +import com.typesafe.config.Config +import akka.actor.ActorContext +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.dispatch.Envelope +import akka.dispatch.MailboxType +import akka.dispatch.MessageQueue +import akka.actor.mailbox.DurableMessageQueue +import akka.actor.mailbox.DurableMessageSerialization + +class MyMailboxType(systemSettings: ActorSystem.Settings, config: Config) + extends MailboxType { + + override def create(owner: Option[ActorContext]): MessageQueue = owner match { + case Some(o) ⇒ new MyMessageQueue(o) + case None ⇒ throw new IllegalArgumentException( + "requires an owner (i.e. does not work with BalancingDispatcher)") + } +} + +class MyMessageQueue(_owner: ActorContext) + extends DurableMessageQueue(_owner) with DurableMessageSerialization { + + val storage = new QueueStorage + + def enqueue(receiver: ActorRef, envelope: Envelope) { + val data: Array[Byte] = serialize(envelope) + storage.push(data) + } + + def dequeue(): Envelope = { + val data: Option[Array[Byte]] = storage.pull() + data.map(deserialize(_)).getOrElse(null) + } + + def hasMessages: Boolean = !storage.isEmpty + + def numberOfMessages: Int = storage.size + + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + +} +//#custom-mailbox + +// dummy +class QueueStorage { + import java.util.concurrent.ConcurrentLinkedQueue + val queue = new ConcurrentLinkedQueue[Array[Byte]] + def push(data: Array[Byte]): Unit = queue.offer(data) + def pull(): Option[Array[Byte]] = Option(queue.poll()) + def isEmpty: Boolean = queue.isEmpty + def size: Int = queue.size +} + +//#custom-mailbox-test +import akka.actor.mailbox.DurableMailboxSpec + +object MyMailboxSpec { + val config = """ + MyStorage-dispatcher { + mailbox-type = akka.docs.actor.mailbox.MyMailboxType + } + """ +} + +class MyMailboxSpec extends DurableMailboxSpec("MyStorage", MyMailboxSpec.config) { + override def atStartup() { + } + + override def atTermination() { + } + + "MyMailbox" must { + "deliver a message" in { + val actor = createMailboxTestActor() + implicit val sender = testActor + actor ! "hello" + expectMsg("hello") + } + + // add more tests + } +} \ No newline at end of file diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index 834dc6f0fb..25158446b6 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -4,9 +4,8 @@ package akka.docs.actor.mailbox; //#imports -import akka.actor.UntypedActorFactory; -import akka.actor.UntypedActor; import akka.actor.Props; +import akka.actor.ActorRef; //#imports @@ -16,8 +15,8 @@ import org.junit.Test; import akka.testkit.AkkaSpec; import com.typesafe.config.ConfigFactory; -import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.UntypedActor; import static org.junit.Assert.*; @@ -39,12 +38,8 @@ public class DurableMailboxDocTestBase { @Test public void configDefinedDispatcher() { //#dispatcher-config-use - ActorRef myActor = system.actorOf( - new Props().withDispatcher("my-dispatcher").withCreator(new UntypedActorFactory() { - public UntypedActor create() { - return new MyUntypedActor(); - } - }), "myactor"); + ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class). + withDispatcher("my-dispatcher"), "myactor"); //#dispatcher-config-use myActor.tell("test"); } diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 74618d978e..4de60ea12b 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -9,40 +9,45 @@ Overview ======== -Akka supports a set of durable mailboxes. A durable mailbox is a replacement for -the standard actor mailbox that is durable. What this means in practice is that -if there are pending messages in the actor's mailbox when the node of the actor -resides on crashes, then when you restart the node, the actor will be able to -continue processing as if nothing had happened; with all pending messages still -in its mailbox. +A durable mailbox is a replacement for the standard actor mailbox that is durable. +What this means in practice is that if there are pending messages in the actor's +mailbox when the node of the actor resides on crashes, then when you restart the +node, the actor will be able to continue processing as if nothing had happened; +with all pending messages still in its mailbox. -None of these mailboxes implements transactions for current message. It's possible +You configure durable mailboxes through the dispatcher. The actor is oblivious +to which type of mailbox it is using. + +This gives you an excellent way of creating bulkheads in your application, where +groups of actors sharing the same dispatcher also share the same backing +storage. Read more about that in the :ref:`dispatchers-scala` documentation. + +One basic file based durable mailbox is provided by Akka out-of-the-box. +Other implementations can easily be added. Some are available as separate community +Open Source projects, such as: + +* `AMQP Durable Mailbox `_ + + +A durable mailbox typically doesn't implements transactions for current message. It's possible if the actor crashes after receiving a message, but before completing processing of it, that the message could be lost. -.. warning:: **IMPORTANT** +.. warning:: - None of these mailboxes work with blocking message send, i.e. the message + A durable mailbox typically doesn't work with blocking message send, i.e. the message send operations that are relying on futures; ``?`` or ``ask``. If the node has crashed and then restarted, the thread that was blocked waiting for the reply is gone and there is no way we can deliver the message. -The durable mailboxes supported out-of-the-box are: - - ``FileBasedMailbox`` -- backed by a journaling transaction log on the local file system +File-based durable mailbox +========================== -You can easily implement your own mailbox. Look at the existing implementation for inspiration. - -.. _DurableMailbox.General: - -General Usage -------------- - -The durable mailboxes and their configuration options reside in the -``akka.actor.mailbox`` package. - -You configure durable mailboxes through the dispatcher. The -actor is oblivious to which type of mailbox it is using. +This mailbox is backed by a journaling transaction log on the local file +system. It is the simplest to use since it does not require an extra +infrastructure piece to administer, but it is usually sufficient and just what +you need. In the configuration of the dispatcher you specify the fully qualified class name of the mailbox: @@ -60,32 +65,37 @@ Corresponding example in Java: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java :include: imports,dispatcher-config-use -The actor is oblivious to which type of mailbox it is using. - -This gives you an excellent way of creating bulkheads in your application, where -groups of actors sharing the same dispatcher also share the same backing -storage. Read more about that in the :ref:`dispatchers-scala` documentation. - -File-based durable mailbox -========================== - -This mailbox is backed by a journaling transaction log on the local file -system. It is the simplest to use since it does not require an extra -infrastructure piece to administer, but it is usually sufficient and just what -you need. - -You configure durable mailboxes through the dispatcher, as described in -:ref:`DurableMailbox.General` with the following mailbox type. - -Config:: - - my-dispatcher { - mailbox-type = akka.actor.mailbox.FileBasedMailboxType - } - You can also configure and tune the file-based durable mailbox. This is done in the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. .. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/reference.conf :language: none +How to implement a durable mailbox +================================== + +Here is an example of how to implement a custom durable mailbox. Essentially it consists of +a configurator (MailboxType) and a queue implementation (DurableMessageQueue). + +The envelope contains the message sent to the actor, and information about sender. It is the +envelope that needs to be stored. As a help utility you can mixin DurableMessageSerialization +to serialize and deserialize the envelope using the ordinary :ref:`serialization-scala` +mechanism. This optional and you may store the envelope data in any way you like. + +.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: custom-mailbox + +To facilitate testing of a durable mailbox you may use ``DurableMailboxSpec`` as base class. +It implements a few basic tests and helps you setup the a fixture. More tests can be +added in concrete subclass like this: + +.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: custom-mailbox-test + +You find DurableMailboxDocSpec in ``akka-mailboxes-common-test-2.1-SNAPSHOT.jar``. +Add this dependency:: + + "com.typesafe.akka" % "akka-mailboxes-common-test" % "2.1-SNAPSHOT" + +For more inspiration you can look at the old implementations based on Redis, MongoDB, Beanstalk, +and ZooKeeper, which can be found in Akka git repository tag v2.0.1. \ No newline at end of file diff --git a/akka-docs/scala/microkernel.rst b/akka-docs/scala/microkernel.rst index 8fb1aec2c2..108a00588a 100644 --- a/akka-docs/scala/microkernel.rst +++ b/akka-docs/scala/microkernel.rst @@ -1,5 +1,5 @@ -.. _microkernel: +.. _microkernel-scala: Microkernel (Scala) =================== diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index be82e0fcb3..6c97142068 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -25,19 +25,17 @@ class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSp } } - def isDurableMailbox(m: Mailbox): Boolean = m.messageQueue.isInstanceOf[FileBasedMessageQueue] - - def clean { + def clean() { FileUtils.deleteDirectory(new java.io.File(queuePath)) } override def atStartup() { - clean + clean() super.atStartup() } override def atTermination() { - clean + clean() super.atTermination() } } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index ff436c227e..9081a5fcb0 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -3,14 +3,25 @@ */ package akka.actor.mailbox -import akka.testkit.AkkaSpec -import akka.testkit.TestLatch -import akka.util.duration._ -import java.io.InputStream -import scala.annotation.tailrec +import DurableMailboxSpecActorFactory.AccumulatorActor +import DurableMailboxSpecActorFactory.MailboxTestActor +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.LocalActorRef +import akka.actor.Props +import akka.actor.actorRef2Scala +import akka.dispatch.Mailbox +import akka.testkit.TestKit +import akka.util.duration.intToDurationInt import com.typesafe.config.Config -import akka.actor._ -import akka.dispatch.{ Mailbox, Await } +import com.typesafe.config.ConfigFactory +import java.io.InputStream +import java.util.concurrent.TimeoutException +import org.scalatest.BeforeAndAfterAll +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import scala.annotation.tailrec object DurableMailboxSpecActorFactory { @@ -28,13 +39,62 @@ object DurableMailboxSpecActorFactory { } +object DurableMailboxSpec { + def fallbackConfig: Config = ConfigFactory.parseString(""" + akka { + event-handlers = ["akka.testkit.TestEventListener"] + loglevel = "WARNING" + stdout-loglevel = "WARNING" + } + """) +} + /** + * Reusable test fixture for durable mailboxes. Implements a few basic tests. More + * tests can be added in concrete subclass. + * * Subclass must define dispatcher in the supplied config for the specific backend. * The id of the dispatcher must be the same as the `-dispatcher`. */ -abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) { +abstract class DurableMailboxSpec(system: ActorSystem, val backendName: String) + extends TestKit(system) with WordSpec with MustMatchers with BeforeAndAfterAll { + import DurableMailboxSpecActorFactory._ + /** + * Subclass must define dispatcher in the supplied config for the specific backend. + * The id of the dispatcher must be the same as the `-dispatcher`. + */ + def this(backendName: String, config: String) = { + this(ActorSystem(backendName + "BasedDurableMailboxSpec", + ConfigFactory.parseString(config).withFallback(DurableMailboxSpec.fallbackConfig)), + backendName) + } + + final override def beforeAll { + atStartup() + } + + /** + * May be implemented in concrete subclass to do additional things once before test + * cases are run. + */ + protected def atStartup() {} + + final override def afterAll { + system.shutdown() + try system.awaitTermination(5 seconds) catch { + case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + } + atTermination() + } + + /** + * May be implemented in concrete subclass to do additional things once after all + * test cases have been run. + */ + def atTermination() {} + protected def streamMustContain(in: InputStream, words: String): Unit = { val output = new Array[Byte](8192) @@ -60,7 +120,8 @@ abstract class DurableMailboxSpec(val backendName: String, config: String) exten case some ⇒ system.actorOf(props.withDispatcher(backendName + "-dispatcher"), some) } - def isDurableMailbox(m: Mailbox): Boolean + private def isDurableMailbox(m: Mailbox): Boolean = + m.messageQueue.isInstanceOf[DurableMessageQueue] "A " + backendName + " based mailbox backed actor" must { diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index fd763e6bad..f24ea49b8c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -5,7 +5,7 @@ package akka.testkit import org.scalatest.{ WordSpec, BeforeAndAfterAll, Tag } import org.scalatest.matchers.MustMatchers -import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.actor.ActorSystem import akka.actor.{ Actor, ActorRef, Props } import akka.event.{ Logging, LoggingAdapter } import akka.util.duration._ @@ -72,7 +72,7 @@ abstract class AkkaSpec(_system: ActorSystem) final override def afterAll { system.shutdown() - try Await.ready(system.asInstanceOf[ActorSystemImpl].terminationFuture, 5 seconds) catch { + try system.awaitTermination(5 seconds) catch { case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) } atTermination() diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 4804c0f796..a489d57c8b 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -154,7 +154,9 @@ object AkkaBuild extends Build { base = file("akka-durable-mailboxes/akka-mailboxes-common"), dependencies = Seq(remote, testkit % "compile;test->test"), settings = defaultSettings ++ Seq( - libraryDependencies ++= Dependencies.mailboxes + libraryDependencies ++= Dependencies.mailboxes, + // DurableMailboxSpec published in akka-mailboxes-common-test + publishArtifact in Test := true ) ) @@ -257,7 +259,8 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel), + dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", + remote, cluster, slf4j, agent, transactor, fileMailbox, zeroMQ, camel), settings = defaultSettings ++ Sphinx.settings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs,