Merge branch 'wip-1844-mailboxes-∂π'

This commit is contained in:
Roland 2012-02-21 14:07:30 +01:00
commit 2d765718cf
14 changed files with 176 additions and 144 deletions

View file

@ -6,15 +6,14 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._ import akka.util._
import akka.util.duration._ import akka.util.duration._
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.ActorRef import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
import akka.actor.ActorContext
import com.typesafe.config.Config import com.typesafe.config.Config
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
def name: String def name: String
def factory: MailboxType Mailbox def factory: MailboxType MessageQueue
name should { name should {
"create an unbounded mailbox" in { "create an unbounded mailbox" in {
@ -77,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system)
def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
q must not be null q must not be null
q match { q match {
case aQueue: BlockingQueue[_] case aQueue: BlockingQueue[_]
@ -136,8 +135,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
class DefaultMailboxSpec extends MailboxSpec { class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation" lazy val name = "The default mailbox implementation"
def factory = { def factory = {
case u: UnboundedMailbox u.create(null) case u: UnboundedMailbox u.create(None)
case b: BoundedMailbox b.create(null) case b: BoundedMailbox b.create(None)
} }
} }
@ -145,8 +144,8 @@ class PriorityMailboxSpec extends MailboxSpec {
val comparator = PriorityGenerator(_.##) val comparator = PriorityGenerator(_.##)
lazy val name = "The priority mailbox implementation" lazy val name = "The priority mailbox implementation"
def factory = { def factory = {
case UnboundedMailbox() UnboundedPriorityMailbox(comparator).create(null) case UnboundedMailbox() UnboundedPriorityMailbox(comparator).create(None)
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None)
} }
} }
@ -158,11 +157,13 @@ object CustomMailboxSpec {
""" """
class MyMailboxType(config: Config) extends MailboxType { class MyMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new MyMailbox(owner) override def create(owner: Option[ActorContext]) = owner match {
case Some(o) new MyMailbox(o)
case None throw new Exception("no mailbox owner given")
}
} }
class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]() final val queue = new ConcurrentLinkedQueue[Envelope]()
} }
} }
@ -171,8 +172,9 @@ object CustomMailboxSpec {
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must { "Dispatcher configuration" must {
"support custom mailboxType" in { "support custom mailboxType" in {
val dispatcher = system.dispatchers.lookup("my-dispatcher") val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher"))
dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue
queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox])
} }
} }
} }

View file

@ -9,6 +9,7 @@ import com.typesafe.config.Config
import akka.dispatch.DispatcherPrerequisites import akka.dispatch.DispatcherPrerequisites
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageDispatcherConfigurator import akka.dispatch.MessageDispatcherConfigurator
import akka.dispatch.UnboundedMailbox
object CallingThreadDispatcherModelSpec { object CallingThreadDispatcherModelSpec {
import ActorModelSpec._ import ActorModelSpec._
@ -31,7 +32,7 @@ object CallingThreadDispatcherModelSpec {
extends MessageDispatcherConfigurator(config, prerequisites) { extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher = private val instance: MessageDispatcher =
new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor {
override def id: String = config.getString("id") override def id: String = config.getString("id")
} }

View file

@ -463,15 +463,19 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def deadLetters: ActorRef = provider.deadLetters def deadLetters: ActorRef = provider.deadLetters
val deadLetterMailbox: Mailbox = new Mailbox(null) { val deadLetterQueue: MessageQueue = new MessageQueue {
def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
def dequeue() = null
def hasMessages = false
def numberOfMessages = 0
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) {
becomeClosed() becomeClosed()
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver)
override def dequeue() = null def systemDrain(): SystemMessage = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } def hasSystemMessages = false
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
} }
def locker: Locker = provider.locker def locker: Locker = provider.locker

View file

@ -40,47 +40,9 @@ class BalancingDispatcher(
def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path
})) }))
val messageQueue: MessageQueue = mailboxType match { val messageQueue: MessageQueue = mailboxType.create(None)
case UnboundedMailbox()
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
case BoundedMailbox(cap, timeout) protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue)
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](cap)
final val pushTimeOut = timeout
}
case other throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]")
}
protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor)
class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue {
final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle)
final def dequeue(): Envelope = messageQueue.dequeue()
final def numberOfMessages: Int = messageQueue.numberOfMessages
final def hasMessages: Boolean = messageQueue.hasMessages
override def cleanUp(): Unit = {
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) {
val dlq = actor.systemImpl.deadLetterMailbox
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
message = next
}
}
}
}
protected[akka] override def register(actor: ActorCell): Unit = { protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor) super.register(actor)
@ -111,4 +73,23 @@ class BalancingDispatcher(
scheduleOne() scheduleOne()
} }
} }
class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue)
extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue {
override def cleanUp(): Unit = {
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
if (hasSystemMessages) {
val dlq = actor.systemImpl.deadLetterMailbox
var message = systemDrain()
while (message ne null) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlq.systemEnqueue(actor.self, message)
message = next
}
}
}
}

