Added CustomMailbox for user defined mailbox implementations with ActorContext instead of ActorCell.
* Mailbox is still there, with ActorCell, for internal use. * Implemented durable mailboxes with CustomMailbox
This commit is contained in:
parent
61813c6635
commit
83b08b20d9
8 changed files with 43 additions and 31 deletions
|
|
@ -12,6 +12,7 @@ import annotation.tailrec
|
|||
import akka.event.Logging.Error
|
||||
import com.typesafe.config.Config
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import akka.actor.ActorContext
|
||||
|
||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
|
|
@ -35,7 +36,17 @@ object Mailbox {
|
|||
final val debug = false
|
||||
}
|
||||
|
||||
abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
|
||||
/**
|
||||
* Custom mailbox implementations are implemented by extending this class.
|
||||
*/
|
||||
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
|
||||
|
||||
/**
|
||||
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
|
||||
* but can't be exposed to user defined mailbox subclasses.
|
||||
*
|
||||
*/
|
||||
private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
|
||||
import Mailbox._
|
||||
|
||||
@volatile
|
||||
|
|
@ -319,15 +330,15 @@ trait QueueBasedMessageQueue extends MessageQueue {
|
|||
* Mailbox configuration.
|
||||
*/
|
||||
trait MailboxType {
|
||||
def create(receiver: ActorCell): Mailbox
|
||||
def create(receiver: ActorContext): Mailbox
|
||||
}
|
||||
|
||||
/**
|
||||
* It's a case class for Java (new UnboundedMailbox)
|
||||
*/
|
||||
case class UnboundedMailbox() extends MailboxType {
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
override def create(receiver: ActorContext) =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
}
|
||||
}
|
||||
|
|
@ -337,16 +348,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
override def create(receiver: ActorContext) =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new LinkedBlockingQueue[Envelope](capacity)
|
||||
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
|
||||
}
|
||||
}
|
||||
|
||||
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
override def create(receiver: ActorContext) =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
|
||||
}
|
||||
}
|
||||
|
|
@ -356,8 +367,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
|
|||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
override def create(receiver: ActorCell) =
|
||||
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
override def create(receiver: ActorContext) =
|
||||
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
|
||||
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
|
||||
}
|
||||
|
|
@ -365,8 +376,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
|
|||
|
||||
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
|
||||
|
||||
def create(receiver: ActorCell): Mailbox = {
|
||||
val constructorSignature = Array[Class[_]](classOf[ActorCell])
|
||||
override def create(receiver: ActorContext): Mailbox = {
|
||||
val constructorSignature = Array[Class[_]](classOf[ActorContext])
|
||||
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
|
||||
case Right(instance) ⇒ instance.asInstanceOf[Mailbox]
|
||||
case Left(exception) ⇒
|
||||
|
|
@ -379,7 +390,7 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType {
|
|||
}
|
||||
}
|
||||
|
||||
private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match {
|
||||
private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorContext].getClassLoader) match {
|
||||
case Right(clazz) ⇒ clazz
|
||||
case Left(exception) ⇒
|
||||
val cause = exception match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue