Make MailboxType implementation configurable. See #1484

* Added mailboxType property to dispatcher config
* Changed durable mailboxes to use this
* Updated docs for durable mailboxes
This commit is contained in:
Patrik Nordwall 2011-12-19 20:36:06 +01:00
parent 6e3c2cb682
commit 61813c6635
10 changed files with 183 additions and 119 deletions

View file

@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getString("akka.actor.default-dispatcher.mailboxType") must be("")
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
settings.DispatcherDefaultShutdown must equal(1 second) settings.DispatcherDefaultShutdown must equal(1 second)
getInt("akka.actor.default-dispatcher.throughput") must equal(5) getInt("akka.actor.default-dispatcher.throughput") must equal(5)

View file

@ -4,6 +4,9 @@ import java.util.concurrent.{ TimeUnit, BlockingQueue }
import akka.util._ import akka.util._
import akka.util.duration._ import akka.util.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.ActorCell
import java.util.concurrent.ConcurrentLinkedQueue
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -144,3 +147,26 @@ class PriorityMailboxSpec extends MailboxSpec {
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
} }
} }
object CustomMailboxSpec {
val config = """
my-dispatcher {
mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox"
}
"""
class MyMailbox(owner: ActorCell) extends Mailbox(owner)
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must {
"support custom mailboxType" in {
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox])
}
}
}

View file

@ -160,6 +160,10 @@ akka {
# Specifies the timeout to add a new message to a mailbox that is full - # Specifies the timeout to add a new message to a mailbox that is full -
# negative number means infinite timeout # negative number means infinite timeout
mailbox-push-timeout-time = 10s mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used.
mailboxType = ""
} }
debug { debug {

View file

@ -271,12 +271,16 @@ abstract class MessageDispatcherConfigurator() {
def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher
def mailboxType(config: Config, settings: Settings): MailboxType = { def mailboxType(config: Config, settings: Settings): MailboxType = {
config.getString("mailboxType") match {
case ""
val capacity = config.getInt("mailbox-capacity") val capacity = config.getInt("mailbox-capacity")
if (capacity < 1) UnboundedMailbox() if (capacity < 1) UnboundedMailbox()
else { else {
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration) BoundedMailbox(capacity, duration)
} }
case fqn new CustomMailboxType(fqn)
}
} }
def configureThreadPool( def configureThreadPool(

View file

@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._ import java.util.concurrent._
import annotation.tailrec import annotation.tailrec
import akka.event.Logging.Error import akka.event.Logging.Error
import com.typesafe.config.Config
import java.lang.reflect.InvocationTargetException
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -361,3 +363,31 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
} }
} }
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
def create(receiver: ActorCell): Mailbox = {
val constructorSignature = Array[Class[_]](classOf[ActorCell])
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance.asInstanceOf[Mailbox]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s".
format(mailboxClass.getName, cause.toString))
}
}
private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match {
case Right(clazz) clazz
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new IllegalArgumentException("Cannot find mailbox class [%s] due to: %s".
format(mailboxFQN, cause.toString))
}
}

View file

