Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-12-20 10:58:31 +01:00
commit 7fc19ec99d
26 changed files with 280 additions and 203 deletions

View file

@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getString("akka.actor.default-dispatcher.mailboxType") must be("")
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
settings.DispatcherDefaultShutdown must equal(1 second)
getInt("akka.actor.default-dispatcher.throughput") must equal(5)

View file

@ -1,9 +1,13 @@
package akka.dispatch
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.ActorContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -144,3 +148,26 @@ class PriorityMailboxSpec extends MailboxSpec {
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
}
}
object CustomMailboxSpec {
val config = """
my-dispatcher {
mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox"
}
"""
class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must {
"support custom mailboxType" in {
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox])
}
}
}

View file

@ -29,7 +29,7 @@ akka {
logConfigOnStart = off
# List FQCN of extensions which shall be loaded at actor system startup.
# Should be on the format: 'extensions = ["foo", "bar"]' etc.
# Should be on the format: 'extensions = ["foo", "bar"]' etc.
# FIXME: clarify "extensions" here, "Akka Extensions (<link to docs>)"
extensions = []
@ -59,7 +59,7 @@ akka {
deployment {
# deployment id pattern - on the format: /parent/child etc.
# deployment id pattern - on the format: /parent/child etc.
default {
# routing (load-balance) scheme to use
@ -160,6 +160,10 @@ akka {
# Specifies the timeout to add a new message to a mailbox that is full -
# negative number means infinite timeout
mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used.
mailboxType = ""
}
debug {

View file

@ -16,7 +16,6 @@ import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import com.eaio.uuid.UUID
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.TimeUnit
import java.util.{ Collection JCollection }
import java.util.regex.Pattern

View file

@ -19,7 +19,6 @@ import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigResolveOptions
import com.typesafe.config.ConfigException
import java.lang.reflect.InvocationTargetException
import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap }
@ -409,9 +408,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
case Left(e) throw e
case Right(p) p
}
}

View file

@ -271,11 +271,15 @@ abstract class MessageDispatcherConfigurator() {
def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher
def mailboxType(config: Config, settings: Settings): MailboxType = {
val capacity = config.getInt("mailbox-capacity")
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration)
config.getString("mailboxType") match {
case ""
val capacity = config.getInt("mailbox-capacity")
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration)
}
case fqn new CustomMailboxType(fqn)
}
}

View file