View file

@ -65,7 +65,7 @@ class Dispatcher(
} }
} }
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue
protected[akka] def shutdown: Unit = protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate { Option(executorService.getAndSet(new ExecutorServiceDelegate {

View file

@ -34,26 +34,21 @@ object Mailbox {
final val debug = false final val debug = false
} }
/**
* Custom mailbox implementations are implemented by extending this class.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
/** /**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses. * but can't be exposed to user defined mailbox subclasses.
* *
*/ */
private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
import Mailbox._ import Mailbox._
def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
def dequeue(): Envelope = messageQueue.dequeue()
def hasMessages: Boolean = messageQueue.hasMessages
def numberOfMessages: Int = messageQueue.numberOfMessages
@volatile @volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@ -216,25 +211,20 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
*/ */
protected[dispatch] def cleanUp(): Unit = protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox if (actor ne null) { // actor is null for the deadLetterMailbox
val dlq = actor.systemImpl.deadLetterMailbox val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) { if (hasSystemMessages) {
var message = systemDrain() var message = systemDrain()
while (message ne null) { while (message ne null) {
// message must be virgin before being able to systemEnqueue again // message must be virgin before being able to systemEnqueue again
val next = message.next val next = message.next
message.next = null message.next = null
dlq.systemEnqueue(actor.self, message) dlm.systemEnqueue(actor.self, message)
message = next message = next
} }
} }
if (hasMessages) { if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
var envelope = dequeue messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
while (envelope ne null) {
dlq.enqueue(actor.self, envelope)
envelope = dequeue
}
}
} }
} }
@ -260,9 +250,20 @@ trait MessageQueue {
* Indicates whether this queue is non-empty. * Indicates whether this queue is non-empty.
*/ */
def hasMessages: Boolean def hasMessages: Boolean
/**
* Called when the mailbox this queue belongs to is disposed of. Normally it
* is expected to transfer all remaining messages into the dead letter queue
* which is passed in. The owner of this MessageQueue is passed in if
* available (e.g. for creating DeadLetters()), /deadletters otherwise.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
} }
trait SystemMessageQueue { /**
* Internal mailbox implementation detail.
*/
private[akka] trait SystemMessageQueue {
/** /**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/ */
@ -276,7 +277,10 @@ trait SystemMessageQueue {
def hasSystemMessages: Boolean def hasSystemMessages: Boolean
} }
trait DefaultSystemMessageQueue { self: Mailbox /**
* Internal mailbox implementation detail.
*/
private[akka] trait DefaultSystemMessageQueue { self: Mailbox
@tailrec @tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
@ -329,21 +333,30 @@ trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope] def queue: Queue[Envelope]
def numberOfMessages = queue.size def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
envelope = dequeue
}
}
}
} }
/** /**
* Mailbox configuration. * Mailbox configuration.
*/ */
trait MailboxType { trait MailboxType {
def create(receiver: ActorContext): Mailbox def create(owner: Option[ActorContext]): MessageQueue
} }
/** /**
* It's a case class for Java (new UnboundedMailbox) * It's a case class for Java (new UnboundedMailbox)
*/ */
case class UnboundedMailbox() extends MailboxType { case class UnboundedMailbox() extends MailboxType {
final override def create(receiver: ActorContext): Mailbox = final override def create(owner: Option[ActorContext]): MessageQueue =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]() final val queue = new ConcurrentLinkedQueue[Envelope]()
} }
} }
@ -353,16 +366,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(receiver: ActorContext): Mailbox = final override def create(owner: Option[ActorContext]): MessageQueue =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](capacity) final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut final val pushTimeOut = BoundedMailbox.this.pushTimeOut
} }
} }
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
final override def create(receiver: ActorContext): Mailbox = final override def create(owner: Option[ActorContext]): MessageQueue =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp) final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
} }
} }
@ -372,8 +385,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(receiver: ActorContext): Mailbox = final override def create(owner: Option[ActorContext]): MessageQueue =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
} }

View file

@ -12,23 +12,28 @@ import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType import akka.dispatch.MailboxType
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {} class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
class BeanstalkBasedMailboxType(config: Config) extends MailboxType { class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new BeanstalkBasedMailbox(owner) override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new BeanstalkBasedMessageQueue(o)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
} }
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system) private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt
val log = Logging(system, "BeanstalkBasedMailbox") val log = Logging(system, "BeanstalkBasedMessageQueue")
private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) } private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) }
@ -107,4 +112,6 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
private def reconnect(name: String): ThreadLocal[Client] = { private def reconnect(name: String): ThreadLocal[Client] = {
new ThreadLocal[Client] { override def initialValue: Client = connect(name) } new ThreadLocal[Client] { override def initialValue: Client = connect(name) }
} }
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
} }

