switch to dispatcher-scoped settings for durable mailboxes, see #1836

- also switch SBT settings to enable testing of durable mailboxes
  centrally (if so desired, just uncomment testMailbox line in build.sbt)
- automatically start mongod, beanstalkd or redis-server when running
  the respective tests (assumes that the binaries are in PATH)
- unify settings extraction from dispatcher config, sub-scoping by
  mailbox type name
This commit is contained in:
Roland 2012-02-23 17:13:40 +01:00
parent d987ad2228
commit b3dd85f6dd
26 changed files with 317 additions and 169 deletions

3
.gitignore vendored
View file

@ -63,3 +63,6 @@ akka.sublime-workspace
_mb
schoir.props
worker*.log
mongoDB/
redis/
beanstalk/

View file

@ -254,7 +254,7 @@ akka {
debug {
# enable function of Actor.loggable(), which is to log any received message at
# DEBUG level
# DEBUG level, see “Testing Actor Systems”
receive = off
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like)

View file

@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess
class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new BeanstalkBasedMessageQueue(o)
case Some(o) new BeanstalkBasedMessageQueue(o, config)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
@ -27,9 +27,9 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class BeanstalkBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val settings = new BeanstalkMailboxSettings(owner.system, _config)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config)
}
class BeanstalkMailboxSettings(val config: Config) extends Extension {
import config._
val Hostname = getString("akka.actor.mailbox.beanstalk.hostname")
val Port = getInt("akka.actor.mailbox.beanstalk.port")
val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS)
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class BeanstalkMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
def name = "beanstalk"
val config = initialize
import config._
val Hostname = getString("hostname")
val Port = getInt("port")
val ReconnectWindow = Duration(getMilliseconds("reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("message-time-to-live"), MILLISECONDS)
}

View file

@ -5,9 +5,26 @@ object BeanstalkBasedMailboxSpec {
Beanstalkd-dispatcher {
mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType
throughput = 1
beanstalk {
hostname = "127.0.0.1"
port = 11400
}
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config)
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) {
lazy val beanstalkd = new ProcessBuilder("beanstalkd", "-b", "beanstalk", "-l", "127.0.0.1", "-p", "11400").start()
override def atStartup(): Unit = {
new java.io.File("beanstalk").mkdir()
beanstalkd
Thread.sleep(3000)
}
override def atTermination(): Unit = beanstalkd.destroy()
}

View file

@ -16,16 +16,16 @@ import akka.config.ConfigurationException
class FileBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new FileBasedMessageQueue(o)
case Some(o) new FileBasedMessageQueue(o, config)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class FileBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMessageQueue")
private val settings = FileBasedMailboxExtension(owner.system)
private val settings = new FileBasedMailboxSettings(owner.system, _config)
val queuePath = settings.QueuePath
private val queue = try {

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config)
}
class FileBasedMailboxSettings(val config: Config) extends Extension {
import config._
val QueuePath = getString("akka.actor.mailbox.file-based.directory-path")
val MaxItems = getInt("akka.actor.mailbox.file-based.max-items")
val MaxSize = getBytes("akka.actor.mailbox.file-based.max-size")
val MaxItemSize = getBytes("akka.actor.mailbox.file-based.max-item-size")
val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS)
val MaxJournalSize = getBytes("akka.actor.mailbox.file-based.max-journal-size")
val MaxMemorySize = getBytes("akka.actor.mailbox.file-based.max-memory-size")
val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow")
val MaxJournalSizeAbsolute = getBytes("akka.actor.mailbox.file-based.max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full")
val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal")
val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal")
}

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class FileBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
def name = "file-based"
val config = initialize
import config._
val QueuePath = getString("directory-path")
val MaxItems = getInt("max-items")
val MaxSize = getBytes("max-size")
val MaxItemSize = getBytes("max-item-size")
val MaxAge = Duration(getMilliseconds("max-age"), MILLISECONDS)
val MaxJournalSize = getBytes("max-journal-size")
val MaxMemorySize = getBytes("max-memory-size")
val MaxJournalOverflow = getInt("max-journal-overflow")
val MaxJournalSizeAbsolute = getBytes("max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("discard-old-when-full")
val KeepJournal = getBoolean("keep-journal")
val SyncJournal = getBoolean("sync-journal")
}

View file