@ -4,15 +4,18 @@
package akka.docs.actor.mailbox package akka.docs.actor.mailbox
//#imports //#imports
import akka.actor.Actor
import akka.actor.Props import akka.actor.Props
import akka.actor.mailbox.FileDurableMailboxType
//#imports //#imports
//#imports2
import akka.actor.mailbox.FileDurableMailboxType
//#imports2
import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.Actor
class MyActor extends Actor { class MyActor extends Actor {
def receive = { def receive = {
@ -20,14 +23,31 @@ class MyActor extends Actor {
} }
} }
class DurableMailboxDocSpec extends AkkaSpec { object DurableMailboxDocSpec {
val config = """
//#dispatcher-config
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
//#dispatcher-config
"""
}
"define dispatcher with durable mailbox" in { class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
//#define-dispatcher
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
//#dispatcher-config-use
}
"programatically define dispatcher with durable mailbox" in {
//#prog-define-dispatcher
val dispatcher = system.dispatcherFactory.newDispatcher( val dispatcher = system.dispatcherFactory.newDispatcher(
"my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build "my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher))
//#define-dispatcher //#prog-define-dispatcher
myActor ! "hello" myActor ! "hello"
} }

View file

@ -4,7 +4,6 @@
package akka.docs.actor.mailbox; package akka.docs.actor.mailbox;
//#imports //#imports
import akka.actor.mailbox.DurableMailboxType;
import akka.dispatch.MessageDispatcher; import akka.dispatch.MessageDispatcher;
import akka.actor.UntypedActorFactory; import akka.actor.UntypedActorFactory;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
@ -12,8 +11,17 @@ import akka.actor.Props;
//#imports //#imports
//#imports2
import akka.actor.mailbox.DurableMailboxType;
//#imports2
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import akka.testkit.AkkaSpec;
import akka.docs.dispatcher.DispatcherDocSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
@ -21,20 +29,44 @@ import static org.junit.Assert.*;
public class DurableMailboxDocTestBase { public class DurableMailboxDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem",
ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.shutdown();
}
@Test @Test
public void defineDispatcher() { public void configDefinedDispatcher() {
ActorSystem system = ActorSystem.create("MySystem"); //#dispatcher-config-use
//#define-dispatcher MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
}), "myactor");
//#dispatcher-config-use
myActor.tell("test");
}
@Test
public void programaticallyDefinedDispatcher() {
//#prog-define-dispatcher
MessageDispatcher dispatcher = system.dispatcherFactory() MessageDispatcher dispatcher = system.dispatcherFactory()
.newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build(); .newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build();
ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() {
public UntypedActor create() { public UntypedActor create() {
return new MyUntypedActor(); return new MyUntypedActor();
} }
})); }), "myactor");
//#define-dispatcher //#prog-define-dispatcher
myActor.tell("test"); myActor.tell("test");
system.shutdown();
} }
public static class MyUntypedActor extends UntypedActor { public static class MyUntypedActor extends UntypedActor {

View file

@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the
You configure durable mailboxes through the dispatcher. The You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using. actor is oblivious to which type of mailbox it is using.
Here is an example in Scala:
In the configuration of the dispatcher you specify the fully qualified class name
of the mailbox:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,define-dispatcher :include: dispatcher-config
Here is an example of how to create an actor with a durable dispatcher, in Scala:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,dispatcher-config-use
Corresponding example in Java: Corresponding example in Java:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java
:include: imports,define-dispatcher :include: imports,dispatcher-config-use
The actor is oblivious to which type of mailbox it is using. The actor is oblivious to which type of mailbox it is using.
@ -89,14 +96,11 @@ you need.
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type. :ref:`DurableMailbox.General` with the following mailbox type.
Scala:: Config::
mailbox = akka.actor.mailbox.FileDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
You can also configure and tune the file-based durable mailbox. This is done in You can also configure and tune the file-based durable mailbox. This is done in
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type. :ref:`DurableMailbox.General` with the following mailbox type.
Scala:: Config::
mailbox = akka.actor.mailbox.RedisDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.RedisBasedMailbox
}
You also need to configure the IP and port for the Redis server. This is done in You also need to configure the IP and port for the Redis server. This is done in
the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`. the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`.
@ -146,13 +147,11 @@ documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type. :ref:`DurableMailbox.General` with the following mailbox type.
Scala:: Config::
mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType my-dispatcher {
mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox
Java:: }
akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType()
You also need to configure ZooKeeper server addresses, timeouts, etc. This is You also need to configure ZooKeeper server addresses, timeouts, etc. This is
done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`. done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`.
@ -171,13 +170,11 @@ Beanstalk documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type. :ref:`DurableMailbox.General` with the following mailbox type.
Scala:: Config::
mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType my-dispatcher {
mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox
Java:: }
akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType()
You also need to configure the IP, and port, and so on, for the Beanstalk You also need to configure the IP, and port, and so on, for the Beanstalk
server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the
@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as
You configure durable mailboxes through the dispatcher, as described in You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type. :ref:`DurableMailbox.General` with the following mailbox type.
Scala:: Config::
mailbox = akka.actor.mailbox.MongoDurableMailboxType my-dispatcher {
mailboxType = akka.actor.mailbox.MongoBasedMailbox
Java:: }
akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType()
You will need to configure the URI for the MongoDB server, using the URI Format specified in the You will need to configure the URI for the MongoDB server, using the URI Format specified in the
`MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in `MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in

View file

@ -24,6 +24,7 @@ import akka.remote.RemoteActorRefProvider
import akka.remote.netty.NettyRemoteServer import akka.remote.netty.NettyRemoteServer
import akka.serialization.Serialization import akka.serialization.Serialization
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.dispatch.CustomMailboxType
private[akka] object DurableExecutableMailboxConfig { private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r val Name = "[\\.\\/\\$\\s]".r
@ -73,39 +74,11 @@ trait DurableMessageSerialization {
} }
abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType { case object RedisDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox")
val constructorSignature = Array[Class[_]](classOf[ActorCell]) case object MongoDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")
case object BeanstalkDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")
val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { case object FileDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")
case Right(clazz) clazz case object ZooKeeperDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString))
}
//TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc
def create(receiver: ActorCell): Mailbox = {
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance.asInstanceOf[Mailbox]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString))
}
}
}
case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox")
case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox")
case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")
case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox")
case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")
case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN)
/** /**
* Java API for the mailbox types. Usage: * Java API for the mailbox types. Usage:
@ -115,31 +88,9 @@ case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(
* </code></pre> * </code></pre>
*/ */
object DurableMailboxType { object DurableMailboxType {
def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType def redisDurableMailboxType(): MailboxType = RedisDurableMailboxType
def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType def mongoDurableMailboxType(): MailboxType = MongoDurableMailboxType
def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType def beanstalkDurableMailboxType(): MailboxType = BeanstalkDurableMailboxType
def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType def fileDurableMailboxType(): MailboxType = FileDurableMailboxType
def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType def zooKeeperDurableMailboxType(): MailboxType = ZooKeeperDurableMailboxType
def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN)
}
/**
* Configurator for the DurableMailbox
* Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file",
* or a full class name of the Mailbox implementation.
*/
class DurableMailboxConfigurator {
// TODO PN #896: when and how is this class supposed to be used? Can we remove it?
def mailboxType(config: Config): MailboxType = {
if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox")
config.getString("storage") match {
case "redis" RedisDurableMailboxType
case "mongodb" MongoDurableMailboxType
case "beanstalk" BeanstalkDurableMailboxType
case "zookeeper" ZooKeeperDurableMailboxType
case "file" FileDurableMailboxType
case fqn FqnDurableMailboxType(fqn)
}
}
} }

View file

@ -1,15 +1,16 @@
package akka.actor.mailbox package akka.actor.mailbox
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.CountDownLatch
import org.scalatest.WordSpec import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.testkit.AkkaSpec
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.dispatch.MailboxType
import akka.testkit.AkkaSpec
object DurableMailboxSpecActorFactory { object DurableMailboxSpecActorFactory {
@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory {
} }
abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach { abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach {
import DurableMailboxSpecActorFactory._ import DurableMailboxSpecActorFactory._
implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build