View file

@ -6,20 +6,24 @@ package akka.actor.mailbox
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import akka.actor.ActorContext import akka.actor.ActorContext
import akka.dispatch.Envelope import akka.dispatch.{ Envelope, MessageQueue }
import akka.event.Logging import akka.event.Logging
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.dispatch.MailboxType import akka.dispatch.MailboxType
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.NonFatal import akka.util.NonFatal
import akka.config.ConfigurationException
class FileBasedMailboxType(config: Config) extends MailboxType { class FileBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new FileBasedMailbox(owner) override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new FileBasedMessageQueue(o)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
} }
class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMailbox") val log = Logging(system, "FileBasedMessageQueue")
private val settings = FileBasedMailboxExtension(owner.system) private val settings = FileBasedMailboxExtension(owner.system)
val queuePath = settings.QueuePath val queuePath = settings.QueuePath
@ -69,4 +73,6 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with
case NonFatal(_) false case NonFatal(_) false
} }
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
} }

View file

@ -4,7 +4,7 @@
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem }
import akka.dispatch.{ Envelope, DefaultSystemMessageQueue, CustomMailbox } import akka.dispatch.{ Envelope, MessageQueue }
import akka.remote.MessageSerializer import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol }
@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r val Name = "[\\.\\/\\$\\s]".r
} }
abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue {
import DurableExecutableMailboxConfig._ import DurableExecutableMailboxConfig._
def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem]
@ -22,7 +22,7 @@ abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(own
} }
trait DurableMessageSerialization { this: DurableMailbox trait DurableMessageSerialization { this: DurableMessageQueue
def serialize(durableMessage: Envelope): Array[Byte] = { def serialize(durableMessage: Envelope): Array[Byte] = {

View file

@ -17,7 +17,7 @@ import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer import akka.remote.MessageSerializer
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] { class BSONSerializableMessageQueue(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] {
protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = { protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = {
@ -25,7 +25,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
val b = Map.newBuilder[String, Any] val b = Map.newBuilder[String, Any]
b += "_id" -> msg._id b += "_id" -> msg._id
b += "ownerPath" -> msg.ownerPath b += "ownerPath" -> msg.ownerPath
b += "senderPath" -> msg.sender.path b += "senderPath" -> msg.sender.path.toString
/** /**
* TODO - Figure out a way for custom serialization of the message instance * TODO - Figure out a way for custom serialization of the message instance

View file

@ -14,11 +14,16 @@ import akka.dispatch.{ Await, Promise, Envelope }
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.MailboxType import akka.dispatch.MailboxType
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
class MongoBasedMailboxException(message: String) extends AkkaException(message) class MongoBasedMailboxException(message: String) extends AkkaException(message)
class MongoBasedMailboxType(config: Config) extends MailboxType { class MongoBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new MongoBasedMailbox(owner) override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MongoBasedMessageQueue(o)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
} }
/** /**
@ -32,14 +37,16 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
* *
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a> * @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/ */
class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) { class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage // this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
private val dispatcher = owner.dispatcher
private val settings = MongoBasedMailboxExtension(owner.system) private val settings = MongoBasedMailboxExtension(owner.system)
val log = Logging(system, "MongoBasedMailbox") val log = Logging(system, "MongoBasedMessageQueue")
@volatile @volatile
private var mongo = connect() private var mongo = connect()
@ -129,4 +136,6 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
} }
} }
} }
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
} }

View file

@ -12,21 +12,26 @@ import akka.actor.ActorRef
import akka.dispatch.MailboxType import akka.dispatch.MailboxType
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.NonFatal import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
class RedisBasedMailboxException(message: String) extends AkkaException(message) class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailboxType(config: Config) extends MailboxType { class RedisBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new RedisBasedMailbox(owner) override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new RedisBasedMessageQueue(o)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
} }
class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system) private val settings = RedisBasedMailboxExtension(owner.system)
@volatile @volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
val log = Logging(system, "RedisBasedMailbox") val log = Logging(system, "RedisBasedMessageQueue")
def enqueue(receiver: ActorRef, envelope: Envelope) { def enqueue(receiver: ActorRef, envelope: Envelope) {
withErrorHandling { withErrorHandling {
@ -75,5 +80,7 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit
throw error throw error
} }
} }
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
} }

View file

