From 0f3a720f3e4974437b6e804eafa5487452cfcafe Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 19 Dec 2011 13:35:10 +0100 Subject: [PATCH 1/7] Adding general badassery --- .../src/main/scala/akka/event/Logging.scala | 61 +++++++++++++------ .../scala/akka/remote/RemoteInterface.scala | 20 +++++- 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index face9a20d9..eab5ca716b 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -16,6 +16,7 @@ import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await +import annotation.switch object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream @@ -516,34 +517,54 @@ trait LoggingAdapter { */ def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) } - def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) error(cause, format(template, arg1)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3, arg4)) } + def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) } def error(message: String) { if (isErrorEnabled) notifyError(message) } - def error(template: String, arg1: Any) { if (isErrorEnabled) error(format(template, arg1)) } - def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(format(template, arg1, arg2)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3, arg4)) } + def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format(template, arg1)) } + def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) } def warning(message: String) { if (isWarningEnabled) notifyWarning(message) } - def warning(template: String, arg1: Any) { if (isWarningEnabled) warning(format(template, arg1)) } - def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3, arg4)) } + def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1)) } + def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) } def info(message: String) { if (isInfoEnabled) notifyInfo(message) } - def info(template: String, arg1: Any) { if (isInfoEnabled) info(format(template, arg1)) } - def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) info(format(template, arg1, arg2)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3, arg4)) } + def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1)) } + def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) } def debug(message: String) { if (isDebugEnabled) notifyDebug(message) } - def debug(template: String, arg1: Any) { if (isDebugEnabled) debug(format(template, arg1)) } - def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3, arg4)) } + def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1)) } + def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) } + + def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) } + def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) } + + final def isEnabled(level: Logging.LogLevel): Boolean = level match { + case Logging.ErrorLevel ⇒ isErrorEnabled + case Logging.WarningLevel ⇒ isWarningEnabled + case Logging.InfoLevel ⇒ isInfoEnabled + case Logging.DebugLevel ⇒ isDebugEnabled + } + + final def notifyLog(level: Logging.LogLevel, message: String): Unit = level match { + case Logging.ErrorLevel ⇒ if (isErrorEnabled) notifyError(message) + case Logging.WarningLevel ⇒ if (isWarningEnabled) notifyWarning(message) + case Logging.InfoLevel ⇒ if (isInfoEnabled) notifyInfo(message) + case Logging.DebugLevel ⇒ if (isDebugEnabled) notifyDebug(message) + } def format(t: String, arg: Any*) = { val sb = new StringBuilder diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala index 4e6730f65b..ff01e8eb26 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala @@ -14,6 +14,7 @@ import java.net.URISyntaxException import java.net.InetAddress import java.net.UnknownHostException import java.net.UnknownServiceException +import akka.event.Logging /** * Interface for remote transports to encode their addresses. The three parts @@ -135,7 +136,9 @@ trait RemoteModule { /** * Remote life-cycle events. */ -sealed trait RemoteLifeCycleEvent +sealed trait RemoteLifeCycleEvent { + def logLevel: Logging.LogLevel +} /** * Life-cycle events for RemoteClient. @@ -148,6 +151,7 @@ case class RemoteClientError[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.ErrorLevel override def toString = "RemoteClientError@" + remoteAddress + @@ -159,6 +163,7 @@ case class RemoteClientError[T <: ParsedTransportAddress]( case class RemoteClientDisconnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteClientDisconnected@" + remoteAddress } @@ -166,6 +171,7 @@ case class RemoteClientDisconnected[T <: ParsedTransportAddress]( case class RemoteClientConnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteClientConnected@" + remoteAddress } @@ -173,6 +179,7 @@ case class RemoteClientConnected[T <: ParsedTransportAddress]( case class RemoteClientStarted[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteClientStarted@" + remoteAddress } @@ -180,6 +187,7 @@ case class RemoteClientStarted[T <: ParsedTransportAddress]( case class RemoteClientShutdown[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteClientShutdown@" + remoteAddress } @@ -189,6 +197,7 @@ case class RemoteClientWriteFailed[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.WarningLevel override def toString = "RemoteClientWriteFailed@" + remoteAddress + @@ -206,12 +215,14 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent case class RemoteServerStarted[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteServerStarted@" + remote.name } case class RemoteServerShutdown[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteServerShutdown@" + remote.name } @@ -219,6 +230,7 @@ case class RemoteServerShutdown[T <: ParsedTransportAddress]( case class RemoteServerError[T <: ParsedTransportAddress]( @BeanProperty val cause: Throwable, @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.ErrorLevel override def toString = "RemoteServerError@" + remote.name + @@ -230,6 +242,7 @@ case class RemoteServerError[T <: ParsedTransportAddress]( case class RemoteServerClientConnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientConnected@" + remote.name + @@ -241,6 +254,7 @@ case class RemoteServerClientConnected[T <: ParsedTransportAddress]( case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientDisconnected@" + remote.name + @@ -252,6 +266,7 @@ case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( case class RemoteServerClientClosed[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientClosed@" + remote.name + @@ -265,6 +280,7 @@ case class RemoteServerWriteFailed[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.WarningLevel override def toString = "RemoteServerWriteFailed@" + remote + @@ -320,7 +336,7 @@ abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSyst protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = { system.eventStream.publish(message) - system.log.debug("REMOTE: {}", message) + system.log.log(message.logLevel, "REMOTE: {}", message) } override def toString = name From b7b1ea57e7c2d831a3860ff7e978915d0642a584 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 19 Dec 2011 13:49:04 +0100 Subject: [PATCH 2/7] Tweaking general badassery --- akka-actor/src/main/scala/akka/event/Logging.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index eab5ca716b..1797eb9d18 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -12,11 +12,9 @@ import akka.util.ReentrantGuard import akka.util.duration._ import akka.util.Timeout import java.util.concurrent.atomic.AtomicInteger -import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await -import annotation.switch object LoggingBus { implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream From 61813c6635e6a6232261c811fe16ded3293344d3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 19 Dec 2011 20:36:06 +0100 Subject: [PATCH 3/7] Make MailboxType implementation configurable. See #1484 * Added mailboxType property to dispatcher config * Changed durable mailboxes to use this * Updated docs for durable mailboxes --- .../test/scala/akka/config/ConfigSpec.scala | 1 + .../akka/dispatch/MailboxConfigSpec.scala | 26 +++++++ akka-actor/src/main/resources/reference.conf | 8 ++- .../akka/dispatch/AbstractDispatcher.scala | 14 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 30 ++++++++ .../actor/mailbox/DurableMailboxDocSpec.scala | 34 +++++++-- .../mailbox/DurableMailboxDocTestBase.java | 46 ++++++++++-- akka-docs/modules/durable-mailbox.rst | 65 ++++++++--------- .../akka/actor/mailbox/DurableMailbox.scala | 71 +++---------------- .../actor/mailbox/DurableMailboxSpec.scala | 7 +- 10 files changed, 183 insertions(+), 119 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index b0529e19cf..ff4aacc4b5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) + getString("akka.actor.default-dispatcher.mailboxType") must be("") getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index d0c2053243..0e9a1b20cf 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -4,6 +4,9 @@ import java.util.concurrent.{ TimeUnit, BlockingQueue } import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.actor.ActorRef +import akka.actor.ActorCell +import java.util.concurrent.ConcurrentLinkedQueue @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -144,3 +147,26 @@ class PriorityMailboxSpec extends MailboxSpec { case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) } } + +object CustomMailboxSpec { + val config = """ + my-dispatcher { + mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox" + } + """ + + class MyMailbox(owner: ActorCell) extends Mailbox(owner) + with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new ConcurrentLinkedQueue[Envelope]() + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { + "Dispatcher configuration" must { + "support custom mailboxType" in { + val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher") + dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) + } + } +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index d9447fa0c7..4eeb3a29fe 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -29,7 +29,7 @@ akka { logConfigOnStart = off # List FQCN of extensions which shall be loaded at actor system startup. - # Should be on the format: 'extensions = ["foo", "bar"]' etc. + # Should be on the format: 'extensions = ["foo", "bar"]' etc. # FIXME: clarify "extensions" here, "Akka Extensions ()" extensions = [] @@ -59,7 +59,7 @@ akka { deployment { - # deployment id pattern - on the format: /parent/child etc. + # deployment id pattern - on the format: /parent/child etc. default { # routing (load-balance) scheme to use @@ -160,6 +160,10 @@ akka { # Specifies the timeout to add a new message to a mailbox that is full - # negative number means infinite timeout mailbox-push-timeout-time = 10s + + # FQCN of the MailboxType, if not specified the default bounded or unbounded + # mailbox is used. + mailboxType = "" } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 35284879a4..b1ca951b36 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -271,11 +271,15 @@ abstract class MessageDispatcherConfigurator() { def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher def mailboxType(config: Config, settings: Settings): MailboxType = { - val capacity = config.getInt("mailbox-capacity") - if (capacity < 1) UnboundedMailbox() - else { - val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) - BoundedMailbox(capacity, duration) + config.getString("mailboxType") match { + case "" ⇒ + val capacity = config.getInt("mailbox-capacity") + if (capacity < 1) UnboundedMailbox() + else { + val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) + BoundedMailbox(capacity, duration) + } + case fqn ⇒ new CustomMailboxType(fqn) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index fdb46f0ec4..633898c4e9 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent._ import annotation.tailrec import akka.event.Logging.Error +import com.typesafe.config.Config +import java.lang.reflect.InvocationTargetException class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -361,3 +363,31 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va } } +class CustomMailboxType(mailboxFQN: String) extends MailboxType { + + def create(receiver: ActorCell): Mailbox = { + val constructorSignature = Array[Class[_]](classOf[ActorCell]) + ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { + case Right(instance) ⇒ instance.asInstanceOf[Mailbox] + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". + format(mailboxClass.getName, cause.toString)) + } + } + + private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { + case Right(clazz) ⇒ clazz + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + throw new IllegalArgumentException("Cannot find mailbox class [%s] due to: %s". + format(mailboxFQN, cause.toString)) + } +} + diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index 7a18da6182..7140da03bb 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -4,15 +4,18 @@ package akka.docs.actor.mailbox //#imports -import akka.actor.Actor import akka.actor.Props -import akka.actor.mailbox.FileDurableMailboxType //#imports +//#imports2 +import akka.actor.mailbox.FileDurableMailboxType +//#imports2 + import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec +import akka.actor.Actor class MyActor extends Actor { def receive = { @@ -20,14 +23,31 @@ class MyActor extends Actor { } } -class DurableMailboxDocSpec extends AkkaSpec { +object DurableMailboxDocSpec { + val config = """ + //#dispatcher-config + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } + //#dispatcher-config + """ +} - "define dispatcher with durable mailbox" in { - //#define-dispatcher +class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { + + "configuration of dispatcher with durable mailbox" in { + //#dispatcher-config-use + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher") + val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") + //#dispatcher-config-use + } + + "programatically define dispatcher with durable mailbox" in { + //#prog-define-dispatcher val dispatcher = system.dispatcherFactory.newDispatcher( "my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build - val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") - //#define-dispatcher + val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher)) + //#prog-define-dispatcher myActor ! "hello" } diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index ba411ffbb0..02066d8673 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -4,7 +4,6 @@ package akka.docs.actor.mailbox; //#imports -import akka.actor.mailbox.DurableMailboxType; import akka.dispatch.MessageDispatcher; import akka.actor.UntypedActorFactory; import akka.actor.UntypedActor; @@ -12,8 +11,17 @@ import akka.actor.Props; //#imports +//#imports2 +import akka.actor.mailbox.DurableMailboxType; +//#imports2 + +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import akka.testkit.AkkaSpec; +import akka.docs.dispatcher.DispatcherDocSpec; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -21,20 +29,44 @@ import static org.junit.Assert.*; public class DurableMailboxDocTestBase { + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("MySystem", + ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf())); + } + + @After + public void tearDown() { + system.shutdown(); + } + @Test - public void defineDispatcher() { - ActorSystem system = ActorSystem.create("MySystem"); - //#define-dispatcher + public void configDefinedDispatcher() { + //#dispatcher-config-use + MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher"); + ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { + public UntypedActor create() { + return new MyUntypedActor(); + } + }), "myactor"); + //#dispatcher-config-use + myActor.tell("test"); + } + + @Test + public void programaticallyDefinedDispatcher() { + //#prog-define-dispatcher MessageDispatcher dispatcher = system.dispatcherFactory() .newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build(); ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { public UntypedActor create() { return new MyUntypedActor(); } - })); - //#define-dispatcher + }), "myactor"); + //#prog-define-dispatcher myActor.tell("test"); - system.shutdown(); } public static class MyUntypedActor extends UntypedActor { diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 74af6c3ca5..2f6ca9e261 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the You configure durable mailboxes through the dispatcher. The actor is oblivious to which type of mailbox it is using. -Here is an example in Scala: + +In the configuration of the dispatcher you specify the fully qualified class name +of the mailbox: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: imports,define-dispatcher + :include: dispatcher-config + +Here is an example of how to create an actor with a durable dispatcher, in Scala: + +.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: imports,dispatcher-config-use Corresponding example in Java: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java - :include: imports,define-dispatcher + :include: imports,dispatcher-config-use The actor is oblivious to which type of mailbox it is using. @@ -89,14 +96,11 @@ you need. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.FileDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } You can also configure and tune the file-based durable mailbox. This is done in the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. @@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.RedisDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.RedisBasedMailbox + } You also need to configure the IP and port for the Redis server. This is done in the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`. @@ -146,13 +147,11 @@ documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox + } You also need to configure ZooKeeper server addresses, timeouts, etc. This is done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`. @@ -171,13 +170,11 @@ Beanstalk documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox + } You also need to configure the IP, and port, and so on, for the Beanstalk server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the @@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.MongoDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.MongoBasedMailbox + } You will need to configure the URI for the MongoDB server, using the URI Format specified in the `MongoDB Documentation `_. This is done in diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 7eb30b2fdb..2c5fd7cb88 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -24,6 +24,7 @@ import akka.remote.RemoteActorRefProvider import akka.remote.netty.NettyRemoteServer import akka.serialization.Serialization import com.typesafe.config.Config +import akka.dispatch.CustomMailboxType private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -73,39 +74,11 @@ trait DurableMessageSerialization { } -abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType { - val constructorSignature = Array[Class[_]](classOf[ActorCell]) - - val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { - case Right(clazz) ⇒ clazz - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString)) - } - - //TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc - def create(receiver: ActorCell): Mailbox = { - ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { - case Right(instance) ⇒ instance.asInstanceOf[Mailbox] - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString)) - } - } -} - -case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox") -case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox") -case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") -case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox") -case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") -case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN) +case object RedisDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox") +case object MongoDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox") +case object BeanstalkDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") +case object FileDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.FileBasedMailbox") +case object ZooKeeperDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") /** * Java API for the mailbox types. Usage: @@ -115,31 +88,9 @@ case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType( * */ object DurableMailboxType { - def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType - def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType - def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType - def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType - def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType - def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN) -} - -/** - * Configurator for the DurableMailbox - * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file", - * or a full class name of the Mailbox implementation. - */ -class DurableMailboxConfigurator { - // TODO PN #896: when and how is this class supposed to be used? Can we remove it? - - def mailboxType(config: Config): MailboxType = { - if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox") - config.getString("storage") match { - case "redis" ⇒ RedisDurableMailboxType - case "mongodb" ⇒ MongoDurableMailboxType - case "beanstalk" ⇒ BeanstalkDurableMailboxType - case "zookeeper" ⇒ ZooKeeperDurableMailboxType - case "file" ⇒ FileDurableMailboxType - case fqn ⇒ FqnDurableMailboxType(fqn) - } - } + def redisDurableMailboxType(): MailboxType = RedisDurableMailboxType + def mongoDurableMailboxType(): MailboxType = MongoDurableMailboxType + def beanstalkDurableMailboxType(): MailboxType = BeanstalkDurableMailboxType + def fileDurableMailboxType(): MailboxType = FileDurableMailboxType + def zooKeeperDurableMailboxType(): MailboxType = ZooKeeperDurableMailboxType } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index eec1b03192..5bce062203 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -1,15 +1,16 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit +import java.util.concurrent.CountDownLatch import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import akka.actor._ import akka.actor.Actor._ -import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher -import akka.testkit.AkkaSpec import akka.dispatch.Dispatchers +import akka.dispatch.MailboxType +import akka.testkit.AkkaSpec object DurableMailboxSpecActorFactory { @@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory { } -abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach { +abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach { import DurableMailboxSpecActorFactory._ implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build From 83b08b20d926e15ff8fb4ed8156d8d819251574f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 19 Dec 2011 21:46:37 +0100 Subject: [PATCH 4/7] Added CustomMailbox for user defined mailbox implementations with ActorContext instead of ActorCell. * Mailbox is still there, with ActorCell, for internal use. * Implemented durable mailboxes with CustomMailbox --- .../akka/dispatch/MailboxConfigSpec.scala | 7 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 37 ++++++++++++------- .../actor/mailbox/BeanstalkBasedMailbox.scala | 6 +-- .../actor/mailbox/FiledBasedMailbox.scala | 4 +- .../akka/actor/mailbox/DurableMailbox.scala | 8 ++-- .../actor/mailbox/MongoBasedMailbox.scala | 4 +- .../actor/mailbox/RedisBasedMailbox.scala | 4 +- .../actor/mailbox/ZooKeeperBasedMailbox.scala | 4 +- 8 files changed, 43 insertions(+), 31 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 0e9a1b20cf..d3dd9e9209 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,12 +1,13 @@ package akka.dispatch + import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import java.util.concurrent.{ TimeUnit, BlockingQueue } +import java.util.concurrent.ConcurrentLinkedQueue import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec import akka.actor.ActorRef -import akka.actor.ActorCell -import java.util.concurrent.ConcurrentLinkedQueue +import akka.actor.ActorContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -155,7 +156,7 @@ object CustomMailboxSpec { } """ - class MyMailbox(owner: ActorCell) extends Mailbox(owner) + class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 633898c4e9..9940b16aa3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.event.Logging.Error import com.typesafe.config.Config import java.lang.reflect.InvocationTargetException +import akka.actor.ActorContext class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -35,7 +36,17 @@ object Mailbox { final val debug = false } -abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { +/** + * Custom mailbox implementations are implemented by extending this class. + */ +abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) + +/** + * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, + * but can't be exposed to user defined mailbox subclasses. + * + */ +private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ @volatile @@ -319,15 +330,15 @@ trait QueueBasedMessageQueue extends MessageQueue { * Mailbox configuration. */ trait MailboxType { - def create(receiver: ActorCell): Mailbox + def create(receiver: ActorContext): Mailbox } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -337,16 +348,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } } @@ -356,8 +367,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } @@ -365,8 +376,8 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va class CustomMailboxType(mailboxFQN: String) extends MailboxType { - def create(receiver: ActorCell): Mailbox = { - val constructorSignature = Array[Class[_]](classOf[ActorCell]) + override def create(receiver: ActorContext): Mailbox = { + val constructorSignature = Array[Class[_]](classOf[ActorContext]) ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { case Right(instance) ⇒ instance.asInstanceOf[Mailbox] case Left(exception) ⇒ @@ -379,7 +390,7 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType { } } - private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { + private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorContext].getClassLoader) match { case Right(clazz) ⇒ clazz case Left(exception) ⇒ val cause = exception match { diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index c680511697..57d7b3e098 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess /** * @author Jonas Bonér */ -class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt @@ -78,7 +78,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) // TODO PN: Why volatile on local variable? @volatile var connected = false - // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue + // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue var attempts = 0 var client: Client = null while (!connected) { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 8fa7f81e25..55c96ea65c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -5,12 +5,12 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef -class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMailbox") diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 2c5fd7cb88..aa27a731e5 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -6,13 +6,13 @@ package akka.actor.mailbox import akka.util.ReflectiveAccess import java.lang.reflect.InvocationTargetException import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.actor.ActorRef import akka.actor.SerializedActorRef import akka.dispatch.Envelope import akka.dispatch.DefaultSystemMessageQueue import akka.dispatch.Dispatcher -import akka.dispatch.Mailbox +import akka.dispatch.CustomMailbox import akka.dispatch.MailboxType import akka.dispatch.MessageDispatcher import akka.dispatch.MessageQueue @@ -34,7 +34,7 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable) def this(message: String) = this(message, null) } -abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue { +abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ def system = owner.system @@ -46,7 +46,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa trait DurableMessageSerialization { - def owner: ActorCell + def owner: ActorContext def serialize(durableMessage: Envelope): Array[Byte] = { diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 6e1c28219d..b404e3c844 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -7,7 +7,7 @@ import akka.AkkaException import com.mongodb.async._ import com.mongodb.async.futures.RequestFutures import org.bson.collection._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } @@ -26,7 +26,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) * * @author Brendan W. McAdams */ -class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { +class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index f937be09e0..21c5555590 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -6,14 +6,14 @@ package akka.actor.mailbox import com.redis._ import akka.actor.LocalActorRef import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef class RedisBasedMailboxException(message: String) extends AkkaException(message) -class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 3a50b93e93..1cdedba25d 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -8,7 +8,7 @@ import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import org.I0Itec.zkclient.serialize._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.cluster.zookeeper.AkkaZkClient import akka.dispatch.Envelope import akka.event.Logging @@ -17,7 +17,7 @@ import akka.actor.ActorRef class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) -class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" From 5fd40e53caa8e4c380c0dd0b88037daa2a14542f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 20 Dec 2011 09:19:34 +0100 Subject: [PATCH 5/7] Updates after feedback --- .../main/scala/akka/dispatch/Mailbox.scala | 26 ++++++++++--------- .../actor/mailbox/DurableMailboxDocSpec.scala | 13 ---------- .../mailbox/DurableMailboxDocTestBase.java | 20 +------------- .../mailbox/BeanstalkBasedMailboxSpec.scala | 5 +++- .../actor/mailbox/FileBasedMailboxSpec.scala | 4 ++- .../akka/actor/mailbox/DurableMailbox.scala | 20 -------------- .../actor/mailbox/MongoBasedMailboxSpec.scala | 7 ++--- .../actor/mailbox/RedisBasedMailboxSpec.scala | 4 ++- .../mailbox/ZooKeeperBasedMailboxSpec.scala | 5 +++- 9 files changed, 33 insertions(+), 71 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 9940b16aa3..d9951c61e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -374,11 +374,23 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va } } +/** + * Mailbox factory that creates instantiates the implementation from a + * fully qualified class name. The implementation class must have + * a constructor with a [[akka.actor.ActorContext]] parameter. + * E.g. + * + * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) + * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + * val queue = new ConcurrentLinkedQueue[Envelope]() + * } + * + */ class CustomMailboxType(mailboxFQN: String) extends MailboxType { override def create(receiver: ActorContext): Mailbox = { val constructorSignature = Array[Class[_]](classOf[ActorContext]) - ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { + ReflectiveAccess.createInstance[AnyRef](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { case Right(instance) ⇒ instance.asInstanceOf[Mailbox] case Left(exception) ⇒ val cause = exception match { @@ -386,19 +398,9 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType { case _ ⇒ exception } throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". - format(mailboxClass.getName, cause.toString)) + format(mailboxFQN, cause.toString)) } } - private def mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorContext].getClassLoader) match { - case Right(clazz) ⇒ clazz - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new IllegalArgumentException("Cannot find mailbox class [%s] due to: %s". - format(mailboxFQN, cause.toString)) - } } diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index 7140da03bb..863c48a15b 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -8,10 +8,6 @@ import akka.actor.Props //#imports -//#imports2 -import akka.actor.mailbox.FileDurableMailboxType -//#imports2 - import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec @@ -42,13 +38,4 @@ class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { //#dispatcher-config-use } - "programatically define dispatcher with durable mailbox" in { - //#prog-define-dispatcher - val dispatcher = system.dispatcherFactory.newDispatcher( - "my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build - val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher)) - //#prog-define-dispatcher - myActor ! "hello" - } - } diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index 02066d8673..8b904f5ef6 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -11,16 +11,11 @@ import akka.actor.Props; //#imports -//#imports2 -import akka.actor.mailbox.DurableMailboxType; -//#imports2 - import org.junit.After; import org.junit.Before; import org.junit.Test; import akka.testkit.AkkaSpec; -import akka.docs.dispatcher.DispatcherDocSpec; import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -55,22 +50,9 @@ public class DurableMailboxDocTestBase { myActor.tell("test"); } - @Test - public void programaticallyDefinedDispatcher() { - //#prog-define-dispatcher - MessageDispatcher dispatcher = system.dispatcherFactory() - .newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build(); - ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { - public UntypedActor create() { - return new MyUntypedActor(); - } - }), "myactor"); - //#prog-define-dispatcher - myActor.tell("test"); - } - public static class MyUntypedActor extends UntypedActor { public void onReceive(Object message) { } } + } diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 88b1bf85ae..f011352d2b 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -1,4 +1,7 @@ package akka.actor.mailbox +import akka.dispatch.CustomMailboxType + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxType) +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", + new CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index d1a36d14eb..43b8b2c048 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,9 +1,11 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) { +class FileBasedMailboxSpec extends DurableMailboxSpec("File", + new CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")) { def clean { val queuePath = FileBasedMailboxExtension(system).QueuePath diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index aa27a731e5..f94dfe83ca 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -74,23 +74,3 @@ trait DurableMessageSerialization { } -case object RedisDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox") -case object MongoDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox") -case object BeanstalkDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") -case object FileDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.FileBasedMailbox") -case object ZooKeeperDurableMailboxType extends CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") - -/** - * Java API for the mailbox types. Usage: - *

- * MessageDispatcher dispatcher = system.dispatcherFactory()
- *   .newDispatcher("my-dispatcher", 1, DurableMailboxType.redisDurableMailboxType()).build();
- * 
- */ -object DurableMailboxType { - def redisDurableMailboxType(): MailboxType = RedisDurableMailboxType - def mongoDurableMailboxType(): MailboxType = MongoDurableMailboxType - def beanstalkDurableMailboxType(): MailboxType = BeanstalkDurableMailboxType - def fileDurableMailboxType(): MailboxType = FileDurableMailboxType - def zooKeeperDurableMailboxType(): MailboxType = ZooKeeperDurableMailboxType -} diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index ec13cff17f..4746a47242 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -1,18 +1,19 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit - import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } - import akka.actor._ import akka.actor.Actor._ import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoDurableMailboxType) { +class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", + new CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")) { + import org.apache.log4j.{ Logger, Level } import com.mongodb.async._ diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 8b8ea6d611..ecb700d383 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -1,4 +1,6 @@ package akka.actor.mailbox +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxType) +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", + new CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox")) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 3cdc734830..888c46c1ea 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -4,10 +4,13 @@ import akka.actor.{ Actor, LocalActorRef } import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher +import akka.dispatch.CustomMailboxType import akka.actor.ActorRef @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxType) { +class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", + new CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")) { + val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" From fb510d50187390bb514d8393e2f42633cc6872f2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 20 Dec 2011 09:59:08 +0100 Subject: [PATCH 6/7] Unwrap InvocationTargetException in ReflectiveAccess.createInstance. See #1555 --- .../src/main/scala/akka/actor/Actor.scala | 1 - .../main/scala/akka/actor/ActorSystem.scala | 6 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 7 +---- .../src/main/scala/akka/routing/Routing.scala | 1 - .../scala/akka/util/ReflectiveAccess.scala | 29 ++++++++++++++----- .../akka/actor/mailbox/DurableMailbox.scala | 1 - 6 files changed, 24 insertions(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1c94d4304d..ccf8b3c6c8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -16,7 +16,6 @@ import akka.AkkaException import scala.reflect.BeanProperty import scala.util.control.NoStackTrace import com.eaio.uuid.UUID -import java.lang.reflect.InvocationTargetException import java.util.concurrent.TimeUnit import java.util.{ Collection ⇒ JCollection } import java.util.regex.Pattern diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 203c8e3510..5e6f2e72e4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -19,7 +19,6 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigResolveOptions import com.typesafe.config.ConfigException -import java.lang.reflect.InvocationTargetException import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap } @@ -409,9 +408,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor val values: Array[AnyRef] = arguments map (_._2) toArray ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match { - case Left(e: InvocationTargetException) ⇒ throw e.getTargetException - case Left(e) ⇒ throw e - case Right(p) ⇒ p + case Left(e) ⇒ throw e + case Right(p) ⇒ p } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index d9951c61e8..1691713ea3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -11,7 +11,6 @@ import java.util.concurrent._ import annotation.tailrec import akka.event.Logging.Error import com.typesafe.config.Config -import java.lang.reflect.InvocationTargetException import akka.actor.ActorContext class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -393,12 +392,8 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType { ReflectiveAccess.createInstance[AnyRef](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { case Right(instance) ⇒ instance.asInstanceOf[Mailbox] case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". - format(mailboxFQN, cause.toString)) + format(mailboxFQN, exception.toString)) } } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b047af5888..0ce9e9adfb 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -5,7 +5,6 @@ package akka.routing import akka.actor._ import akka.japi.Creator -import java.lang.reflect.InvocationTargetException import akka.config.ConfigurationException import java.util.concurrent.atomic.AtomicInteger import akka.util.{ ReflectiveAccess, Timeout } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 065e11ba78..0e738541ae 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -5,6 +5,7 @@ package akka.util import akka.actor._ +import java.lang.reflect.InvocationTargetException object ReflectiveAccess { @@ -14,22 +15,19 @@ object ReflectiveAccess { def createInstance[T](clazz: Class[_], params: Array[Class[_]], - args: Array[AnyRef]): Either[Exception, T] = try { + args: Array[AnyRef]): Either[Exception, T] = withErrorHandling { assert(clazz ne null) assert(params ne null) assert(args ne null) val ctor = clazz.getDeclaredConstructor(params: _*) ctor.setAccessible(true) Right(ctor.newInstance(args: _*).asInstanceOf[T]) - } catch { - case e: Exception ⇒ - Left(e) } def createInstance[T](fqn: String, params: Array[Class[_]], args: Array[AnyRef], - classloader: ClassLoader = loader): Either[Exception, T] = try { + classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling { assert(params ne null) assert(args ne null) getClassFor(fqn, classloader) match { @@ -39,9 +37,6 @@ object ReflectiveAccess { Right(ctor.newInstance(args: _*).asInstanceOf[T]) case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly } - } catch { - case e: Exception ⇒ - Left(e) } //Obtains a reference to fqn.MODULE$ @@ -100,5 +95,23 @@ object ReflectiveAccess { case e: Exception ⇒ Left(e) } + /** + * Caught exception is returned as Left(exception). + * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. + * Other `Throwable`, such as `Error` is thrown. + */ + private def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = { + try { + body + } catch { + case e: InvocationTargetException ⇒ e.getTargetException match { + case t: Exception ⇒ Left(t) + case t ⇒ throw t + } + case e: Exception ⇒ + Left(e) + } + } + } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index f94dfe83ca..c8f9026cbf 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -4,7 +4,6 @@ package akka.actor.mailbox import akka.util.ReflectiveAccess -import java.lang.reflect.InvocationTargetException import akka.AkkaException import akka.actor.ActorContext import akka.actor.ActorRef From d87d9e2e80424f8fe103dc6147ba19e68f4d8110 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 20 Dec 2011 10:08:53 +0100 Subject: [PATCH 7/7] Additional feedback, thanks. --- akka-actor/src/main/scala/akka/dispatch/Mailbox.scala | 4 ++-- akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 1691713ea3..86b286e916 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -389,8 +389,8 @@ class CustomMailboxType(mailboxFQN: String) extends MailboxType { override def create(receiver: ActorContext): Mailbox = { val constructorSignature = Array[Class[_]](classOf[ActorContext]) - ReflectiveAccess.createInstance[AnyRef](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { - case Right(instance) ⇒ instance.asInstanceOf[Mailbox] + ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { + case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". format(mailboxFQN, exception.toString)) diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 0e738541ae..70b6fa5a03 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -100,7 +100,8 @@ object ReflectiveAccess { * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. * Other `Throwable`, such as `Error` is thrown. */ - private def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = { + @inline + private final def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = { try { body } catch {