@ -1,12 +1,14 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
object FileBasedMailboxSpec {
val config = """
File-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
throughput = 1
file-based.directory-path = "file-based"
}
"""
}
@ -14,8 +16,15 @@ object FileBasedMailboxSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
val queuePath = new FileBasedMailboxSettings(system, system.settings.config.getConfig("File-dispatcher")).QueuePath
"FileBasedMailboxSettings" must {
"read the file-based section" in {
queuePath must be("file-based")
}
}
def clean {
val queuePath = FileBasedMailboxExtension(system).QueuePath
FileUtils.deleteDirectory(new java.io.File(queuePath))
}

View file

@ -7,6 +7,8 @@ import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem }
import akka.dispatch.{ Envelope, MessageQueue }
import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol }
import com.typesafe.config.Config
import akka.actor.ActorSystem
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
@ -50,3 +52,55 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
}
/**
* Conventional organization of durable mailbox settings:
*
* {{{
* my-durable-dispatcher {
* mailbox-type = "my.durable.mailbox"
* my-durable-mailbox {
* setting1 = 1
* setting2 = 2
* }
* }
* }}}
*
* where name=my-durable-mailbox in this example.
*/
trait DurableMailboxSettings {
/**
* A reference to the enclosing actor system.
*/
def system: ActorSystem
/**
* A reference to the config section which the user specified for this mailboxs dispatcher.
*/
def userConfig: Config
/**
* The extracted config section for this mailbox, which is the name
* section (if that exists), falling back to system defaults. Typical
* implementation looks like:
*
* {{{
* val config = initialize
* }}}
*/
def config: Config
/**
* Name of this mailbox type for purposes of configuration scoping. Reference
* defaults go into akka.actor.mailbox.<name>.
*/
def name: String
/**
* Obtain default extracted mailbox config section from userConfig and system.
*/
def initialize: Config =
if (userConfig.hasPath(name))
userConfig.getConfig(name).withFallback(system.settings.config.getConfig("akka.actor.mailbox." + name))
else system.settings.config.getConfig("akka.actor.mailbox." + name)
}

View file

