diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index b0529e19cf..ff4aacc4b5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) + getString("akka.actor.default-dispatcher.mailboxType") must be("") getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index d0c2053243..0e9a1b20cf 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -4,6 +4,9 @@ import java.util.concurrent.{ TimeUnit, BlockingQueue } import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.actor.ActorRef +import akka.actor.ActorCell +import java.util.concurrent.ConcurrentLinkedQueue @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -144,3 +147,26 @@ class PriorityMailboxSpec extends MailboxSpec { case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) } } + +object CustomMailboxSpec { + val config = """ + my-dispatcher { + mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox" + } + """ + + class MyMailbox(owner: ActorCell) extends Mailbox(owner) + with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new ConcurrentLinkedQueue[Envelope]() + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { + "Dispatcher configuration" must { + "support custom mailboxType" in { + val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher") + dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) + } + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index d9447fa0c7..4eeb3a29fe 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -29,7 +29,7 @@ akka { logConfigOnStart = off # List FQCN of extensions which shall be loaded at actor system startup. - # Should be on the format: 'extensions = ["foo", "bar"]' etc. + # Should be on the format: 'extensions = ["foo", "bar"]' etc. # FIXME: clarify "extensions" here, "Akka Extensions ()" extensions = [] @@ -59,7 +59,7 @@ akka { deployment { - # deployment id pattern - on the format: /parent/child etc. + # deployment id pattern - on the format: /parent/child etc. default { # routing (load-balance) scheme to use @@ -160,6 +160,10 @@ akka { # Specifies the timeout to add a new message to a mailbox that is full - # negative number means infinite timeout mailbox-push-timeout-time = 10s + + # FQCN of the MailboxType, if not specified the default bounded or unbounded + # mailbox is used. + mailboxType = "" } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 35284879a4..b1ca951b36 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -271,11 +271,15 @@ abstract class MessageDispatcherConfigurator() { def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher def mailboxType(config: Config, settings: Settings): MailboxType = { - val capacity = config.getInt("mailbox-capacity") - if (capacity < 1) UnboundedMailbox() - else { - val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) - BoundedMailbox(capacity, duration) + config.getString("mailboxType") match { + case "" ⇒ + val capacity = config.getInt("mailbox-capacity") + if (capacity < 1) UnboundedMailbox() + else { + val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) + BoundedMailbox(capacity, duration) + } + case fqn ⇒ new CustomMailboxType(fqn) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index fdb46f0ec4..633898c4e9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent._ import annotation.tailrec import akka.event.Logging.Error +import com.typesafe.config.Config +import java.lang.reflect.InvocationTargetException class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -361,3 +363,31 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va } } +class CustomMailboxType(mailboxFQN: String) extends MailboxType { + + def create(receiver: ActorCell): Mailbox = { + val constructorSignature = Array[Class[_]](classOf[ActorCell]) + ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { + case Right(instance) ⇒ instance.asInstanceOf[Mailbox] + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". + format(mailboxClass.getName, cause.toString)) + } + } + + private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { + case Right(clazz) ⇒ clazz + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new IllegalArgumentException("Cannot find mailbox class [%s] due to: %s". + format(mailboxFQN, cause.toString)) + } +} + diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index 7a18da6182..7140da03bb 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -4,15 +4,18 @@ package akka.docs.actor.mailbox //#imports -import akka.actor.Actor import akka.actor.Props -import akka.actor.mailbox.FileDurableMailboxType //#imports +//#imports2 +import akka.actor.mailbox.FileDurableMailboxType +//#imports2 + import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec +import akka.actor.Actor class MyActor extends Actor { def receive = { @@ -20,14 +23,31 @@ class MyActor extends Actor { } } -class DurableMailboxDocSpec extends AkkaSpec { +object DurableMailboxDocSpec { + val config = """ + //#dispatcher-config + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } + //#dispatcher-config + """ +} - "define dispatcher with durable mailbox" in { - //#define-dispatcher +class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { + + "configuration of dispatcher with durable mailbox" in { + //#dispatcher-config-use + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher") + val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") + //#dispatcher-config-use + } + + "programatically define dispatcher with durable mailbox" in { + //#prog-define-dispatcher val dispatcher = system.dispatcherFactory.newDispatcher( "my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build - val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") - //#define-dispatcher + val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher)) + //#prog-define-dispatcher myActor ! "hello" } diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index ba411ffbb0..02066d8673 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -4,7 +4,6 @@ package akka.docs.actor.mailbox; //#imports -import akka.actor.mailbox.DurableMailboxType; import akka.dispatch.MessageDispatcher; import akka.actor.UntypedActorFactory; import akka.actor.UntypedActor; @@ -12,8 +11,17 @@ import akka.actor.Props; //#imports +//#imports2 +import akka.actor.mailbox.DurableMailboxType; +//#imports2 + +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import akka.testkit.AkkaSpec; +import akka.docs.dispatcher.DispatcherDocSpec; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -21,20 +29,44 @@ import static org.junit.Assert.*; public class DurableMailboxDocTestBase { + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("MySystem", + ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf())); + } + + @After + public void tearDown() { + system.shutdown(); + } + @Test - public void defineDispatcher() { - ActorSystem system = ActorSystem.create("MySystem"); - //#define-dispatcher + public void configDefinedDispatcher() { + //#dispatcher-config-use + MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher"); + ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { + public UntypedActor create() { + return new MyUntypedActor(); + } + }), "myactor"); + //#dispatcher-config-use + myActor.tell("test"); + } + + @Test + public void programaticallyDefinedDispatcher() { + //#prog-define-dispatcher MessageDispatcher dispatcher = system.dispatcherFactory() .newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build(); ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { public UntypedActor create() { return new MyUntypedActor(); } - })); - //#define-dispatcher + }), "myactor"); + //#prog-define-dispatcher myActor.tell("test"); - system.shutdown(); } public static class MyUntypedActor extends UntypedActor { diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 74af6c3ca5..2f6ca9e261 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the You configure durable mailboxes through the dispatcher. The actor is oblivious to which type of mailbox it is using. -Here is an example in Scala: + +In the configuration of the dispatcher you specify the fully qualified class name +of the mailbox: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: imports,define-dispatcher + :include: dispatcher-config + +Here is an example of how to create an actor with a durable dispatcher, in Scala: + +.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: imports,dispatcher-config-use Corresponding example in Java: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java - :include: imports,define-dispatcher + :include: imports,dispatcher-config-use The actor is oblivious to which type of mailbox it is using. @@ -89,14 +96,11 @@ you need. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.FileDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } You can also configure and tune the file-based durable mailbox. This is done in the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. @@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.RedisDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.RedisBasedMailbox + } You also need to configure the IP and port for the Redis server. This is done in the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`. @@ -146,13 +147,11 @@ documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox + } You also need to configure ZooKeeper server addresses, timeouts, etc. This is done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`. @@ -171,13 +170,11 @@ Beanstalk documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox + } You also need to configure the IP, and port, and so on, for the Beanstalk server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the @@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.MongoDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.MongoBasedMailbox + } You will need to configure the URI for the MongoDB server, using the URI Format specified in the `MongoDB Documentation `_. This is done in diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 7eb30b2fdb..2c5fd7cb88 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -24,6 +24,7 @@ import akka.remote.RemoteActorRefProvider import akka.remote.netty.NettyRemoteServer import akka.serialization.Serialization import com.typesafe.config.Config +import akka.dispatch.CustomMailboxType private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -73,39 +74,11 @@ trait DurableMessageSerialization { } -abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType { - val constructorSignature = Array[Class[_]](classOf[ActorCell]) - - val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { - case Right(clazz) ⇒ clazz - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString)) - } - - //TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc - def create(receiver: ActorCell): Mailbox = { - ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { - case Right(instance) ⇒ instance.asInstanceOf[Mailbox] - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString)) - } - } -} - -case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox") -case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox") -case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") -case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox") -case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") -case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN) +case object RedisDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox") +case object MongoDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox") +case object BeanstalkDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") +case object FileDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.FileBasedMailbox") +case object ZooKeeperDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") /** * Java API for the mailbox types. Usage: @@ -115,31 +88,9 @@ case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType( * */ object DurableMailboxType { - def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType - def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType - def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType - def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType - def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType - def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN) -} - -/** - * Configurator for the DurableMailbox - * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file", - * or a full class name of the Mailbox implementation. - */ -class DurableMailboxConfigurator { - // TODO PN #896: when and how is this class supposed to be used? Can we remove it? - - def mailboxType(config: Config): MailboxType = { - if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox") - config.getString("storage") match { - case "redis" ⇒ RedisDurableMailboxType - case "mongodb" ⇒ MongoDurableMailboxType - case "beanstalk" ⇒ BeanstalkDurableMailboxType - case "zookeeper" ⇒ ZooKeeperDurableMailboxType - case "file" ⇒ FileDurableMailboxType - case fqn ⇒ FqnDurableMailboxType(fqn) - } - } + def redisDurableMailboxType(): MailboxType = RedisDurableMailboxType + def mongoDurableMailboxType(): MailboxType = MongoDurableMailboxType + def beanstalkDurableMailboxType(): MailboxType = BeanstalkDurableMailboxType + def fileDurableMailboxType(): MailboxType = FileDurableMailboxType + def zooKeeperDurableMailboxType(): MailboxType = ZooKeeperDurableMailboxType } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index eec1b03192..5bce062203 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -1,15 +1,16 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit +import java.util.concurrent.CountDownLatch import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import akka.actor._ import akka.actor.Actor._ -import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher -import akka.testkit.AkkaSpec import akka.dispatch.Dispatchers +import akka.dispatch.MailboxType +import akka.testkit.AkkaSpec object DurableMailboxSpecActorFactory { @@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory { } -abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach { +abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach { import DurableMailboxSpecActorFactory._ implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build