Changed so that the configured FQCN of the mailboxType must be a MailboxType, not the Mailbox. See #1458

This commit is contained in:
Patrik Nordwall 2011-12-21 19:37:18 +01:00
parent 6eb7e1d438
commit c4401f1ca8
17 changed files with 71 additions and 50 deletions

View file

@ -8,6 +8,7 @@ import akka.util.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorContext import akka.actor.ActorContext
import com.typesafe.config.Config
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -152,10 +153,14 @@ class PriorityMailboxSpec extends MailboxSpec {
object CustomMailboxSpec { object CustomMailboxSpec {
val config = """ val config = """
my-dispatcher { my-dispatcher {
mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox" mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailboxType"
} }
""" """
class MyMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new MyMailbox(owner)
}
class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]() final val queue = new ConcurrentLinkedQueue[Envelope]()

View file

@ -166,7 +166,8 @@ akka {
mailbox-push-timeout-time = 10s mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded # FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used. # mailbox is used. The Class of the FQCN must have a constructor with a
# com.typesafe.config.Config parameter.
mailboxType = "" mailboxType = ""
} }

View file

@ -17,6 +17,7 @@ import akka.event.EventStream
import akka.actor.ActorSystem.Settings import akka.actor.ActorSystem.Settings
import com.typesafe.config.Config import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.util.ReflectiveAccess
final case class Envelope(val message: Any, val sender: ActorRef) { final case class Envelope(val message: Any, val sender: ActorRef) {
if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null") if (message.isInstanceOf[AnyRef] && (message.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message is null")
@ -282,9 +283,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
/** /**
* Returns a factory for the [[akka.dispatch.Mailbox]] given the configuration. * Returns a factory for the [[akka.dispatch.Mailbox]] given the configuration.
* Default implementation use [[akka.dispatch.CustomMailboxType]] if * Default implementation instantiate the [[akka.dispatch.MailboxType]] specified
* mailboxType config property is specified, otherwise [[akka.dispatch.UnboundedMailbox]] * as FQCN in mailboxType config property. If mailboxType is unspecified (empty)
* when capacity is < 1, otherwise [[akka.dispatch.BoundedMailbox]]. * then [[akka.dispatch.UnboundedMailbox]] is used when capacity is < 1,
* otherwise [[akka.dispatch.BoundedMailbox]].
*/ */
def mailboxType(): MailboxType = { def mailboxType(): MailboxType = {
config.getString("mailboxType") match { config.getString("mailboxType") match {
@ -295,7 +297,16 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration) BoundedMailbox(capacity, duration)
} }
case fqn new CustomMailboxType(fqn) case fqcn
val constructorSignature = Array[Class[_]](classOf[Config])
ReflectiveAccess.createInstance[MailboxType](fqcn, constructorSignature, Array[AnyRef](config)) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException(
("Cannot instantiate MailboxType [%s], defined in [%s], " +
"make sure it has constructor with a [com.typesafe.config.Config] parameter")
.format(fqcn, config.getString("id")), exception)
}
} }
} }

View file

@ -37,6 +37,13 @@ object Mailbox {
/** /**
* Custom mailbox implementations are implemented by extending this class. * Custom mailbox implementations are implemented by extending this class.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/ */
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
@ -373,29 +380,3 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
} }
} }
/**
* Mailbox factory that creates instantiates the implementation from a
* fully qualified class name. The implementation class must have
* a constructor with a [[akka.actor.ActorContext]] parameter.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
override def create(receiver: ActorContext): Mailbox = {
val constructorSignature = Array[Class[_]](classOf[ActorContext])
ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s".
format(mailboxFQN, exception.toString))
}
}
}

View file

@ -23,7 +23,7 @@ object DurableMailboxDocSpec {
val config = """ val config = """
//#dispatcher-config //#dispatcher-config
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox mailboxType = akka.actor.mailbox.FileBasedMailboxType
} }
//#dispatcher-config //#dispatcher-config
""" """

View file

@ -99,7 +99,7 @@ You configure durable mailboxes through the dispatcher, as described in
Config:: Config::
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox mailboxType = akka.actor.mailbox.FileBasedMailboxType
} }
You can also configure and tune the file-based durable mailbox. This is done in You can also configure and tune the file-based durable mailbox. This is done in
@ -124,7 +124,7 @@ You configure durable mailboxes through the dispatcher, as described in
Config:: Config::
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.RedisBasedMailbox mailboxType = akka.actor.mailbox.RedisBasedMailboxType
} }
You also need to configure the IP and port for the Redis server. This is done in You also need to configure the IP and port for the Redis server. This is done in
@ -150,7 +150,7 @@ You configure durable mailboxes through the dispatcher, as described in
Config:: Config::
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox mailboxType = akka.actor.mailbox.ZooKeeperBasedMailboxType
} }
You also need to configure ZooKeeper server addresses, timeouts, etc. This is You also need to configure ZooKeeper server addresses, timeouts, etc. This is
@ -173,7 +173,7 @@ You configure durable mailboxes through the dispatcher, as described in
Config:: Config::
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox mailboxType = akka.actor.mailbox.BeanstalkBasedMailboxType
} }
You also need to configure the IP, and port, and so on, for the Beanstalk You also need to configure the IP, and port, and so on, for the Beanstalk
@ -202,7 +202,7 @@ You configure durable mailboxes through the dispatcher, as described in
Config:: Config::
my-dispatcher { my-dispatcher {
mailboxType = akka.actor.mailbox.MongoBasedMailbox mailboxType = akka.actor.mailbox.MongoBasedMailboxType
} }
You will need to configure the URI for the MongoDB server, using the URI Format specified in the You will need to configure the URI for the MongoDB server, using the URI Format specified in the

View file

@ -13,9 +13,15 @@ import akka.actor.ActorContext
import akka.dispatch.Envelope import akka.dispatch.Envelope
import akka.event.Logging import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new BeanstalkBasedMailbox(owner)
}
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */

