Updates after feedback
This commit is contained in:
parent
83b08b20d9
commit
5fd40e53ca
9 changed files with 33 additions and 71 deletions
|
|
@ -374,11 +374,23 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mailbox factory that creates instantiates the implementation from a
|
||||
* fully qualified class name. The implementation class must have
|
||||
* a constructor with a [[akka.actor.ActorContext]] parameter.
|
||||
* E.g.
|
||||
* <pre<code>
|
||||
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
|
||||
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
|
||||
* val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
* }
|
||||
* </code></pre>
|
||||
*/
|
||||
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
|
||||
|
||||
override def create(receiver: ActorContext): Mailbox = {
|
||||
val constructorSignature = Array[Class[_]](classOf[ActorContext])
|
||||
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
|
||||
ReflectiveAccess.createInstance[AnyRef](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match {
|
||||
case Right(instance) ⇒ instance.asInstanceOf[Mailbox]
|
||||
case Left(exception) ⇒
|
||||
val cause = exception match {
|
||||
|
|
@ -386,19 +398,9 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType {
|
|||
case _ ⇒ exception
|
||||
}
|
||||
throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s".
|
||||
format(mailboxClass.getName, cause.toString))
|
||||
format(mailboxFQN, cause.toString))
|
||||
}
|
||||
}
|
||||
|
||||
private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorContext].getClassLoader) match {
|
||||
case Right(clazz) ⇒ clazz
|
||||
case Left(exception) ⇒
|
||||
val cause = exception match {
|
||||
case i: InvocationTargetException ⇒ i.getTargetException
|
||||
case _ ⇒ exception
|
||||
}
|
||||
throw new IllegalArgumentException("Cannot find mailbox class [%s] due to: %s".
|
||||
format(mailboxFQN, cause.toString))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,6 @@ import akka.actor.Props
|
|||
|
||||
//#imports
|
||||
|
||||
//#imports2
|
||||
import akka.actor.mailbox.FileDurableMailboxType
|
||||
//#imports2
|
||||
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.testkit.AkkaSpec
|
||||
|
|
@ -42,13 +38,4 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
|
|||
//#dispatcher-config-use
|
||||
}
|
||||
|
||||
"programatically define dispatcher with durable mailbox" in {
|
||||
//#prog-define-dispatcher
|
||||
val dispatcher = system.dispatcherFactory.newDispatcher(
|
||||
"my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build
|
||||
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher))
|
||||
//#prog-define-dispatcher
|
||||
myActor ! "hello"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,16 +11,11 @@ import akka.actor.Props;
|
|||
|
||||
//#imports
|
||||
|
||||
//#imports2
|
||||
import akka.actor.mailbox.DurableMailboxType;
|
||||
//#imports2
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import akka.testkit.AkkaSpec;
|
||||
import akka.docs.dispatcher.DispatcherDocSpec;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
|
|
@ -55,22 +50,9 @@ public class DurableMailboxDocTestBase {
|
|||
myActor.tell("test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void programaticallyDefinedDispatcher() {
|
||||
//#prog-define-dispatcher
|
||||
MessageDispatcher dispatcher = system.dispatcherFactory()
|
||||
.newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build();
|
||||
ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new MyUntypedActor();
|
||||
}
|
||||
}), "myactor");
|
||||
//#prog-define-dispatcher
|
||||
myActor.tell("test");
|
||||
}
|
||||
|
||||
public static class MyUntypedActor extends UntypedActor {
|
||||
public void onReceive(Object message) {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,7 @@
|
|||
package akka.actor.mailbox
|
||||
|
||||
import akka.dispatch.CustomMailboxType
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxType)
|
||||
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd",
|
||||
new CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox"))
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
package akka.actor.mailbox
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import akka.dispatch.CustomMailboxType
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) {
|
||||
class FileBasedMailboxSpec extends DurableMailboxSpec("File",
|
||||
new CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")) {
|
||||
|
||||
def clean {
|
||||
val queuePath = FileBasedMailboxExtension(system).QueuePath
|
||||
|
|
|
|||
|
|
@ -74,23 +74,3 @@ trait DurableMessageSerialization {
|
|||
|
||||
}
|
||||
|
||||
case object RedisDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox")
|
||||
case object MongoDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")
|
||||
case object BeanstalkDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")
|
||||
case object FileDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")
|
||||
case object ZooKeeperDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")
|
||||
|
||||
/**
|
||||
* Java API for the mailbox types. Usage:
|
||||
* <pre><code>
|
||||
* MessageDispatcher dispatcher = system.dispatcherFactory()
|
||||
* .newDispatcher("my-dispatcher", 1, DurableMailboxType.redisDurableMailboxType()).build();
|
||||
* </code></pre>
|
||||
*/
|
||||
object DurableMailboxType {
|
||||
def redisDurableMailboxType(): MailboxType = RedisDurableMailboxType
|
||||
def mongoDurableMailboxType(): MailboxType = MongoDurableMailboxType
|
||||
def beanstalkDurableMailboxType(): MailboxType = BeanstalkDurableMailboxType
|
||||
def fileDurableMailboxType(): MailboxType = FileDurableMailboxType
|
||||
def zooKeeperDurableMailboxType(): MailboxType = ZooKeeperDurableMailboxType
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,18 +1,19 @@
|
|||
package akka.actor.mailbox
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
||||
|
||||
import akka.actor._
|
||||
import akka.actor.Actor._
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.dispatch.MessageDispatcher
|
||||
import akka.dispatch.CustomMailboxType
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoDurableMailboxType) {
|
||||
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb",
|
||||
new CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")) {
|
||||
|
||||
import org.apache.log4j.{ Logger, Level }
|
||||
import com.mongodb.async._
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
package akka.actor.mailbox
|
||||
import akka.dispatch.CustomMailboxType
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxType)
|
||||
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis",
|
||||
new CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox"))
|
||||
|
|
|
|||
|
|
@ -4,10 +4,13 @@ import akka.actor.{ Actor, LocalActorRef }
|
|||
import akka.cluster.zookeeper._
|
||||
import org.I0Itec.zkclient._
|
||||
import akka.dispatch.MessageDispatcher
|
||||
import akka.dispatch.CustomMailboxType
|
||||
import akka.actor.ActorRef
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxType) {
|
||||
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper",
|
||||
new CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")) {
|
||||
|
||||
val dataPath = "_akka_cluster/data"
|
||||
val logPath = "_akka_cluster/log"
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue