Merge pull request #346 from jboner/wip-1843-MailboxTypes-∂π
clean up mailbox types configurability, see #1843
This commit is contained in:
commit
db5bf9dd75
4 changed files with 29 additions and 10 deletions
|
|
@ -383,12 +383,10 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
def mailboxType(): MailboxType = {
|
||||
config.getString("mailbox-type") match {
|
||||
case "" ⇒
|
||||
val capacity = config.getInt("mailbox-capacity")
|
||||
if (capacity < 1) UnboundedMailbox()
|
||||
else {
|
||||
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
|
||||
BoundedMailbox(capacity, duration)
|
||||
}
|
||||
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
|
||||
else new BoundedMailbox(config)
|
||||
case "unbounded" ⇒ UnboundedMailbox()
|
||||
case "bounded" ⇒ new BoundedMailbox(config)
|
||||
case fqcn ⇒
|
||||
val args = Seq(classOf[Config] -> config)
|
||||
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import java.util.concurrent._
|
|||
import annotation.tailrec
|
||||
import akka.event.Logging.Error
|
||||
import akka.actor.ActorContext
|
||||
import com.typesafe.config.Config
|
||||
|
||||
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
|
||||
|
||||
|
|
@ -355,6 +356,9 @@ trait MailboxType {
|
|||
* It's a case class for Java (new UnboundedMailbox)
|
||||
*/
|
||||
case class UnboundedMailbox() extends MailboxType {
|
||||
|
||||
def this(config: Config) = this()
|
||||
|
||||
final override def create(owner: Option[ActorContext]): MessageQueue =
|
||||
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
|
|
@ -363,6 +367,9 @@ case class UnboundedMailbox() extends MailboxType {
|
|||
|
||||
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
|
||||
|
||||
def this(config: Config) = this(config.getInt("mailbox-capacity"),
|
||||
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||
|
||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
||||
|
|
|
|||
|
|
@ -20,8 +20,11 @@ import akka.event.LoggingAdapter;
|
|||
//#imports-prio
|
||||
|
||||
//#imports-prio-mailbox
|
||||
import akka.actor.ActorContext;
|
||||
import akka.dispatch.PriorityGenerator;
|
||||
import akka.dispatch.UnboundedPriorityMailbox;
|
||||
import akka.dispatch.MailboxType;
|
||||
import akka.dispatch.MessageQueue;
|
||||
import com.typesafe.config.Config;
|
||||
|
||||
//#imports-prio-mailbox
|
||||
|
|
@ -120,7 +123,7 @@ public class DispatcherDocTestBase {
|
|||
}
|
||||
|
||||
//#prio-mailbox
|
||||
public static class PrioMailbox extends UnboundedPriorityMailbox {
|
||||
public static class PrioMailbox implements MailboxType {
|
||||
|
||||
static final PriorityGenerator generator = new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
|
||||
@Override
|
||||
|
|
@ -135,9 +138,15 @@ public class DispatcherDocTestBase {
|
|||
return 50; // We default to 50
|
||||
}
|
||||
};
|
||||
|
||||
private UnboundedPriorityMailbox priorityMailbox;
|
||||
|
||||
public PrioMailbox(Config config) {
|
||||
super(generator);
|
||||
public PrioMailbox(Config config) { // needed for reflective instantiation
|
||||
priorityMailbox = new UnboundedPriorityMailbox(generator);
|
||||
}
|
||||
|
||||
public MessageQueue create(Option<ActorContext> owner) {
|
||||
return priorityMailbox.create(owner);
|
||||
}
|
||||
}
|
||||
//#prio-mailbox
|
||||
|
|
|
|||
|
|
@ -108,6 +108,8 @@ object DispatcherDocSpec {
|
|||
//#prio-mailbox
|
||||
import akka.dispatch.PriorityGenerator
|
||||
import akka.dispatch.UnboundedPriorityMailbox
|
||||
import akka.dispatch.MailboxType
|
||||
import akka.actor.ActorContext
|
||||
import com.typesafe.config.Config
|
||||
|
||||
val generator = PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
|
||||
|
|
@ -118,7 +120,10 @@ object DispatcherDocSpec {
|
|||
}
|
||||
|
||||
// We create a new Priority dispatcher and seed it with the priority generator
|
||||
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(generator)
|
||||
class PrioMailbox(config: Config) extends MailboxType {
|
||||
val priorityMailbox = UnboundedPriorityMailbox(generator)
|
||||
def create(owner: Option[ActorContext]) = priorityMailbox.create(owner)
|
||||
}
|
||||
//#prio-mailbox
|
||||
|
||||
class MyActor extends Actor {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue