diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 4f787a730f..21c6c75cb2 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -6,15 +6,14 @@ import java.util.concurrent.ConcurrentLinkedQueue import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec -import akka.actor.ActorRef -import akka.actor.ActorContext +import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef } import com.typesafe.config.Config @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { def name: String - def factory: MailboxType ⇒ Mailbox + def factory: MailboxType ⇒ MessageQueue name should { "create an unbounded mailbox" in { @@ -77,7 +76,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn def createMessageInvocation(msg: Any): Envelope = Envelope(msg, system.deadLetters)(system) - def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { + def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { q must not be null q match { case aQueue: BlockingQueue[_] ⇒ @@ -136,8 +135,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case u: UnboundedMailbox ⇒ u.create(null) - case b: BoundedMailbox ⇒ b.create(null) + case u: UnboundedMailbox ⇒ u.create(None) + case b: BoundedMailbox ⇒ b.create(None) } } @@ -145,8 +144,8 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null) - case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) + case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(None) + case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None) } } @@ -158,11 +157,13 @@ object CustomMailboxSpec { """ class MyMailboxType(config: Config) extends MailboxType { - override def create(owner: ActorContext) = new MyMailbox(owner) + override def create(owner: Option[ActorContext]) = owner match { + case Some(o) ⇒ new MyMailbox(o) + case None ⇒ throw new Exception("no mailbox owner given") + } } - class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) - with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + class MyMailbox(owner: ActorContext) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -171,8 +172,9 @@ object CustomMailboxSpec { class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { "Dispatcher configuration" must { "support custom mailboxType" in { - val dispatcher = system.dispatchers.lookup("my-dispatcher") - dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) + val actor = system.actorOf(Props.empty.withDispatcher("my-dispatcher")) + val queue = actor.asInstanceOf[LocalActorRef].underlying.mailbox.messageQueue + queue.getClass must be(classOf[CustomMailboxSpec.MyMailbox]) } } } diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index 4693a56536..5b023054d4 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -9,6 +9,7 @@ import com.typesafe.config.Config import akka.dispatch.DispatcherPrerequisites import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcherConfigurator +import akka.dispatch.UnboundedMailbox object CallingThreadDispatcherModelSpec { import ActorModelSpec._ @@ -31,7 +32,7 @@ object CallingThreadDispatcherModelSpec { extends MessageDispatcherConfigurator(config, prerequisites) { private val instance: MessageDispatcher = - new CallingThreadDispatcher(prerequisites) with MessageDispatcherInterceptor { + new CallingThreadDispatcher(prerequisites, UnboundedMailbox()) with MessageDispatcherInterceptor { override def id: String = config.getString("id") } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 63bb2caa54..72c5d4a13f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -463,15 +463,19 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf def deadLetters: ActorRef = provider.deadLetters - val deadLetterMailbox: Mailbox = new Mailbox(null) { + val deadLetterQueue: MessageQueue = new MessageQueue { + def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + def dequeue() = null + def hasMessages = false + def numberOfMessages = 0 + def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = () + } + + val deadLetterMailbox: Mailbox = new Mailbox(null, deadLetterQueue) { becomeClosed() - override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } - override def dequeue() = null - override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 + def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) + def systemDrain(): SystemMessage = null + def hasSystemMessages = false } def locker: Locker = provider.locker diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a7f03db5a0..e95f54b88b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -40,47 +40,9 @@ class BalancingDispatcher( def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path })) - val messageQueue: MessageQueue = mailboxType match { - case UnboundedMailbox() ⇒ - new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } + val messageQueue: MessageQueue = mailboxType.create(None) - case BoundedMailbox(cap, timeout) ⇒ - new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } - - case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") - } - - protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - - class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) - - final def dequeue(): Envelope = messageQueue.dequeue() - - final def numberOfMessages: Int = messageQueue.numberOfMessages - - final def hasMessages: Boolean = messageQueue.hasMessages - - override def cleanUp(): Unit = { - //Don't call the original implementation of this since it scraps all messages, and we don't want to do that - if (hasSystemMessages) { - val dlq = actor.systemImpl.deadLetterMailbox - var message = systemDrain() - while (message ne null) { - // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - dlq.systemEnqueue(actor.self, message) - message = next - } - } - } - } + protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor, messageQueue) protected[akka] override def register(actor: ActorCell): Unit = { super.register(actor) @@ -111,4 +73,23 @@ class BalancingDispatcher( scheduleOne() } -} \ No newline at end of file +} + +class SharingMailbox(_actor: ActorCell, _messageQueue: MessageQueue) + extends Mailbox(_actor, _messageQueue) with DefaultSystemMessageQueue { + + override def cleanUp(): Unit = { + //Don't call the original implementation of this since it scraps all messages, and we don't want to do that + if (hasSystemMessages) { + val dlq = actor.systemImpl.deadLetterMailbox + var message = systemDrain() + while (message ne null) { + // message must be “virgin” before being able to systemEnqueue again + val next = message.next + message.next = null + dlq.systemEnqueue(actor.self, message) + message = next + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 2046f02286..5537b01244 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -65,7 +65,7 @@ class Dispatcher( } } - protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) + protected[akka] def createMailbox(actor: ActorCell): Mailbox = new Mailbox(actor, mailboxType.create(Some(actor))) with DefaultSystemMessageQueue protected[akka] def shutdown: Unit = Option(executorService.getAndSet(new ExecutorServiceDelegate { diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 4c50cb5c8d..f25c6571e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -34,26 +34,21 @@ object Mailbox { final val debug = false } -/** - * Custom mailbox implementations are implemented by extending this class. - * E.g. - *
- * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
- * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
- * val queue = new ConcurrentLinkedQueue[Envelope]()
- * }
- *
- */
-abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
-
/**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
*/
-private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
+private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
+ extends SystemMessageQueue with Runnable {
+
import Mailbox._
+ def enqueue(receiver: ActorRef, msg: Envelope): Unit = messageQueue.enqueue(receiver, msg)
+ def dequeue(): Envelope = messageQueue.dequeue()
+ def hasMessages: Boolean = messageQueue.hasMessages
+ def numberOfMessages: Int = messageQueue.numberOfMessages
+
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
@@ -216,25 +211,20 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue
*/
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
- val dlq = actor.systemImpl.deadLetterMailbox
+ val dlm = actor.systemImpl.deadLetterMailbox
if (hasSystemMessages) {
var message = systemDrain()
while (message ne null) {
// message must be “virgin” before being able to systemEnqueue again
val next = message.next
message.next = null
- dlq.systemEnqueue(actor.self, message)
+ dlm.systemEnqueue(actor.self, message)
message = next
}
}
- if (hasMessages) {
- var envelope = dequeue
- while (envelope ne null) {
- dlq.enqueue(actor.self, envelope)
- envelope = dequeue
- }
- }
+ if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
+ messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
}
}
@@ -260,9 +250,20 @@ trait MessageQueue {
* Indicates whether this queue is non-empty.
*/
def hasMessages: Boolean
+
+ /**
+ * Called when the mailbox this queue belongs to is disposed of. Normally it
+ * is expected to transfer all remaining messages into the dead letter queue
+ * which is passed in. The owner of this MessageQueue is passed in if
+ * available (e.g. for creating DeadLetters()), “/deadletters” otherwise.
+ */
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
}
-trait SystemMessageQueue {
+/**
+ * Internal mailbox implementation detail.
+ */
+private[akka] trait SystemMessageQueue {
/**
* Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list.
*/
@@ -276,7 +277,10 @@ trait SystemMessageQueue {
def hasSystemMessages: Boolean
}
-trait DefaultSystemMessageQueue { self: Mailbox ⇒
+/**
+ * Internal mailbox implementation detail.
+ */
+private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
@@ -329,21 +333,30 @@ trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
+ if (hasMessages) {
+ var envelope = dequeue
+ while (envelope ne null) {
+ deadLetters.enqueue(owner.self, envelope)
+ envelope = dequeue
+ }
+ }
+ }
}
/**
* Mailbox configuration.
*/
trait MailboxType {
- def create(receiver: ActorContext): Mailbox
+ def create(owner: Option[ActorContext]): MessageQueue
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
- final override def create(receiver: ActorContext): Mailbox =
- new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
+ final override def create(owner: Option[ActorContext]): MessageQueue =
+ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@@ -353,16 +366,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
- final override def create(receiver: ActorContext): Mailbox =
- new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
+ final override def create(owner: Option[ActorContext]): MessageQueue =
+ new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
- final override def create(receiver: ActorContext): Mailbox =
- new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
+ final override def create(owner: Option[ActorContext]): MessageQueue =
+ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
}
}
@@ -372,8 +385,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
- final override def create(receiver: ActorContext): Mailbox =
- new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
+ final override def create(owner: Option[ActorContext]): MessageQueue =
+ new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
index f42efd1da1..649b365beb 100644
--- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala
@@ -12,23 +12,28 @@ import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
+import akka.config.ConfigurationException
+import akka.dispatch.MessageQueue
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
- override def create(owner: ActorContext) = new BeanstalkBasedMailbox(owner)
+ override def create(owner: Option[ActorContext]): MessageQueue = owner match {
+ case Some(o) ⇒ new BeanstalkBasedMessageQueue(o)
+ case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
+ }
}
/**
* @author Jonas Bonér
*/
-class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
+class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt
- val log = Logging(system, "BeanstalkBasedMailbox")
+ val log = Logging(system, "BeanstalkBasedMessageQueue")
private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) }
@@ -107,4 +112,6 @@ class BeanstalkBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
private def reconnect(name: String): ThreadLocal[Client] = {
new ThreadLocal[Client] { override def initialValue: Client = connect(name) }
}
+
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
index 1d9f72a579..8be117d89e 100644
--- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala
@@ -6,20 +6,24 @@ package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import akka.actor.ActorContext
-import akka.dispatch.Envelope
+import akka.dispatch.{ Envelope, MessageQueue }
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
+import akka.config.ConfigurationException
class FileBasedMailboxType(config: Config) extends MailboxType {
- override def create(owner: ActorContext) = new FileBasedMailbox(owner)
+ override def create(owner: Option[ActorContext]): MessageQueue = owner match {
+ case Some(o) ⇒ new FileBasedMessageQueue(o)
+ case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
+ }
}
-class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
+class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
- val log = Logging(system, "FileBasedMailbox")
+ val log = Logging(system, "FileBasedMessageQueue")
private val settings = FileBasedMailboxExtension(owner.system)
val queuePath = settings.QueuePath
@@ -69,4 +73,6 @@ class FileBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with
case NonFatal(_) ⇒ false
}
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
+
}
diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
index 69f7fb50c1..77b932911d 100644
--- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
+++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala
@@ -4,7 +4,7 @@
package akka.actor.mailbox
import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem }
-import akka.dispatch.{ Envelope, DefaultSystemMessageQueue, CustomMailbox }
+import akka.dispatch.{ Envelope, MessageQueue }
import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol }
@@ -12,7 +12,7 @@ private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
}
-abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
+abstract class DurableMessageQueue(val owner: ActorContext) extends MessageQueue {
import DurableExecutableMailboxConfig._
def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem]
@@ -22,7 +22,7 @@ abstract class DurableMailbox(val owner: ActorContext) extends CustomMailbox(own
}
-trait DurableMessageSerialization { this: DurableMailbox ⇒
+trait DurableMessageSerialization { this: DurableMessageQueue ⇒
def serialize(durableMessage: Envelope): Array[Byte] = {
diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala
index 86e190a5f7..27152a4f51 100644
--- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala
+++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala
@@ -17,7 +17,7 @@ import akka.remote.RemoteProtocol.MessageProtocol
import akka.remote.MessageSerializer
import akka.actor.ExtendedActorSystem
-class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] {
+class BSONSerializableMessageQueue(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] {
protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = {
@@ -25,7 +25,7 @@ class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableB
val b = Map.newBuilder[String, Any]
b += "_id" -> msg._id
b += "ownerPath" -> msg.ownerPath
- b += "senderPath" -> msg.sender.path
+ b += "senderPath" -> msg.sender.path.toString
/**
* TODO - Figure out a way for custom serialization of the message instance
diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
index f2ca76df77..23de168370 100644
--- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala
@@ -14,11 +14,16 @@ import akka.dispatch.{ Await, Promise, Envelope }
import java.util.concurrent.TimeoutException
import akka.dispatch.MailboxType
import com.typesafe.config.Config
+import akka.config.ConfigurationException
+import akka.dispatch.MessageQueue
class MongoBasedMailboxException(message: String) extends AkkaException(message)
class MongoBasedMailboxType(config: Config) extends MailboxType {
- override def create(owner: ActorContext) = new MongoBasedMailbox(owner)
+ override def create(owner: Option[ActorContext]): MessageQueue = owner match {
+ case Some(o) ⇒ new MongoBasedMessageQueue(o)
+ case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
+ }
}
/**
@@ -32,14 +37,16 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
*
* @author Brendan W. McAdams
*/
-class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
+class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
- implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
+ implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
+ private val dispatcher = owner.dispatcher
+
private val settings = MongoBasedMailboxExtension(owner.system)
- val log = Logging(system, "MongoBasedMailbox")
+ val log = Logging(system, "MongoBasedMessageQueue")
@volatile
private var mongo = connect()
@@ -129,4 +136,6 @@ class MongoBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) {
}
}
}
+
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
index f5248ac635..fbc5830a30 100644
--- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala
@@ -12,21 +12,26 @@ import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
+import akka.config.ConfigurationException
+import akka.dispatch.MessageQueue
class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailboxType(config: Config) extends MailboxType {
- override def create(owner: ActorContext) = new RedisBasedMailbox(owner)
+ override def create(owner: Option[ActorContext]): MessageQueue = owner match {
+ case Some(o) ⇒ new RedisBasedMessageQueue(o)
+ case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
+ }
}
-class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
+class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
- val log = Logging(system, "RedisBasedMailbox")
+ val log = Logging(system, "RedisBasedMessageQueue")
def enqueue(receiver: ActorRef, envelope: Envelope) {
withErrorHandling {
@@ -75,5 +80,7 @@ class RedisBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) wit
throw error
}
}
+
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = ()
}
diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
index 2a88c565e6..203a94b620 100644
--- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
+++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala
@@ -13,20 +13,25 @@ import akka.actor.ActorRef
import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
+import akka.config.ConfigurationException
+import akka.dispatch.MessageQueue
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
- override def create(owner: ActorContext) = new ZooKeeperBasedMailbox(owner)
+ override def create(owner: Option[ActorContext]): MessageQueue = owner match {
+ case Some(o) ⇒ new ZooKeeperBasedMessageQueue(o)
+ case None ⇒ throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
+ }
}
-class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner) with DurableMessageSerialization {
+class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"
- val log = Logging(system, "ZooKeeperBasedMailbox")
+ val log = Logging(system, "ZooKeeperBasedMessageQueue")
private val zkClient = new AkkaZkClient(
settings.ZkServerAddresses,
@@ -59,7 +64,7 @@ class ZooKeeperBasedMailbox(_owner: ActorContext) extends DurableMailbox(_owner)
case e: Exception ⇒ false
}
- override def cleanUp() {
+ def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
try {
zkClient.close()
} catch {
diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
index 8b2d15a079..aba582ae68 100644
--- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala
@@ -9,10 +9,12 @@ import java.util.LinkedList
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
-import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
+import akka.dispatch.{ MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.util.duration.intToDurationInt
import akka.util.{ Switch, Duration }
import akka.util.NonFatal
+import akka.actor.ActorContext
+import akka.dispatch.MessageQueue
/*
* Locking rules:
@@ -75,9 +77,12 @@ private[testkit] class CallingThreadDispatcherQueues extends Extension {
val q = ref.get
if (q ne null) && (q ne own)
} {
- while (q.peek ne null) {
+ val owner = mbox.actor.self
+ var msg = q.q.dequeue()
+ while (msg ne null) {
// this is safe because this method is only ever called while holding the suspendSwitch monitor
- own.push(q.pop)
+ own.q.enqueue(owner, msg)
+ msg = q.q.dequeue()
}
}
}
@@ -115,6 +120,7 @@ object CallingThreadDispatcher {
*/
class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites,
+ val mailboxType: MailboxType,
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._
@@ -122,7 +128,7 @@ class CallingThreadDispatcher(
override def id: String = Id
- protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
+ protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor, mailboxType)
protected[akka] override def shutdown() {}
@@ -183,10 +189,10 @@ class CallingThreadDispatcher(
case mbox: CallingThreadMailbox ⇒
val queue = mbox.queue
val execute = mbox.suspendSwitch.fold {
- queue.push(handle)
+ queue.q.enqueue(receiver.self, handle)
false
} {
- queue.push(handle)
+ queue.q.enqueue(receiver.self, handle)
if (!queue.isActive) {
queue.enter
true
@@ -219,7 +225,7 @@ class CallingThreadDispatcher(
queue.leave
null
} {
- val ret = if (mbox.isClosed) null else queue.pop
+ val ret = if (mbox.isClosed) null else queue.q.dequeue()
if (ret eq null) queue.leave
ret
}
@@ -261,19 +267,13 @@ class CallingThreadDispatcher(
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
- private val instance = new CallingThreadDispatcher(prerequisites)
+
+ private val instance = new CallingThreadDispatcher(prerequisites, mailboxType())
override def dispatcher(): MessageDispatcher = instance
}
-class NestingQueue {
- private var q = new LinkedList[Envelope]()
- def size = q.size
- def isEmpty = q.isEmpty
- def push(handle: Envelope) { q.offer(handle) }
- def peek = q.peek
- def pop = q.poll
-
+class NestingQueue(val q: MessageQueue) {
@volatile
private var active = false
def enter { if (active) sys.error("already active") else active = true }
@@ -281,11 +281,11 @@ class NestingQueue {
def isActive = active
}
-class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
+class CallingThreadMailbox(_receiver: ActorCell, val mailboxType: MailboxType) extends Mailbox(_receiver, null) with DefaultSystemMessageQueue {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = {
- val queue = new NestingQueue
+ val queue = new NestingQueue(mailboxType.create(Some(actor)))
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
queue
}
@@ -296,11 +296,6 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
val ctdLock = new ReentrantLock
val suspendSwitch = new Switch
- override def enqueue(receiver: ActorRef, msg: Envelope) {}
- override def dequeue() = null
- override def hasMessages = queue.isEmpty
- override def numberOfMessages = queue.size
-
override def cleanUp(): Unit = {
/*
* This is called from dispatcher.unregister, i.e. under this.lock. If
@@ -308,8 +303,10 @@ class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
*/
suspendSwitch.locked {
- CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue)
+ val q = queue
+ CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, q)
super.cleanUp()
+ q.q.cleanUp(actor, actor.systemImpl.deadLetterQueue)
}
}
}