From b3dd85f6dd50309073e58837d6edeedaa43f071f Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 23 Feb 2012 17:13:40 +0100 Subject: [PATCH 1/3] switch to dispatcher-scoped settings for durable mailboxes, see #1836 - also switch SBT settings to enable testing of durable mailboxes centrally (if so desired, just uncomment testMailbox line in build.sbt) - automatically start mongod, beanstalkd or redis-server when running the respective tests (assumes that the binaries are in PATH) - unify settings extraction from dispatcher config, sub-scoping by mailbox type name --- .gitignore | 5 +- akka-actor/src/main/resources/reference.conf | 2 +- .../actor/mailbox/BeanstalkBasedMailbox.scala | 6 +-- .../BeanstalkBasedMailboxExtension.scala | 28 ---------- .../BeanstalkBasedMailboxSettings.scala | 26 +++++++++ .../mailbox/BeanstalkBasedMailboxSpec.scala | 19 ++++++- ...edMailbox.scala => FileBasedMailbox.scala} | 6 +-- .../mailbox/FileBasedMailboxExtension.scala | 35 ------------ .../mailbox/FileBasedMailboxSettings.scala | 33 ++++++++++++ .../actor/mailbox/FileBasedMailboxSpec.scala | 11 +++- .../akka/actor/mailbox/DurableMailbox.scala | 54 +++++++++++++++++++ .../actor/mailbox/DurableMailboxSpec.scala | 23 ++++++++ .../actor/mailbox/MongoBasedMailbox.scala | 9 ++-- .../mailbox/MongoBasedMailboxExtension.scala | 26 --------- .../mailbox/MongoBasedMailboxSettings.scala | 23 ++++++++ .../actor/mailbox/MongoBasedMailboxSpec.scala | 19 ++++++- .../actor/mailbox/RedisBasedMailbox.scala | 6 +-- .../mailbox/RedisBasedMailboxExtension.scala | 21 -------- .../mailbox/RedisBasedMailboxSettings.scala | 19 +++++++ .../actor/mailbox/RedisBasedMailboxSpec.scala | 33 +++++++++++- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 6 +-- .../ZooKeeperBasedMailboxExtension.scala | 25 --------- .../ZooKeeperBasedMailboxSettings.scala | 24 +++++++++ .../mailbox/ZooKeeperBasedMailboxSpec.scala | 9 ++++ build.sbt | 2 + project/AkkaBuild.scala | 16 +++--- 26 files changed, 317 insertions(+), 169 deletions(-) delete mode 100644 akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala create mode 100644 akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala rename akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/{FiledBasedMailbox.scala => FileBasedMailbox.scala} (87%) delete mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala create mode 100644 akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala delete mode 100644 akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala create mode 100644 akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala delete mode 100644 akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala create mode 100644 akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala delete mode 100644 akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala create mode 100644 akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala diff --git a/.gitignore b/.gitignore index b80964477d..25d3fa2323 100755 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,7 @@ akka.sublime-workspace .multi-jvm _mb schoir.props -worker*.log \ No newline at end of file +worker*.log +mongoDB/ +redis/ +beanstalk/ diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index e1a603c971..c900bc6b70 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -254,7 +254,7 @@ akka { debug { # enable function of Actor.loggable(), which is to log any received message at - # DEBUG level + # DEBUG level, see “Testing Actor Systems” receive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 649b365beb..03f438ac5d 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess class BeanstalkBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new BeanstalkBasedMessageQueue(o) + case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -27,9 +27,9 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class BeanstalkBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = BeanstalkBasedMailboxExtension(owner.system) + private val settings = new BeanstalkMailboxSettings(owner.system, _config) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala deleted file mode 100644 index 36ab10393a..0000000000 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config) -} - -class BeanstalkMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.beanstalk.hostname") - val Port = getInt("akka.actor.mailbox.beanstalk.port") - val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS) - val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS) - val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS) - val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala new file mode 100644 index 0000000000..6cf8868a9c --- /dev/null +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class BeanstalkMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "beanstalk" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") + val ReconnectWindow = Duration(getMilliseconds("reconnect-window"), MILLISECONDS) + val MessageSubmitDelay = Duration(getMilliseconds("message-submit-delay"), MILLISECONDS) + val MessageSubmitTimeout = Duration(getMilliseconds("message-submit-timeout"), MILLISECONDS) + val MessageTimeToLive = Duration(getMilliseconds("message-time-to-live"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 4eb370ab63..3b538fdf4c 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -5,9 +5,26 @@ object BeanstalkBasedMailboxSpec { Beanstalkd-dispatcher { mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType throughput = 1 + beanstalk { + hostname = "127.0.0.1" + port = 11400 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) { + + lazy val beanstalkd = new ProcessBuilder("beanstalkd", "-b", "beanstalk", "-l", "127.0.0.1", "-p", "11400").start() + + override def atStartup(): Unit = { + new java.io.File("beanstalk").mkdir() + beanstalkd + Thread.sleep(3000) + } + + override def atTermination(): Unit = beanstalkd.destroy() + +} + diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala similarity index 87% rename from akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala rename to akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index 8be117d89e..d750be17b8 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -16,16 +16,16 @@ import akka.config.ConfigurationException class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMessageQueue(o) + case Some(o) ⇒ new FileBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMessageQueue") - private val settings = FileBasedMailboxExtension(owner.system) + private val settings = new FileBasedMailboxSettings(owner.system, _config) val queuePath = settings.QueuePath private val queue = try { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala deleted file mode 100644 index f7e6527499..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config) -} - -class FileBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val QueuePath = getString("akka.actor.mailbox.file-based.directory-path") - - val MaxItems = getInt("akka.actor.mailbox.file-based.max-items") - val MaxSize = getBytes("akka.actor.mailbox.file-based.max-size") - val MaxItemSize = getBytes("akka.actor.mailbox.file-based.max-item-size") - val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS) - val MaxJournalSize = getBytes("akka.actor.mailbox.file-based.max-journal-size") - val MaxMemorySize = getBytes("akka.actor.mailbox.file-based.max-memory-size") - val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow") - val MaxJournalSizeAbsolute = getBytes("akka.actor.mailbox.file-based.max-journal-size-absolute") - val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full") - val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal") - val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala new file mode 100644 index 0000000000..84d06ab571 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class FileBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "file-based" + + val config = initialize + + import config._ + + val QueuePath = getString("directory-path") + + val MaxItems = getInt("max-items") + val MaxSize = getBytes("max-size") + val MaxItemSize = getBytes("max-item-size") + val MaxAge = Duration(getMilliseconds("max-age"), MILLISECONDS) + val MaxJournalSize = getBytes("max-journal-size") + val MaxMemorySize = getBytes("max-memory-size") + val MaxJournalOverflow = getInt("max-journal-overflow") + val MaxJournalSizeAbsolute = getBytes("max-journal-size-absolute") + val DiscardOldWhenFull = getBoolean("discard-old-when-full") + val KeepJournal = getBoolean("keep-journal") + val SyncJournal = getBoolean("sync-journal") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 274bc36cc1..75a889ee26 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,12 +1,14 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory object FileBasedMailboxSpec { val config = """ File-dispatcher { mailbox-type = akka.actor.mailbox.FileBasedMailboxType throughput = 1 + file-based.directory-path = "file-based" } """ } @@ -14,8 +16,15 @@ object FileBasedMailboxSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { + val queuePath = new FileBasedMailboxSettings(system, system.settings.config.getConfig("File-dispatcher")).QueuePath + + "FileBasedMailboxSettings" must { + "read the file-based section" in { + queuePath must be("file-based") + } + } + def clean { - val queuePath = FileBasedMailboxExtension(system).QueuePath FileUtils.deleteDirectory(new java.io.File(queuePath)) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 77b932911d..6645e17e58 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -7,6 +7,8 @@ import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } import akka.dispatch.{ Envelope, MessageQueue } import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } +import com.typesafe.config.Config +import akka.actor.ActorSystem private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -50,3 +52,55 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ } +/** + * Conventional organization of durable mailbox settings: + * + * {{{ + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } + * }}} + * + * where name=“my-durable-mailbox” in this example. + */ +trait DurableMailboxSettings { + /** + * A reference to the enclosing actor system. + */ + def system: ActorSystem + + /** + * A reference to the config section which the user specified for this mailbox’s dispatcher. + */ + def userConfig: Config + + /** + * The extracted config section for this mailbox, which is the “name” + * section (if that exists), falling back to system defaults. Typical + * implementation looks like: + * + * {{{ + * val config = initialize + * }}} + */ + def config: Config + + /** + * Name of this mailbox type for purposes of configuration scoping. Reference + * defaults go into “akka.actor.mailbox.”. + */ + def name: String + + /** + * Obtain default extracted mailbox config section from userConfig and system. + */ + def initialize: Config = + if (userConfig.hasPath(name)) + userConfig.getConfig(name).withFallback(system.settings.config.getConfig("akka.actor.mailbox." + name)) + else system.settings.config.getConfig("akka.actor.mailbox." + name) +} + diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 444edfa72d..970e0d6f1d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -11,6 +11,9 @@ import akka.dispatch.Await import akka.testkit.AkkaSpec import akka.testkit.TestLatch import akka.util.duration._ +import java.io.InputStream +import scala.annotation.tailrec +import com.typesafe.config.Config object DurableMailboxSpecActorFactory { @@ -31,6 +34,26 @@ object DurableMailboxSpecActorFactory { abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) { import DurableMailboxSpecActorFactory._ + protected def streamMustContain(in: InputStream, words: String): Unit = { + val output = new Array[Byte](8192) + + def now = System.currentTimeMillis + + def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // don’t want parse errors + + @tailrec def read(end: Int = 0, start: Long = now): Int = + in.read(output, end, output.length - end) match { + case -1 ⇒ end + case x ⇒ + val next = end + x + if (string(next).contains(words) || now - start > 10000 || next == output.length) next + else read(next, start) + } + + val result = string(read()) + if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) + } + def createMailboxTestActor(id: String): ActorRef = system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher")) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 23de168370..58ca7228eb 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -21,7 +21,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MongoBasedMessageQueue(o) + case Some(o) ⇒ new MongoBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -37,14 +37,14 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) { +class MongoBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! private val dispatcher = owner.dispatcher - private val settings = MongoBasedMailboxExtension(owner.system) + private val settings = new MongoBasedMailboxSettings(owner.system, _config) val log = Logging(system, "MongoBasedMessageQueue") @@ -98,9 +98,8 @@ class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_ def hasMessages: Boolean = numberOfMessages > 0 private[akka] def connect() = { - require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey)) log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI) - val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match { + val _dbh = MongoConnection.fromURI(settings.MongoURI) match { case (conn, None, None) ⇒ { throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'") } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala deleted file mode 100644 index fac0ad9050..0000000000 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config) -} - -class MongoBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val UriConfigKey = "akka.actor.mailbox.mongodb.uri" - val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None - val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala new file mode 100644 index 0000000000..469865e002 --- /dev/null +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class MongoBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "mongodb" + + val config = initialize + + import config._ + + val MongoURI = getString("uri") + val WriteTimeout = Duration(config.getMilliseconds("timeout.write"), MILLISECONDS) + val ReadTimeout = Duration(config.getMilliseconds("timeout.read"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index 7001d8de99..0579215df0 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -14,6 +14,7 @@ object MongoBasedMailboxSpec { mongodb-dispatcher { mailbox-type = akka.actor.mailbox.MongoBasedMailboxType throughput = 1 + mongodb.uri = "mongodb://localhost:27123/akka.mailbox" } """ } @@ -23,9 +24,23 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail import com.mongodb.async._ - val mongo = MongoConnection("localhost", 27017)("akka") + lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start() + lazy val mongo = MongoConnection("localhost", 27123)("akka") - mongo.dropDatabase() { success ⇒ } + override def atStartup(): Unit = { + // start MongoDB daemon + new java.io.File("mongoDB").mkdir() + val in = mongod.getInputStream + + try { + streamMustContain(in, "waiting for connections on port") + mongo.dropDatabase() { success ⇒ } + } catch { + case e ⇒ mongod.destroy(); throw e + } + } + + override def atTermination(): Unit = mongod.destroy() } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index fbc5830a30..9203180e9f 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -19,14 +19,14 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new RedisBasedMessageQueue(o) + case Some(o) ⇒ new RedisBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class RedisBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = RedisBasedMailboxExtension(owner.system) + private val settings = new RedisBasedMailboxSettings(owner.system, _config) @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala deleted file mode 100644 index 629f08b145..0000000000 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.actor._ - -object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config) -} - -class RedisBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.redis.hostname") - val Port = getInt("akka.actor.mailbox.redis.port") -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala new file mode 100644 index 0000000000..e302b8dd50 --- /dev/null +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.actor.ActorSystem + +class RedisBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "redis" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 6e78d6b74a..8dd0d09ce4 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -5,9 +5,40 @@ object RedisBasedMailboxSpec { Redis-dispatcher { mailbox-type = akka.actor.mailbox.RedisBasedMailboxType throughput = 1 + redis { + hostname = "127.0.0.1" + port = 6479 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) { + + lazy val redisServer = new ProcessBuilder("redis-server", "-").start() + + override def atStartup(): Unit = { + new java.io.File("redis").mkdir() + + val out = redisServer.getOutputStream + + val config = """ + port 6479 + bind 127.0.0.1 + dir redis + """.getBytes("UTF-8") + + try { + out.write(config) + out.close() + + streamMustContain(redisServer.getInputStream, "ready to accept connections on port") + } catch { + case e ⇒ redisServer.destroy(); throw e + } + } + + override def atTermination(): Unit = redisServer.destroy() + +} diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 203a94b620..862d88d451 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -20,14 +20,14 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o) + case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class ZooKeeperBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = ZooKeeperBasedMailboxExtension(owner.system) + private val settings = new ZooKeeperBasedMailboxSettings(owner.system, _config) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala deleted file mode 100644 index 4f3dcfb42f..0000000000 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config) -} -class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses") - val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) - val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) - val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala new file mode 100644 index 0000000000..17fd8f3f25 --- /dev/null +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class ZooKeeperBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "zookeeper" + + val config = initialize + + import config._ + + val ZkServerAddresses = getString("server-addresses") + val SessionTimeout = Duration(getMilliseconds("session-timeout"), MILLISECONDS) + val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) + val BlockingQueue = getBoolean("blocking-queue") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 9264fbccce..e4d735de30 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -5,12 +5,15 @@ import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher import akka.actor.ActorRef +import com.typesafe.config.ConfigFactory +import akka.util.duration._ object ZooKeeperBasedMailboxSpec { val config = """ ZooKeeper-dispatcher { mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType throughput = 1 + zookeeper.session-timeout = 30s } """ } @@ -21,6 +24,12 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" + "ZookeeperBasedMailboxSettings" must { + "read the right settings" in { + new ZooKeeperBasedMailboxSettings(system, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds) + } + } + var zkServer: ZkServer = _ override def atStartup() { diff --git a/build.sbt b/build.sbt index 13467e1654..07de39759d 100644 --- a/build.sbt +++ b/build.sbt @@ -5,3 +5,5 @@ (externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/") (description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM." + + //testMailbox in Global := true diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 905e345dec..a201895405 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -26,6 +26,7 @@ object AkkaBuild extends Build { id = "akka", base = file("."), settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( + testMailbox in GlobalScope := false, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), @@ -141,6 +142,8 @@ object AkkaBuild extends Build { // ) // ) + val testMailbox = SettingKey[Boolean]("test-mailbox") + lazy val mailboxes = Project( id = "akka-durable-mailboxes", base = file("akka-durable-mailboxes"), @@ -165,8 +168,7 @@ object AkkaBuild extends Build { dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.beanstalkMailbox, - testBeanstalkMailbox := false, - testOptions in Test <+= testBeanstalkMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -179,16 +181,13 @@ object AkkaBuild extends Build { ) ) - val testRedisMailbox = SettingKey[Boolean]("test-redis-mailbox") - lazy val redisMailbox = Project( id = "akka-redis-mailbox", base = file("akka-durable-mailboxes/akka-redis-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.redisMailbox, - testRedisMailbox := false, - testOptions in Test <+= testRedisMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -201,8 +200,6 @@ object AkkaBuild extends Build { ) ) - val testMongoMailbox = SettingKey[Boolean]("test-mongo-mailbox") - lazy val mongoMailbox = Project( id = "akka-mongo-mailbox", base = file("akka-durable-mailboxes/akka-mongo-mailbox"), @@ -210,8 +207,7 @@ object AkkaBuild extends Build { settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.mongoMailbox, ivyXML := Dependencies.mongoMailboxExcludes, - testMongoMailbox := false, - testOptions in Test <+= testMongoMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) From abd7fb40cafb10e9bc8c30590db2a9914bf16513 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Feb 2012 19:57:05 +0100 Subject: [PATCH 2/3] take testMaibox setting from akka.testMailbox system property also improve doc comment in reference.conf --- akka-actor/src/main/resources/reference.conf | 3 ++- build.sbt | 1 - project/AkkaBuild.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index c900bc6b70..344f3c1126 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -254,7 +254,8 @@ akka { debug { # enable function of Actor.loggable(), which is to log any received message at - # DEBUG level, see “Testing Actor Systems” + # DEBUG level, see the “Testing Actor Systems” section of the Akka Documentation + # at http://akka.io/docs receive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) diff --git a/build.sbt b/build.sbt index 07de39759d..a2be7bb5c3 100644 --- a/build.sbt +++ b/build.sbt @@ -6,4 +6,3 @@ (description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM." - //testMailbox in Global := true diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index a201895405..e7d3a071c7 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -26,7 +26,7 @@ object AkkaBuild extends Build { id = "akka", base = file("."), settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( - testMailbox in GlobalScope := false, + testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), From eaee16c7d325288489683329b1a935e483a062e1 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 26 Feb 2012 21:26:25 +0100 Subject: [PATCH 3/3] include system.settings when constructing MailboxType, see #1864 - necessary to not have to construct one Settings object per MessageQueue - added system.settings to DispatcherPrerequisites --- .../test/scala/akka/dispatch/MailboxConfigSpec.scala | 3 ++- .../scala/akka/dispatch/PriorityDispatcherSpec.scala | 5 +++-- akka-actor/src/main/resources/reference.conf | 4 ++-- akka-actor/src/main/scala/akka/actor/ActorSystem.scala | 2 +- .../main/scala/akka/dispatch/AbstractDispatcher.scala | 8 ++++---- .../src/main/scala/akka/dispatch/Dispatchers.scala | 4 +++- akka-actor/src/main/scala/akka/dispatch/Mailbox.scala | 5 +++-- .../akka/docs/dispatcher/DispatcherDocTestBase.java | 2 +- .../code/akka/docs/dispatcher/DispatcherDocSpec.scala | 3 ++- .../akka/actor/mailbox/BeanstalkBasedMailbox.scala | 9 +++++---- .../actor/mailbox/BeanstalkBasedMailboxSettings.scala | 3 ++- .../scala/akka/actor/mailbox/FileBasedMailbox.scala | 9 +++++---- .../akka/actor/mailbox/FileBasedMailboxSettings.scala | 3 ++- .../akka/actor/mailbox/FileBasedMailboxSpec.scala | 2 +- .../main/scala/akka/actor/mailbox/DurableMailbox.scala | 6 +++--- .../scala/akka/actor/mailbox/MongoBasedMailbox.scala | 10 +++++----- .../akka/actor/mailbox/MongoBasedMailboxSettings.scala | 3 ++- .../scala/akka/actor/mailbox/RedisBasedMailbox.scala | 10 +++++----- .../akka/actor/mailbox/RedisBasedMailboxSettings.scala | 3 ++- .../akka/actor/mailbox/ZooKeeperBasedMailbox.scala | 9 +++++---- .../actor/mailbox/ZooKeeperBasedMailboxSettings.scala | 3 ++- .../akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala | 2 +- 22 files changed, 61 insertions(+), 47 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index f4d355333a..8759f1aad9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -8,6 +8,7 @@ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef } import com.typesafe.config.Config +import akka.actor.ActorSystem @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -156,7 +157,7 @@ object CustomMailboxSpec { } """ - class MyMailboxType(config: Config) extends MailboxType { + class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { override def create(owner: Option[ActorContext]) = owner match { case Some(o) ⇒ new MyMailbox(o) case None ⇒ throw new Exception("no mailbox owner given") diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 855c4f6965..a9855fef7d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -6,6 +6,7 @@ import akka.pattern.ask import akka.util.duration._ import akka.testkit.DefaultTimeout import com.typesafe.config.Config +import akka.actor.ActorSystem object PriorityDispatcherSpec { val config = """ @@ -17,12 +18,12 @@ object PriorityDispatcherSpec { } """ - class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({ + class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({ case i: Int ⇒ i //Reverse order case 'Result ⇒ Int.MaxValue }: Any ⇒ Int)) - class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({ + class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedPriorityMailbox(PriorityGenerator({ case i: Int ⇒ i //Reverse order case 'Result ⇒ Int.MaxValue }: Any ⇒ Int), 1000, 10 seconds) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 344f3c1126..a0306b806c 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -242,8 +242,8 @@ akka { mailbox-push-timeout-time = 10s # FQCN of the MailboxType, if not specified the default bounded or unbounded - # mailbox is used. The Class of the FQCN must have a constructor with a - # com.typesafe.config.Config parameter. + # mailbox is used. The Class of the FQCN must have a constructor with + # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "" # For BalancingDispatcher: If the balancing dispatcher should attempt to diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 72c5d4a13f..de1689e730 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -481,7 +481,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def locker: Locker = provider.locker val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess)) + threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 631e84ece7..e8389c69fc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -384,17 +384,17 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit config.getString("mailbox-type") match { case "" ⇒ if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox() - else new BoundedMailbox(config) + else new BoundedMailbox(prerequisites.settings, config) case "unbounded" ⇒ UnboundedMailbox() - case "bounded" ⇒ new BoundedMailbox(config) + case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) case fqcn ⇒ - val args = Seq(classOf[Config] -> config) + val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( ("Cannot instantiate MailboxType [%s], defined in [%s], " + - "make sure it has constructor with a [com.typesafe.config.Config] parameter") + "make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters") .format(fqcn, config.getString("id")), exception) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 5f4528146d..93d44e007d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -22,6 +22,7 @@ trait DispatcherPrerequisites { def deadLetterMailbox: Mailbox def scheduler: Scheduler def dynamicAccess: DynamicAccess + def settings: ActorSystem.Settings } case class DefaultDispatcherPrerequisites( @@ -29,7 +30,8 @@ case class DefaultDispatcherPrerequisites( val eventStream: EventStream, val deadLetterMailbox: Mailbox, val scheduler: Scheduler, - val dynamicAccess: DynamicAccess) extends DispatcherPrerequisites + val dynamicAccess: DynamicAccess, + val settings: ActorSystem.Settings) extends DispatcherPrerequisites object Dispatchers { /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 6ae7f3ddc7..881d036f05 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.event.Logging.Error import akka.actor.ActorContext import com.typesafe.config.Config +import akka.actor.ActorSystem class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -357,7 +358,7 @@ trait MailboxType { */ case class UnboundedMailbox() extends MailboxType { - def this(config: Config) = this() + def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorContext]): MessageQueue = new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { @@ -367,7 +368,7 @@ case class UnboundedMailbox() extends MailboxType { case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { - def this(config: Config) = this(config.getInt("mailbox-capacity"), + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)) if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 1aaa76ee11..1616e349bc 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -124,7 +124,7 @@ public class DispatcherDocTestBase { //#prio-mailbox public static class PrioMailbox extends UnboundedPriorityMailbox { - public PrioMailbox(Config config) { // needed for reflective instantiation + public PrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important @Override public int gen(Object message) { diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 6717ad96cf..bb3831ea4a 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -15,6 +15,7 @@ import akka.actor.PoisonPill import akka.dispatch.MessageDispatcherConfigurator import akka.dispatch.MessageDispatcher import akka.dispatch.DispatcherPrerequisites +import akka.actor.ActorSystem object DispatcherDocSpec { val config = """ @@ -113,7 +114,7 @@ object DispatcherDocSpec { import com.typesafe.config.Config // We create a new Priority dispatcher and seed it with the priority generator - class PrioMailbox(config: Config) extends UnboundedPriorityMailbox( + class PrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox( PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 03f438ac5d..35a09303fa 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -14,12 +14,14 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} -class BeanstalkBasedMailboxType(config: Config) extends MailboxType { +class BeanstalkBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new BeanstalkMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, config) + case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -27,9 +29,8 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class BeanstalkBasedMessageQueue(_owner: ActorContext, val settings: BeanstalkMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = new BeanstalkMailboxSettings(owner.system, _config) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala index 6cf8868a9c..0450b2c172 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala @@ -8,7 +8,8 @@ import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.ActorSystem -class BeanstalkMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { +class BeanstalkMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { def name = "beanstalk" diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index d750be17b8..343217ba12 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -13,19 +13,20 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException +import akka.actor.ActorSystem -class FileBasedMailboxType(config: Config) extends MailboxType { +class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new FileBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMessageQueue(o, config) + case Some(o) ⇒ new FileBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMessageQueue") - private val settings = new FileBasedMailboxSettings(owner.system, _config) val queuePath = settings.QueuePath private val queue = try { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala index 84d06ab571..6511bf9e00 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -8,7 +8,8 @@ import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.ActorSystem -class FileBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { +class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { def name = "file-based" diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 75a889ee26..5d428e0776 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -16,7 +16,7 @@ object FileBasedMailboxSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { - val queuePath = new FileBasedMailboxSettings(system, system.settings.config.getConfig("File-dispatcher")).QueuePath + val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath "FileBasedMailboxSettings" must { "read the file-based section" in { diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 6645e17e58..41ec6d7307 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -71,7 +71,7 @@ trait DurableMailboxSettings { /** * A reference to the enclosing actor system. */ - def system: ActorSystem + def systemSettings: ActorSystem.Settings /** * A reference to the config section which the user specified for this mailbox’s dispatcher. @@ -100,7 +100,7 @@ trait DurableMailboxSettings { */ def initialize: Config = if (userConfig.hasPath(name)) - userConfig.getConfig(name).withFallback(system.settings.config.getConfig("akka.actor.mailbox." + name)) - else system.settings.config.getConfig("akka.actor.mailbox." + name) + userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name)) + else systemSettings.config.getConfig("akka.actor.mailbox." + name) } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 58ca7228eb..fa3b0b0c87 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -16,12 +16,14 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class MongoBasedMailboxException(message: String) extends AkkaException(message) -class MongoBasedMailboxType(config: Config) extends MailboxType { +class MongoBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new MongoBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MongoBasedMessageQueue(o, config) + case Some(o) ⇒ new MongoBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -37,15 +39,13 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) { +class MongoBasedMessageQueue(_owner: ActorContext, val settings: MongoBasedMailboxSettings) extends DurableMessageQueue(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! private val dispatcher = owner.dispatcher - private val settings = new MongoBasedMailboxSettings(owner.system, _config) - val log = Logging(system, "MongoBasedMessageQueue") @volatile diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala index 469865e002..3ed62c2010 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala @@ -8,7 +8,8 @@ import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.ActorSystem -class MongoBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { +class MongoBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { def name = "mongodb" diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 9203180e9f..236c8fb6f3 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -14,19 +14,19 @@ import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class RedisBasedMailboxException(message: String) extends AkkaException(message) -class RedisBasedMailboxType(config: Config) extends MailboxType { +class RedisBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new RedisBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new RedisBasedMessageQueue(o, config) + case Some(o) ⇒ new RedisBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class RedisBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - - private val settings = new RedisBasedMailboxSettings(owner.system, _config) +class RedisBasedMessageQueue(_owner: ActorContext, val settings: RedisBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala index e302b8dd50..a628216fb1 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala @@ -6,7 +6,8 @@ package akka.actor.mailbox import com.typesafe.config.Config import akka.actor.ActorSystem -class RedisBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { +class RedisBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { def name = "redis" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 862d88d451..e54423d0e0 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -15,19 +15,20 @@ import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) -class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { +class ZooKeeperBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new ZooKeeperBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, config) + case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class ZooKeeperBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class ZooKeeperBasedMessageQueue(_owner: ActorContext, val settings: ZooKeeperBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = new ZooKeeperBasedMailboxSettings(owner.system, _config) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala index 17fd8f3f25..58cd67c3c1 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala @@ -8,7 +8,8 @@ import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.ActorSystem -class ZooKeeperBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { +class ZooKeeperBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { def name = "zookeeper" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index e4d735de30..a2d9028bfe 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -26,7 +26,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe "ZookeeperBasedMailboxSettings" must { "read the right settings" in { - new ZooKeeperBasedMailboxSettings(system, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds) + new ZooKeeperBasedMailboxSettings(system.settings, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds) } }