include system.settings when constructing MailboxType, see #1864
- necessary to not have to construct one Settings object per MessageQueue - added system.settings to DispatcherPrerequisites
This commit is contained in:
parent
abd7fb40ca
commit
eaee16c7d3
22 changed files with 61 additions and 47 deletions
|
|
@ -8,6 +8,7 @@ import akka.util.duration._
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
|
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||||
|
|
@ -156,7 +157,7 @@ object CustomMailboxSpec {
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
class MyMailboxType(config: Config) extends MailboxType {
|
class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
override def create(owner: Option[ActorContext]) = owner match {
|
override def create(owner: Option[ActorContext]) = owner match {
|
||||||
case Some(o) ⇒ new MyMailbox(o)
|
case Some(o) ⇒ new MyMailbox(o)
|
||||||
case None ⇒ throw new Exception("no mailbox owner given")
|
case None ⇒ throw new Exception("no mailbox owner given")
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import akka.pattern.ask
|
||||||
import akka.util.duration._
|
import akka.util.duration._
|
||||||
import akka.testkit.DefaultTimeout
|
import akka.testkit.DefaultTimeout
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
object PriorityDispatcherSpec {
|
object PriorityDispatcherSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -17,12 +18,12 @@ object PriorityDispatcherSpec {
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({
|
class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({
|
||||||
case i: Int ⇒ i //Reverse order
|
case i: Int ⇒ i //Reverse order
|
||||||
case 'Result ⇒ Int.MaxValue
|
case 'Result ⇒ Int.MaxValue
|
||||||
}: Any ⇒ Int))
|
}: Any ⇒ Int))
|
||||||
|
|
||||||
class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({
|
class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedPriorityMailbox(PriorityGenerator({
|
||||||
case i: Int ⇒ i //Reverse order
|
case i: Int ⇒ i //Reverse order
|
||||||
case 'Result ⇒ Int.MaxValue
|
case 'Result ⇒ Int.MaxValue
|
||||||
}: Any ⇒ Int), 1000, 10 seconds)
|
}: Any ⇒ Int), 1000, 10 seconds)
|
||||||
|
|
|
||||||
|
|
@ -242,8 +242,8 @@ akka {
|
||||||
mailbox-push-timeout-time = 10s
|
mailbox-push-timeout-time = 10s
|
||||||
|
|
||||||
# FQCN of the MailboxType, if not specified the default bounded or unbounded
|
# FQCN of the MailboxType, if not specified the default bounded or unbounded
|
||||||
# mailbox is used. The Class of the FQCN must have a constructor with a
|
# mailbox is used. The Class of the FQCN must have a constructor with
|
||||||
# com.typesafe.config.Config parameter.
|
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
|
||||||
mailbox-type = ""
|
mailbox-type = ""
|
||||||
|
|
||||||
# For BalancingDispatcher: If the balancing dispatcher should attempt to
|
# For BalancingDispatcher: If the balancing dispatcher should attempt to
|
||||||
|
|
|
||||||
|
|
@ -481,7 +481,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
|
||||||
def locker: Locker = provider.locker
|
def locker: Locker = provider.locker
|
||||||
|
|
||||||
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
|
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
|
||||||
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess))
|
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
|
||||||
|
|
||||||
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
|
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -384,17 +384,17 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
||||||
config.getString("mailbox-type") match {
|
config.getString("mailbox-type") match {
|
||||||
case "" ⇒
|
case "" ⇒
|
||||||
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
|
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
|
||||||
else new BoundedMailbox(config)
|
else new BoundedMailbox(prerequisites.settings, config)
|
||||||
case "unbounded" ⇒ UnboundedMailbox()
|
case "unbounded" ⇒ UnboundedMailbox()
|
||||||
case "bounded" ⇒ new BoundedMailbox(config)
|
case "bounded" ⇒ new BoundedMailbox(prerequisites.settings, config)
|
||||||
case fqcn ⇒
|
case fqcn ⇒
|
||||||
val args = Seq(classOf[Config] -> config)
|
val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
|
||||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
||||||
case Right(instance) ⇒ instance
|
case Right(instance) ⇒ instance
|
||||||
case Left(exception) ⇒
|
case Left(exception) ⇒
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
("Cannot instantiate MailboxType [%s], defined in [%s], " +
|
("Cannot instantiate MailboxType [%s], defined in [%s], " +
|
||||||
"make sure it has constructor with a [com.typesafe.config.Config] parameter")
|
"make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters")
|
||||||
.format(fqcn, config.getString("id")), exception)
|
.format(fqcn, config.getString("id")), exception)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ trait DispatcherPrerequisites {
|
||||||
def deadLetterMailbox: Mailbox
|
def deadLetterMailbox: Mailbox
|
||||||
def scheduler: Scheduler
|
def scheduler: Scheduler
|
||||||
def dynamicAccess: DynamicAccess
|
def dynamicAccess: DynamicAccess
|
||||||
|
def settings: ActorSystem.Settings
|
||||||
}
|
}
|
||||||
|
|
||||||
case class DefaultDispatcherPrerequisites(
|
case class DefaultDispatcherPrerequisites(
|
||||||
|
|
@ -29,7 +30,8 @@ case class DefaultDispatcherPrerequisites(
|
||||||
val eventStream: EventStream,
|
val eventStream: EventStream,
|
||||||
val deadLetterMailbox: Mailbox,
|
val deadLetterMailbox: Mailbox,
|
||||||
val scheduler: Scheduler,
|
val scheduler: Scheduler,
|
||||||
val dynamicAccess: DynamicAccess) extends DispatcherPrerequisites
|
val dynamicAccess: DynamicAccess,
|
||||||
|
val settings: ActorSystem.Settings) extends DispatcherPrerequisites
|
||||||
|
|
||||||
object Dispatchers {
|
object Dispatchers {
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import annotation.tailrec
|
||||||
import akka.event.Logging.Error
|
import akka.event.Logging.Error
|
||||||
import akka.actor.ActorContext
|
import akka.actor.ActorContext
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||||
|
|
||||||
|
|
@ -357,7 +358,7 @@ trait MailboxType {
|
||||||
*/
|
*/
|
||||||
case class UnboundedMailbox() extends MailboxType {
|
case class UnboundedMailbox() extends MailboxType {
|
||||||
|
|
||||||
def this(config: Config) = this()
|
def this(settings: ActorSystem.Settings, config: Config) = this()
|
||||||
|
|
||||||
final override def create(owner: Option[ActorContext]): MessageQueue =
|
final override def create(owner: Option[ActorContext]): MessageQueue =
|
||||||
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||||
|
|
@ -367,7 +368,7 @@ case class UnboundedMailbox() extends MailboxType {
|
||||||
|
|
||||||
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
|
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
|
||||||
|
|
||||||
def this(config: Config) = this(config.getInt("mailbox-capacity"),
|
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
|
||||||
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||||
|
|
||||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||||
|
|
|
||||||
|
|
@ -124,7 +124,7 @@ public class DispatcherDocTestBase {
|
||||||
|
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
public static class PrioMailbox extends UnboundedPriorityMailbox {
|
public static class PrioMailbox extends UnboundedPriorityMailbox {
|
||||||
public PrioMailbox(Config config) { // needed for reflective instantiation
|
public PrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation
|
||||||
super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
||||||
@Override
|
@Override
|
||||||
public int gen(Object message) {
|
public int gen(Object message) {
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import akka.actor.PoisonPill
|
||||||
import akka.dispatch.MessageDispatcherConfigurator
|
import akka.dispatch.MessageDispatcherConfigurator
|
||||||
import akka.dispatch.MessageDispatcher
|
import akka.dispatch.MessageDispatcher
|
||||||
import akka.dispatch.DispatcherPrerequisites
|
import akka.dispatch.DispatcherPrerequisites
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
object DispatcherDocSpec {
|
object DispatcherDocSpec {
|
||||||
val config = """
|
val config = """
|
||||||
|
|
@ -113,7 +114,7 @@ object DispatcherDocSpec {
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
// We create a new Priority dispatcher and seed it with the priority generator
|
||||||
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(
|
class PrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(
|
||||||
PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||||
case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible
|
case 'highpriority ⇒ 0 // 'highpriority messages should be treated first if possible
|
||||||
case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible
|
case 'lowpriority ⇒ 100 // 'lowpriority messages should be treated last if possible
|
||||||
|
|
|
||||||
|
|
@ -14,12 +14,14 @@ import akka.dispatch.MailboxType
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.dispatch.MessageQueue
|
import akka.dispatch.MessageQueue
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
|
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
|
||||||
|
|
||||||
class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
|
class BeanstalkBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
|
private val settings = new BeanstalkMailboxSettings(systemSettings, config)
|
||||||
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
||||||
case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, config)
|
case Some(o) ⇒ new BeanstalkBasedMessageQueue(o, settings)
|
||||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -27,9 +29,8 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class BeanstalkBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
class BeanstalkBasedMessageQueue(_owner: ActorContext, val settings: BeanstalkMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
||||||
|
|
||||||
private val settings = new BeanstalkMailboxSettings(owner.system, _config)
|
|
||||||
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
|
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
|
||||||
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt
|
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,8 @@ import akka.util.Duration
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class BeanstalkMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
|
class BeanstalkMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||||
|
extends DurableMailboxSettings {
|
||||||
|
|
||||||
def name = "beanstalk"
|
def name = "beanstalk"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,19 +13,20 @@ import akka.dispatch.MailboxType
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.util.NonFatal
|
import akka.util.NonFatal
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class FileBasedMailboxType(config: Config) extends MailboxType {
|
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
|
private val settings = new FileBasedMailboxSettings(systemSettings, config)
|
||||||
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
||||||
case Some(o) ⇒ new FileBasedMessageQueue(o, config)
|
case Some(o) ⇒ new FileBasedMessageQueue(o, settings)
|
||||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class FileBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
||||||
|
|
||||||
val log = Logging(system, "FileBasedMessageQueue")
|
val log = Logging(system, "FileBasedMessageQueue")
|
||||||
|
|
||||||
private val settings = new FileBasedMailboxSettings(owner.system, _config)
|
|
||||||
val queuePath = settings.QueuePath
|
val queuePath = settings.QueuePath
|
||||||
|
|
||||||
private val queue = try {
|
private val queue = try {
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,8 @@ import akka.util.Duration
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class FileBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
|
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||||
|
extends DurableMailboxSettings {
|
||||||
|
|
||||||
def name = "file-based"
|
def name = "file-based"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ object FileBasedMailboxSpec {
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
|
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
|
||||||
|
|
||||||
val queuePath = new FileBasedMailboxSettings(system, system.settings.config.getConfig("File-dispatcher")).QueuePath
|
val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath
|
||||||
|
|
||||||
"FileBasedMailboxSettings" must {
|
"FileBasedMailboxSettings" must {
|
||||||
"read the file-based section" in {
|
"read the file-based section" in {
|
||||||
|
|
|
||||||
|
|
@ -71,7 +71,7 @@ trait DurableMailboxSettings {
|
||||||
/**
|
/**
|
||||||
* A reference to the enclosing actor system.
|
* A reference to the enclosing actor system.
|
||||||
*/
|
*/
|
||||||
def system: ActorSystem
|
def systemSettings: ActorSystem.Settings
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A reference to the config section which the user specified for this mailbox’s dispatcher.
|
* A reference to the config section which the user specified for this mailbox’s dispatcher.
|
||||||
|
|
@ -100,7 +100,7 @@ trait DurableMailboxSettings {
|
||||||
*/
|
*/
|
||||||
def initialize: Config =
|
def initialize: Config =
|
||||||
if (userConfig.hasPath(name))
|
if (userConfig.hasPath(name))
|
||||||
userConfig.getConfig(name).withFallback(system.settings.config.getConfig("akka.actor.mailbox." + name))
|
userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name))
|
||||||
else system.settings.config.getConfig("akka.actor.mailbox." + name)
|
else systemSettings.config.getConfig("akka.actor.mailbox." + name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,14 @@ import akka.dispatch.MailboxType
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.dispatch.MessageQueue
|
import akka.dispatch.MessageQueue
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class MongoBasedMailboxException(message: String) extends AkkaException(message)
|
class MongoBasedMailboxException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
class MongoBasedMailboxType(config: Config) extends MailboxType {
|
class MongoBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
|
private val settings = new MongoBasedMailboxSettings(systemSettings, config)
|
||||||
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
||||||
case Some(o) ⇒ new MongoBasedMessageQueue(o, config)
|
case Some(o) ⇒ new MongoBasedMessageQueue(o, settings)
|
||||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -37,15 +39,13 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
|
||||||
*
|
*
|
||||||
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
|
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
|
||||||
*/
|
*/
|
||||||
class MongoBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) {
|
class MongoBasedMessageQueue(_owner: ActorContext, val settings: MongoBasedMailboxSettings) extends DurableMessageQueue(_owner) {
|
||||||
// this implicit object provides the context for reading/writing things as MongoDurableMessage
|
// this implicit object provides the context for reading/writing things as MongoDurableMessage
|
||||||
implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
|
implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
|
||||||
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
|
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
|
||||||
|
|
||||||
private val dispatcher = owner.dispatcher
|
private val dispatcher = owner.dispatcher
|
||||||
|
|
||||||
private val settings = new MongoBasedMailboxSettings(owner.system, _config)
|
|
||||||
|
|
||||||
val log = Logging(system, "MongoBasedMessageQueue")
|
val log = Logging(system, "MongoBasedMessageQueue")
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,8 @@ import akka.util.Duration
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class MongoBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
|
class MongoBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||||
|
extends DurableMailboxSettings {
|
||||||
|
|
||||||
def name = "mongodb"
|
def name = "mongodb"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -14,19 +14,19 @@ import com.typesafe.config.Config
|
||||||
import akka.util.NonFatal
|
import akka.util.NonFatal
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.dispatch.MessageQueue
|
import akka.dispatch.MessageQueue
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class RedisBasedMailboxException(message: String) extends AkkaException(message)
|
class RedisBasedMailboxException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
class RedisBasedMailboxType(config: Config) extends MailboxType {
|
class RedisBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
|
private val settings = new RedisBasedMailboxSettings(systemSettings, config)
|
||||||
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
||||||
case Some(o) ⇒ new RedisBasedMessageQueue(o, config)
|
case Some(o) ⇒ new RedisBasedMessageQueue(o, settings)
|
||||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class RedisBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
class RedisBasedMessageQueue(_owner: ActorContext, val settings: RedisBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
||||||
|
|
||||||
private val settings = new RedisBasedMailboxSettings(owner.system, _config)
|
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
|
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,8 @@ package akka.actor.mailbox
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class RedisBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
|
class RedisBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||||
|
extends DurableMailboxSettings {
|
||||||
|
|
||||||
def name = "redis"
|
def name = "redis"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,19 +15,20 @@ import com.typesafe.config.Config
|
||||||
import akka.util.NonFatal
|
import akka.util.NonFatal
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import akka.dispatch.MessageQueue
|
import akka.dispatch.MessageQueue
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
|
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
|
class ZooKeeperBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||||
|
private val settings = new ZooKeeperBasedMailboxSettings(systemSettings, config)
|
||||||
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
|
||||||
case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, config)
|
case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o, settings)
|
||||||
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ZooKeeperBasedMessageQueue(_owner: ActorContext, _config: Config) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
class ZooKeeperBasedMessageQueue(_owner: ActorContext, val settings: ZooKeeperBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
|
||||||
|
|
||||||
private val settings = new ZooKeeperBasedMailboxSettings(owner.system, _config)
|
|
||||||
val queueNode = "/queues"
|
val queueNode = "/queues"
|
||||||
val queuePathTemplate = queueNode + "/%s"
|
val queuePathTemplate = queueNode + "/%s"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,8 @@ import akka.util.Duration
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
||||||
class ZooKeeperBasedMailboxSettings(val system: ActorSystem, val userConfig: Config) extends DurableMailboxSettings {
|
class ZooKeeperBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
|
||||||
|
extends DurableMailboxSettings {
|
||||||
|
|
||||||
def name = "zookeeper"
|
def name = "zookeeper"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
|
||||||
|
|
||||||
"ZookeeperBasedMailboxSettings" must {
|
"ZookeeperBasedMailboxSettings" must {
|
||||||
"read the right settings" in {
|
"read the right settings" in {
|
||||||
new ZooKeeperBasedMailboxSettings(system, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds)
|
new ZooKeeperBasedMailboxSettings(system.settings, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue