diff --git a/.gitignore b/.gitignore index b80964477d..25d3fa2323 100755 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,7 @@ akka.sublime-workspace .multi-jvm _mb schoir.props -worker*.log \ No newline at end of file +worker*.log +mongoDB/ +redis/ +beanstalk/ diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index e1a603c971..c900bc6b70 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -254,7 +254,7 @@ akka { debug { # enable function of Actor.loggable(), which is to log any received message at - # DEBUG level + # DEBUG level, see “Testing Actor Systems” receive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 649b365beb..03f438ac5d 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess class BeanstalkBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new BeanstalkBasedMessageQueue(o) + case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -27,9 +27,9 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class BeanstalkBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = BeanstalkBasedMailboxExtension(owner.system) + private val settings = new BeanstalkMailboxSettings(owner.system, _config) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala deleted file mode 100644 index 36ab10393a..0000000000 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config) -} - -class BeanstalkMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.beanstalk.hostname") - val Port = getInt("akka.actor.mailbox.beanstalk.port") - val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS) - val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS) - val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS) - val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala new file mode 100644 index 0000000000..6cf8868a9c --- /dev/null +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala @@ -0,0 +1,26 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class BeanstalkMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "beanstalk" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") + val ReconnectWindow = Duration(getMilliseconds("reconnect-window"), MILLISECONDS) + val MessageSubmitDelay = Duration(getMilliseconds("message-submit-delay"), MILLISECONDS) + val MessageSubmitTimeout = Duration(getMilliseconds("message-submit-timeout"), MILLISECONDS) + val MessageTimeToLive = Duration(getMilliseconds("message-time-to-live"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 4eb370ab63..3b538fdf4c 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -5,9 +5,26 @@ object BeanstalkBasedMailboxSpec { Beanstalkd-dispatcher { mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType throughput = 1 + beanstalk { + hostname = "127.0.0.1" + port = 11400 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) { + + lazy val beanstalkd = new ProcessBuilder("beanstalkd", "-b", "beanstalk", "-l", "127.0.0.1", "-p", "11400").start() + + override def atStartup(): Unit = { + new java.io.File("beanstalk").mkdir() + beanstalkd + Thread.sleep(3000) + } + + override def atTermination(): Unit = beanstalkd.destroy() + +} + diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala similarity index 87% rename from akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala rename to akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index 8be117d89e..d750be17b8 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -16,16 +16,16 @@ import akka.config.ConfigurationException class FileBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMessageQueue(o) + case Some(o) ⇒ new FileBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMessageQueue") - private val settings = FileBasedMailboxExtension(owner.system) + private val settings = new FileBasedMailboxSettings(owner.system, _config) val queuePath = settings.QueuePath private val queue = try { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala deleted file mode 100644 index f7e6527499..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config) -} - -class FileBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val QueuePath = getString("akka.actor.mailbox.file-based.directory-path") - - val MaxItems = getInt("akka.actor.mailbox.file-based.max-items") - val MaxSize = getBytes("akka.actor.mailbox.file-based.max-size") - val MaxItemSize = getBytes("akka.actor.mailbox.file-based.max-item-size") - val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS) - val MaxJournalSize = getBytes("akka.actor.mailbox.file-based.max-journal-size") - val MaxMemorySize = getBytes("akka.actor.mailbox.file-based.max-memory-size") - val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow") - val MaxJournalSizeAbsolute = getBytes("akka.actor.mailbox.file-based.max-journal-size-absolute") - val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full") - val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal") - val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala new file mode 100644 index 0000000000..84d06ab571 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class FileBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "file-based" + + val config = initialize + + import config._ + + val QueuePath = getString("directory-path") + + val MaxItems = getInt("max-items") + val MaxSize = getBytes("max-size") + val MaxItemSize = getBytes("max-item-size") + val MaxAge = Duration(getMilliseconds("max-age"), MILLISECONDS) + val MaxJournalSize = getBytes("max-journal-size") + val MaxMemorySize = getBytes("max-memory-size") + val MaxJournalOverflow = getInt("max-journal-overflow") + val MaxJournalSizeAbsolute = getBytes("max-journal-size-absolute") + val DiscardOldWhenFull = getBoolean("discard-old-when-full") + val KeepJournal = getBoolean("keep-journal") + val SyncJournal = getBoolean("sync-journal") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 274bc36cc1..75a889ee26 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,12 +1,14 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory object FileBasedMailboxSpec { val config = """ File-dispatcher { mailbox-type = akka.actor.mailbox.FileBasedMailboxType throughput = 1 + file-based.directory-path = "file-based" } """ } @@ -14,8 +16,15 @@ object FileBasedMailboxSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { + val queuePath = new FileBasedMailboxSettings(system, system.settings.config.getConfig("File-dispatcher")).QueuePath + + "FileBasedMailboxSettings" must { + "read the file-based section" in { + queuePath must be("file-based") + } + } + def clean { - val queuePath = FileBasedMailboxExtension(system).QueuePath FileUtils.deleteDirectory(new java.io.File(queuePath)) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 77b932911d..6645e17e58 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -7,6 +7,8 @@ import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } import akka.dispatch.{ Envelope, MessageQueue } import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } +import com.typesafe.config.Config +import akka.actor.ActorSystem private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -50,3 +52,55 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ } +/** + * Conventional organization of durable mailbox settings: + * + * {{{ + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } + * }}} + * + * where name=“my-durable-mailbox” in this example. + */ +trait DurableMailboxSettings { + /** + * A reference to the enclosing actor system. + */ + def system: ActorSystem + + /** + * A reference to the config section which the user specified for this mailbox’s dispatcher. + */ + def userConfig: Config + + /** + * The extracted config section for this mailbox, which is the “name” + * section (if that exists), falling back to system defaults. Typical + * implementation looks like: + * + * {{{ + * val config = initialize + * }}} + */ + def config: Config + + /** + * Name of this mailbox type for purposes of configuration scoping. Reference + * defaults go into “akka.actor.mailbox.”. + */ + def name: String + + /** + * Obtain default extracted mailbox config section from userConfig and system. + */ + def initialize: Config = + if (userConfig.hasPath(name)) + userConfig.getConfig(name).withFallback(system.settings.config.getConfig("akka.actor.mailbox." + name)) + else system.settings.config.getConfig("akka.actor.mailbox." + name) +} + diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 444edfa72d..970e0d6f1d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -11,6 +11,9 @@ import akka.dispatch.Await import akka.testkit.AkkaSpec import akka.testkit.TestLatch import akka.util.duration._ +import java.io.InputStream +import scala.annotation.tailrec +import com.typesafe.config.Config object DurableMailboxSpecActorFactory { @@ -31,6 +34,26 @@ object DurableMailboxSpecActorFactory { abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) { import DurableMailboxSpecActorFactory._ + protected def streamMustContain(in: InputStream, words: String): Unit = { + val output = new Array[Byte](8192) + + def now = System.currentTimeMillis + + def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // don’t want parse errors + + @tailrec def read(end: Int = 0, start: Long = now): Int = + in.read(output, end, output.length - end) match { + case -1 ⇒ end + case x ⇒ + val next = end + x + if (string(next).contains(words) || now - start > 10000 || next == output.length) next + else read(next, start) + } + + val result = string(read()) + if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) + } + def createMailboxTestActor(id: String): ActorRef = system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher")) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 23de168370..58ca7228eb 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -21,7 +21,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MongoBasedMessageQueue(o) + case Some(o) ⇒ new MongoBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -37,14 +37,14 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) { +class MongoBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! private val dispatcher = owner.dispatcher - private val settings = MongoBasedMailboxExtension(owner.system) + private val settings = new MongoBasedMailboxSettings(owner.system, _config) val log = Logging(system, "MongoBasedMessageQueue") @@ -98,9 +98,8 @@ class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_ def hasMessages: Boolean = numberOfMessages > 0 private[akka] def connect() = { - require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey)) log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI) - val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match { + val _dbh = MongoConnection.fromURI(settings.MongoURI) match { case (conn, None, None) ⇒ { throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'") } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala deleted file mode 100644 index fac0ad9050..0000000000 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config) -} - -class MongoBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val UriConfigKey = "akka.actor.mailbox.mongodb.uri" - val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None - val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala new file mode 100644 index 0000000000..469865e002 --- /dev/null +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class MongoBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "mongodb" + + val config = initialize + + import config._ + + val MongoURI = getString("uri") + val WriteTimeout = Duration(config.getMilliseconds("timeout.write"), MILLISECONDS) + val ReadTimeout = Duration(config.getMilliseconds("timeout.read"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index 7001d8de99..0579215df0 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -14,6 +14,7 @@ object MongoBasedMailboxSpec { mongodb-dispatcher { mailbox-type = akka.actor.mailbox.MongoBasedMailboxType throughput = 1 + mongodb.uri = "mongodb://localhost:27123/akka.mailbox" } """ } @@ -23,9 +24,23 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail import com.mongodb.async._ - val mongo = MongoConnection("localhost", 27017)("akka") + lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start() + lazy val mongo = MongoConnection("localhost", 27123)("akka") - mongo.dropDatabase() { success ⇒ } + override def atStartup(): Unit = { + // start MongoDB daemon + new java.io.File("mongoDB").mkdir() + val in = mongod.getInputStream + + try { + streamMustContain(in, "waiting for connections on port") + mongo.dropDatabase() { success ⇒ } + } catch { + case e ⇒ mongod.destroy(); throw e + } + } + + override def atTermination(): Unit = mongod.destroy() } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index fbc5830a30..9203180e9f 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -19,14 +19,14 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new RedisBasedMessageQueue(o) + case Some(o) ⇒ new RedisBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class RedisBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = RedisBasedMailboxExtension(owner.system) + private val settings = new RedisBasedMailboxSettings(owner.system, _config) @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala deleted file mode 100644 index 629f08b145..0000000000 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.actor._ - -object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config) -} - -class RedisBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.redis.hostname") - val Port = getInt("akka.actor.mailbox.redis.port") -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala new file mode 100644 index 0000000000..e302b8dd50 --- /dev/null +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.actor.ActorSystem + +class RedisBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "redis" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 6e78d6b74a..8dd0d09ce4 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -5,9 +5,40 @@ object RedisBasedMailboxSpec { Redis-dispatcher { mailbox-type = akka.actor.mailbox.RedisBasedMailboxType throughput = 1 + redis { + hostname = "127.0.0.1" + port = 6479 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) { + + lazy val redisServer = new ProcessBuilder("redis-server", "-").start() + + override def atStartup(): Unit = { + new java.io.File("redis").mkdir() + + val out = redisServer.getOutputStream + + val config = """ + port 6479 + bind 127.0.0.1 + dir redis + """.getBytes("UTF-8") + + try { + out.write(config) + out.close() + + streamMustContain(redisServer.getInputStream, "ready to accept connections on port") + } catch { + case e ⇒ redisServer.destroy(); throw e + } + } + + override def atTermination(): Unit = redisServer.destroy() + +} diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 203a94b620..862d88d451 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -20,14 +20,14 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o) + case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, config) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class ZooKeeperBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = ZooKeeperBasedMailboxExtension(owner.system) + private val settings = new ZooKeeperBasedMailboxSettings(owner.system, _config) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala deleted file mode 100644 index 4f3dcfb42f..0000000000 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config) -} -class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses") - val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) - val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) - val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala new file mode 100644 index 0000000000..17fd8f3f25 --- /dev/null +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class ZooKeeperBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings { + + def name = "zookeeper" + + val config = initialize + + import config._ + + val ZkServerAddresses = getString("server-addresses") + val SessionTimeout = Duration(getMilliseconds("session-timeout"), MILLISECONDS) + val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) + val BlockingQueue = getBoolean("blocking-queue") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 9264fbccce..e4d735de30 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -5,12 +5,15 @@ import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher import akka.actor.ActorRef +import com.typesafe.config.ConfigFactory +import akka.util.duration._ object ZooKeeperBasedMailboxSpec { val config = """ ZooKeeper-dispatcher { mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType throughput = 1 + zookeeper.session-timeout = 30s } """ } @@ -21,6 +24,12 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" + "ZookeeperBasedMailboxSettings" must { + "read the right settings" in { + new ZooKeeperBasedMailboxSettings(system, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds) + } + } + var zkServer: ZkServer = _ override def atStartup() { diff --git a/build.sbt b/build.sbt index 13467e1654..07de39759d 100644 --- a/build.sbt +++ b/build.sbt @@ -5,3 +5,5 @@ (externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/") (description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM." + + //testMailbox in Global := true diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 905e345dec..a201895405 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -26,6 +26,7 @@ object AkkaBuild extends Build { id = "akka", base = file("."), settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( + testMailbox in GlobalScope := false, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), @@ -141,6 +142,8 @@ object AkkaBuild extends Build { // ) // ) + val testMailbox = SettingKey[Boolean]("test-mailbox") + lazy val mailboxes = Project( id = "akka-durable-mailboxes", base = file("akka-durable-mailboxes"), @@ -165,8 +168,7 @@ object AkkaBuild extends Build { dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.beanstalkMailbox, - testBeanstalkMailbox := false, - testOptions in Test <+= testBeanstalkMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -179,16 +181,13 @@ object AkkaBuild extends Build { ) ) - val testRedisMailbox = SettingKey[Boolean]("test-redis-mailbox") - lazy val redisMailbox = Project( id = "akka-redis-mailbox", base = file("akka-durable-mailboxes/akka-redis-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.redisMailbox, - testRedisMailbox := false, - testOptions in Test <+= testRedisMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -201,8 +200,6 @@ object AkkaBuild extends Build { ) ) - val testMongoMailbox = SettingKey[Boolean]("test-mongo-mailbox") - lazy val mongoMailbox = Project( id = "akka-mongo-mailbox", base = file("akka-durable-mailboxes/akka-mongo-mailbox"), @@ -210,8 +207,7 @@ object AkkaBuild extends Build { settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.mongoMailbox, ivyXML := Dependencies.mongoMailboxExcludes, - testMongoMailbox := false, - testOptions in Test <+= testMongoMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) )