clean up mailbox types configurability, see #1843
- add (config) constructors to std mailbox types - update docs for prio mailbox to directly implement MailboxType
This commit is contained in:
parent
f1e12a8298
commit
c84daf83e8
4 changed files with 29 additions and 10 deletions
|
|
@ -383,12 +383,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
||||||
def mailboxType(): MailboxType = {
|
def mailboxType(): MailboxType = {
|
||||||
config.getString("mailbox-type") match {
|
config.getString("mailbox-type") match {
|
||||||
case "" ⇒
|
case "" ⇒
|
||||||
val capacity = config.getInt("mailbox-capacity")
|
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
|
||||||
if (capacity < 1) UnboundedMailbox()
|
else new BoundedMailbox(config)
|
||||||
else {
|
case "unbounded" ⇒ UnboundedMailbox()
|
||||||
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
|
case "bounded" ⇒ new BoundedMailbox(config)
|
||||||
BoundedMailbox(capacity, duration)
|
|
||||||
}
|
|
||||||
case fqcn ⇒
|
case fqcn ⇒
|
||||||
val args = Seq(classOf[Config] -> config)
|
val args = Seq(classOf[Config] -> config)
|
||||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import java.util.concurrent._
|
||||||
import annotation.tailrec
|
import annotation.tailrec
|
||||||
import akka.event.Logging.Error
|
import akka.event.Logging.Error
|
||||||
import akka.actor.ActorContext
|
import akka.actor.ActorContext
|
||||||
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||||
|
|
||||||
|
|
@ -355,6 +356,9 @@ trait MailboxType {
|
||||||
* It's a case class for Java (new UnboundedMailbox)
|
* It's a case class for Java (new UnboundedMailbox)
|
||||||
*/
|
*/
|
||||||
case class UnboundedMailbox() extends MailboxType {
|
case class UnboundedMailbox() extends MailboxType {
|
||||||
|
|
||||||
|
def this(config: Config) = this()
|
||||||
|
|
||||||
final override def create(owner: Option[ActorContext]): MessageQueue =
|
final override def create(owner: Option[ActorContext]): MessageQueue =
|
||||||
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||||
|
|
@ -363,6 +367,9 @@ case class UnboundedMailbox() extends MailboxType {
|
||||||
|
|
||||||
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
|
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
|
||||||
|
|
||||||
|
def this(config: Config) = this(config.getInt("mailbox-capacity"),
|
||||||
|
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||||
|
|
||||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,11 @@ import akka.event.LoggingAdapter;
|
||||||
//#imports-prio
|
//#imports-prio
|
||||||
|
|
||||||
//#imports-prio-mailbox
|
//#imports-prio-mailbox
|
||||||
|
import akka.actor.ActorContext;
|
||||||
import akka.dispatch.PriorityGenerator;
|
import akka.dispatch.PriorityGenerator;
|
||||||
import akka.dispatch.UnboundedPriorityMailbox;
|
import akka.dispatch.UnboundedPriorityMailbox;
|
||||||
|
import akka.dispatch.MailboxType;
|
||||||
|
import akka.dispatch.MessageQueue;
|
||||||
import com.typesafe.config.Config;
|
import com.typesafe.config.Config;
|
||||||
|
|
||||||
//#imports-prio-mailbox
|
//#imports-prio-mailbox
|
||||||
|
|
@ -120,7 +123,7 @@ public class DispatcherDocTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
public static class PrioMailbox extends UnboundedPriorityMailbox {
|
public static class PrioMailbox implements MailboxType {
|
||||||
|
|
||||||
static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -136,8 +139,14 @@ public class DispatcherDocTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
public PrioMailbox(Config config) {
|
private UnboundedPriorityMailbox priorityMailbox;
|
||||||
super(generator);
|
|
||||||
|
public PrioMailbox(Config config) { // needed for reflective instantiation
|
||||||
|
priorityMailbox = new UnboundedPriorityMailbox(generator);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageQueue create(Option<ActorContext> owner) {
|
||||||
|
return priorityMailbox.create(owner);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
|
|
|
||||||
|
|
@ -108,6 +108,8 @@ object DispatcherDocSpec {
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
import akka.dispatch.PriorityGenerator
|
import akka.dispatch.PriorityGenerator
|
||||||
import akka.dispatch.UnboundedPriorityMailbox
|
import akka.dispatch.UnboundedPriorityMailbox
|
||||||
|
import akka.dispatch.MailboxType
|
||||||
|
import akka.actor.ActorContext
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
val generator = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
val generator = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||||
|
|
@ -118,7 +120,10 @@ object DispatcherDocSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We create a new Priority dispatcher and seed it with the priority generator
|
// We create a new Priority dispatcher and seed it with the priority generator
|
||||||
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(generator)
|
class PrioMailbox(config: Config) extends MailboxType {
|
||||||
|
val priorityMailbox = UnboundedPriorityMailbox(generator)
|
||||||
|
def create(owner: Option[ActorContext]) = priorityMailbox.create(owner)
|
||||||
|
}
|
||||||
//#prio-mailbox
|
//#prio-mailbox
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue