diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index de3ec03fae..534c4a757d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -8,6 +8,7 @@ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.actor.ActorRef import akka.actor.ActorContext +import com.typesafe.config.Config @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -152,10 +153,14 @@ class PriorityMailboxSpec extends MailboxSpec { object CustomMailboxSpec { val config = """ my-dispatcher { - mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox" + mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailboxType" } """ + class MyMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new MyMailbox(owner) + } + class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index eb2f79f5fc..42993515e8 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -166,7 +166,8 @@ akka { mailbox-push-timeout-time = 10s # FQCN of the MailboxType, if not specified the default bounded or unbounded - # mailbox is used. + # mailbox is used. The Class of the FQCN must have a constructor with a + # com.typesafe.config.Config parameter. mailboxType = "" } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index d6addcb144..997623195f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -17,6 +17,7 @@ import akka.event.EventStream import akka.actor.ActorSystem.Settings import com.typesafe.config.Config import java.util.concurrent.atomic.AtomicReference +import akka.util.ReflectiveAccess final case class Envelope(val message: Any, val sender: ActorRef) { if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") @@ -282,9 +283,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit /** * Returns a factory for the [[akka.dispatch.Mailbox]] given the configuration. - * Default implementation use [[akka.dispatch.CustomMailboxType]] if - * mailboxType config property is specified, otherwise [[akka.dispatch.UnboundedMailbox]] - * when capacity is < 1, otherwise [[akka.dispatch.BoundedMailbox]]. + * Default implementation instantiate the [[akka.dispatch.MailboxType]] specified + * as FQCN in mailboxType config property. If mailboxType is unspecified (empty) + * then [[akka.dispatch.UnboundedMailbox]] is used when capacity is < 1, + * otherwise [[akka.dispatch.BoundedMailbox]]. */ def mailboxType(): MailboxType = { config.getString("mailboxType") match { @@ -295,7 +297,16 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) BoundedMailbox(capacity, duration) } - case fqn ⇒ new CustomMailboxType(fqn) + case fqcn ⇒ + val constructorSignature = Array[Class[_]](classOf[Config]) + ReflectiveAccess.createInstance[MailboxType](fqcn, constructorSignature, Array[AnyRef](config)) match { + case Right(instance) ⇒ instance + case Left(exception) ⇒ + throw new IllegalArgumentException( + ("Cannot instantiate MailboxType [%s], defined in [%s], " + + "make sure it has constructor with a [com.typesafe.config.Config] parameter") + .format(fqcn, config.getString("id")), exception) + } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 86b286e916..bb0f845aba 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -37,6 +37,13 @@ object Mailbox { /** * Custom mailbox implementations are implemented by extending this class. + * E.g. + * + * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) + * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + * val queue = new ConcurrentLinkedQueue[Envelope]() + * } + * */ abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) @@ -373,29 +380,3 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va } } -/** - * Mailbox factory that creates instantiates the implementation from a - * fully qualified class name. The implementation class must have - * a constructor with a [[akka.actor.ActorContext]] parameter. - * E.g. - * - * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) - * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - * val queue = new ConcurrentLinkedQueue[Envelope]() - * } - * - */ -class CustomMailboxType(mailboxFQN: String) extends MailboxType { - - override def create(receiver: ActorContext): Mailbox = { - val constructorSignature = Array[Class[_]](classOf[ActorContext]) - ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { - case Right(instance) ⇒ instance - case Left(exception) ⇒ - throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". - format(mailboxFQN, exception.toString)) - } - } - -} - diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index c7d396ec4a..9e7a0fd6dc 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -23,7 +23,7 @@ object DurableMailboxDocSpec { val config = """ //#dispatcher-config my-dispatcher { - mailboxType = akka.actor.mailbox.FileBasedMailbox + mailboxType = akka.actor.mailbox.FileBasedMailboxType } //#dispatcher-config """ diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 2f6ca9e261..dc83522705 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -99,7 +99,7 @@ You configure durable mailboxes through the dispatcher, as described in Config:: my-dispatcher { - mailboxType = akka.actor.mailbox.FileBasedMailbox + mailboxType = akka.actor.mailbox.FileBasedMailboxType } You can also configure and tune the file-based durable mailbox. This is done in @@ -124,7 +124,7 @@ You configure durable mailboxes through the dispatcher, as described in Config:: my-dispatcher { - mailboxType = akka.actor.mailbox.RedisBasedMailbox + mailboxType = akka.actor.mailbox.RedisBasedMailboxType } You also need to configure the IP and port for the Redis server. This is done in @@ -150,7 +150,7 @@ You configure durable mailboxes through the dispatcher, as described in Config:: my-dispatcher { - mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox + mailboxType = akka.actor.mailbox.ZooKeeperBasedMailboxType } You also need to configure ZooKeeper server addresses, timeouts, etc. This is @@ -173,7 +173,7 @@ You configure durable mailboxes through the dispatcher, as described in Config:: my-dispatcher { - mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox + mailboxType = akka.actor.mailbox.BeanstalkBasedMailboxType } You also need to configure the IP, and port, and so on, for the Beanstalk @@ -202,7 +202,7 @@ You configure durable mailboxes through the dispatcher, as described in Config:: my-dispatcher { - mailboxType = akka.actor.mailbox.MongoBasedMailbox + mailboxType = akka.actor.mailbox.MongoBasedMailboxType } You will need to configure the URI for the MongoDB server, using the URI Format specified in the diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 57d7b3e098..cd9186388e 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -13,9 +13,15 @@ import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef +import akka.dispatch.MailboxType +import com.typesafe.config.Config class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} +class BeanstalkBasedMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new BeanstalkBasedMailbox(owner) +} + /** * @author Jonas Bonér */ diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index e306545056..f7ed2e71ac 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -1,11 +1,9 @@ package akka.actor.mailbox -import akka.dispatch.CustomMailboxType - object BeanstalkBasedMailboxSpec { val config = """ Beanstalkd-dispatcher { - mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox + mailboxType = akka.actor.mailbox.BeanstalkBasedMailboxType throughput = 1 } """ diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 55c96ea65c..2accd9fb20 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -9,6 +9,12 @@ import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef +import akka.dispatch.MailboxType +import com.typesafe.config.Config + +class FileBasedMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new FileBasedMailbox(owner) +} class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 30278fca5a..3f202ddc5a 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,12 +1,11 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import akka.dispatch.CustomMailboxType object FileBasedMailboxSpec { val config = """ File-dispatcher { - mailboxType = akka.actor.mailbox.FileBasedMailbox + mailboxType = akka.actor.mailbox.FileBasedMailboxType throughput = 1 } """ diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index c8f9026cbf..6a474c8ab7 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -23,7 +23,6 @@ import akka.remote.RemoteActorRefProvider import akka.remote.netty.NettyRemoteServer import akka.serialization.Serialization import com.typesafe.config.Config -import akka.dispatch.CustomMailboxType private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index b404e3c844..dfb8e3a481 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -12,9 +12,15 @@ import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } import java.util.concurrent.TimeoutException +import akka.dispatch.MailboxType +import com.typesafe.config.Config class MongoBasedMailboxException(message: String) extends AkkaException(message) +class MongoBasedMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new MongoBasedMailbox(owner) +} + /** * A "naive" durable mailbox which uses findAndRemove; it's possible if the actor crashes * after consuming a message that the message could be lost. diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index 59e3c3785c..0167af12aa 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -8,12 +8,11 @@ import akka.actor._ import akka.actor.Actor._ import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher -import akka.dispatch.CustomMailboxType object MongoBasedMailboxSpec { val config = """ mongodb-dispatcher { - mailboxType = akka.actor.mailbox.MongoBasedMailbox + mailboxType = akka.actor.mailbox.MongoBasedMailboxType throughput = 1 } """ diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 21c5555590..6d0f173bbf 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -10,9 +10,15 @@ import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef +import akka.dispatch.MailboxType +import com.typesafe.config.Config class RedisBasedMailboxException(message: String) extends AkkaException(message) +class RedisBasedMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new RedisBasedMailbox(owner) +} + class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index efcf483915..15bad81d2f 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -1,10 +1,9 @@ package akka.actor.mailbox -import akka.dispatch.CustomMailboxType object RedisBasedMailboxSpec { val config = """ Redis-dispatcher { - mailboxType = akka.actor.mailbox.RedisBasedMailbox + mailboxType = akka.actor.mailbox.RedisBasedMailboxType throughput = 1 } """ diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 1cdedba25d..4309da402a 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -14,9 +14,15 @@ import akka.dispatch.Envelope import akka.event.Logging import akka.cluster.zookeeper.ZooKeeperQueue import akka.actor.ActorRef +import akka.dispatch.MailboxType +import com.typesafe.config.Config class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) +class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { + override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner) +} + class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index ce13d9fffc..e5af760f40 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -4,13 +4,12 @@ import akka.actor.{ Actor, LocalActorRef } import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher -import akka.dispatch.CustomMailboxType import akka.actor.ActorRef object ZooKeeperBasedMailboxSpec { val config = """ ZooKeeper-dispatcher { - mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox + mailboxType = akka.actor.mailbox.ZooKeeperBasedMailboxType throughput = 1 } """