@ -13,20 +13,25 @@ import akka.actor.ActorRef
import akka.dispatch.MailboxType import akka.dispatch.MailboxType
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.NonFatal import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType { class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner) override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new ZooKeeperBasedMessageQueue(o)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
} }
class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization { class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system) private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues" val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s" val queuePathTemplate = queueNode + "/%s"
val log = Logging(system, "ZooKeeperBasedMailbox") val log = Logging(system, "ZooKeeperBasedMessageQueue")
private val zkClient = new AkkaZkClient( private val zkClient = new AkkaZkClient(
settings.ZkServerAddresses, settings.ZkServerAddresses,
@ -59,7 +64,7 @@ class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
case e: Exception false case e: Exception false
} }
override def cleanUp() { def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
try { try {
zkClient.close() zkClient.close()
} catch { } catch {

View file

@ -9,10 +9,12 @@ import java.util.LinkedList
import scala.annotation.tailrec import scala.annotation.tailrec
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.dispatch.{ MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration } import akka.util.{ Switch, Duration }
import akka.util.NonFatal import akka.util.NonFatal
import akka.actor.ActorContext
import akka.dispatch.MessageQueue
/* /*
* Locking rules: * Locking rules:
@ -75,9 +77,12 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
val q = ref.get val q = ref.get
if (q ne null) && (q ne own) if (q ne null) && (q ne own)
} { } {
while (q.peek ne null) { val owner = mbox.actor.self
var msg = q.q.dequeue()
while (msg ne null) {
// this is safe because this method is only ever called while holding the suspendSwitch monitor // this is safe because this method is only ever called while holding the suspendSwitch monitor
own.push(q.pop) own.q.enqueue(owner, msg)
msg = q.q.dequeue()
} }
} }
} }
@ -115,6 +120,7 @@ object CallingThreadDispatcher {
*/ */
class CallingThreadDispatcher( class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites, _prerequisites: DispatcherPrerequisites,
val mailboxType: MailboxType,
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) { val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._ import CallingThreadDispatcher._
@ -122,7 +128,7 @@ class CallingThreadDispatcher(
override def id: String = Id override def id: String = Id
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def shutdown() {} protected[akka] override def shutdown() {}
@ -183,10 +189,10 @@ class CallingThreadDispatcher(
case mbox: CallingThreadMailbox case mbox: CallingThreadMailbox
val queue = mbox.queue val queue = mbox.queue
val execute = mbox.suspendSwitch.fold { val execute = mbox.suspendSwitch.fold {
queue.push(handle) queue.q.enqueue(receiver.self, handle)
false false
} { } {
queue.push(handle) queue.q.enqueue(receiver.self, handle)
if (!queue.isActive) { if (!queue.isActive) {
queue.enter queue.enter
true true
@ -219,7 +225,7 @@ class CallingThreadDispatcher(
queue.leave queue.leave
null null
} { } {
val ret = if (mbox.isClosed) null else queue.pop val ret = if (mbox.isClosed) null else queue.q.dequeue()
if (ret eq null) queue.leave if (ret eq null) queue.leave
ret ret
} }
@ -261,19 +267,13 @@ class CallingThreadDispatcher(
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) { extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new CallingThreadDispatcher(prerequisites)
private val instance = new CallingThreadDispatcher(prerequisites, mailboxType())
override def dispatcher(): MessageDispatcher = instance override def dispatcher(): MessageDispatcher = instance
} }
class NestingQueue { class NestingQueue(val q: MessageQueue) {
private var q = new LinkedList[Envelope]()
def size = q.size
def isEmpty = q.isEmpty
def push(handle: Envelope) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
@volatile @volatile
private var active = false private var active = false
def enter { if (active) sys.error("already active") else active = true } def enter { if (active) sys.error("already active") else active = true }
@ -281,11 +281,11 @@ class NestingQueue {
def isActive = active def isActive = active
} }
class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue {
private val q = new ThreadLocal[NestingQueue]() { private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = { override def initialValue = {
val queue = new NestingQueue val queue = new NestingQueue(mailboxType.create(Some(actor)))
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue) CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
queue queue
} }
@ -296,11 +296,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
val ctdLock = new ReentrantLock val ctdLock = new ReentrantLock
val suspendSwitch = new Switch val suspendSwitch = new Switch
override def enqueue(receiver: ActorRef, msg: Envelope) {}
override def dequeue() = null
override def hasMessages = queue.isEmpty
override def numberOfMessages = queue.size
override def cleanUp(): Unit = { override def cleanUp(): Unit = {
/* /*
* This is called from dispatcher.unregister, i.e. under this.lock. If * This is called from dispatcher.unregister, i.e. under this.lock. If
@ -308,8 +303,10 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
* the gather operation, tough luck: no guaranteed delivery to deadLetters. * the gather operation, tough luck: no guaranteed delivery to deadLetters.
*/ */
suspendSwitch.locked { suspendSwitch.locked {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue) val q = queue
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
super.cleanUp() super.cleanUp()
q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue)
} }
} }
} }