View file

@ -1,11 +1,9 @@
package akka.actor.mailbox package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
object BeanstalkBasedMailboxSpec { object BeanstalkBasedMailboxSpec {
val config = """ val config = """
Beanstalkd-dispatcher { Beanstalkd-dispatcher {
mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox mailboxType = akka.actor.mailbox.BeanstalkBasedMailboxType
throughput = 1 throughput = 1
} }
""" """

View file

@ -9,6 +9,12 @@ import akka.actor.ActorContext
import akka.dispatch.Envelope import akka.dispatch.Envelope
import akka.event.Logging import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
class FileBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new FileBasedMailbox(owner)
}
class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {

View file

@ -1,12 +1,11 @@
package akka.actor.mailbox package akka.actor.mailbox
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import akka.dispatch.CustomMailboxType
object FileBasedMailboxSpec { object FileBasedMailboxSpec {
val config = """ val config = """
File-dispatcher { File-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox mailboxType = akka.actor.mailbox.FileBasedMailboxType
throughput = 1 throughput = 1
} }
""" """

View file

@ -23,7 +23,6 @@ import akka.remote.RemoteActorRefProvider
import akka.remote.netty.NettyRemoteServer import akka.remote.netty.NettyRemoteServer
import akka.serialization.Serialization import akka.serialization.Serialization
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.dispatch.CustomMailboxType
private[akka] object DurableExecutableMailboxConfig { private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r val Name = "[\\.\\/\\$\\s]".r

View file

@ -12,9 +12,15 @@ import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise }
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.MailboxType
import com.typesafe.config.Config
class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxException(message: String) extends AkkaException(message)
class MongoBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new MongoBasedMailbox(owner)
}
/** /**
* A "naive" durable mailbox which uses findAndRemove; it's possible if the actor crashes * A "naive" durable mailbox which uses findAndRemove; it's possible if the actor crashes
* after consuming a message that the message could be lost. * after consuming a message that the message could be lost.

View file

@ -8,12 +8,11 @@ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
object MongoBasedMailboxSpec { object MongoBasedMailboxSpec {
val config = """ val config = """
mongodb-dispatcher { mongodb-dispatcher {
mailboxType = akka.actor.mailbox.MongoBasedMailbox mailboxType = akka.actor.mailbox.MongoBasedMailboxType
throughput = 1 throughput = 1
} }
""" """

View file

@ -10,9 +10,15 @@ import akka.actor.ActorContext
import akka.dispatch.Envelope import akka.dispatch.Envelope
import akka.event.Logging import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new RedisBasedMailbox(owner)
}
class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system) private val settings = RedisBasedMailboxExtension(owner.system)

View file

@ -1,10 +1,9 @@
package akka.actor.mailbox package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
object RedisBasedMailboxSpec { object RedisBasedMailboxSpec {
val config = """ val config = """
Redis-dispatcher { Redis-dispatcher {
mailboxType = akka.actor.mailbox.RedisBasedMailbox mailboxType = akka.actor.mailbox.RedisBasedMailboxType
throughput = 1 throughput = 1
} }
""" """

View file

@ -14,9 +14,15 @@ import akka.dispatch.Envelope
import akka.event.Logging import akka.event.Logging
import akka.cluster.zookeeper.ZooKeeperQueue import akka.cluster.zookeeper.ZooKeeperQueue
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner)
}
class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system) private val settings = ZooKeeperBasedMailboxExtension(owner.system)

View file

@ -4,13 +4,12 @@ import akka.actor.{ Actor, LocalActorRef }
import akka.cluster.zookeeper._ import akka.cluster.zookeeper._
import org.I0Itec.zkclient._ import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
import akka.actor.ActorRef import akka.actor.ActorRef
object ZooKeeperBasedMailboxSpec { object ZooKeeperBasedMailboxSpec {
val config = """ val config = """
ZooKeeper-dispatcher { ZooKeeper-dispatcher {
mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox mailboxType = akka.actor.mailbox.ZooKeeperBasedMailboxType
throughput = 1 throughput = 1
} }
""" """