@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
import com.typesafe.config.Config
import akka.actor.ActorContext
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -33,7 +35,17 @@ object Mailbox {
final val debug = false
}
abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
/**
* Custom mailbox implementations are implemented by extending this class.
*/
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
/**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
*/
private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
import Mailbox._
@volatile
@ -317,15 +329,15 @@ trait QueueBasedMessageQueue extends MessageQueue {
* Mailbox configuration.
*/
trait MailboxType {
def create(receiver: ActorCell): Mailbox
def create(receiver: ActorContext): Mailbox
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@ -335,16 +347,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
}
}
@ -354,10 +366,36 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
/**
* Mailbox factory that creates instantiates the implementation from a
* fully qualified class name. The implementation class must have
* a constructor with a [[akka.actor.ActorContext]] parameter.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
override def create(receiver: ActorContext): Mailbox = {
val constructorSignature = Array[Class[_]](classOf[ActorContext])
ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s".
format(mailboxFQN, exception.toString))
}
}
}

View file

@ -12,7 +12,6 @@ import akka.util.ReentrantGuard
import akka.util.duration._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
@ -516,34 +515,54 @@ trait LoggingAdapter {
*/
def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) error(cause, format(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(message: String) { if (isErrorEnabled) notifyError(message) }
def error(template: String, arg1: Any) { if (isErrorEnabled) error(format(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3, arg4)) }
def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) }
def warning(message: String) { if (isWarningEnabled) notifyWarning(message) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) warning(format(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3, arg4)) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) }
def info(message: String) { if (isInfoEnabled) notifyInfo(message) }
def info(template: String, arg1: Any) { if (isInfoEnabled) info(format(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) info(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3, arg4)) }
def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) }
def debug(message: String) { if (isDebugEnabled) notifyDebug(message) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) debug(format(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3, arg4)) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) }
def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) }
def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) }
final def isEnabled(level: Logging.LogLevel): Boolean = level match {
case Logging.ErrorLevel isErrorEnabled
case Logging.WarningLevel isWarningEnabled
case Logging.InfoLevel isInfoEnabled
case Logging.DebugLevel isDebugEnabled
}
final def notifyLog(level: Logging.LogLevel, message: String): Unit = level match {
case Logging.ErrorLevel if (isErrorEnabled) notifyError(message)
case Logging.WarningLevel if (isWarningEnabled) notifyWarning(message)
case Logging.InfoLevel if (isInfoEnabled) notifyInfo(message)
case Logging.DebugLevel if (isDebugEnabled) notifyDebug(message)
}
def format(t: String, arg: Any*) = {
val sb = new StringBuilder

View file

@ -5,7 +5,6 @@ package akka.routing
import akka.actor._
import akka.japi.Creator
import java.lang.reflect.InvocationTargetException
import akka.config.ConfigurationException
import java.util.concurrent.atomic.AtomicInteger
import akka.util.{ ReflectiveAccess, Timeout }

View file

@ -5,6 +5,7 @@
package akka.util
import akka.actor._
import java.lang.reflect.InvocationTargetException
object ReflectiveAccess {
@ -14,22 +15,19 @@ object ReflectiveAccess {
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Either[Exception, T] = try {
args: Array[AnyRef]): Either[Exception, T] = withErrorHandling {
assert(clazz ne null)
assert(params ne null)
assert(args ne null)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: Exception
Left(e)
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Either[Exception, T] = try {
classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling {
assert(params ne null)
assert(args ne null)
getClassFor(fqn, classloader) match {
@ -39,9 +37,6 @@ object ReflectiveAccess {
Right(ctor.newInstance(args: _*).asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
//Obtains a reference to fqn.MODULE$
@ -100,5 +95,24 @@ object ReflectiveAccess {
case e: Exception Left(e)
}
/**
* Caught exception is returned as Left(exception).
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
* Other `Throwable`, such as `Error` is thrown.
*/
@inline
private final def withErrorHandling[T](body: Either[Exception, T]): Either[Exception, T] = {
try {
body
} catch {
case e: InvocationTargetException e.getTargetException match {
case t: Exception Left(t)
case t throw t
}
case e: Exception
Left(e)
}
}
}

View file

@ -4,15 +4,14 @@
package akka.docs.actor.mailbox
//#imports
import akka.actor.Actor
import akka.actor.Props
import akka.actor.mailbox.FileDurableMailboxType
//#imports
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
@ -20,15 +19,23 @@ class MyActor extends Actor {
}
}
class DurableMailboxDocSpec extends AkkaSpec {
object DurableMailboxDocSpec {
val config = """
//#dispatcher-config
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
//#dispatcher-config
"""
}
"define dispatcher with durable mailbox" in {
//#define-dispatcher
val dispatcher = system.dispatcherFactory.newDispatcher(
"my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build
class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
//#define-dispatcher
myActor ! "hello"
//#dispatcher-config-use
}
}

View file

@ -4,7 +4,6 @@
package akka.docs.actor.mailbox;
//#imports
import akka.actor.mailbox.DurableMailboxType;
import akka.dispatch.MessageDispatcher;
import akka.actor.UntypedActorFactory;
import akka.actor.UntypedActor;
@ -12,8 +11,12 @@ import akka.actor.Props;
//#imports
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@ -21,24 +24,35 @@ import static org.junit.Assert.*;
public class DurableMailboxDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem",
ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void defineDispatcher() {
ActorSystem system = ActorSystem.create("MySystem");
//#define-dispatcher
MessageDispatcher dispatcher = system.dispatcherFactory()
.newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build();
public void configDefinedDispatcher() {
//#dispatcher-config-use
MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
}));
//#define-dispatcher
}), "myactor");
//#dispatcher-config-use
myActor.tell("test");
system.shutdown();
}
public static class MyUntypedActor extends UntypedActor {
public void onReceive(Object message) {
}
}
}

View file

@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the
You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using.
Here is an example in Scala:
In the configuration of the dispatcher you specify the fully qualified class name
of the mailbox:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,define-dispatcher
:include: dispatcher-config
Here is an example of how to create an actor with a durable dispatcher, in Scala:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,dispatcher-config-use
Corresponding example in Java:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java
:include: imports,define-dispatcher
:include: imports,dispatcher-config-use
The actor is oblivious to which type of mailbox it is using.
@ -89,14 +96,11 @@ you need.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
mailbox = akka.actor.mailbox.FileDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType()
Config::
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
You can also configure and tune the file-based durable mailbox. This is done in
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
mailbox = akka.actor.mailbox.RedisDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType()
Config::
my-dispatcher {
mailboxType = akka.actor.mailbox.RedisBasedMailbox
}
You also need to configure the IP and port for the Redis server. This is done in
the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`.
@ -146,13 +147,11 @@ documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox
}
You also need to configure ZooKeeper server addresses, timeouts, etc. This is
done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`.
@ -171,13 +170,11 @@ Beanstalk documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox
}
You also need to configure the IP, and port, and so on, for the Beanstalk
server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the
@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.MongoDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.MongoBasedMailbox
}
You will need to configure the URI for the MongoDB server, using the URI Format specified in the
`MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in

View file

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
@ -78,7 +78,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
// TODO PN: Why volatile on local variable?
@volatile
var connected = false
// TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue
// TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue
var attempts = 0
var client: Client = null
while (!connected) {

View file

@ -1,4 +1,7 @@
package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxType)
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd",
new CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox"))

View file

@ -5,12 +5,12 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMailbox")

View file

@ -1,9 +1,11 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) {
class FileBasedMailboxSpec extends DurableMailboxSpec("File",
new CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")) {
def clean {
val queuePath = FileBasedMailboxExtension(system).QueuePath

View file

@ -4,15 +4,14 @@
package akka.actor.mailbox
import akka.util.ReflectiveAccess
import java.lang.reflect.InvocationTargetException
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.SerializedActorRef
import akka.dispatch.Envelope
import akka.dispatch.DefaultSystemMessageQueue
import akka.dispatch.Dispatcher
import akka.dispatch.Mailbox
import akka.dispatch.CustomMailbox
import akka.dispatch.MailboxType
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageQueue
@ -24,6 +23,7 @@ import akka.remote.RemoteActorRefProvider
import akka.remote.netty.NettyRemoteServer
import akka.serialization.Serialization
import com.typesafe.config.Config
import akka.dispatch.CustomMailboxType
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
@ -33,7 +33,7 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue {
abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
import DurableExecutableMailboxConfig._
def system = owner.system
@ -45,7 +45,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa
trait DurableMessageSerialization {
def owner: ActorCell
def owner: ActorContext
def serialize(durableMessage: Envelope): Array[Byte] = {
@ -73,73 +73,3 @@ trait DurableMessageSerialization {
}
abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType {
val constructorSignature = Array[Class[_]](classOf[ActorCell])
val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match {
case Right(clazz) clazz
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString))
}
//TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc
def create(receiver: ActorCell): Mailbox = {
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance.asInstanceOf[Mailbox]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString))
}
}
}
case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox")
case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox")
case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")
case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox")
case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")
case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN)
/**
* Java API for the mailbox types. Usage:
* <pre><code>
* MessageDispatcher dispatcher = system.dispatcherFactory()
* .newDispatcher("my-dispatcher", 1, DurableMailboxType.redisDurableMailboxType()).build();
* </code></pre>
*/
object DurableMailboxType {
def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType
def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType
def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType
def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType
def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType
def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN)
}
/**
* Configurator for the DurableMailbox
* Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file",
* or a full class name of the Mailbox implementation.
*/
class DurableMailboxConfigurator {
// TODO PN #896: when and how is this class supposed to be used? Can we remove it?
def mailboxType(config: Config): MailboxType = {
if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox")
config.getString("storage") match {
case "redis" RedisDurableMailboxType
case "mongodb" MongoDurableMailboxType
case "beanstalk" BeanstalkDurableMailboxType
case "zookeeper" ZooKeeperDurableMailboxType
case "file" FileDurableMailboxType
case fqn FqnDurableMailboxType(fqn)
}
}
}

View file

@ -1,15 +1,16 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit
import java.util.concurrent.CountDownLatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher
import akka.testkit.AkkaSpec
import akka.dispatch.Dispatchers
import akka.dispatch.MailboxType
import akka.testkit.AkkaSpec
object DurableMailboxSpecActorFactory {
@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory {
}
abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach {
abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach {
import DurableMailboxSpecActorFactory._
implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build

View file

@ -7,7 +7,7 @@ import akka.AkkaException
import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise }
@ -26,7 +26,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message)
*
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/
class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!

View file

@ -1,18 +1,19 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoDurableMailboxType) {
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb",
new CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")) {
import org.apache.log4j.{ Logger, Level }
import com.mongodb.async._

View file

@ -6,14 +6,14 @@ package akka.actor.mailbox
import com.redis._
import akka.actor.LocalActorRef
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)

View file

@ -1,4 +1,6 @@
package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxType)
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis",
new CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox"))

View file

@ -8,7 +8,7 @@ import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import org.I0Itec.zkclient.serialize._
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.cluster.zookeeper.AkkaZkClient
import akka.dispatch.Envelope
import akka.event.Logging
@ -17,7 +17,7 @@ import akka.actor.ActorRef
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues"

View file

@ -4,10 +4,13 @@ import akka.actor.{ Actor, LocalActorRef }
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
import akka.actor.ActorRef
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxType) {
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper",
new CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")) {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"

View file

@ -14,6 +14,7 @@ import java.net.URISyntaxException
import java.net.InetAddress
import java.net.UnknownHostException
import java.net.UnknownServiceException
import akka.event.Logging
/**
* Interface for remote transports to encode their addresses. The three parts
@ -135,7 +136,9 @@ trait RemoteModule {
/**
* Remote life-cycle events.
*/
sealed trait RemoteLifeCycleEvent
sealed trait RemoteLifeCycleEvent {
def logLevel: Logging.LogLevel
}
/**
* Life-cycle events for RemoteClient.
@ -148,6 +151,7 @@ case class RemoteClientError[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteClientError@" +
remoteAddress +
@ -159,6 +163,7 @@ case class RemoteClientError[T <: ParsedTransportAddress](
case class RemoteClientDisconnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteClientDisconnected@" + remoteAddress
}
@ -166,6 +171,7 @@ case class RemoteClientDisconnected[T <: ParsedTransportAddress](
case class RemoteClientConnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteClientConnected@" + remoteAddress
}
@ -173,6 +179,7 @@ case class RemoteClientConnected[T <: ParsedTransportAddress](
case class RemoteClientStarted[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteClientStarted@" + remoteAddress
}
@ -180,6 +187,7 @@ case class RemoteClientStarted[T <: ParsedTransportAddress](
case class RemoteClientShutdown[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteClientShutdown@" + remoteAddress
}
@ -189,6 +197,7 @@ case class RemoteClientWriteFailed[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteClientWriteFailed@" +
remoteAddress +
@ -206,12 +215,14 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
case class RemoteServerStarted[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteServerStarted@" + remote.name
}
case class RemoteServerShutdown[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteServerShutdown@" + remote.name
}
@ -219,6 +230,7 @@ case class RemoteServerShutdown[T <: ParsedTransportAddress](
case class RemoteServerError[T <: ParsedTransportAddress](
@BeanProperty val cause: Throwable,
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteServerError@" +
remote.name +
@ -230,6 +242,7 @@ case class RemoteServerError[T <: ParsedTransportAddress](
case class RemoteServerClientConnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientConnected@" +
remote.name +
@ -241,6 +254,7 @@ case class RemoteServerClientConnected[T <: ParsedTransportAddress](
case class RemoteServerClientDisconnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientDisconnected@" +
remote.name +
@ -252,6 +266,7 @@ case class RemoteServerClientDisconnected[T <: ParsedTransportAddress](
case class RemoteServerClientClosed[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientClosed@" +
remote.name +
@ -265,6 +280,7 @@ case class RemoteServerWriteFailed[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteServerWriteFailed@" +
remote +
@ -320,7 +336,7 @@ abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSyst
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
system.eventStream.publish(message)
system.log.debug("REMOTE: {}", message)
system.log.log(message.logLevel, "REMOTE: {}", message)
}
override def toString = name