Added docs for durable mailboxes, plus filtered out replication tests
This commit is contained in:
parent
160ff867cb
commit
962ee1ebba
11 changed files with 275 additions and 22 deletions
|
|
@ -319,13 +319,15 @@ object TransactionLog {
|
||||||
new TransactionLog(ledger, id, isAsync)
|
new TransactionLog(ledger, id, isAsync)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO document method
|
* Shuts down the transaction log.
|
||||||
*/
|
*/
|
||||||
def shutdown() {
|
def shutdown() {
|
||||||
isConnected switchOff {
|
isConnected switchOff {
|
||||||
try {
|
try {
|
||||||
zkClient.close
|
EventHandler.info(this, "Shutting down transaction log...")
|
||||||
bookieClient.halt
|
zkClient.close()
|
||||||
|
bookieClient.halt()
|
||||||
|
EventHandler.info(this, "Transaction log shut down successfully")
|
||||||
} catch {
|
} catch {
|
||||||
case e => handleError(e)
|
case e => handleError(e)
|
||||||
}
|
}
|
||||||
|
|
@ -378,8 +380,7 @@ object TransactionLog {
|
||||||
"] meta-data in ZooKeeper for UUID [" + id +"]"))
|
"] meta-data in ZooKeeper for UUID [" + id +"]"))
|
||||||
}
|
}
|
||||||
|
|
||||||
EventHandler.info(this,
|
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
|
||||||
"Created new transaction log [%s] for UUID [%s]".format(logId, id))
|
|
||||||
TransactionLog(ledger, id, isAsync)
|
TransactionLog(ledger, id, isAsync)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -459,6 +460,7 @@ object LocalBookKeeperEnsemble {
|
||||||
localBookKeeper.runZookeeper(port)
|
localBookKeeper.runZookeeper(port)
|
||||||
localBookKeeper.initializeZookeper
|
localBookKeeper.initializeZookeper
|
||||||
localBookKeeper.runBookies
|
localBookKeeper.runBookies
|
||||||
|
EventHandler.info(this, "LocalBookKeeperEnsemble started successfully")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -467,10 +469,17 @@ object LocalBookKeeperEnsemble {
|
||||||
*/
|
*/
|
||||||
def shutdown() {
|
def shutdown() {
|
||||||
isRunning switchOff {
|
isRunning switchOff {
|
||||||
localBookKeeper.bs.foreach(_.shutdown) // stop bookies
|
EventHandler.info(this, "Shutting down LocalBookKeeperEnsemble...")
|
||||||
localBookKeeper.zkc.close // stop zk client
|
println("***************************** 1")
|
||||||
localBookKeeper.zks.shutdown // stop zk server
|
localBookKeeper.bs.foreach(_.shutdown()) // stop bookies
|
||||||
localBookKeeper.serverFactory.shutdown // stop zk NIOServer
|
println("***************************** 2")
|
||||||
|
localBookKeeper.zkc.close() // stop zk client
|
||||||
|
println("***************************** 3")
|
||||||
|
localBookKeeper.zks.shutdown() // stop zk server
|
||||||
|
println("***************************** 4")
|
||||||
|
localBookKeeper.serverFactory.shutdown() // stop zk NIOServer
|
||||||
|
println("***************************** 5")
|
||||||
|
EventHandler.info(this, "LocalBookKeeperEnsemble shut down successfully")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
|
||||||
oldDeployment must equal(newDeployment.get)
|
oldDeployment must equal(newDeployment.get)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
"be able to create an actor deployed using ClusterDeployer" in {
|
"be able to create an actor deployed using ClusterDeployer" in {
|
||||||
val pi = Actor.actorOf[Pi]("service-pi")
|
val pi = Actor.actorOf[Pi]("service-pi")
|
||||||
|
|
@ -58,7 +59,7 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterAll() {
|
override def afterAll() {
|
||||||
Deployer.shutdown()
|
ClusterDeployer.shutdown()
|
||||||
Cluster.shutdownLocalCluster()
|
Cluster.shutdownLocalCluster()
|
||||||
Actor.registry.local.shutdownAll()
|
Actor.registry.local.shutdownAll()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -271,11 +271,11 @@ class ReplicationSpec extends WordSpec with MustMatchers with BeforeAndAfterAll
|
||||||
}
|
}
|
||||||
|
|
||||||
override def beforeAll() = {
|
override def beforeAll() = {
|
||||||
LocalBookKeeperEnsemble.start
|
LocalBookKeeperEnsemble.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterAll() = {
|
override def afterAll() = {
|
||||||
TransactionLog.shutdown
|
TransactionLog.shutdown()
|
||||||
LocalBookKeeperEnsemble.shutdown
|
LocalBookKeeperEnsemble.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
BIN
akka-docs/cluster/cluster/more.png
Normal file
BIN
akka-docs/cluster/cluster/more.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.5 KiB |
228
akka-docs/cluster/durable-mailbox.rst
Normal file
228
akka-docs/cluster/durable-mailbox.rst
Normal file
|
|
@ -0,0 +1,228 @@
|
||||||
|
Durable Mailboxes
|
||||||
|
=================
|
||||||
|
|
||||||
|
Overview
|
||||||
|
--------
|
||||||
|
|
||||||
|
Akka supports a set of durable mailboxes. A durable mailbox is a
|
||||||
|
replacement for the standard actor mailbox that is durable. What this means in
|
||||||
|
practice is that if there are pending messages in the actor's mailbox when the
|
||||||
|
node of the actor resides on crashes, then when you restart the node, the actor
|
||||||
|
will be able to continue processing as if nothing had happened; with all pending
|
||||||
|
messages still in its mailbox.
|
||||||
|
|
||||||
|
.. sidebar:: **IMPORTANT**
|
||||||
|
|
||||||
|
None of these mailboxes work with blocking message send, e.g. the message send
|
||||||
|
operations that are relying on futures; ``!!``, ``!!!``, ``sendRequestReply``
|
||||||
|
and ``sendRequestReplyFuture``. If the node has crashed and then restarted, the
|
||||||
|
thread that was blocked waiting for the reply is gone and there is no way we can
|
||||||
|
deliver the message.
|
||||||
|
|
||||||
|
The durable mailboxes currently supported are:
|
||||||
|
|
||||||
|
- ``FileDurableMailboxStorage`` -- backed by a journaling transaction log on the local file system
|
||||||
|
- ``RedisDurableMailboxStorage`` -- backed by Redis
|
||||||
|
- ``ZooKeeperDurableMailboxStorage`` -- backed by ZooKeeper
|
||||||
|
- ``BeanstalkDurableMailboxStorage`` -- backed by Beanstalkd
|
||||||
|
|
||||||
|
We'll walk through each one of these in detail in the sections below.
|
||||||
|
|
||||||
|
Soon Akka will also have:
|
||||||
|
|
||||||
|
- ``AmqpDurableMailboxStorage`` -- AMQP based mailbox (default RabbitMQ)
|
||||||
|
- ``JmsDurableMailboxStorage`` -- JMS based mailbox (default ActiveMQ)
|
||||||
|
|
||||||
|
|
||||||
|
File-based durable mailbox
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
This mailbox is backed by a journaling transaction log on the local file
|
||||||
|
system. It is the simplest want to use since it does not require an extra
|
||||||
|
infrastructure piece to administer, but it is usually sufficient and just what
|
||||||
|
you need.
|
||||||
|
|
||||||
|
The durable dispatchers and their configuration options reside in the ``akka.actor.mailbox`` package.
|
||||||
|
|
||||||
|
You configure durable mailboxes through the "Akka"-only durable dispatchers, the actor
|
||||||
|
is oblivious to which type of mailbox it is using. Here is an example::
|
||||||
|
|
||||||
|
val dispatcher = DurableEventBasedDispatcher(
|
||||||
|
"my:service",
|
||||||
|
FileDurableMailboxStorage)
|
||||||
|
//Then set the actors dispatcher to this dispatcher
|
||||||
|
|
||||||
|
or for a thread-based durable dispatcher. ::
|
||||||
|
|
||||||
|
self.dispatcher = DurableThreadBasedDispatcher(
|
||||||
|
self,
|
||||||
|
FileDurableMailboxStorage)
|
||||||
|
|
||||||
|
There are 2 different durable dispatchers,
|
||||||
|
``DurableEventBasedDispatcher`` and ``DurableThreadBasedDispatcher``,
|
||||||
|
which are durable versions of ``ExecutorBasedEventDrivenDispatcher`` and ``ThreadBasedDispatcher``.
|
||||||
|
|
||||||
|
This gives you an excellent way of creating bulkheads in your application,
|
||||||
|
where groups of actors sharing the same dispatcher also share the same backing storage.
|
||||||
|
|
||||||
|
|more| Read more about that in the :ref:`dispatchers-scala` documentation.
|
||||||
|
|
||||||
|
You can also configure and tune the file-based durable mailbox. This is done in
|
||||||
|
the ``akka.actor.mailbox.file-based`` section in the ``akka.conf`` configuration
|
||||||
|
file.
|
||||||
|
|
||||||
|
.. code-block:: conf
|
||||||
|
|
||||||
|
akka {
|
||||||
|
actor {
|
||||||
|
mailbox {
|
||||||
|
file-based {
|
||||||
|
directory-path = "./_mb"
|
||||||
|
max-items = 2147483647
|
||||||
|
max-size = 2147483647
|
||||||
|
max-items = 2147483647
|
||||||
|
max-age = 0
|
||||||
|
max-journal-size = 16777216 # 16 * 1024 * 1024
|
||||||
|
max-memory-size = 134217728 # 128 * 1024 * 1024
|
||||||
|
max-journal-overflow = 10
|
||||||
|
max-journal-size-absolute = 9223372036854775807
|
||||||
|
discard-old-when-full = on
|
||||||
|
keep-journal = on
|
||||||
|
sync-journal = off
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
.. todo:: explain all the above options in detail
|
||||||
|
|
||||||
|
|
||||||
|
Redis-based durable mailbox
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
This mailbox is backed by a Redis queue. `Redis <http://redis.io>`_ Is a very
|
||||||
|
fast NOSQL database that has a wide range of data structure abstractions, one of
|
||||||
|
them is a queue which is what we are using in this implementation. This means
|
||||||
|
that you have to start up a Redis server that can host these durable
|
||||||
|
mailboxes. Read more in the Redis documentation on how to do that.
|
||||||
|
|
||||||
|
Here is an example of how you can configure your dispatcher to use this mailbox::
|
||||||
|
|
||||||
|
val dispatcher = DurableEventBasedDispatcher(
|
||||||
|
"my:service",
|
||||||
|
RedisDurableMailboxStorage)
|
||||||
|
|
||||||
|
or for a thread-based durable dispatcher. ::
|
||||||
|
|
||||||
|
self.dispatcher = DurableThreadBasedDispatcher(
|
||||||
|
self,
|
||||||
|
RedisDurableMailboxStorage)
|
||||||
|
|
||||||
|
You also need to configure the IP and port for the Redis server. This is done in
|
||||||
|
the ``akka.actor.mailbox.redis`` section in the ``akka.conf`` configuration
|
||||||
|
file.
|
||||||
|
|
||||||
|
.. code-block:: conf
|
||||||
|
|
||||||
|
akka {
|
||||||
|
actor {
|
||||||
|
mailbox {
|
||||||
|
redis {
|
||||||
|
hostname = "127.0.0.1"
|
||||||
|
port = 6379
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ZooKeeper-based durable mailbox
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
This mailbox is backed by `ZooKeeper <http://zookeeper.apache.org/>`_. ZooKeeper
|
||||||
|
is a centralized service for maintaining configuration information, naming,
|
||||||
|
providing distributed synchronization, and providing group services This means
|
||||||
|
that you have to start up a ZooKeeper server (for production a ZooKeeper server
|
||||||
|
ensamble) that can host these durable mailboxes. Read more in the ZooKeeper
|
||||||
|
documentation on how to do that.
|
||||||
|
|
||||||
|
Akka is using ZooKeeper for many other things, for example the clustering
|
||||||
|
support so if you're using that you love to run a ZooKeeper server anyway and
|
||||||
|
there will not be that much more work to set up this durable mailbox.
|
||||||
|
|
||||||
|
Here is an example of how you can configure your dispatcher to use this mailbox::
|
||||||
|
|
||||||
|
val dispatcher = DurableEventBasedDispatcher(
|
||||||
|
"my:service",
|
||||||
|
ZooKeeperDurableMailboxStorage)
|
||||||
|
|
||||||
|
or for a thread-based durable dispatcher. ::
|
||||||
|
|
||||||
|
self.dispatcher = DurableThreadBasedDispatcher(
|
||||||
|
self,
|
||||||
|
ZooKeeperDurableMailboxStorage)
|
||||||
|
|
||||||
|
You also need to configure ZooKeeper server addresses, timeouts, etc. This is
|
||||||
|
done in the ``akka.actor.mailbox.zookeeper`` section in the ``akka.conf``
|
||||||
|
configuration file.
|
||||||
|
|
||||||
|
.. code-block:: conf
|
||||||
|
|
||||||
|
akka {
|
||||||
|
actor {
|
||||||
|
mailbox {
|
||||||
|
zookeeper {
|
||||||
|
server-addresses = "localhost:2181"
|
||||||
|
session-timeout = 60
|
||||||
|
connection-timeout = 30
|
||||||
|
blocking-queue = on
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Beanstalk-based durable mailbox
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
This mailbox is backed by `Beanstalkd <http://kr.github.com/beanstalkd/>`_.
|
||||||
|
Beanstalk is a simple, fast work queue. This means that you have to start up a
|
||||||
|
Beanstalk server that can host these durable mailboxes. Read more in the
|
||||||
|
Beanstalk documentation on how to do that. ::
|
||||||
|
|
||||||
|
val dispatcher = DurableEventBasedDispatcher(
|
||||||
|
"my:service",
|
||||||
|
BeanstalkDurableMailboxStorage)
|
||||||
|
|
||||||
|
or for a thread-based durable dispatcher. ::
|
||||||
|
|
||||||
|
self.dispatcher = DurableThreadBasedDispatcher(
|
||||||
|
self,
|
||||||
|
BeanstalkDurableMailboxStorage)
|
||||||
|
|
||||||
|
You also need to configure the IP, and port, and so on, for the Beanstalk
|
||||||
|
server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the
|
||||||
|
``akka.conf`` configuration file.
|
||||||
|
|
||||||
|
.. code-block:: conf
|
||||||
|
|
||||||
|
akka {
|
||||||
|
actor {
|
||||||
|
mailbox {
|
||||||
|
beanstalk {
|
||||||
|
hostname = "127.0.0.1"
|
||||||
|
port = 11300
|
||||||
|
reconnect-window = 5
|
||||||
|
message-submit-delay = 0
|
||||||
|
message-submit-timeout = 5
|
||||||
|
message-time-to-live = 120
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
.. |more| image:: more.png
|
||||||
|
:align: middle
|
||||||
|
:alt: More info
|
||||||
|
|
||||||
7
akka-docs/cluster/index.rst
Normal file
7
akka-docs/cluster/index.rst
Normal file
|
|
@ -0,0 +1,7 @@
|
||||||
|
Cluster
|
||||||
|
=======
|
||||||
|
|
||||||
|
.. toctree::
|
||||||
|
:maxdepth: 2
|
||||||
|
|
||||||
|
durable-mailbox
|
||||||
|
|
@ -9,6 +9,7 @@ Contents
|
||||||
common/index
|
common/index
|
||||||
scala/index
|
scala/index
|
||||||
java/index
|
java/index
|
||||||
|
cluster/index
|
||||||
dev/index
|
dev/index
|
||||||
project/index
|
project/index
|
||||||
additional/index
|
additional/index
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,5 @@
|
||||||
|
.. _dispatchers-java:
|
||||||
|
|
||||||
Dispatchers (Java)
|
Dispatchers (Java)
|
||||||
==================
|
==================
|
||||||
|
|
||||||
|
|
|
||||||
BIN
akka-docs/more.png
Normal file
BIN
akka-docs/more.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.5 KiB |
|
|
@ -1,3 +1,5 @@
|
||||||
|
.. _dispatchers-scala:
|
||||||
|
|
||||||
Dispatchers (Scala)
|
Dispatchers (Scala)
|
||||||
===================
|
===================
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -329,24 +329,27 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec
|
||||||
// -------------------------------------------------------------------------------------------------------------------
|
// -------------------------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
class AkkaClusterProject(info: ProjectInfo) extends AkkaDefaultProject(info) with MultiJvmTests {
|
class AkkaClusterProject(info: ProjectInfo) extends AkkaDefaultProject(info) with MultiJvmTests {
|
||||||
val bookkeeper = Dependencies.bookkeeper
|
val bookkeeper = Dependencies.bookkeeper
|
||||||
val zookeeper = Dependencies.zookeeper
|
val zookeeper = Dependencies.zookeeper
|
||||||
val zookeeper_lock = Dependencies.zookeeper_lock
|
val zookeeper_lock = Dependencies.zookeeper_lock
|
||||||
val zkClient = Dependencies.zkClient
|
val zkClient = Dependencies.zkClient
|
||||||
val commons_io = Dependencies.commons_io
|
val commons_io = Dependencies.commons_io
|
||||||
val log4j = Dependencies.log4j
|
val log4j = Dependencies.log4j
|
||||||
|
|
||||||
// test dependencies
|
// test dependencies
|
||||||
|
val scalatest = Dependencies.scalatest
|
||||||
val scalatest = Dependencies.scalatest
|
val junit = Dependencies.junit
|
||||||
val junit = Dependencies.junit
|
|
||||||
|
|
||||||
// multi jvm tests
|
// multi jvm tests
|
||||||
|
|
||||||
lazy val clusterTest = multiJvmTest
|
lazy val clusterTest = multiJvmTest
|
||||||
lazy val clusterRun = multiJvmRun
|
lazy val clusterRun = multiJvmRun
|
||||||
|
|
||||||
override def multiJvmOptions = Seq("-Xmx256M")
|
override def multiJvmOptions = Seq("-Xmx256M")
|
||||||
|
|
||||||
|
lazy val replicationTestsEnabled = systemOptional[Boolean]("cluster.test.replication", false)
|
||||||
|
|
||||||
|
override def testOptions =
|
||||||
|
super.testOptions ++ (if (!replicationTestsEnabled.value) Seq(testFilter("Replication")) else Seq.empty)
|
||||||
}
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------------------------------------------------
|
// -------------------------------------------------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue