diff --git a/.gitignore b/.gitignore index b80964477d..25d3fa2323 100755 --- a/.gitignore +++ b/.gitignore @@ -62,4 +62,7 @@ akka.sublime-workspace .multi-jvm _mb schoir.props -worker*.log \ No newline at end of file +worker*.log +mongoDB/ +redis/ +beanstalk/ diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index f4d355333a..8759f1aad9 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -8,6 +8,7 @@ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef } import com.typesafe.config.Config +import akka.actor.ActorSystem @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -156,7 +157,7 @@ object CustomMailboxSpec { } """ - class MyMailboxType(config: Config) extends MailboxType { + class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { override def create(owner: Option[ActorContext]) = owner match { case Some(o) ⇒ new MyMailbox(o) case None ⇒ throw new Exception("no mailbox owner given") diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 855c4f6965..a9855fef7d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -6,6 +6,7 @@ import akka.pattern.ask import akka.util.duration._ import akka.testkit.DefaultTimeout import com.typesafe.config.Config +import akka.actor.ActorSystem object PriorityDispatcherSpec { val config = """ @@ -17,12 +18,12 @@ object PriorityDispatcherSpec { } """ - class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({ + class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({ case i: Int ⇒ i //Reverse order case 'Result ⇒ Int.MaxValue }: Any ⇒ Int)) - class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({ + class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedPriorityMailbox(PriorityGenerator({ case i: Int ⇒ i //Reverse order case 'Result ⇒ Int.MaxValue }: Any ⇒ Int), 1000, 10 seconds) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index e1a603c971..a0306b806c 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -242,8 +242,8 @@ akka { mailbox-push-timeout-time = 10s # FQCN of the MailboxType, if not specified the default bounded or unbounded - # mailbox is used. The Class of the FQCN must have a constructor with a - # com.typesafe.config.Config parameter. + # mailbox is used. The Class of the FQCN must have a constructor with + # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "" # For BalancingDispatcher: If the balancing dispatcher should attempt to @@ -254,7 +254,8 @@ akka { debug { # enable function of Actor.loggable(), which is to log any received message at - # DEBUG level + # DEBUG level, see the “Testing Actor Systems” section of the Akka Documentation + # at http://akka.io/docs receive = off # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 72c5d4a13f..de1689e730 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -481,7 +481,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def locker: Locker = provider.locker val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( - threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess)) + threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 631e84ece7..e8389c69fc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -384,17 +384,17 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit config.getString("mailbox-type") match { case "" ⇒ if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox() - else new BoundedMailbox(config) + else new BoundedMailbox(prerequisites.settings, config) case "unbounded" ⇒ UnboundedMailbox() - case "bounded" ⇒ new BoundedMailbox(config) + case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config) case fqcn ⇒ - val args = Seq(classOf[Config] -> config) + val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config) prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( ("Cannot instantiate MailboxType [%s], defined in [%s], " + - "make sure it has constructor with a [com.typesafe.config.Config] parameter") + "make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters") .format(fqcn, config.getString("id")), exception) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 5f4528146d..93d44e007d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -22,6 +22,7 @@ trait DispatcherPrerequisites { def deadLetterMailbox: Mailbox def scheduler: Scheduler def dynamicAccess: DynamicAccess + def settings: ActorSystem.Settings } case class DefaultDispatcherPrerequisites( @@ -29,7 +30,8 @@ case class DefaultDispatcherPrerequisites( val eventStream: EventStream, val deadLetterMailbox: Mailbox, val scheduler: Scheduler, - val dynamicAccess: DynamicAccess) extends DispatcherPrerequisites + val dynamicAccess: DynamicAccess, + val settings: ActorSystem.Settings) extends DispatcherPrerequisites object Dispatchers { /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 29260a9f8d..7698d3f2f1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.event.Logging.Error import akka.actor.ActorContext import com.typesafe.config.Config +import akka.actor.ActorSystem class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -357,7 +358,7 @@ trait MailboxType { */ case class UnboundedMailbox() extends MailboxType { - def this(config: Config) = this() + def this(settings: ActorSystem.Settings, config: Config) = this() final override def create(owner: Option[ActorContext]): MessageQueue = new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics { @@ -367,7 +368,7 @@ case class UnboundedMailbox() extends MailboxType { case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { - def this(config: Config) = this(config.getInt("mailbox-capacity"), + def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)) if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") diff --git a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java index 14291fc27e..5c3bc7b8ad 100644 --- a/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/akka/docs/dispatcher/DispatcherDocTestBase.java @@ -124,7 +124,7 @@ public class DispatcherDocTestBase { //#prio-mailbox public static class MyPrioMailbox extends UnboundedPriorityMailbox { - public MyPrioMailbox(Config config) { // needed for reflective instantiation + public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation // Create a new PriorityGenerator, lower prio means more important super(new PriorityGenerator() { @Override diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index e602c2c77c..8940cce616 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -152,7 +152,7 @@ Mailbox configuration examples How to create a PriorityMailbox: -.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherTestBase.java#prio-mailbox +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-mailbox And then add it to the configuration: @@ -160,4 +160,14 @@ And then add it to the configuration: And then an example on how you would use it: -.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher \ No newline at end of file +.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher + +.. note:: + + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. + diff --git a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala index 32ecfa1705..1452d72088 100644 --- a/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala @@ -9,7 +9,7 @@ import akka.testkit.AkkaSpec import akka.event.Logging import akka.event.LoggingAdapter import akka.util.duration._ -import akka.actor.{ Props, Actor, PoisonPill } +import akka.actor.{ Props, Actor, PoisonPill, ActorSystem } object DispatcherDocSpec { val config = """ @@ -110,7 +110,7 @@ object DispatcherDocSpec { // We inherit, in this case, from UnboundedPriorityMailbox // and seed it with the priority generator - class MyPrioMailbox(config: Config) extends UnboundedPriorityMailbox( + class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox( // Create a new PriorityGenerator, lower prio means more important PriorityGenerator { // 'highpriority messages should be treated first if possible @@ -146,7 +146,7 @@ object DispatcherDocSpec { } // This constructor signature must exist, it will be called by Akka - def this(config: Config) = this() + def this(settings: ActorSystem.Settings, config: Config) = this() // The create method is called to create the MessageQueue final override def create(owner: Option[ActorContext]): MessageQueue = diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index 18169fb2ef..1dd050684e 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -169,4 +169,14 @@ An example is worth a thousand quacks: .. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example -And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. \ No newline at end of file +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. + +.. note:: + + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. + diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 649b365beb..35a09303fa 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -14,12 +14,14 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} -class BeanstalkBasedMailboxType(config: Config) extends MailboxType { +class BeanstalkBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new BeanstalkMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new BeanstalkBasedMessageQueue(o) + case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -27,9 +29,8 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType { /** * @author Jonas Bonér */ -class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class BeanstalkBasedMessageQueue(_owner: ActorContext, val settings: BeanstalkMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala deleted file mode 100644 index 36ab10393a..0000000000 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config) -} - -class BeanstalkMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.beanstalk.hostname") - val Port = getInt("akka.actor.mailbox.beanstalk.port") - val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS) - val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS) - val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS) - val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala new file mode 100644 index 0000000000..0450b2c172 --- /dev/null +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxSettings.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class BeanstalkMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { + + def name = "beanstalk" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") + val ReconnectWindow = Duration(getMilliseconds("reconnect-window"), MILLISECONDS) + val MessageSubmitDelay = Duration(getMilliseconds("message-submit-delay"), MILLISECONDS) + val MessageSubmitTimeout = Duration(getMilliseconds("message-submit-timeout"), MILLISECONDS) + val MessageTimeToLive = Duration(getMilliseconds("message-time-to-live"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 4eb370ab63..3b538fdf4c 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -5,9 +5,26 @@ object BeanstalkBasedMailboxSpec { Beanstalkd-dispatcher { mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType throughput = 1 + beanstalk { + hostname = "127.0.0.1" + port = 11400 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) { + + lazy val beanstalkd = new ProcessBuilder("beanstalkd", "-b", "beanstalk", "-l", "127.0.0.1", "-p", "11400").start() + + override def atStartup(): Unit = { + new java.io.File("beanstalk").mkdir() + beanstalkd + Thread.sleep(3000) + } + + override def atTermination(): Unit = beanstalkd.destroy() + +} + diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala similarity index 81% rename from akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala rename to akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala index 8be117d89e..343217ba12 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailbox.scala @@ -13,19 +13,20 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException +import akka.actor.ActorSystem -class FileBasedMailboxType(config: Config) extends MailboxType { +class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new FileBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new FileBasedMessageQueue(o) + case Some(o) ⇒ new FileBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMessageQueue") - private val settings = FileBasedMailboxExtension(owner.system) val queuePath = settings.QueuePath private val queue = try { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala deleted file mode 100644 index f7e6527499..0000000000 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config) -} - -class FileBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val QueuePath = getString("akka.actor.mailbox.file-based.directory-path") - - val MaxItems = getInt("akka.actor.mailbox.file-based.max-items") - val MaxSize = getBytes("akka.actor.mailbox.file-based.max-size") - val MaxItemSize = getBytes("akka.actor.mailbox.file-based.max-item-size") - val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS) - val MaxJournalSize = getBytes("akka.actor.mailbox.file-based.max-journal-size") - val MaxMemorySize = getBytes("akka.actor.mailbox.file-based.max-memory-size") - val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow") - val MaxJournalSizeAbsolute = getBytes("akka.actor.mailbox.file-based.max-journal-size-absolute") - val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full") - val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal") - val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala new file mode 100644 index 0000000000..6511bf9e00 --- /dev/null +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxSettings.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { + + def name = "file-based" + + val config = initialize + + import config._ + + val QueuePath = getString("directory-path") + + val MaxItems = getInt("max-items") + val MaxSize = getBytes("max-size") + val MaxItemSize = getBytes("max-item-size") + val MaxAge = Duration(getMilliseconds("max-age"), MILLISECONDS) + val MaxJournalSize = getBytes("max-journal-size") + val MaxMemorySize = getBytes("max-memory-size") + val MaxJournalOverflow = getInt("max-journal-overflow") + val MaxJournalSizeAbsolute = getBytes("max-journal-size-absolute") + val DiscardOldWhenFull = getBoolean("discard-old-when-full") + val KeepJournal = getBoolean("keep-journal") + val SyncJournal = getBoolean("sync-journal") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 274bc36cc1..5d428e0776 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,12 +1,14 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory object FileBasedMailboxSpec { val config = """ File-dispatcher { mailbox-type = akka.actor.mailbox.FileBasedMailboxType throughput = 1 + file-based.directory-path = "file-based" } """ } @@ -14,8 +16,15 @@ object FileBasedMailboxSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) { + val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath + + "FileBasedMailboxSettings" must { + "read the file-based section" in { + queuePath must be("file-based") + } + } + def clean { - val queuePath = FileBasedMailboxExtension(system).QueuePath FileUtils.deleteDirectory(new java.io.File(queuePath)) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 77b932911d..41ec6d7307 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -7,6 +7,8 @@ import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } import akka.dispatch.{ Envelope, MessageQueue } import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } +import com.typesafe.config.Config +import akka.actor.ActorSystem private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -50,3 +52,55 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒ } +/** + * Conventional organization of durable mailbox settings: + * + * {{{ + * my-durable-dispatcher { + * mailbox-type = "my.durable.mailbox" + * my-durable-mailbox { + * setting1 = 1 + * setting2 = 2 + * } + * } + * }}} + * + * where name=“my-durable-mailbox” in this example. + */ +trait DurableMailboxSettings { + /** + * A reference to the enclosing actor system. + */ + def systemSettings: ActorSystem.Settings + + /** + * A reference to the config section which the user specified for this mailbox’s dispatcher. + */ + def userConfig: Config + + /** + * The extracted config section for this mailbox, which is the “name” + * section (if that exists), falling back to system defaults. Typical + * implementation looks like: + * + * {{{ + * val config = initialize + * }}} + */ + def config: Config + + /** + * Name of this mailbox type for purposes of configuration scoping. Reference + * defaults go into “akka.actor.mailbox.”. + */ + def name: String + + /** + * Obtain default extracted mailbox config section from userConfig and system. + */ + def initialize: Config = + if (userConfig.hasPath(name)) + userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name)) + else systemSettings.config.getConfig("akka.actor.mailbox." + name) +} + diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index 444edfa72d..970e0d6f1d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -11,6 +11,9 @@ import akka.dispatch.Await import akka.testkit.AkkaSpec import akka.testkit.TestLatch import akka.util.duration._ +import java.io.InputStream +import scala.annotation.tailrec +import com.typesafe.config.Config object DurableMailboxSpecActorFactory { @@ -31,6 +34,26 @@ object DurableMailboxSpecActorFactory { abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) { import DurableMailboxSpecActorFactory._ + protected def streamMustContain(in: InputStream, words: String): Unit = { + val output = new Array[Byte](8192) + + def now = System.currentTimeMillis + + def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // don’t want parse errors + + @tailrec def read(end: Int = 0, start: Long = now): Int = + in.read(output, end, output.length - end) match { + case -1 ⇒ end + case x ⇒ + val next = end + x + if (string(next).contains(words) || now - start > 10000 || next == output.length) next + else read(next, start) + } + + val result = string(read()) + if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result) + } + def createMailboxTestActor(id: String): ActorRef = system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher")) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 23de168370..fa3b0b0c87 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -16,12 +16,14 @@ import akka.dispatch.MailboxType import com.typesafe.config.Config import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class MongoBasedMailboxException(message: String) extends AkkaException(message) -class MongoBasedMailboxType(config: Config) extends MailboxType { +class MongoBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new MongoBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new MongoBasedMessageQueue(o) + case Some(o) ⇒ new MongoBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } @@ -37,15 +39,13 @@ class MongoBasedMailboxType(config: Config) extends MailboxType { * * @author Brendan W. McAdams */ -class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) { +class MongoBasedMessageQueue(_owner: ActorContext, val settings: MongoBasedMailboxSettings) extends DurableMessageQueue(_owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! private val dispatcher = owner.dispatcher - private val settings = MongoBasedMailboxExtension(owner.system) - val log = Logging(system, "MongoBasedMessageQueue") @volatile @@ -98,9 +98,8 @@ class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_ def hasMessages: Boolean = numberOfMessages > 0 private[akka] def connect() = { - require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey)) log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI) - val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match { + val _dbh = MongoConnection.fromURI(settings.MongoURI) match { case (conn, None, None) ⇒ { throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'") } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala deleted file mode 100644 index fac0ad9050..0000000000 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config) -} - -class MongoBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val UriConfigKey = "akka.actor.mailbox.mongodb.uri" - val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None - val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS) - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala new file mode 100644 index 0000000000..3ed62c2010 --- /dev/null +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxSettings.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class MongoBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { + + def name = "mongodb" + + val config = initialize + + import config._ + + val MongoURI = getString("uri") + val WriteTimeout = Duration(config.getMilliseconds("timeout.write"), MILLISECONDS) + val ReadTimeout = Duration(config.getMilliseconds("timeout.read"), MILLISECONDS) + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index 7001d8de99..0579215df0 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -14,6 +14,7 @@ object MongoBasedMailboxSpec { mongodb-dispatcher { mailbox-type = akka.actor.mailbox.MongoBasedMailboxType throughput = 1 + mongodb.uri = "mongodb://localhost:27123/akka.mailbox" } """ } @@ -23,9 +24,23 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail import com.mongodb.async._ - val mongo = MongoConnection("localhost", 27017)("akka") + lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start() + lazy val mongo = MongoConnection("localhost", 27123)("akka") - mongo.dropDatabase() { success ⇒ } + override def atStartup(): Unit = { + // start MongoDB daemon + new java.io.File("mongoDB").mkdir() + val in = mongod.getInputStream + + try { + streamMustContain(in, "waiting for connections on port") + mongo.dropDatabase() { success ⇒ } + } catch { + case e ⇒ mongod.destroy(); throw e + } + } + + override def atTermination(): Unit = mongod.destroy() } diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index fbc5830a30..236c8fb6f3 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -14,19 +14,19 @@ import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class RedisBasedMailboxException(message: String) extends AkkaException(message) -class RedisBasedMailboxType(config: Config) extends MailboxType { +class RedisBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new RedisBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new RedisBasedMessageQueue(o) + case Some(o) ⇒ new RedisBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - - private val settings = RedisBasedMailboxExtension(owner.system) +class RedisBasedMessageQueue(_owner: ActorContext, val settings: RedisBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala deleted file mode 100644 index 629f08b145..0000000000 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.actor._ - -object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config) -} - -class RedisBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val Hostname = getString("akka.actor.mailbox.redis.hostname") - val Port = getInt("akka.actor.mailbox.redis.port") -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala new file mode 100644 index 0000000000..a628216fb1 --- /dev/null +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxSettings.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.actor.ActorSystem + +class RedisBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { + + def name = "redis" + + val config = initialize + + import config._ + + val Hostname = getString("hostname") + val Port = getInt("port") +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 6e78d6b74a..8dd0d09ce4 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -5,9 +5,40 @@ object RedisBasedMailboxSpec { Redis-dispatcher { mailbox-type = akka.actor.mailbox.RedisBasedMailboxType throughput = 1 + redis { + hostname = "127.0.0.1" + port = 6479 + } } """ } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) { + + lazy val redisServer = new ProcessBuilder("redis-server", "-").start() + + override def atStartup(): Unit = { + new java.io.File("redis").mkdir() + + val out = redisServer.getOutputStream + + val config = """ + port 6479 + bind 127.0.0.1 + dir redis + """.getBytes("UTF-8") + + try { + out.write(config) + out.close() + + streamMustContain(redisServer.getInputStream, "ready to accept connections on port") + } catch { + case e ⇒ redisServer.destroy(); throw e + } + } + + override def atTermination(): Unit = redisServer.destroy() + +} diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 203a94b620..e54423d0e0 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -15,19 +15,20 @@ import com.typesafe.config.Config import akka.util.NonFatal import akka.config.ConfigurationException import akka.dispatch.MessageQueue +import akka.actor.ActorSystem class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) -class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { +class ZooKeeperBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType { + private val settings = new ZooKeeperBasedMailboxSettings(systemSettings, config) override def create(owner: Option[ActorContext]): MessageQueue = owner match { - case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o) + case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, settings) case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)") } } -class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization { +class ZooKeeperBasedMessageQueue(_owner: ActorContext, val settings: ZooKeeperBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization { - private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala deleted file mode 100644 index 4f3dcfb42f..0000000000 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.actor.mailbox - -import com.typesafe.config.Config -import akka.util.Duration -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor._ - -object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { - override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system) - def lookup() = this - def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config) -} -class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension { - - import config._ - - val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses") - val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) - val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) - val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") - -} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala new file mode 100644 index 0000000000..58cd67c3c1 --- /dev/null +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSettings.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.actor.mailbox + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.ActorSystem + +class ZooKeeperBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config) + extends DurableMailboxSettings { + + def name = "zookeeper" + + val config = initialize + + import config._ + + val ZkServerAddresses = getString("server-addresses") + val SessionTimeout = Duration(getMilliseconds("session-timeout"), MILLISECONDS) + val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) + val BlockingQueue = getBoolean("blocking-queue") + +} \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 9264fbccce..a2d9028bfe 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -5,12 +5,15 @@ import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher import akka.actor.ActorRef +import com.typesafe.config.ConfigFactory +import akka.util.duration._ object ZooKeeperBasedMailboxSpec { val config = """ ZooKeeper-dispatcher { mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType throughput = 1 + zookeeper.session-timeout = 30s } """ } @@ -21,6 +24,12 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" + "ZookeeperBasedMailboxSettings" must { + "read the right settings" in { + new ZooKeeperBasedMailboxSettings(system.settings, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds) + } + } + var zkServer: ZkServer = _ override def atStartup() { diff --git a/build.sbt b/build.sbt index 13467e1654..a2be7bb5c3 100644 --- a/build.sbt +++ b/build.sbt @@ -5,3 +5,4 @@ (externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/") (description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM." + diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 905e345dec..e7d3a071c7 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -26,6 +26,7 @@ object AkkaBuild extends Build { id = "akka", base = file("."), settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( + testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean, parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), @@ -141,6 +142,8 @@ object AkkaBuild extends Build { // ) // ) + val testMailbox = SettingKey[Boolean]("test-mailbox") + lazy val mailboxes = Project( id = "akka-durable-mailboxes", base = file("akka-durable-mailboxes"), @@ -165,8 +168,7 @@ object AkkaBuild extends Build { dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.beanstalkMailbox, - testBeanstalkMailbox := false, - testOptions in Test <+= testBeanstalkMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -179,16 +181,13 @@ object AkkaBuild extends Build { ) ) - val testRedisMailbox = SettingKey[Boolean]("test-redis-mailbox") - lazy val redisMailbox = Project( id = "akka-redis-mailbox", base = file("akka-durable-mailboxes/akka-redis-mailbox"), dependencies = Seq(mailboxesCommon % "compile;test->test"), settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.redisMailbox, - testRedisMailbox := false, - testOptions in Test <+= testRedisMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) ) @@ -201,8 +200,6 @@ object AkkaBuild extends Build { ) ) - val testMongoMailbox = SettingKey[Boolean]("test-mongo-mailbox") - lazy val mongoMailbox = Project( id = "akka-mongo-mailbox", base = file("akka-durable-mailboxes/akka-mongo-mailbox"), @@ -210,8 +207,7 @@ object AkkaBuild extends Build { settings = defaultSettings ++ Seq( libraryDependencies ++= Dependencies.mongoMailbox, ivyXML := Dependencies.mongoMailboxExcludes, - testMongoMailbox := false, - testOptions in Test <+= testMongoMailbox map { test => Tests.Filter(s => test) } + testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) } ) )