@ -11,6 +11,9 @@ import akka.dispatch.Await
import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import akka.util.duration._
import java.io.InputStream
import scala.annotation.tailrec
import com.typesafe.config.Config
object DurableMailboxSpecActorFactory {
@ -31,6 +34,26 @@ object DurableMailboxSpecActorFactory {
abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) {
import DurableMailboxSpecActorFactory._
protected def streamMustContain(in: InputStream, words: String): Unit = {
val output = new Array[Byte](8192)
def now = System.currentTimeMillis
def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // dont want parse errors
@tailrec def read(end: Int = 0, start: Long = now): Int =
in.read(output, end, output.length - end) match {
case -1 end
case x
val next = end + x
if (string(next).contains(words) || now - start > 10000 || next == output.length) next
else read(next, start)
}
val result = string(read())
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
}
def createMailboxTestActor(id: String): ActorRef =
system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher"))

View file

@ -21,7 +21,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message)
class MongoBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MongoBasedMessageQueue(o)
case Some(o) new MongoBasedMessageQueue(o, config)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
@ -37,14 +37,14 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
*
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/
class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) {
class MongoBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
private val dispatcher = owner.dispatcher
private val settings = MongoBasedMailboxExtension(owner.system)
private val settings = new MongoBasedMailboxSettings(owner.system, _config)
val log = Logging(system, "MongoBasedMessageQueue")
@ -98,9 +98,8 @@ class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_
def hasMessages: Boolean = numberOfMessages > 0
private[akka] def connect() = {
require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey))
log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match {
val _dbh = MongoConnection.fromURI(settings.MongoURI) match {
case (conn, None, None) {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}

View file

@ -1,26 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config)
}
class MongoBasedMailboxSettings(val config: Config) extends Extension {
import config._
val UriConfigKey = "akka.actor.mailbox.mongodb.uri"
val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None
val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS)
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class MongoBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
def name = "mongodb"
val config = initialize
import config._
val MongoURI = getString("uri")
val WriteTimeout = Duration(config.getMilliseconds("timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("timeout.read"), MILLISECONDS)
}

View file

@ -14,6 +14,7 @@ object MongoBasedMailboxSpec {
mongodb-dispatcher {
mailbox-type = akka.actor.mailbox.MongoBasedMailboxType
throughput = 1
mongodb.uri = "mongodb://localhost:27123/akka.mailbox"
}
"""
}
@ -23,9 +24,23 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
import com.mongodb.async._
val mongo = MongoConnection("localhost", 27017)("akka")
lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start()
lazy val mongo = MongoConnection("localhost", 27123)("akka")
override def atStartup(): Unit = {
// start MongoDB daemon
new java.io.File("mongoDB").mkdir()
val in = mongod.getInputStream
try {
streamMustContain(in, "waiting for connections on port")
mongo.dropDatabase() { success }
} catch {
case e mongod.destroy(); throw e
}
}
override def atTermination(): Unit = mongod.destroy()
}

View file

@ -19,14 +19,14 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new RedisBasedMessageQueue(o)
case Some(o) new RedisBasedMessageQueue(o, config)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class RedisBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)
private val settings = new RedisBasedMailboxSettings(owner.system, _config)
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.actor._
object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config)
}
class RedisBasedMailboxSettings(val config: Config) extends Extension {
import config._
val Hostname = getString("akka.actor.mailbox.redis.hostname")
val Port = getInt("akka.actor.mailbox.redis.port")
}

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.actor.ActorSystem
class RedisBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
def name = "redis"
val config = initialize
import config._
val Hostname = getString("hostname")
val Port = getInt("port")
}

View file

@ -5,9 +5,40 @@ object RedisBasedMailboxSpec {
Redis-dispatcher {
mailbox-type = akka.actor.mailbox.RedisBasedMailboxType
throughput = 1
redis {
hostname = "127.0.0.1"
port = 6479
}
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config)
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) {
lazy val redisServer = new ProcessBuilder("redis-server", "-").start()
override def atStartup(): Unit = {
new java.io.File("redis").mkdir()
val out = redisServer.getOutputStream
val config = """
port 6479
bind 127.0.0.1
dir redis
""".getBytes("UTF-8")
try {
out.write(config)
out.close()
streamMustContain(redisServer.getInputStream, "ready to accept connections on port")
} catch {
case e redisServer.destroy(); throw e
}
}
override def atTermination(): Unit = redisServer.destroy()
}

View file

@ -20,14 +20,14 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new ZooKeeperBasedMessageQueue(o)
case Some(o) new ZooKeeperBasedMessageQueue(o, config)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class ZooKeeperBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
private val settings = new ZooKeeperBasedMailboxSettings(owner.system, _config)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config)
}
class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension {
import config._
val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses")
val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class ZooKeeperBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
def name = "zookeeper"
val config = initialize
import config._
val ZkServerAddresses = getString("server-addresses")
val SessionTimeout = Duration(getMilliseconds("session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("blocking-queue")
}

View file

@ -5,12 +5,15 @@ import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher
import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.util.duration._
object ZooKeeperBasedMailboxSpec {
val config = """
ZooKeeper-dispatcher {
mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType
throughput = 1
zookeeper.session-timeout = 30s
}
"""
}
@ -21,6 +24,12 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
"ZookeeperBasedMailboxSettings" must {
"read the right settings" in {
new ZooKeeperBasedMailboxSettings(system, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds)
}
}
var zkServer: ZkServer = _
override def atStartup() {

View file

@ -5,3 +5,5 @@
(externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/")
(description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM."
//testMailbox in Global := true

View file

@ -26,6 +26,7 @@ object AkkaBuild extends Build {
id = "akka",
base = file("."),
settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq(
testMailbox in GlobalScope := false,
parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean,
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
@ -141,6 +142,8 @@ object AkkaBuild extends Build {
// )
// )
val testMailbox = SettingKey[Boolean]("test-mailbox")
lazy val mailboxes = Project(
id = "akka-durable-mailboxes",
base = file("akka-durable-mailboxes"),
@ -165,8 +168,7 @@ object AkkaBuild extends Build {
dependencies = Seq(mailboxesCommon % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.beanstalkMailbox,
testBeanstalkMailbox := false,
testOptions in Test <+= testBeanstalkMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)
@ -179,16 +181,13 @@ object AkkaBuild extends Build {
)
)
val testRedisMailbox = SettingKey[Boolean]("test-redis-mailbox")
lazy val redisMailbox = Project(
id = "akka-redis-mailbox",
base = file("akka-durable-mailboxes/akka-redis-mailbox"),
dependencies = Seq(mailboxesCommon % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.redisMailbox,
testRedisMailbox := false,
testOptions in Test <+= testRedisMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)
@ -201,8 +200,6 @@ object AkkaBuild extends Build {
)
)
val testMongoMailbox = SettingKey[Boolean]("test-mongo-mailbox")
lazy val mongoMailbox = Project(
id = "akka-mongo-mailbox",
base = file("akka-durable-mailboxes/akka-mongo-mailbox"),
@ -210,8 +207,7 @@ object AkkaBuild extends Build {
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.mongoMailbox,
ivyXML := Dependencies.mongoMailboxExcludes,
testMongoMailbox := false,
testOptions in Test <+= testMongoMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)