diff --git a/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala index 018b2c1726..8254d86872 100644 --- a/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala @@ -319,13 +319,15 @@ object TransactionLog { new TransactionLog(ledger, id, isAsync) /** - * TODO document method + * Shuts down the transaction log. */ def shutdown() { isConnected switchOff { try { - zkClient.close - bookieClient.halt + EventHandler.info(this, "Shutting down transaction log...") + zkClient.close() + bookieClient.halt() + EventHandler.info(this, "Transaction log shut down successfully") } catch { case e => handleError(e) } @@ -378,8 +380,7 @@ object TransactionLog { "] meta-data in ZooKeeper for UUID [" + id +"]")) } - EventHandler.info(this, - "Created new transaction log [%s] for UUID [%s]".format(logId, id)) + EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id)) TransactionLog(ledger, id, isAsync) } @@ -459,6 +460,7 @@ object LocalBookKeeperEnsemble { localBookKeeper.runZookeeper(port) localBookKeeper.initializeZookeper localBookKeeper.runBookies + EventHandler.info(this, "LocalBookKeeperEnsemble started successfully") } } @@ -467,10 +469,17 @@ object LocalBookKeeperEnsemble { */ def shutdown() { isRunning switchOff { - localBookKeeper.bs.foreach(_.shutdown) // stop bookies - localBookKeeper.zkc.close // stop zk client - localBookKeeper.zks.shutdown // stop zk server - localBookKeeper.serverFactory.shutdown // stop zk NIOServer + EventHandler.info(this, "Shutting down LocalBookKeeperEnsemble...") + println("***************************** 1") + localBookKeeper.bs.foreach(_.shutdown()) // stop bookies + println("***************************** 2") + localBookKeeper.zkc.close() // stop zk client + println("***************************** 3") + localBookKeeper.zks.shutdown() // stop zk server + println("***************************** 4") + localBookKeeper.serverFactory.shutdown() // stop zk NIOServer + println("***************************** 5") + EventHandler.info(this, "LocalBookKeeperEnsemble shut down successfully") } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 7568a6a580..7290c396e1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -36,6 +36,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter oldDeployment must equal(newDeployment.get) } } + /* "be able to create an actor deployed using ClusterDeployer" in { val pi = Actor.actorOf[Pi]("service-pi") @@ -58,7 +59,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter } override def afterAll() { - Deployer.shutdown() + ClusterDeployer.shutdown() Cluster.shutdownLocalCluster() Actor.registry.local.shutdownAll() } diff --git a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala index 84b48f8fdc..1120c977ed 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReplicationSpec.scala @@ -271,11 +271,11 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll } override def beforeAll() = { - LocalBookKeeperEnsemble.start + LocalBookKeeperEnsemble.start() } override def afterAll() = { - TransactionLog.shutdown - LocalBookKeeperEnsemble.shutdown + TransactionLog.shutdown() + LocalBookKeeperEnsemble.shutdown() } } diff --git a/akka-docs/cluster/cluster/more.png b/akka-docs/cluster/cluster/more.png new file mode 100644 index 0000000000..3eb7b05c84 Binary files /dev/null and b/akka-docs/cluster/cluster/more.png differ diff --git a/akka-docs/cluster/durable-mailbox.rst b/akka-docs/cluster/durable-mailbox.rst new file mode 100644 index 0000000000..212308e8d6 --- /dev/null +++ b/akka-docs/cluster/durable-mailbox.rst @@ -0,0 +1,228 @@ +Durable Mailboxes +================= + +Overview +-------- + +Akka supports a set of durable mailboxes. A durable mailbox is a +replacement for the standard actor mailbox that is durable. What this means in +practice is that if there are pending messages in the actor's mailbox when the +node of the actor resides on crashes, then when you restart the node, the actor +will be able to continue processing as if nothing had happened; with all pending +messages still in its mailbox. + +.. sidebar:: **IMPORTANT** + + None of these mailboxes work with blocking message send, e.g. the message send + operations that are relying on futures; ``!!``, ``!!!``, ``sendRequestReply`` + and ``sendRequestReplyFuture``. If the node has crashed and then restarted, the + thread that was blocked waiting for the reply is gone and there is no way we can + deliver the message. + +The durable mailboxes currently supported are: + + - ``FileDurableMailboxStorage`` -- backed by a journaling transaction log on the local file system + - ``RedisDurableMailboxStorage`` -- backed by Redis + - ``ZooKeeperDurableMailboxStorage`` -- backed by ZooKeeper + - ``BeanstalkDurableMailboxStorage`` -- backed by Beanstalkd + +We'll walk through each one of these in detail in the sections below. + +Soon Akka will also have: + + - ``AmqpDurableMailboxStorage`` -- AMQP based mailbox (default RabbitMQ) + - ``JmsDurableMailboxStorage`` -- JMS based mailbox (default ActiveMQ) + + +File-based durable mailbox +-------------------------- + +This mailbox is backed by a journaling transaction log on the local file +system. It is the simplest want to use since it does not require an extra +infrastructure piece to administer, but it is usually sufficient and just what +you need. + +The durable dispatchers and their configuration options reside in the ``akka.actor.mailbox`` package. + +You configure durable mailboxes through the "Akka"-only durable dispatchers, the actor +is oblivious to which type of mailbox it is using. Here is an example:: + + val dispatcher = DurableEventBasedDispatcher( + "my:service", + FileDurableMailboxStorage) + //Then set the actors dispatcher to this dispatcher + +or for a thread-based durable dispatcher. :: + + self.dispatcher = DurableThreadBasedDispatcher( + self, + FileDurableMailboxStorage) + +There are 2 different durable dispatchers, +``DurableEventBasedDispatcher`` and ``DurableThreadBasedDispatcher``, +which are durable versions of ``ExecutorBasedEventDrivenDispatcher`` and ``ThreadBasedDispatcher``. + +This gives you an excellent way of creating bulkheads in your application, +where groups of actors sharing the same dispatcher also share the same backing storage. + +|more| Read more about that in the :ref:`dispatchers-scala` documentation. + +You can also configure and tune the file-based durable mailbox. This is done in +the ``akka.actor.mailbox.file-based`` section in the ``akka.conf`` configuration +file. + +.. code-block:: conf + + akka { + actor { + mailbox { + file-based { + directory-path = "./_mb" + max-items = 2147483647 + max-size = 2147483647 + max-items = 2147483647 + max-age = 0 + max-journal-size = 16777216 # 16 * 1024 * 1024 + max-memory-size = 134217728 # 128 * 1024 * 1024 + max-journal-overflow = 10 + max-journal-size-absolute = 9223372036854775807 + discard-old-when-full = on + keep-journal = on + sync-journal = off + } + } + } + } + +.. todo:: explain all the above options in detail + + +Redis-based durable mailbox +--------------------------- + +This mailbox is backed by a Redis queue. `Redis `_ Is a very +fast NOSQL database that has a wide range of data structure abstractions, one of +them is a queue which is what we are using in this implementation. This means +that you have to start up a Redis server that can host these durable +mailboxes. Read more in the Redis documentation on how to do that. + +Here is an example of how you can configure your dispatcher to use this mailbox:: + + val dispatcher = DurableEventBasedDispatcher( + "my:service", + RedisDurableMailboxStorage) + +or for a thread-based durable dispatcher. :: + + self.dispatcher = DurableThreadBasedDispatcher( + self, + RedisDurableMailboxStorage) + +You also need to configure the IP and port for the Redis server. This is done in +the ``akka.actor.mailbox.redis`` section in the ``akka.conf`` configuration +file. + +.. code-block:: conf + + akka { + actor { + mailbox { + redis { + hostname = "127.0.0.1" + port = 6379 + } + } + } + } + + +ZooKeeper-based durable mailbox +------------------------------- + +This mailbox is backed by `ZooKeeper `_. ZooKeeper +is a centralized service for maintaining configuration information, naming, +providing distributed synchronization, and providing group services This means +that you have to start up a ZooKeeper server (for production a ZooKeeper server +ensamble) that can host these durable mailboxes. Read more in the ZooKeeper +documentation on how to do that. + +Akka is using ZooKeeper for many other things, for example the clustering +support so if you're using that you love to run a ZooKeeper server anyway and +there will not be that much more work to set up this durable mailbox. + +Here is an example of how you can configure your dispatcher to use this mailbox:: + + val dispatcher = DurableEventBasedDispatcher( + "my:service", + ZooKeeperDurableMailboxStorage) + +or for a thread-based durable dispatcher. :: + + self.dispatcher = DurableThreadBasedDispatcher( + self, + ZooKeeperDurableMailboxStorage) + +You also need to configure ZooKeeper server addresses, timeouts, etc. This is +done in the ``akka.actor.mailbox.zookeeper`` section in the ``akka.conf`` +configuration file. + +.. code-block:: conf + + akka { + actor { + mailbox { + zookeeper { + server-addresses = "localhost:2181" + session-timeout = 60 + connection-timeout = 30 + blocking-queue = on + } + } + } + } + + +Beanstalk-based durable mailbox +------------------------------- + +This mailbox is backed by `Beanstalkd `_. +Beanstalk is a simple, fast work queue. This means that you have to start up a +Beanstalk server that can host these durable mailboxes. Read more in the +Beanstalk documentation on how to do that. :: + + val dispatcher = DurableEventBasedDispatcher( + "my:service", + BeanstalkDurableMailboxStorage) + +or for a thread-based durable dispatcher. :: + + self.dispatcher = DurableThreadBasedDispatcher( + self, + BeanstalkDurableMailboxStorage) + +You also need to configure the IP, and port, and so on, for the Beanstalk +server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the +``akka.conf`` configuration file. + +.. code-block:: conf + + akka { + actor { + mailbox { + beanstalk { + hostname = "127.0.0.1" + port = 11300 + reconnect-window = 5 + message-submit-delay = 0 + message-submit-timeout = 5 + message-time-to-live = 120 + } + } + } + } + + +.. |more| image:: more.png + :align: middle + :alt: More info + diff --git a/akka-docs/cluster/index.rst b/akka-docs/cluster/index.rst new file mode 100644 index 0000000000..b9ffb28378 --- /dev/null +++ b/akka-docs/cluster/index.rst @@ -0,0 +1,7 @@ +Cluster +======= + +.. toctree:: + :maxdepth: 2 + + durable-mailbox diff --git a/akka-docs/index.rst b/akka-docs/index.rst index 738b8e636f..4c94d7a68a 100644 --- a/akka-docs/index.rst +++ b/akka-docs/index.rst @@ -9,6 +9,7 @@ Contents common/index scala/index java/index + cluster/index dev/index project/index additional/index diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 578fcd4ff5..36bd22d8ba 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -1,3 +1,5 @@ +.. _dispatchers-java: + Dispatchers (Java) ================== diff --git a/akka-docs/more.png b/akka-docs/more.png new file mode 100644 index 0000000000..3eb7b05c84 Binary files /dev/null and b/akka-docs/more.png differ diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 35285c20fa..91f3799d00 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -1,3 +1,5 @@ +.. _dispatchers-scala: + Dispatchers (Scala) =================== diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 156fa02dc7..26c630634f 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -329,24 +329,27 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec // ------------------------------------------------------------------------------------------------------------------- class AkkaClusterProject(info: ProjectInfo) extends AkkaDefaultProject(info) with MultiJvmTests { - val bookkeeper = Dependencies.bookkeeper - val zookeeper = Dependencies.zookeeper + val bookkeeper = Dependencies.bookkeeper + val zookeeper = Dependencies.zookeeper val zookeeper_lock = Dependencies.zookeeper_lock - val zkClient = Dependencies.zkClient - val commons_io = Dependencies.commons_io - val log4j = Dependencies.log4j + val zkClient = Dependencies.zkClient + val commons_io = Dependencies.commons_io + val log4j = Dependencies.log4j // test dependencies - - val scalatest = Dependencies.scalatest - val junit = Dependencies.junit + val scalatest = Dependencies.scalatest + val junit = Dependencies.junit // multi jvm tests - lazy val clusterTest = multiJvmTest lazy val clusterRun = multiJvmRun override def multiJvmOptions = Seq("-Xmx256M") + + lazy val replicationTestsEnabled = systemOptional[Boolean]("cluster.test.replication", false) + + override def testOptions = + super.testOptions ++ (if (!replicationTestsEnabled.value) Seq(testFilter("Replication")) else Seq.empty) } // -------------------------------------------------------------------------------------------------------------------